import * as nexus from 'nexus-rpc'; import { CancelledFailure, IllegalStateError, LoadedDataConverter, Payload, SdkComponent, LoggerWithComposedMetadata, MetricMeter, MetricMeterWithComposedTags, MetricTags, } from '@temporalio/common'; import { temporal, coresdk } from '@temporalio/proto'; import { asyncLocalStorage } from '@temporalio/nexus/lib/context'; import { encodeToPayload } from '@temporalio/common/lib/internal-non-workflow'; import { isAbortError } from '@temporalio/common/lib/type-helpers'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { Client } from '@temporalio/client'; import { Logger } from '../logger'; import { NexusCancelOperationInput, NexusStartOperationInput, NexusInboundCallsInterceptor, NexusInterceptorsFactory, NexusOutboundCallsInterceptor, } from '../interceptors'; import { coerceToHandlerError, decodePayload, handlerErrorToProto, operationErrorToProto } from './conversions'; const UNINITIALIZED = Symbol(); export class NexusHandler { /** * Logger bound to `sdkComponent: worker`, with metadata from this Nexus task. * This is the logger to use for all log messages emitted by the Nexus * worker. Note this is not exactly the same thing as the Nexus context * logger, which is bound to `sdkComponent: nexus`. */ private readonly logger: Logger; /** * Metric Meter with tags from this Nexus task, including tags from interceptors. */ private readonly metricMeter: MetricMeter; /** * List of interceptors for this Nexus task. */ private readonly interceptors: { inbound: NexusInboundCallsInterceptor[]; outbound: NexusOutboundCallsInterceptor[] }; constructor( public readonly taskToken: Uint8Array, public readonly namespace: string, public readonly taskQueue: string, public readonly context: nexus.OperationContext, public readonly client: Client, public readonly abortController: AbortController, private readonly services: Map, public readonly dataConverter: LoadedDataConverter, workerLogger: Logger, workerMetricMeter: MetricMeter, interceptors: NexusInterceptorsFactory[] ) { this.logger = LoggerWithComposedMetadata.compose(workerLogger, this.getLogAttributes.bind(this)); this.metricMeter = MetricMeterWithComposedTags.compose(workerMetricMeter, this.getMetricTags.bind(this)); this.interceptors = { inbound: [], outbound: [] }; interceptors .map((factory) => factory(this.context)) .forEach(({ inbound, outbound }) => { if (inbound) this.interceptors.inbound.push(inbound); if (outbound) this.interceptors.outbound.push(outbound); }); } public getLogAttributes(): Record { const logAttributes = { namespace: this.namespace, taskQueue: this.taskQueue, service: this.context.service, operation: this.context.operation, }; // In case some interceptor uses the logger while initializing... if (this.interceptors == null) return logAttributes; return composeInterceptors(this.interceptors.outbound, 'getLogAttributes', (a) => a)(logAttributes); } protected getMetricTags(): MetricTags { const baseTags = { namespace: this.namespace, taskQueue: this.taskQueue, service: this.context.service, operation: this.context.operation, }; // In case some interceptors use the metric meter while initializing... if (this.interceptors == null) return baseTags; return composeInterceptors(this.interceptors.outbound, 'getMetricTags', (a) => a)(baseTags); } private getOperationHandler(ctx: nexus.OperationContext): nexus.OperationHandler { const serviceHandler = this.services.get(ctx.service); if (serviceHandler == null) { throw new nexus.HandlerError('NOT_FOUND', `No service handler registered for service name '${ctx.service}'`); } return serviceHandler.getOperationHandler(ctx.operation); } protected async startOperation( ctx: nexus.StartOperationContext, payload: Payload | undefined ): Promise { try { const handler = this.getOperationHandler(ctx); const input = await decodePayload(this.dataConverter, payload); const executeNextHandler = async (interceptorInput: NexusStartOperationInput) => { const result = await this.invokeUserCode( 'startOperation', handler.start.bind(handler, interceptorInput.ctx, interceptorInput.input) ); return { result }; }; const executeWithInterceptors = composeInterceptors( this.interceptors.inbound, 'startOperation', executeNextHandler ); const { result } = await executeWithInterceptors({ ctx, input }); if (result.isAsync) { return { taskToken: this.taskToken, completed: { startOperation: { asyncSuccess: { operationToken: result.token, links: ctx.outboundLinks.map(nexusLinkToProtoLink), }, }, }, }; } else { return { taskToken: this.taskToken, completed: { startOperation: { syncSuccess: { payload: await encodeToPayload(this.dataConverter, result.value), links: ctx.outboundLinks.map(nexusLinkToProtoLink), }, }, }, }; } } catch (err) { if (err instanceof nexus.OperationError) { return { taskToken: this.taskToken, completed: { startOperation: { failure: await operationErrorToProto(this.dataConverter, err), }, }, }; } return { taskToken: this.taskToken, failure: await handlerErrorToProto(this.dataConverter, coerceToHandlerError(err)), }; } } protected async cancelOperation( ctx: nexus.CancelOperationContext, token: string ): Promise { try { const handler = this.getOperationHandler(ctx); const cancelNextHandler = async (interceptorInput: NexusCancelOperationInput) => { await this.invokeUserCode( 'cancelOperation', handler.cancel.bind(handler, interceptorInput.ctx, interceptorInput.token) ); }; const cancelWithInterceptors = composeInterceptors( this.interceptors.inbound, 'cancelOperation', cancelNextHandler ); await cancelWithInterceptors({ ctx, token }); return { taskToken: this.taskToken, completed: { cancelOperation: {}, }, }; } catch (err) { return { taskToken: this.taskToken, failure: await handlerErrorToProto(this.dataConverter, coerceToHandlerError(err)), }; } } protected async invokeUserCode(method: string, fn: () => Promise): Promise { let error: any = UNINITIALIZED; // In case someone decides to throw undefined... const startTime = process.hrtime.bigint(); this.logger.debug('Nexus handler started', { method }); try { return await fn(); } catch (err: any) { error = err; throw err; } finally { const durationNanos = process.hrtime.bigint() - startTime; const durationMs = Number(durationNanos / 1_000_000n); if (error === UNINITIALIZED) { this.logger.debug('Nexus handler invocation completed', { method, durationMs }); } else if ((error instanceof CancelledFailure || isAbortError(error)) && this.abortController.signal.aborted) { this.logger.debug('Nexus handler invocation completed as cancelled', { method, durationMs }); } else { this.logger.warn('Nexus handler invocation failed', { method, error, durationMs }); } } } /** * Actually executes the Operation. * * Any call up to this function and including this one will be trimmed out of stack traces. */ protected async execute( task: temporal.api.workflowservice.v1.IPollNexusTaskQueueResponse ): Promise { if (task.request?.startOperation != null) { const variant = task.request?.startOperation; if (!variant.requestId) { throw new IllegalStateError('Missing requestId in Nexus start operation request'); } return await this.startOperation( { ...this.context, requestId: variant.requestId, inboundLinks: (variant.links ?? []).map(protoLinkToNexusLink), callbackUrl: variant.callback ?? undefined, callbackHeaders: variant.callbackHeader ?? undefined, outboundLinks: [], }, variant.payload ?? undefined ); } else if (task.request?.cancelOperation != null) { const variant = task.request?.cancelOperation; if (variant.operationToken == null) { throw new nexus.HandlerError('BAD_REQUEST', 'Request missing Operation token'); } return await this.cancelOperation( { ...this.context, }, variant.operationToken ); } else { throw new nexus.HandlerError('NOT_IMPLEMENTED', 'Request method not implemented'); } } public async run( task: temporal.api.workflowservice.v1.IPollNexusTaskQueueResponse ): Promise { // Ensure that client calls made with the worker's client in this handler's context are tied to the abort signal. // TODO: Actually support canceling requests backed by NativeConnection. Once it does, this functionality should be tested. return await this.client.withAbortSignal(this.abortController.signal, async () => { return await asyncLocalStorage.run( { client: this.client, namespace: this.namespace, taskQueue: this.taskQueue, log: LoggerWithComposedMetadata.compose(this.logger, { sdkComponent: SdkComponent.nexus }), metrics: this.metricMeter, }, this.execute.bind(this, task) ); }); } } export function constructNexusOperationContext( request: temporal.api.nexus.v1.IRequest | null | undefined, abortSignal: AbortSignal, requestDeadline?: Date ): nexus.OperationContext { const base = { abortSignal, headers: headersProxy(request?.header), requestDeadline, }; if (request?.startOperation != null) { const op = request.startOperation; if (op?.service == null) { throw new IllegalStateError('expected request service to not be empty'); } if (op?.operation == null) { throw new IllegalStateError('expected request service to not be empty'); } return { ...base, service: op.service, operation: op.operation }; } if (request?.cancelOperation != null) { const op = request.cancelOperation; if (op?.service == null) { throw new IllegalStateError('expected request service to not be empty'); } if (op?.operation == null) { throw new IllegalStateError('expected request service to not be empty'); } return { ...base, service: op.service, operation: op.operation }; } throw new nexus.HandlerError('NOT_IMPLEMENTED', 'Request method not implemented'); } // TODO: That utility should be moved to the nexus-rpc package. function headersProxy(initializer?: Record | null): Record { const headers: Record = initializer ? Object.fromEntries(Object.entries(initializer).map(([k, v]) => [k.toLowerCase(), v])) : {}; return new Proxy(headers, { get(target, p) { if (typeof p !== 'string') { throw new TypeError('header keys must be strings'); } return target[p.toLowerCase()]; }, set(target, p, newValue) { if (typeof p !== 'string') { throw new TypeError('header keys must be strings'); } if (typeof newValue !== 'string') { throw new TypeError('header values must be strings'); } target[p.toLowerCase()] = newValue; return true; }, }); } function protoLinkToNexusLink(plink: temporal.api.nexus.v1.ILink): nexus.Link { if (!plink.url) { throw new nexus.HandlerError('BAD_REQUEST', 'empty link URL'); } if (!plink.type) { throw new nexus.HandlerError('BAD_REQUEST', 'empty link type'); } return { url: new URL(plink.url), type: plink.type, }; } function nexusLinkToProtoLink(nlink: nexus.Link): temporal.api.nexus.v1.ILink { return { url: nlink.url.toString(), type: nlink.type, }; }