import * as Activity from "@effect/workflow/Activity"; import * as Workflow from "@effect/workflow/Workflow"; import * as WorkflowEngine from "@effect/workflow/WorkflowEngine"; import { Cause, Effect, Exit, Layer, ManagedRuntime, Schema } from "effect"; import type { SmithersDb } from "../db/adapter"; import type { TaskDescriptor } from "../TaskDescriptor"; const TaskBridgeWorkflow = Workflow.make({ name: "SmithersTaskBridge", payload: { executionId: Schema.String }, success: Schema.Unknown, idempotencyKey: ({ executionId }) => executionId, }); const adapterNamespaces = new WeakMap(); let nextAdapterNamespace = 0; const ActivityBridgeWorkflowEngineLive = Layer.suspend(() => WorkflowEngine.layerMemory); const activityBridgeRuntime = ManagedRuntime.make(ActivityBridgeWorkflowEngineLive); const getAdapterNamespace = (adapter: SmithersDb): string => { const existing = adapterNamespaces.get(adapter); if (existing) { return existing; } const created = `adapter-${++nextAdapterNamespace}`; adapterNamespaces.set(adapter, created); return created; }; export type TaskActivityContext = { attempt: number; idempotencyKey: string; }; export type TaskActivityRetryOptions = { times: number; while?: (error: unknown) => boolean; }; export type ExecuteTaskActivityOptions = { initialAttempt?: number; retry?: false | TaskActivityRetryOptions; includeAttemptInIdempotencyKey?: boolean; }; export class RetriableTaskFailure extends Error { readonly nodeId: string; readonly attempt: number; constructor(nodeId: string, attempt: number) { super(`Task ${nodeId} failed on attempt ${attempt} and should be retried`); this.name = "RetriableTaskFailure"; this.nodeId = nodeId; this.attempt = attempt; } } const isRetriableTaskFailure = ( error: unknown, ): error is RetriableTaskFailure => error instanceof RetriableTaskFailure; export const makeTaskBridgeKey = ( adapter: SmithersDb, workflowName: string, runId: string, desc: TaskDescriptor, ): string => [ "smithers-task-bridge", getAdapterNamespace(adapter), workflowName, runId, desc.nodeId, String(desc.iteration), ].join(":"); export const makeTaskActivity = ( desc: TaskDescriptor, executeFn: (context: TaskActivityContext) => Promise | A, options?: Pick, ) => Activity.make({ name: desc.nodeId, success: Schema.Unknown, error: Schema.Unknown, execute: Effect.gen(function* () { const attempt = yield* Activity.CurrentAttempt; const idempotencyKey = yield* Activity.idempotencyKey(desc.nodeId, { includeAttempt: options?.includeAttemptInIdempotencyKey, }); return yield* Effect.tryPromise({ try: () => Promise.resolve(executeFn({ attempt, idempotencyKey })), catch: (error) => error, }); }), }); const runTaskActivityAttempt = async ( activity: ReturnType>, instance: WorkflowEngine.WorkflowInstance["Type"], attempt: number, ): Promise => { const exit = await activityBridgeRuntime.runPromiseExit( activity.pipe( Effect.provide( Layer.mergeAll( Layer.succeed(Activity.CurrentAttempt, attempt), Layer.succeed(WorkflowEngine.WorkflowInstance, instance), ), ), ) as Effect.Effect, ); if (Exit.isSuccess(exit)) { return exit.value; } const failure = Cause.failureOption(exit.cause); if (failure._tag === "Some") { throw failure.value; } throw Cause.squash(exit.cause); }; export const executeTaskActivity = async ( adapter: SmithersDb, workflowName: string, runId: string, desc: TaskDescriptor, executeFn: (context: TaskActivityContext) => Promise | A, options?: ExecuteTaskActivityOptions, ): Promise => { const activity = makeTaskActivity(desc, executeFn, options); const instance = WorkflowEngine.WorkflowInstance.initial( TaskBridgeWorkflow, makeTaskBridgeKey(adapter, workflowName, runId, desc), ); const initialAttempt = Math.max(1, options?.initialAttempt ?? 1); const retry = options?.retry === undefined ? { times: desc.retries, while: isRetriableTaskFailure } : options.retry; let attempt = initialAttempt; while (true) { try { return await runTaskActivityAttempt(activity, instance, attempt); } catch (error) { if ( retry === false || attempt - initialAttempt >= retry.times || !(retry.while ?? isRetriableTaskFailure)(error) ) { throw error; } attempt += 1; } } };