import crypto from 'node:crypto'; import fs from 'node:fs/promises'; import * as path from 'node:path'; import * as vm from 'node:vm'; import { EventEmitter, on } from 'node:events'; import { setTimeout as setTimeoutCallback } from 'node:timers'; import { BehaviorSubject, EMPTY, from, lastValueFrom, merge, MonoTypeOperatorFunction, Observable, OperatorFunction, pipe, race, Subject, } from 'rxjs'; import { delay, filter, first, ignoreElements, last, map, mergeMap, takeUntil, takeWhile, tap } from 'rxjs/operators'; import type { RawSourceMap } from 'source-map'; import * as nexus from 'nexus-rpc'; import { Info as ActivityInfo } from '@temporalio/activity'; import { DataConverter, decompileRetryPolicy, defaultPayloadConverter, IllegalStateError, LoadedDataConverter, SdkComponent, Payload, ApplicationFailure, ensureApplicationFailure, TypedSearchAttributes, decodePriority, CancelledFailure, MetricMeter, ActivityCancellationDetails, } from '@temporalio/common'; import { decodeArrayFromPayloads, Decoded, decodeFromPayloadsAtIndex, encodeErrorToFailure, encodeToPayload, } from '@temporalio/common/lib/internal-non-workflow'; import { historyFromJSON } from '@temporalio/common/lib/proto-utils'; import { Duration, msToNumber, optionalTsToDate, optionalTsToMs, requiredTsToMs, tsToDate, tsToMs, } from '@temporalio/common/lib/time'; import { LoggerWithComposedMetadata } from '@temporalio/common/lib/logger'; import { errorMessage, NonNullableObject, OmitFirstParam } from '@temporalio/common/lib/type-helpers'; import { workflowLogAttributes } from '@temporalio/workflow/lib/logs'; import { native } from '@temporalio/core-bridge'; import { Client } from '@temporalio/client'; import { coresdk, temporal } from '@temporalio/proto'; import { type SinkCall, type WorkflowInfo } from '@temporalio/workflow'; import { throwIfReservedName } from '@temporalio/common/lib/reserved'; import { suggestContinueAsNewReasonsFromProto } from '@temporalio/common/lib/continue-as-new'; import { Activity, CancelReason, activityLogAttributes } from './activity'; import { extractNativeClient, extractReferenceHolders, InternalNativeConnection, NativeConnection } from './connection'; import { ActivityExecuteInput } from './interceptors'; import { Logger } from './logger'; import pkg from './pkg'; import { EvictionReason, evictionReasonToReplayError, RemoveFromCache, ReplayHistoriesIterable, ReplayResult, } from './replay'; import { History, Runtime } from './runtime'; import { CloseableGroupedObservable, closeableGroupBy, mapWithState, mergeMapWithState } from './rxutils'; import { byteArrayToBuffer, convertDeploymentVersion, convertToParentWorkflowType, convertToRootWorkflowType, } from './utils'; import { CompiledWorkerOptions, CompiledWorkerOptionsWithBuildId, compileWorkerOptions, isCodeBundleOption, isPathBundleOption, ReplayWorkerOptions, toNativeWorkerOptions, WorkerOptions, WorkerPlugin, WorkflowBundle, } from './worker-options'; import { WorkflowCodecRunner } from './workflow-codec-runner'; import { defaultWorkflowInterceptorModules, WorkflowCodeBundler } from './workflow/bundler'; import { Workflow, WorkflowCreator } from './workflow/interface'; import { ReusableVMWorkflowCreator } from './workflow/reusable-vm'; import { ThreadedVMWorkflowCreator } from './workflow/threaded-vm'; import { VMWorkflowCreator } from './workflow/vm'; import { WorkflowBundleWithSourceMapAndFilename } from './workflow/workflow-worker-thread/input'; import { CombinedWorkerRunError, GracefulShutdownPeriodExpiredError, PromiseCompletionTimeoutError, ShutdownError, UnexpectedError, } from './errors'; import { constructNexusOperationContext, NexusHandler } from './nexus'; import { handlerErrorToProto } from './nexus/conversions'; export { DataConverter, defaultPayloadConverter }; /** * The worker's possible states * * `INITIALIZED` - The initial state of the Worker after calling {@link Worker.create} and successful connection to the server * * `RUNNING` - {@link Worker.run} was called, polling task queues * * `STOPPING` - {@link Worker.shutdown} was called or received shutdown signal, worker will forcefully shutdown in {@link WorkerOptions.shutdownGraceTime | shutdownGraceTime} * * `DRAINING` - Core has indicated that shutdown is complete and all Workflow tasks have been drained, waiting for activities and cached workflows eviction * * `DRAINED` - All activities and workflows have completed, ready to shutdown * * `STOPPED` - Shutdown complete, {@link Worker.run} resolves * * `FAILED` - Worker encountered an unrecoverable error, {@link Worker.run} should reject with the error */ export type State = 'INITIALIZED' | 'RUNNING' | 'STOPPED' | 'STOPPING' | 'DRAINING' | 'DRAINED' | 'FAILED'; type WorkflowActivation = coresdk.workflow_activation.WorkflowActivation; export type ActivityTaskWithBase64Token = { task: coresdk.activity_task.ActivityTask; base64TaskToken: string; // The unaltered protobuf-encoded ActivityTask; kept so that it can be printed // out for analysis if decoding fails at a later step. protobufEncodedTask: Buffer; }; export type NexusTaskWithBase64Token = { task: coresdk.nexus.NexusTask; base64TaskToken: string; // The unaltered protobuf-encoded NexusTask; kept so that it can be printed // out for analysis if decoding fails at a later step. protobufEncodedTask: Buffer; }; interface EvictionWithRunID { runId: string; evictJob: coresdk.workflow_activation.IRemoveFromCache; } export interface NativeWorkerLike { type: 'worker'; initiateShutdown: OmitFirstParam; finalizeShutdown(): Promise; flushCoreLogs(): void; pollWorkflowActivation: OmitFirstParam; pollActivityTask: OmitFirstParam; pollNexusTask: OmitFirstParam; completeWorkflowActivation: OmitFirstParam; completeActivityTask: OmitFirstParam; completeNexusTask: OmitFirstParam; recordActivityHeartbeat: OmitFirstParam; replaceClient: OmitFirstParam; } export interface NativeReplayHandle { worker: NativeWorkerLike; historyPusher: native.HistoryPusher; } interface NativeWorkerConstructor { create( runtime: Runtime, connection: NativeConnection, options: CompiledWorkerOptionsWithBuildId ): Promise; createReplay(runtime: Runtime, options: CompiledWorkerOptionsWithBuildId): Promise; } interface WorkflowWithLogAttributes { workflow: Workflow; logAttributes: Record; } function addBuildIdIfMissing(options: CompiledWorkerOptions, bundleCode?: string): CompiledWorkerOptionsWithBuildId { const bid = options.buildId; // eslint-disable-line @typescript-eslint/no-deprecated if (bid != null) { return options as CompiledWorkerOptionsWithBuildId; } const suffix = bundleCode ? `+${crypto.createHash('sha256').update(bundleCode).digest('hex')}` : ''; return { ...options, buildId: `${pkg.name}@${pkg.version}${suffix}` }; } export class NativeWorker implements NativeWorkerLike { public readonly type = 'worker'; public readonly replaceClient: OmitFirstParam; public readonly pollWorkflowActivation: OmitFirstParam; public readonly pollActivityTask: OmitFirstParam; public readonly pollNexusTask: OmitFirstParam; public readonly completeWorkflowActivation: OmitFirstParam; public readonly completeActivityTask: OmitFirstParam; public readonly completeNexusTask: OmitFirstParam; public readonly recordActivityHeartbeat: OmitFirstParam; public readonly initiateShutdown: OmitFirstParam; public static async create( runtime: Runtime, connection: NativeConnection, options: CompiledWorkerOptionsWithBuildId ): Promise { const nativeWorker = await runtime.registerWorker(extractNativeClient(connection), toNativeWorkerOptions(options)); return new NativeWorker(runtime, nativeWorker); } public static async createReplay( runtime: Runtime, options: CompiledWorkerOptionsWithBuildId ): Promise { const [worker, historyPusher] = await runtime.createReplayWorker(toNativeWorkerOptions(options)); return { worker: new NativeWorker(runtime, worker), historyPusher, }; } protected constructor( protected readonly runtime: Runtime, protected readonly nativeWorker: native.Worker ) { this.replaceClient = native.workerReplaceClient.bind(undefined, nativeWorker); this.pollWorkflowActivation = native.workerPollWorkflowActivation.bind(undefined, nativeWorker); this.pollActivityTask = native.workerPollActivityTask.bind(undefined, nativeWorker); this.pollNexusTask = native.workerPollNexusTask.bind(undefined, nativeWorker); this.completeWorkflowActivation = native.workerCompleteWorkflowActivation.bind(undefined, nativeWorker); this.completeActivityTask = native.workerCompleteActivityTask.bind(undefined, nativeWorker); this.completeNexusTask = native.workerCompleteNexusTask.bind(undefined, nativeWorker); this.recordActivityHeartbeat = native.workerRecordActivityHeartbeat.bind(undefined, nativeWorker); this.initiateShutdown = native.workerInitiateShutdown.bind(undefined, nativeWorker); } flushCoreLogs(): void { this.runtime.flushLogs(); } public async finalizeShutdown(): Promise { await this.runtime.deregisterWorker(this.nativeWorker); } } function formatTaskToken(taskToken: Uint8Array) { return Buffer.from(taskToken).toString('base64'); } /** * Notify that an activity has started, used as input to {@link Worker.activityHeartbeatSubject} * * Used to detect rogue activities. */ interface HeartbeatCreateNotification { type: 'create'; base64TaskToken: string; } /** * Heartbeat request used as input to {@link Worker.activityHeartbeatSubject} */ interface Heartbeat { type: 'heartbeat'; info: ActivityInfo; base64TaskToken: string; taskToken: Uint8Array; details?: any; onError: () => void; } /** * Notifies that an activity has been complete, used as input to {@link Worker.activityHeartbeatSubject} */ interface ActivityCompleteNotification { type: 'completion'; flushRequired: boolean; callback(): void; base64TaskToken: string; } /** * Notifies that an Activity heartbeat has been flushed, used as input to {@link Worker.activityHeartbeatSubject} */ interface HeartbeatFlushNotification { type: 'flush'; base64TaskToken: string; } /** * Input for the {@link Worker.activityHeartbeatSubject} */ type HeartbeatInput = | Heartbeat | ActivityCompleteNotification | HeartbeatFlushNotification | HeartbeatCreateNotification; /** * State for managing a single Activity's heartbeat sending */ interface HeartbeatState { closed: boolean; processing: boolean; completionCallback?: () => void; pending?: Heartbeat; } /** * Request to send a heartbeat, used as output from the Activity heartbeat state mapper */ interface HeartbeatSendRequest { type: 'send'; heartbeat: Heartbeat; } /** * Request to close an Activity heartbeat stream, used as output from the Activity heartbeat state mapper */ interface HeartbeatGroupCloseRequest { type: 'close'; completionCallback?: () => void; } /** * Output from the Activity heartbeat state mapper */ type HeartbeatOutput = HeartbeatSendRequest | HeartbeatGroupCloseRequest; /** * Used as the return type of the Activity heartbeat state mapper */ interface HeartbeatStateAndOutput { state: HeartbeatState; output: HeartbeatOutput | null; } export type PollerState = 'POLLING' | 'SHUTDOWN' | 'FAILED'; /** * Status overview of a Worker. * Useful for troubleshooting issues and overall obeservability. */ export interface WorkerStatus { /** * General run state of a Worker */ runState: State; /** * General state of the Workflow poller */ workflowPollerState: PollerState; /** * General state of the Activity poller */ activityPollerState: PollerState; /** * Whether this Worker has an outstanding Workflow poll request */ hasOutstandingWorkflowPoll: boolean; /** * Whether this Worker has an outstanding Activity poll request */ hasOutstandingActivityPoll: boolean; /** * Number of in-flight (currently actively processed) Workflow activations */ numInFlightWorkflowActivations: number; /** * Number of in-flight (currently actively processed) Activities * * This includes both local and non-local Activities. * * See {@link numInFlightNonLocalActivities} and {@link numInFlightLocalActivities} for a breakdown. */ numInFlightActivities: number; /** * Number of in-flight (currently actively processed) non-Local Activities */ numInFlightNonLocalActivities: number; /** * Number of in-flight (currently actively processed) Local Activities */ numInFlightLocalActivities: number; /** * Number of Workflow executions cached in Worker memory */ numCachedWorkflows: number; /** * Number of running Activities that have emitted a heartbeat */ numHeartbeatingActivities: number; } interface RunUntilOptions { /** * Maximum time to wait for the provided Promise to complete after the Worker has stopped or failed. * * Until TS SDK 1.11.2, `Worker.runUntil(...)` would wait _indefinitely_ for both the Worker's run * Promise _and_ the provided Promise to resolve or fail, _even in error cases_. In most practical * use cases, that would create a possibility for the Worker to hang indefinitely if the Worker * was stopped due to "unexpected" factors * * For example, in the common test idiom show below, sending a `SIGINT` to the process would * initiate shutdown of the Worker, potentially resulting in termination of the Worker before the * Workflow completes; in that case, the Workflow would never complete, and consequently, the * `runUntil` Promise would never resolve, leaving the process in a hang state. * * ```ts * await Worker.runUntil(() => client.workflow.execute(...)); * ``` * * The behavior of `Worker.runUntil(...)` has therefore been changedin 1.11.3 so that if the worker * shuts down before the inner promise completes, `runUntil` will allow no more than a certain delay * (i.e. `promiseCompletionTimeout`) for the inner promise to complete, after which a * {@link PromiseCompletionTimeoutError} is thrown. * * In most practical use cases, no delay is actually required; `promiseCompletionTimeout` therefore * defaults to 0 second, meaning the Worker will not wait for the inner promise to complete. * You may adjust this value in the very rare cases where waiting is pertinent; set it to a * very high value to mimic the previous behavior. * * This time is calculated from the moment the Worker reachs either the `STOPPED` or the `FAILED` * state. {@link Worker.runUntil} throws a {@link PromiseCompletionTimeoutError} if the if the * Promise still hasn't completed after that delay. * * @default 0 don't wait */ promiseCompletionTimeout?: Duration; } /** * The temporal Worker connects to Temporal Server and runs Workflows and Activities. */ export class Worker { protected readonly activityHeartbeatSubject = new Subject(); protected readonly stateSubject = new BehaviorSubject('INITIALIZED'); // Pushing an error to this subject causes the Worker to initiate a graceful shutdown, after // which the Worker will be in FAILED state and the `run` promise will throw the first error // published on this subject. protected readonly unexpectedErrorSubject = new Subject(); // Pushing an error to this subject will cause the worker to IMMEDIATELY fall into FAILED state. // // The `run` promise will throw the first error reported on either this subject or the // `unexpectedErrorSubject` subject. That is, suppose that an "unexpected error" comes in, // which triggers graceful shutdown of the Worker, and then, while attempting to gracefully shut // down the Worker, we get some "instant terminate error". The Worker's `run` promise will throw // the _initial error_ rather than the "instant terminate error" that came later. This is so to // avoid masking the original error with a subsequent one that will likely be less relevant. // Both errors will still be reported to the logger. protected readonly instantTerminateErrorSubject = new Subject(); protected readonly workflowPollerStateSubject = new BehaviorSubject('POLLING'); protected readonly activityPollerStateSubject = new BehaviorSubject('POLLING'); protected readonly nexusPollerStateSubject = new BehaviorSubject('POLLING'); /** * Whether or not this worker has an outstanding workflow poll request */ protected hasOutstandingWorkflowPoll = false; /** * Whether or not this worker has an outstanding activity poll request */ protected hasOutstandingActivityPoll = false; /** * Whether or not this worker has an outstanding Nexus poll request */ protected hasOutstandingNexusPoll = false; protected _client?: Client; protected readonly numInFlightActivationsSubject = new BehaviorSubject(0); protected readonly numInFlightActivitiesSubject = new BehaviorSubject(0); protected readonly numInFlightNonLocalActivitiesSubject = new BehaviorSubject(0); protected readonly numInFlightLocalActivitiesSubject = new BehaviorSubject(0); protected readonly numInFlightNexusOperationsSubject = new BehaviorSubject(0); protected readonly numCachedWorkflowsSubject = new BehaviorSubject(0); protected readonly numHeartbeatingActivitiesSubject = new BehaviorSubject(0); protected readonly evictionsEmitter = new EventEmitter(); private readonly taskTokenToNexusHandler = new Map(); protected static nativeWorkerCtor: NativeWorkerConstructor = NativeWorker; // Used to add uniqueness to replay worker task queue names protected static replayWorkerCount = 0; private static readonly SELF_INDUCED_SHUTDOWN_EVICTION: RemoveFromCache = { message: 'Shutting down', reason: EvictionReason.FATAL, }; protected readonly workflowCodecRunner: WorkflowCodecRunner; /** * Create a new Worker. * This method initiates a connection to the server and will throw (asynchronously) on connection failure. */ public static async create(options: WorkerOptions): Promise { options.plugins = (options.plugins ?? []).concat(options.connection?.plugins ?? []); for (const plugin of options.plugins) { if (plugin.configureWorker !== undefined) { options = plugin.configureWorker(options); } } if (!options.taskQueue) { throw new TypeError('Task queue name is required'); } throwIfReservedName('task queue', options.taskQueue); const runtime = Runtime.instance(); const logger = LoggerWithComposedMetadata.compose(runtime.logger, { sdkComponent: SdkComponent.worker, taskQueue: options.taskQueue ?? 'default', }); const metricMeter = runtime.metricMeter.withTags({ namespace: options.namespace ?? 'default', taskQueue: options.taskQueue ?? 'default', }); const nativeWorkerCtor: NativeWorkerConstructor = this.nativeWorkerCtor; const compiledOptions = compileWorkerOptions(options, logger, metricMeter); logger.debug('Creating worker', { options: { ...compiledOptions, ...(compiledOptions.workflowBundle && isCodeBundleOption(compiledOptions.workflowBundle) ? { // Avoid dumping workflow bundle code to the console workflowBundle: { code: ``, }, } : {}), }, }); const bundle = await this.getOrCreateBundle(compiledOptions, logger); let workflowCreator: WorkflowCreator | undefined = undefined; if (bundle) { workflowCreator = await this.createWorkflowCreator(bundle, compiledOptions, logger); } // Create a new connection if one is not provided with no CREATOR reference // so it can be automatically closed when this Worker shuts down. const connection = options.connection ?? (await InternalNativeConnection.connect()); let nativeWorker: NativeWorkerLike; const compiledOptionsWithBuildId = addBuildIdIfMissing(compiledOptions, bundle?.code); try { nativeWorker = await nativeWorkerCtor.create(runtime, connection, compiledOptionsWithBuildId); } catch (err) { // We just created this connection, close it if (!options.connection) { await connection.close(); } throw err; } extractReferenceHolders(connection).add(nativeWorker); return new this( runtime, nativeWorker, workflowCreator, compiledOptionsWithBuildId, logger, metricMeter, options.plugins ?? [], connection ); } protected static async createWorkflowCreator( workflowBundle: WorkflowBundleWithSourceMapAndFilename, compiledOptions: CompiledWorkerOptions, logger: Logger ): Promise { const registeredActivityNames = new Set(compiledOptions.activities.keys()); // This isn't required for vscode, only for Chrome Dev Tools which doesn't support debugging worker threads. // We also rely on this in debug-replayer where we inject a global variable to be read from workflow context. if (compiledOptions.debugMode) { if (compiledOptions.reuseV8Context) { return await ReusableVMWorkflowCreator.create( workflowBundle, compiledOptions.isolateExecutionTimeoutMs, registeredActivityNames ); } return await VMWorkflowCreator.create( workflowBundle, compiledOptions.isolateExecutionTimeoutMs, registeredActivityNames ); } else { return await ThreadedVMWorkflowCreator.create({ workflowBundle, threadPoolSize: compiledOptions.workflowThreadPoolSize, isolateExecutionTimeoutMs: compiledOptions.isolateExecutionTimeoutMs, reuseV8Context: compiledOptions.reuseV8Context ?? true, registeredActivityNames, logger, }); } } /** * Create a replay Worker, and run the provided history against it. Will resolve as soon as * the history has finished being replayed, or if the workflow produces a nondeterminism error. * * @param workflowId If provided, use this as the workflow id during replay. Histories do not * contain a workflow id, so it must be provided separately if your workflow depends on it. * @throws {@link DeterminismViolationError} if the workflow code is not compatible with the history. * @throws {@link ReplayError} on any other replay related error. */ public static async runReplayHistory( options: ReplayWorkerOptions, history: History | unknown, workflowId?: string ): Promise { const validated = this.validateHistory(history); const result = await this.runReplayHistories(options, [ { history: validated, workflowId: workflowId ?? 'fake' }, ]).next(); if (result.done) throw new IllegalStateError('Expected at least one replay result'); if (result.value.error) throw result.value.error; } /** * Create a replay Worker, running all histories provided by the passed in iterable. * * Returns an async iterable of results for each history replayed. */ public static async *runReplayHistories( options: ReplayWorkerOptions, histories: ReplayHistoriesIterable ): AsyncIterableIterator { const [worker, pusher] = await this.constructReplayWorker(options); const rt = worker.runtime; const evictions = on(worker.evictionsEmitter, 'eviction') as AsyncIterableIterator<[EvictionWithRunID]>; const runPromise = worker.run().then(() => { throw new ShutdownError('Worker was shutdown'); }); void runPromise.catch(() => { // ignore to avoid unhandled rejections }); let innerError = undefined; try { try { for await (const { history, workflowId } of histories) { const validated = this.validateHistory(history); await rt.pushHistory(pusher, workflowId, validated); const next = await Promise.race([evictions.next(), runPromise]); if (next.done) { break; // This shouldn't happen, handle just in case } const [{ runId, evictJob }] = next.value; const error = evictionReasonToReplayError(evictJob); // We replay one workflow at a time so the workflow ID comes from the histories iterable. yield { workflowId, runId, error, }; } } catch (err) { innerError = err; } } finally { try { rt.closeHistoryStream(pusher); worker.shutdown(); } catch { // ignore in case worker was already shutdown } try { await runPromise; } catch (err) { /* eslint-disable no-unsafe-finally */ if (err instanceof ShutdownError) { if (innerError !== undefined) throw innerError; return; } else if (innerError === undefined) { throw err; } else { throw new CombinedWorkerRunError('Worker run failed with inner error', { cause: { workerError: err, innerError, }, }); } /* eslint-enable no-unsafe-finally */ } } } private static validateHistory(history: unknown): History { if (typeof history !== 'object' || history == null) { throw new TypeError(`Expected a non-null history object, got ${typeof history}`); } const { eventId } = (history as any).events[0]; // in a "valid" history, eventId would be Long if (typeof eventId === 'string') { return historyFromJSON(history); } else { return history; } } private static async constructReplayWorker(options: ReplayWorkerOptions): Promise<[Worker, native.HistoryPusher]> { const plugins = options.plugins ?? []; for (const plugin of plugins) { if (plugin.configureReplayWorker !== undefined) { options = plugin.configureReplayWorker(options); } } const nativeWorkerCtor: NativeWorkerConstructor = this.nativeWorkerCtor; const fixedUpOptions: WorkerOptions = { taskQueue: (options.replayName ?? 'fake_replay_queue') + '-' + this.replayWorkerCount, debugMode: true, ...options, }; this.replayWorkerCount++; const runtime = Runtime.instance(); const logger = LoggerWithComposedMetadata.compose(runtime.logger, { sdkComponent: 'worker', taskQueue: fixedUpOptions.taskQueue, }); const metricMeter = runtime.metricMeter.withTags({ namespace: 'default', taskQueue: fixedUpOptions.taskQueue, }); const compiledOptions = compileWorkerOptions(fixedUpOptions, logger, metricMeter); const bundle = await this.getOrCreateBundle(compiledOptions, logger); if (!bundle) { throw new TypeError('ReplayWorkerOptions must contain workflowsPath or workflowBundle'); } const workflowCreator = await this.createWorkflowCreator(bundle, compiledOptions, logger); const replayHandle = await nativeWorkerCtor.createReplay( runtime, addBuildIdIfMissing(compiledOptions, bundle.code) ); return [ new this( runtime, replayHandle.worker, workflowCreator, compiledOptions, logger, metricMeter, plugins, undefined, true ), replayHandle.historyPusher, ]; } protected static async getOrCreateBundle( compiledOptions: CompiledWorkerOptions, logger: Logger ): Promise { if (compiledOptions.workflowBundle) { if (compiledOptions.workflowsPath) { logger.warn('Ignoring WorkerOptions.workflowsPath because WorkerOptions.workflowBundle is set'); } if (compiledOptions.bundlerOptions) { logger.warn('Ignoring WorkerOptions.bundlerOptions because WorkerOptions.workflowBundle is set'); } const modules = new Set(compiledOptions.interceptors.workflowModules); // Warn if user tries to customize the default set of workflow interceptor modules if ( modules && new Set([...modules, ...defaultWorkflowInterceptorModules]).size !== defaultWorkflowInterceptorModules.length ) { logger.warn( 'Ignoring WorkerOptions.interceptors.workflowModules because WorkerOptions.workflowBundle is set.\n' + 'To use workflow interceptors with a workflowBundle, pass them in the call to bundleWorkflowCode.' ); } if (isCodeBundleOption(compiledOptions.workflowBundle)) { return parseWorkflowCode(compiledOptions.workflowBundle.code); } else if (isPathBundleOption(compiledOptions.workflowBundle)) { const code = await fs.readFile(compiledOptions.workflowBundle.codePath, 'utf8'); return parseWorkflowCode(code, compiledOptions.workflowBundle.codePath); } else { throw new TypeError('Invalid WorkflowOptions.workflowBundle'); } } else if (compiledOptions.workflowsPath) { const bundler = new WorkflowCodeBundler({ logger, workflowsPath: compiledOptions.workflowsPath, workflowInterceptorModules: compiledOptions.interceptors.workflowModules, failureConverterPath: compiledOptions.dataConverter?.failureConverterPath, payloadConverterPath: compiledOptions.dataConverter?.payloadConverterPath, ignoreModules: compiledOptions.bundlerOptions?.ignoreModules, preloadModules: compiledOptions.bundlerOptions?.preloadModules, webpackConfigHook: compiledOptions.bundlerOptions?.webpackConfigHook, plugins: compiledOptions.plugins, }); const bundle = await bundler.createBundle(); return parseWorkflowCode(bundle.code); } else { return undefined; } } /** * Create a new Worker from nativeWorker. */ protected constructor( protected readonly runtime: Runtime, protected readonly nativeWorker: NativeWorkerLike, /** * Optional WorkflowCreator - if not provided, Worker will not poll on Workflows */ protected readonly workflowCreator: WorkflowCreator | undefined, public readonly options: CompiledWorkerOptions, /** Logger bound to 'sdkComponent: worker' */ protected readonly logger: Logger, protected readonly metricMeter: MetricMeter, protected readonly plugins: WorkerPlugin[], protected _connection?: NativeConnection, protected readonly isReplayWorker: boolean = false ) { this.workflowCodecRunner = new WorkflowCodecRunner(options.loadedDataConverter.payloadCodecs); } /** * An Observable which emits each time the number of in flight activations changes */ public get numInFlightActivations$(): Observable { return this.numInFlightActivationsSubject; } /** * An Observable which emits each time the number of in flight Activity tasks changes */ public get numInFlightActivities$(): Observable { return this.numInFlightActivitiesSubject; } /** * An Observable which emits each time the number of cached workflows changes */ public get numRunningWorkflowInstances$(): Observable { return this.numCachedWorkflowsSubject; } /** * Get the poll state of this worker */ public getState(): State { // Setters and getters require the same visibility, add this public getter function return this.stateSubject.getValue(); } /** * Get a status overview of this Worker */ public getStatus(): WorkerStatus { return { runState: this.state, numHeartbeatingActivities: this.numHeartbeatingActivitiesSubject.value, workflowPollerState: this.workflowPollerStateSubject.value, activityPollerState: this.activityPollerStateSubject.value, hasOutstandingWorkflowPoll: this.hasOutstandingWorkflowPoll, hasOutstandingActivityPoll: this.hasOutstandingActivityPoll, numCachedWorkflows: this.numCachedWorkflowsSubject.value, numInFlightWorkflowActivations: this.numInFlightActivationsSubject.value, numInFlightActivities: this.numInFlightActivitiesSubject.value, numInFlightNonLocalActivities: this.numInFlightNonLocalActivitiesSubject.value, numInFlightLocalActivities: this.numInFlightLocalActivitiesSubject.value, }; } /** * Get the client associated with this Worker. * * Returns undefined if the worker was created without a connection (e.g., replay workers). */ public get client(): Client | undefined { // Lazily create the client if it's not already set. Note that _connection // (and consequently _client) will be set IIF this is not a replay worker. if (this._client == null && this._connection != null) { this._client = new Client({ namespace: this.options.namespace, connection: this._connection, identity: this.options.identity, dataConverter: this.options.dataConverter, interceptors: this.options.interceptors.client, }); } return this._client; } /** * Get the connection associated with this Worker. * Returns undefined if the worker was created without a connection (e.g., replay workers). */ public get connection(): NativeConnection | undefined { return this._connection; } /** * Replace the connection used by this Worker. * This allows the worker to switch to a different Temporal server or update connection configuration * without restarting the worker. * * @remarks * The worker must have been created with a connection for this method to work. * Subsequent calls by the worker to Temporal (e.g., responding to tasks) will use the new connection. * A new client will be created automatically from the provided connection. * * @param newConnection - The new NativeConnection to use * @throws {IllegalStateError} If the worker was created without a connection * @throws {Error} If the connection replacement fails */ public set connection(newConnection: NativeConnection) { if (!this._connection) throw new IllegalStateError('Cannot replace connection on a worker without a connection'); if (extractNativeClient(this._connection) === extractNativeClient(newConnection)) return; const previousConnection = this._connection; // Call the native bridge to replace the client this.nativeWorker.replaceClient(extractNativeClient(newConnection)); extractReferenceHolders(previousConnection).delete(this.nativeWorker); this._connection = newConnection; extractReferenceHolders(newConnection).add(this.nativeWorker); // Clear up the cached client, if any. It will be lazily recreated on demand. this._client = undefined; if (previousConnection instanceof InternalNativeConnection) { previousConnection.close().catch((err) => { this.logger.error('Error closing previous connection after replacement of worker connection', { err }); }); } } protected get state(): State { return this.stateSubject.getValue(); } protected set state(state: State) { this.logger.info('Worker state changed', { state }); this.stateSubject.next(state); } /** * Start shutting down the Worker. The Worker stops polling for new tasks and sends * {@link https://typescript.temporal.io/api/namespaces/activity#cancellation | cancellation} * (via a {@link CancelledFailure} with `message` set to `'WORKER_SHUTDOWN'`) to running Activities. * Note: if the Activity accepts cancellation (i.e. re-throws or allows the `CancelledFailure` * to be thrown out of the Activity function), the Activity Task will be marked as failed, not * cancelled. It's helpful for the Activity Task to be marked failed during shutdown because the * Server will retry the Activity sooner (than if the Server had to wait for the Activity Task * to time out). * * When called, immediately transitions {@link state} to `'STOPPING'` and asks Core to shut down. * Once Core has confirmed that it's shutting down, the Worker enters `'DRAINING'` state. It will * stay in that state until both task pollers receive a `ShutdownError`, at which point we'll * transition to `DRAINED` state. Once all currently running Activities and Workflow Tasks have * completed, the Worker transitions to `'STOPPED'`. */ shutdown(): void { if (this.state !== 'RUNNING') { throw new IllegalStateError(`Not running. Current state: ${this.state}`); } this.state = 'STOPPING'; try { this.nativeWorker.initiateShutdown(); this.state = 'DRAINING'; } catch (error) { // This is totally unexpected, and indicates there's something horribly wrong with the Worker // state. Attempt to shutdown gracefully will very likely hang, so just terminate immediately. this.logger.error('Failed to initiate shutdown', { error }); this.instantTerminateErrorSubject.error(error); } } /** * An observable that completes when {@link state} becomes `'DRAINED'` or throws if {@link state} transitions to * `'STOPPING'` and remains that way for {@link this.options.shutdownForceTimeMs}. */ protected forceShutdown$(): Observable { if (this.options.shutdownForceTimeMs == null) { return EMPTY; } return race( this.stateSubject.pipe( filter((state): state is 'STOPPING' => state === 'STOPPING'), delay(this.options.shutdownForceTimeMs), tap({ next: () => { // Inject the error into the instantTerminateError subject so that we don't mask // any error that might have caused the Worker to shutdown in the first place. this.logger.debug('Shutdown force time expired, terminating worker'); this.instantTerminateErrorSubject.error( new GracefulShutdownPeriodExpiredError('Timed out while waiting for worker to shutdown gracefully') ); }, }) ), this.stateSubject.pipe( filter((state) => state === 'DRAINED'), first() ) ).pipe(ignoreElements()); } /** * An observable which repeatedly polls for new tasks unless worker becomes suspended. * The observable stops emitting once core is shutting down. */ protected pollLoop$(pollFn: () => Promise): Observable { return from( (async function* () { for (;;) { try { yield await pollFn(); } catch (err) { if (err instanceof ShutdownError) { break; } throw err; } } })() ); } /** * Process Activity tasks */ protected activityOperator(): OperatorFunction { return pipe( closeableGroupBy(({ base64TaskToken }) => base64TaskToken), mergeMap((group$) => { return group$.pipe( mergeMapWithState( async (activity: Activity | undefined, { task, base64TaskToken, protobufEncodedTask }) => { const { taskToken, variant } = task; if (!variant) { throw new TypeError('Got an activity task without a "variant" attribute'); } // We either want to return an activity result (for failures) or pass on the activity for running at a later stage // If cancel is requested we ignore the result of this function // We don't run the activity directly in this operator because we need to return the activity in the state // so it can be cancelled if requested let output: | { type: 'result'; result: coresdk.activity_result.IActivityExecutionResult; } | { type: 'run'; activity: Activity; input: ActivityExecuteInput; } | { type: 'ignore' }; switch (variant) { case 'start': { let info: ActivityInfo | undefined = undefined; try { if (activity !== undefined) { throw new IllegalStateError( `Got start event for an already running activity: ${base64TaskToken}` ); } info = await extractActivityInfo( task, this.options.loadedDataConverter, this.options.namespace, this.options.taskQueue ); const { activityType } = info; // Use the corresponding activity if it exists, otherwise, fallback to default activity function (if exists) const fn = this.options.activities.get(activityType) ?? this.options.activities.get('default'); if (typeof fn !== 'function') { throw ApplicationFailure.create({ type: 'NotFoundError', message: `Activity function ${activityType} is not registered on this Worker, available activities: ${JSON.stringify( [...this.options.activities.keys()] )}`, nonRetryable: false, }); } let args: unknown[]; try { args = await decodeArrayFromPayloads(this.options.loadedDataConverter, task.start?.input); } catch (err) { throw ApplicationFailure.fromError(err, { message: `Failed to parse activity args for activity ${activityType}: ${errorMessage(err)}`, nonRetryable: false, }); } const headers = task.start?.headerFields ?? {}; const input = { args, headers, }; this.logger.trace('Starting activity', activityLogAttributes(info)); activity = new Activity( info, fn, this.options.loadedDataConverter, (details) => this.activityHeartbeatSubject.next({ type: 'heartbeat', info: info!, taskToken, base64TaskToken, details, onError() { // activity must be defined // empty cancellation details, no corresponding detail for heartbeat detail conversion failure activity?.cancel( 'HEARTBEAT_DETAILS_CONVERSION_FAILED', ActivityCancellationDetails.fromProto(undefined) ); }, }), this.client, this.logger, this.metricMeter, this.options.interceptors.activity ); output = { type: 'run', activity, input }; break; } catch (e) { const error = ensureApplicationFailure(e); this.logger.error(`Error while processing ActivityTask.start: ${errorMessage(error)}`, { ...(info ? activityLogAttributes(info) : {}), error: e, task: JSON.stringify(task.toJSON()), taskEncoded: Buffer.from(protobufEncodedTask).toString('base64'), }); output = { type: 'result', result: { failed: { failure: await encodeErrorToFailure(this.options.loadedDataConverter, error), }, }, }; break; } } case 'cancel': { output = { type: 'ignore' }; if (activity === undefined) { this.logger.trace('Tried to cancel a non-existing activity', { taskToken: base64TaskToken, }); break; } // NOTE: activity will not be considered cancelled until it confirms cancellation (by throwing a CancelledFailure) this.logger.trace('Cancelling activity', activityLogAttributes(activity.info)); const reason = task.cancel?.reason; const cancellationDetails = task.cancel?.details; if (reason === undefined || reason === null) { // Special case of Lang side cancellation during shutdown (see `activity.shutdown.evict` above) activity.cancel('WORKER_SHUTDOWN', ActivityCancellationDetails.fromProto(cancellationDetails)); } else { activity.cancel( coresdk.activity_task.ActivityCancelReason[reason] as CancelReason, ActivityCancellationDetails.fromProto(cancellationDetails) ); } break; } } return { state: activity, output: { taskToken, output } }; }, undefined // initial value ), mergeMap(async ({ output, taskToken }) => { if (output.type === 'ignore') { return undefined; } if (output.type === 'result') { return { taskToken, result: output.result }; } const { base64TaskToken } = output.activity.info; this.activityHeartbeatSubject.next({ type: 'create', base64TaskToken, }); let result; const numInFlightBreakdownSubject = output.activity.info.isLocal ? this.numInFlightLocalActivitiesSubject : this.numInFlightNonLocalActivitiesSubject; this.numInFlightActivitiesSubject.next(this.numInFlightActivitiesSubject.value + 1); numInFlightBreakdownSubject.next(numInFlightBreakdownSubject.value + 1); try { result = await output.activity.run(output.input); } finally { numInFlightBreakdownSubject.next(numInFlightBreakdownSubject.value - 1); this.numInFlightActivitiesSubject.next(this.numInFlightActivitiesSubject.value - 1); } const status = result.failed ? 'failed' : result.completed ? 'completed' : 'cancelled'; if (status === 'failed') { // Make sure to flush the last heartbeat this.logger.trace('Activity failed, waiting for heartbeats to be flushed', { ...activityLogAttributes(output.activity.info), status, }); await new Promise((resolve) => { this.activityHeartbeatSubject.next({ type: 'completion', flushRequired: true, base64TaskToken, callback: resolve, }); }); } else { // Notify the Activity heartbeat state mapper that the Activity has completed this.activityHeartbeatSubject.next({ type: 'completion', flushRequired: false, base64TaskToken, callback: () => undefined, }); } this.logger.trace('Activity resolved', { ...activityLogAttributes(output.activity.info), status, }); return { taskToken, result }; }), filter((result: T): result is Exclude => result !== undefined), map((rest) => coresdk.ActivityTaskCompletion.encodeDelimited(rest).finish()), tap({ next: () => { group$.close(); }, }) ); }) ); } /** * Process Nexus tasks */ protected nexusOperator(): OperatorFunction { return pipe( mergeMap( async ({ task, base64TaskToken, protobufEncodedTask, }): Promise => { const { variant } = task; if (!variant) { throw new TypeError('Got a nexus task without a "variant" attribute'); } switch (variant) { case 'task': { if (task.task == null) { throw new IllegalStateError(`Got empty task for task variant with token: ${base64TaskToken}`); } const requestDeadline = task.requestDeadline != null ? tsToDate(task.requestDeadline) : undefined; return await this.handleNexusRunTask(task.task, base64TaskToken, protobufEncodedTask, requestDeadline); } case 'cancelTask': { const nexusHandler = this.taskTokenToNexusHandler.get(base64TaskToken); if (nexusHandler == null) { this.logger.trace('Tried to cancel a non-existing Nexus handler', { taskToken: base64TaskToken, }); break; } // NOTE: Nexus handler will not be considered cancelled until it confirms cancellation (by throwing a CancelledFailure) this.logger.trace('Cancelling Nexus handler', nexusHandler.getLogAttributes()); let reason = 'unkown'; if (task.cancelTask?.reason != null) { reason = coresdk.nexus.NexusTaskCancelReason[task.cancelTask.reason]; } nexusHandler.abortController.abort(new CancelledFailure(reason)); return; } } } ), filter((result: T): result is Exclude => result !== undefined), map((result) => coresdk.nexus.NexusTaskCompletion.encodeDelimited(result).finish()) ); } private async handleNexusRunTask( task: temporal.api.workflowservice.v1.IPollNexusTaskQueueResponse, base64TaskToken: string, protobufEncodedTask: ArrayBuffer, requestDeadline: Date | undefined ) { const { taskToken } = task; if (taskToken == null) { throw new nexus.HandlerError('INTERNAL', 'Task missing request task token'); } let nexusHandler: NexusHandler | undefined = undefined; try { const abortController = new AbortController(); nexusHandler = new NexusHandler( taskToken, this.options.namespace, this.options.taskQueue, constructNexusOperationContext(task.request, abortController.signal, requestDeadline), this.client!, // Must be defined if we are handling Nexus tasks. abortController, this.options.nexusServiceHandlers!, // Must be defined if we are handling Nexus tasks. this.options.loadedDataConverter, this.logger, this.metricMeter, this.options.interceptors.nexus ); this.taskTokenToNexusHandler.set(base64TaskToken, nexusHandler); this.numInFlightNexusOperationsSubject.next(this.numInFlightNexusOperationsSubject.value + 1); try { return await nexusHandler.run(task); } finally { this.numInFlightNexusOperationsSubject.next(this.numInFlightNexusOperationsSubject.value - 1); this.taskTokenToNexusHandler.delete(base64TaskToken); } } catch (e) { this.logger.error(`Error while processing Nexus task: ${errorMessage(e)}`, { ...(nexusHandler?.getLogAttributes() ?? {}), error: e, taskEncoded: Buffer.from(protobufEncodedTask).toString('base64'), }); const handlerError = e instanceof nexus.HandlerError ? e : new nexus.HandlerError('INTERNAL', undefined, { cause: e }); return { taskToken, failure: await handlerErrorToProto(this.options.loadedDataConverter, handlerError), }; } } /** * Process activations from the same workflow execution to an observable of completions. * * Injects a synthetic eviction activation when the worker transitions to no longer polling. */ protected handleWorkflowActivations( activations$: CloseableGroupedObservable ): Observable { const syntheticEvictionActivations$ = this.workflowPollerStateSubject.pipe( // Core has indicated that it will not return any more poll results; evict all cached WFs. filter((state) => state !== 'POLLING'), first(), map(() => ({ activation: coresdk.workflow_activation.WorkflowActivation.create({ runId: activations$.key, jobs: [{ removeFromCache: Worker.SELF_INDUCED_SHUTDOWN_EVICTION }], }), synthetic: true, })), takeUntil(activations$.pipe(last(undefined, null))) ); const activations$$ = activations$.pipe(map((activation) => ({ activation, synthetic: false }))); return merge(activations$$, syntheticEvictionActivations$).pipe( tap(() => { this.numInFlightActivationsSubject.next(this.numInFlightActivationsSubject.value + 1); }), mergeMapWithState(this.handleActivation.bind(this), undefined), tap(({ close }) => { this.numInFlightActivationsSubject.next(this.numInFlightActivationsSubject.value - 1); if (close) { activations$.close(); this.numCachedWorkflowsSubject.next(this.numCachedWorkflowsSubject.value - 1); } }), takeWhile(({ close }) => !close, true /* inclusive */), map(({ completion }) => completion), filter((result): result is Uint8Array => result !== undefined) ); } /** * Process a single activation to a completion. */ protected async handleActivation( workflow: WorkflowWithLogAttributes | undefined, { activation, synthetic }: { activation: coresdk.workflow_activation.WorkflowActivation; synthetic: boolean } ): Promise<{ state: WorkflowWithLogAttributes | undefined; output: { completion?: Uint8Array; close: boolean }; }> { try { const removeFromCacheIx = activation.jobs.findIndex(({ removeFromCache }) => removeFromCache); const close = removeFromCacheIx !== -1; const jobs = activation.jobs; if (close) { const asEvictJob = jobs.splice(removeFromCacheIx, 1)[0]!.removeFromCache; if (asEvictJob) { this.evictionsEmitter.emit('eviction', { runId: activation.runId, evictJob: asEvictJob, } as EvictionWithRunID); } } activation.jobs = jobs; if (jobs.length === 0) { this.logger.trace('Disposing workflow', workflow ? workflow.logAttributes : { runId: activation.runId }); await workflow?.workflow.dispose(); if (!close) { throw new IllegalStateError('Got a Workflow activation with no jobs'); } const completion = synthetic ? undefined : coresdk.workflow_completion.WorkflowActivationCompletion.encodeDelimited({ runId: activation.runId, successful: {}, }).finish(); return { state: undefined, output: { close, completion } }; } const decodedActivation = await this.workflowCodecRunner.decodeActivation(activation); if (workflow === undefined) { const initWorkflowDetails = decodedActivation.jobs[0]?.initializeWorkflow; if (initWorkflowDetails == null) throw new IllegalStateError( 'Received workflow activation for an untracked workflow with no init workflow job' ); workflow = await this.createWorkflow(decodedActivation, initWorkflowDetails); } let isFatalError = false; try { const unencodedCompletion = await workflow.workflow.activate(decodedActivation); const completion = await this.workflowCodecRunner.encodeCompletion(unencodedCompletion); return { state: workflow, output: { close, completion } }; } catch (err) { if (err instanceof UnexpectedError) { isFatalError = true; } throw err; } finally { // Fatal error means we cannot call into this workflow again unfortunately if (!isFatalError) { // When processing workflows through runReplayHistories, Core may still send non-replay // activations on the very last Workflow Task in some cases. Though Core is technically exact // here, the fact that sinks marked with callDuringReplay = false may get called on a replay // worker is definitely a surprising behavior. For that reason, processSinkCalls checks // this.isReplayWorker to suppress all non-callDuringReplay sinks on replay workers. const calls = await workflow.workflow.getAndResetSinkCalls(); await this.processSinkCalls(calls, workflow.logAttributes); } this.logger.trace('Completed activation', workflow.logAttributes); } } catch (error) { let logMessage = 'Failed to process Workflow Activation'; if (error instanceof UnexpectedError) { // Something went wrong in the workflow; we'll do our best to shut the Worker // down gracefully, but then we'll need to terminate the Worker ASAP. logMessage = 'An unexpected error occured while processing Workflow Activation. Initiating Worker shutdown.'; this.unexpectedErrorSubject.error(error); } this.logger.error(logMessage, { runId: activation.runId, ...workflow?.logAttributes, error, workflowExists: workflow !== undefined, }); const completion = coresdk.workflow_completion.WorkflowActivationCompletion.encodeDelimited({ runId: activation.runId, failed: { failure: await encodeErrorToFailure(this.options.loadedDataConverter, error), }, }).finish(); // We do not dispose of the Workflow yet, wait to be evicted from Core. // This is done to simplify the Workflow lifecycle so Core is the sole driver. return { state: undefined, output: { close: true, completion } }; } } protected async createWorkflow( activation: Decoded, initWorkflowJob: Decoded ): Promise { const workflowCreator = this.workflowCreator!; if ( !( initWorkflowJob.workflowId != null && initWorkflowJob.workflowType != null && initWorkflowJob.randomnessSeed != null && initWorkflowJob.firstExecutionRunId != null && initWorkflowJob.attempt != null && initWorkflowJob.startTime != null ) ) { throw new TypeError(`Malformed InitializeWorkflow activation: ${JSON.stringify(initWorkflowJob)}`); } if (activation.timestamp == null) { throw new TypeError('Got activation with no timestamp, cannot create a new Workflow instance'); } const { workflowId, randomnessSeed, workflowType, parentWorkflowInfo, rootWorkflow, workflowExecutionTimeout, workflowRunTimeout, workflowTaskTimeout, continuedFromExecutionRunId, firstExecutionRunId, retryPolicy, attempt, cronSchedule, workflowExecutionExpirationTime, cronScheduleToScheduleInterval, priority, } = initWorkflowJob; // Note that we can't do payload conversion here, as there's no guarantee that converted payloads would be safe to // transfer through the V8 message port. Those will therefore be set in the Activator's initializeWorkflow job handler. const workflowInfo: WorkflowInfo = { workflowId, runId: activation.runId, workflowType, searchAttributes: {}, typedSearchAttributes: new TypedSearchAttributes(), parent: convertToParentWorkflowType(parentWorkflowInfo), root: convertToRootWorkflowType(rootWorkflow), taskQueue: this.options.taskQueue, namespace: this.options.namespace, firstExecutionRunId, continuedFromExecutionRunId: continuedFromExecutionRunId || undefined, startTime: tsToDate(initWorkflowJob.startTime), runStartTime: tsToDate(activation.timestamp), executionTimeoutMs: optionalTsToMs(workflowExecutionTimeout), executionExpirationTime: optionalTsToDate(workflowExecutionExpirationTime), runTimeoutMs: optionalTsToMs(workflowRunTimeout), taskTimeoutMs: requiredTsToMs(workflowTaskTimeout, 'workflowTaskTimeout'), retryPolicy: decompileRetryPolicy(retryPolicy), attempt, cronSchedule: cronSchedule || undefined, // 0 is the default, and not a valid value, since crons are at least a minute apart cronScheduleToScheduleInterval: optionalTsToMs(cronScheduleToScheduleInterval) || undefined, historyLength: activation.historyLength, // Exact truncation for multi-petabyte histories // A zero value means that it was not set by the server historySize: activation.historySizeBytes.toNumber(), continueAsNewSuggested: activation.continueAsNewSuggested, targetWorkerDeploymentVersionChanged: activation.targetWorkerDeploymentVersionChanged ?? false, suggestedContinueAsNewReasons: suggestContinueAsNewReasonsFromProto(activation.suggestContinueAsNewReasons), currentBuildId: activation.deploymentVersionForCurrentTask?.buildId ?? '', currentDeploymentVersion: convertDeploymentVersion(activation.deploymentVersionForCurrentTask), unsafe: { now: () => Date.now(), // re-set in initRuntime isReplaying: activation.isReplaying, isReplayingHistoryEvents: activation.isReplaying, }, priority: decodePriority(priority), }; const logAttributes = workflowLogAttributes(workflowInfo); this.logger.trace('Creating workflow', logAttributes); const workflow = await workflowCreator.createWorkflow({ info: workflowInfo, randomnessSeed: randomnessSeed.toBytes(), now: tsToMs(activation.timestamp), showStackTraceSources: this.options.showStackTraceSources, }); this.numCachedWorkflowsSubject.next(this.numCachedWorkflowsSubject.value + 1); return { workflow, logAttributes }; } /** * Process extracted external calls from Workflow post activation. * * Each SinkCall is translated into a injected sink function call. * * This function does not throw, it will log in case of missing sinks * or failed sink function invocations. */ protected async processSinkCalls(externalCalls: SinkCall[], logAttributes: Record): Promise { const { sinks } = this.options; const filteredCalls = externalCalls // Fix depreacted usage of the 'defaultWorkerLogger' sink .map((call) => (call.ifaceName === 'defaultWorkerLogger' ? { ...call, ifaceName: '__temporal_logger' } : call)) // Map sink call to the corresponding sink function definition .map((call) => ({ call, sink: sinks?.[call.ifaceName]?.[call.fnName] })) // Reject calls to undefined sink definitions .filter(({ call: { ifaceName, fnName }, sink }) => { if (sink !== undefined) return true; this.logger.error('Workflow referenced an unregistered external sink', { ...logAttributes, ifaceName, fnName, }); return false; }) // If appropriate, reject calls to sink functions not configured with `callDuringReplay = true`. // Use per-call isReplayingHistoryEvents (which is false during queries and update validators) // rather than per-activation isReplaying, so that logging is permitted during live read-only operations. // Replay workers still suppress all non-callDuringReplay sinks regardless. .filter(({ call, sink }) => { if (sink?.callDuringReplay) return true; if (this.isReplayWorker) return false; return !call.workflowInfo.unsafe.isReplayingHistoryEvents; }); // Make a wrapper function, to make things easier afterward await Promise.all( filteredCalls.map(async ({ call, sink }) => { try { await sink?.fn(call.workflowInfo, ...call.args); } catch (error) { this.logger.error('External sink function threw an error', { ...logAttributes, ifaceName: call.ifaceName, fnName: call.fnName, error, workflowInfo: call.workflowInfo, }); } }) ); } /** * Listen on heartbeats emitted from activities and send them to core. * Errors from core responses are translated to cancellation requests and fed back via the activityFeedbackSubject. */ protected activityHeartbeat$(): Observable { function process(state: HeartbeatState, heartbeat: Heartbeat): HeartbeatStateAndOutput { return { state: { ...state, processing: true, pending: undefined }, output: { type: 'send', heartbeat }, }; } function storePending(state: HeartbeatState, heartbeat: Heartbeat): HeartbeatStateAndOutput { return { state: { ...state, pending: heartbeat }, output: null }; } function complete(callback: () => void): HeartbeatStateAndOutput { return { state: { pending: undefined, completionCallback: undefined, processing: false, closed: true, }, output: { type: 'close', completionCallback: callback }, }; } return this.activityHeartbeatSubject.pipe( // The only way for this observable to be closed is by state changing to DRAINED meaning that all in-flight activities have been resolved and thus there should not be any heartbeats to send. this.takeUntilState('DRAINED'), tap({ complete: () => this.logger.debug('Heartbeats complete'), }), closeableGroupBy(({ base64TaskToken }) => base64TaskToken), mergeMap((group$) => group$.pipe( mapWithState( (state: HeartbeatState, input: HeartbeatInput): HeartbeatStateAndOutput => { if (input.type === 'create') { this.numHeartbeatingActivitiesSubject.next(this.numHeartbeatingActivitiesSubject.value + 1); return { state: { processing: false, closed: false }, output: null, }; } // Ignore any input if we've marked this activity heartbeat stream as closed // (rogue heartbeat) if (state.closed) return { state, output: { type: 'close' } }; switch (input.type) { case 'heartbeat': this.logger.trace('Got activity heartbeat', activityLogAttributes(input.info)); if (state.processing) { // We're already processing a heartbeat, mark this one as pending return storePending(state, input); } else { // Ready to send heartbeat, mark that we're now processing return process(state, input); } case 'flush': if (state.pending) { // Send pending heartbeat return process(state, state.pending); } else if (state.completionCallback) { // We were asked to complete, fulfill that request return complete(state.completionCallback); } else { // Nothing to do, wait for completion or heartbeat return { state: { ...state, processing: false }, output: null, }; } case 'completion': if (state.processing) { // Store the completion callback until heartbeat has been flushed return { state: { ...state, // If flush isn't required, delete any pending heartbeat pending: input.flushRequired ? state.pending : undefined, completionCallback: input.callback, }, output: null, }; } else if (!input.flushRequired) { return complete(input.callback); } else { if (state.pending) { // Flush the final heartbeat and store the completion callback return process({ ...state, completionCallback: input.callback }, state.pending); } else { // Nothing to flush, complete and call back return complete(input.callback); } } } }, // Start `closed`, wait for a `create` input to open the stream. // This prevents rogue activities from heartbeating and keeping // this stream open. { processing: false, closed: true } ), filter((out): out is HeartbeatOutput => out != null), tap((out) => { if (out.type === 'close') { this.numHeartbeatingActivitiesSubject.next(this.numHeartbeatingActivitiesSubject.value - 1); out.completionCallback?.(); } }), takeWhile((out): out is HeartbeatSendRequest => out.type !== 'close'), mergeMap(async ({ heartbeat: { base64TaskToken, taskToken, details, onError, info } }) => { let payload: Payload; try { try { payload = await encodeToPayload(this.options.loadedDataConverter, details); } catch (error: any) { this.logger.warn('Failed to encode heartbeat details, cancelling Activity', { error, ...activityLogAttributes(info), }); onError(); return; } const arr = coresdk.ActivityHeartbeat.encodeDelimited({ taskToken, details: [payload], }).finish(); this.nativeWorker.recordActivityHeartbeat(byteArrayToBuffer(arr)); } finally { this.activityHeartbeatSubject.next({ type: 'flush', base64TaskToken, }); } }), tap({ complete: group$.close }) ) ) ); } /** * Poll core for `WorkflowActivation`s while respecting worker state. */ protected workflowPoll$(): Observable { return this.pollLoop$(async () => { this.hasOutstandingWorkflowPoll = true; let buffer: Buffer; try { buffer = await this.nativeWorker.pollWorkflowActivation(); } finally { this.hasOutstandingWorkflowPoll = false; } const activation = coresdk.workflow_activation.WorkflowActivation.decode(new Uint8Array(buffer)); this.logger.trace('Got workflow activation', activation); return activation; }).pipe( tap({ complete: () => { this.logger.trace('Workflow Poller changed state to SHUTDOWN'); this.workflowPollerStateSubject.next('SHUTDOWN'); }, error: () => { this.logger.trace('Workflow Poller changed state to FAILED'); this.workflowPollerStateSubject.next('FAILED'); }, }) ); } /** * Poll for Workflow activations, handle them, and report completions. */ protected workflow$(): Observable { // This Worker did not register any workflows, return early if (this.workflowCreator === undefined) { this.logger.warn('No workflows registered, not polling for workflow tasks'); this.workflowPollerStateSubject.next('SHUTDOWN'); return EMPTY; } return this.workflowPoll$().pipe( closeableGroupBy((activation) => activation.runId), mergeMap(this.handleWorkflowActivations.bind(this)), mergeMap(async (completion) => { try { await this.nativeWorker.completeWorkflowActivation(Buffer.from(completion, completion.byteOffset)); } catch (error) { this.logger.error('Core reported failure in completeWorkflowActivation(). Initiating Worker shutdown.', { error, }); this.unexpectedErrorSubject.error(error); } }), tap({ complete: () => { this.logger.debug('Workflow Worker terminated'); }, }) ); } /** * Poll core for `ActivityTask`s while respecting worker state */ protected activityPoll$(): Observable { return this.pollLoop$(async () => { this.hasOutstandingActivityPoll = true; let buffer: Buffer; try { buffer = await this.nativeWorker.pollActivityTask(); } finally { this.hasOutstandingActivityPoll = false; } const task = coresdk.activity_task.ActivityTask.decode(new Uint8Array(buffer)); const { taskToken, ...rest } = task; const base64TaskToken = formatTaskToken(taskToken); this.logger.trace('Got activity task', { taskToken: base64TaskToken, ...rest, }); const { variant } = task; if (variant === undefined) { throw new TypeError('Got an activity task without a "variant" attribute'); } return { task, base64TaskToken, protobufEncodedTask: buffer }; }).pipe( tap({ complete: () => { this.activityPollerStateSubject.next('SHUTDOWN'); }, error: () => { this.activityPollerStateSubject.next('FAILED'); }, }) ); } protected activity$(): Observable { // This Worker did not register any activities, return early if (!this.options.activities?.size) { if (!this.isReplayWorker) this.logger.warn('No activities registered, not polling for activity tasks'); this.activityPollerStateSubject.next('SHUTDOWN'); return EMPTY; } return this.activityPoll$().pipe( this.activityOperator(), mergeMap(async (completion) => { await this.nativeWorker.completeActivityTask(Buffer.from(completion, completion.byteOffset)); }), tap({ complete: () => this.logger.debug('Activity Worker terminated') }) ); } protected nexusPoll$(): Observable { return this.pollLoop$(async () => { this.hasOutstandingNexusPoll = true; let buffer: Buffer; try { buffer = await this.nativeWorker.pollNexusTask(); } finally { this.hasOutstandingNexusPoll = false; } const task = coresdk.nexus.NexusTask.decode(new Uint8Array(buffer)); const taskToken = task.task?.taskToken || task.cancelTask?.taskToken; if (taskToken == null) { throw new TypeError('Got a Nexus task without a task token'); } const base64TaskToken = formatTaskToken(taskToken); this.logger.trace('Got Nexus task', { taskToken: base64TaskToken, ...task, }); return { task, base64TaskToken, protobufEncodedTask: buffer }; }).pipe( tap({ complete: () => { this.nexusPollerStateSubject.next('SHUTDOWN'); }, error: () => { this.nexusPollerStateSubject.next('FAILED'); }, }) ); } protected nexus$(): Observable { // This Worker did not register any Nexus services, return early. if (this.options.nexusServiceHandlers == null) { if (!this.isReplayWorker) this.logger.info('No Nexus services registered, not polling for Nexus tasks'); this.nexusPollerStateSubject.next('SHUTDOWN'); return EMPTY; } return this.nexusPoll$().pipe( this.nexusOperator(), mergeMap(async (completion) => { await this.nativeWorker.completeNexusTask(Buffer.from(completion.buffer, completion.byteOffset)); }), tap({ complete: () => this.logger.debug('Nexus Worker terminated') }) ); } protected takeUntilState(state: State): MonoTypeOperatorFunction { return takeUntil(this.stateSubject.pipe(filter((value) => value === state))); } /** * Run the Worker until `fnOrPromise` completes, then {@link shutdown} and wait for {@link run} to complete. * * Be aware that the Worker may shutdown for reasons other than the completion of the provided promise, * e.g. due to the process receiving a SIGINT signal, direct call to `Worker.shutdown()`, or a critical * error that imposes a shutdown of the Worker. * * Throws on fatal Worker errors. * * **SDK versions `>=1.11.3`**: * If the worker shuts down before the inner promise completes, allow no more than * {@link RunUntilOptions.promiseCompletionTimeout} for the inner promise to complete, * after which a {@link PromiseCompletionTimeoutError} is thrown. * * **SDK versions `>=1.5.0`**: * This method always waits for both worker shutdown and inner `fnOrPromise` to complete. * If one of worker run -or- the inner promise throw an error, that error is rethrown. * If both throw an error, a {@link CombinedWorkerRunError} with a `cause` attribute containing both errors. * * **SDK versions `< 1.5.0`**: * This method would not wait for worker to complete shutdown if the inner `fnOrPromise` threw an error. * * @returns the result of `fnOrPromise` */ async runUntil(fnOrPromise: Promise | (() => Promise), options?: RunUntilOptions): Promise { const workerRunPromise = this.run(); let innerResult: PromiseSettledResult | undefined; const innerPromise = (async () => { try { const result = await (typeof fnOrPromise === 'function' ? fnOrPromise() : fnOrPromise); innerResult = { status: 'fulfilled', value: result }; } catch (err) { innerResult = { status: 'rejected', reason: err }; } finally { if (this.state === 'RUNNING') { this.shutdown(); } } })(); let workerError: Error | undefined; try { await workerRunPromise; } catch (err) { workerError = err as Error; } // Allow some extra time for the provided promise to resolve, if it hasn't already const promiseCompletionTimeoutMs = msToNumber(options?.promiseCompletionTimeout ?? 0); if (innerResult === undefined && promiseCompletionTimeoutMs > 0) { await timeoutPromise(innerPromise, promiseCompletionTimeoutMs); } if (innerResult === undefined) { innerResult = { status: 'rejected', reason: new PromiseCompletionTimeoutError( `Promise did not resolve within ${promiseCompletionTimeoutMs}ms after Worker completed shutdown` ), }; } if (workerError === undefined) { if (innerResult.status === 'fulfilled') { return innerResult.value; } else { throw innerResult.reason; } } else { if (innerResult?.status === 'fulfilled') { throw workerError; } else { throw new CombinedWorkerRunError('Worker terminated with fatal error in `runUntil`', { cause: { workerError, innerError: innerResult.reason, }, }); } } } /** * Start polling on the Task Queue for tasks. Completes after graceful {@link shutdown}, once the Worker reaches the * `'STOPPED'` state. * * Throws on a fatal error or failure to shutdown gracefully. * * @see {@link errors} * * To stop polling, call {@link shutdown} or send one of {@link Runtime.options.shutdownSignals}. */ async run(): Promise { if (this.isReplayWorker) { return this.runInternal(); } let runWorker = (w: Worker) => w.runInternal(); for (let i = this.plugins.length - 1; i >= 0; --i) { const rw = runWorker; const plugin = this.plugins[i]; if (plugin?.runWorker !== undefined) { runWorker = (w: Worker) => plugin.runWorker!(w, rw); } } return runWorker(this); } private async runInternal(): Promise { if (this.state !== 'INITIALIZED') { throw new IllegalStateError('Poller was already started'); } this.state = 'RUNNING'; const shutdownCallback = () => { if (this.state === 'RUNNING') this.shutdown(); }; this.runtime.registerShutdownSignalCallback(shutdownCallback); let fatalError: Error | undefined; const unexpectedErrorSubscription = this.unexpectedErrorSubject.subscribe({ error: (e: Error) => { if (this.state === 'RUNNING') this.shutdown(); if (fatalError === undefined) { fatalError = e; } }, }); try { try { await lastValueFrom( merge( this.instantTerminateErrorSubject.pipe(this.takeUntilState('DRAINED')), this.forceShutdown$(), this.activityHeartbeat$(), merge(this.workflow$(), this.activity$(), this.nexus$()).pipe( tap({ complete: () => { this.state = 'DRAINED'; }, }) ) ).pipe( // Reinject fatalError inside the stream, if any, // with precedence over the error from the pipe tap({ complete: () => { if (fatalError) throw fatalError; }, error: () => { if (fatalError) throw fatalError; }, }), tap({ complete: () => { this.state = 'STOPPED'; }, error: (error) => { this.logger.error('Worker failed', { error }); this.state = 'FAILED'; }, }) ), { defaultValue: undefined } ); } finally { this.runtime.deregisterShutdownSignalCallback(shutdownCallback); await this.nativeWorker.finalizeShutdown(); } } finally { unexpectedErrorSubscription.unsubscribe(); try { // Only exists in non-replay Worker if (this._connection) { extractReferenceHolders(this._connection).delete(this.nativeWorker); // Only close if this worker is the creator of the connection if (this._connection instanceof InternalNativeConnection) { await this._connection.close(); } } await this.workflowCreator?.destroy(); } finally { this.nativeWorker.flushCoreLogs(); } } } } export function parseWorkflowCode(code: string, codePath?: string): WorkflowBundleWithSourceMapAndFilename { const [actualCode, sourceMapJson] = extractSourceMap(code); const sourceMap: RawSourceMap = JSON.parse(sourceMapJson); // JS debuggers (at least VSCode's) have a few requirements regarding the script and its source map, notably: // - The script file name's must look like an absolute path (relative paths are treated as node internals scripts) // - If the script contains a sourceMapURL directive, the executable 'file' indicated by the source map must match the // filename of the script itself. If the source map's file is a relative path, then it gets resolved relative to cwd const filename = path.resolve(process.cwd(), codePath ?? sourceMap.file); if (filename !== codePath) { sourceMap.file = filename; const patchedSourceMapJson = Buffer.from(JSON.stringify(sourceMap)).toString('base64'); const fixedSourceMappingUrl = `\n//# sourceMappingURL=data:application/json;base64,${patchedSourceMapJson}`; code = actualCode + fixedSourceMappingUrl; } // Preloading the script makes breakpoints significantly more reliable and more responsive let script: vm.Script | undefined = new vm.Script(code, { filename }); let context: any = vm.createContext({}); try { script.runInContext(context); } catch (_e) { // Context has not been properly configured, so eventual errors are possible. Just ignore at this point } // Keep these objects from GC long enough for debugger to complete parsing the source map and reporting locations // to the node process. Otherwise, the debugger risks source mapping resolution errors, meaning breakpoints wont work. setTimeoutUnref(10000).then( () => { script = undefined; context = undefined; }, () => void 0 ); return { code, sourceMap, filename }; } function extractSourceMap(code: string): [string, string] { const sourceMapCommentPos = code.lastIndexOf('//# sourceMappingURL=data:'); if (sourceMapCommentPos > 0) { const base64TagIndex = code.indexOf('base64,', sourceMapCommentPos); if (base64TagIndex > 0) { const sourceMapJson = Buffer.from(code.slice(base64TagIndex + 'base64,'.length).trimEnd(), 'base64').toString(); const actualCode = code.slice(0, sourceMapCommentPos); return [actualCode, sourceMapJson]; } } throw new Error("Can't extract inlined source map from the provided Workflow Bundle"); } /** * Transform an ActivityTask into ActivityInfo to pass on into an Activity */ async function extractActivityInfo( task: coresdk.activity_task.ActivityTask, dataConverter: LoadedDataConverter, activityNamespace: string, taskQueue: string ): Promise { // NOTE: We trust core to supply all of these fields instead of checking for null and undefined everywhere const { taskToken } = task as NonNullableObject; const start = task.start as NonNullableObject; const activityId = start.activityId; let heartbeatDetails = undefined; try { heartbeatDetails = await decodeFromPayloadsAtIndex(dataConverter, 0, start.heartbeatDetails); } catch (e) { throw ApplicationFailure.fromError(e, { message: `Failed to parse heartbeat details for activity ${activityId}: ${errorMessage(e)}`, }); } return { taskToken, taskQueue, base64TaskToken: formatTaskToken(taskToken), activityId, workflowExecution: start.workflowExecution as NonNullableObject, attempt: start.attempt, isLocal: start.isLocal, activityType: start.activityType, workflowType: start.workflowType, heartbeatTimeoutMs: optionalTsToMs(start.heartbeatTimeout), heartbeatDetails, activityNamespace, workflowNamespace: start.workflowNamespace, scheduledTimestampMs: requiredTsToMs(start.scheduledTime, 'scheduledTime'), startToCloseTimeoutMs: requiredTsToMs(start.startToCloseTimeout, 'startToCloseTimeout'), scheduleToCloseTimeoutMs: requiredTsToMs(start.scheduleToCloseTimeout, 'scheduleToCloseTimeout'), currentAttemptScheduledTimestampMs: requiredTsToMs( start.currentAttemptScheduledTime, 'currentAttemptScheduledTime' ), priority: decodePriority(start.priority), retryPolicy: decompileRetryPolicy(start.retryPolicy), }; } /** * A utility function to await a promise with a timeout. * * This function properly cleans up the timer when the provided promise resolves or rejects. * * Returns a tuple with a boolean indicating if the promise resolved before the timeout, * and the result of the promise (if it completed). */ async function timeoutPromise(promise: Promise, timeout: number): Promise { let timer: NodeJS.Timeout | undefined; try { const timerPromise = new Promise((resolve) => { timer = setTimeoutCallback(resolve, timeout); }); return await Promise.race([ promise.then((result) => [true, result] as const), timerPromise.then(() => [false] as const), ]); } finally { if (timer) clearTimeout(timer); } } /** * A utility function that creates a timer promise, with an unrefed timer. */ async function setTimeoutUnref(timeout: number): Promise { return new Promise((resolve) => setTimeoutCallback(resolve, timeout).unref()); }