---
name: workflow-engine
description: Guide for @bratsos/workflow-engine - a type-safe workflow engine with AI integration, stage pipelines, and persistence. Use when building multi-stage workflows, AI-powered pipelines, implementing workflow persistence, defining stages, or working with batch AI operations. For upgrades between published versions, route through `migrations/README.md`.
license: MIT
metadata:
  author: bratsos
  version: "0.4.0"
  repository: https://github.com/bratsos/workflow-engine
---

## Upgrading between versions

When the user wants to upgrade `@bratsos/workflow-engine` in their project (e.g., "upgrade my project to the latest workflow-engine version", "I just bumped workflow-engine, walk me through the migration"), route to `migrations/README.md` — the upgrade router. It explains how to detect the installed and previous versions, find the relevant migration guides, and apply them in order. Multi-version upgrades (e.g., 0.6 → 0.8) load and apply multiple migration files sequentially.

# @bratsos/workflow-engine Skill

Type-safe workflow engine for building AI-powered, multi-stage pipelines with persistence and batch processing support. Uses a **command kernel** architecture with environment-agnostic design.

## Architecture Overview

The engine follows a **kernel + host** pattern:

- **Core library** (`@bratsos/workflow-engine`) - Command kernel, stage/workflow definitions, persistence adapters
- **Node Host** (`@bratsos/workflow-engine-host-node`) - Long-running worker with polling loops and signal handling
- **Serverless Host** (`@bratsos/workflow-engine-host-serverless`) - Stateless single-invocation for edge/lambda/workers
- **Remote Host** (`@bratsos/workflow-engine-host-remote`) - Credential-free remote activity workers: run a stage's `execute()` on a separate, disposable machine (no DB connection, no root object-store credentials)

The **kernel** is a pure command dispatcher. All workflow operations are expressed as typed commands dispatched via `kernel.dispatch()`. Hosts wrap the kernel with environment-specific process management.

## When to Apply

- User wants to create workflow stages or pipelines
- User mentions `defineStage`, `defineAsyncBatchStage`, `WorkflowBuilder`
- User is implementing workflow persistence with Prisma
- User needs AI integration (generateText, generateObject, embeddings, batch)
- User is building multi-stage data processing pipelines
- User mentions kernel, command dispatch, or job execution
- User wants to set up a Node.js worker or serverless worker
- User wants to run a stage on a separate / remote / disposable machine, or mentions credential-free workers, `defineRemoteStage`, the `ActivityExecutor` port, or offloading heavy stages (transcoding, ffmpeg, batch inference)
- User wants to rerun a workflow from a specific stage
- User needs to test workflows with in-memory adapters

## Quick Start

```typescript
import { defineStage, WorkflowBuilder } from "@bratsos/workflow-engine";
import { createKernel } from "@bratsos/workflow-engine/kernel";
import { createNodeHost } from "@bratsos/workflow-engine-host-node";
import {
  createPrismaWorkflowPersistence,
  createPrismaJobQueue,
} from "@bratsos/workflow-engine";
import { z } from "zod";

// 1. Define a stage
const processStage = defineStage({
  id: "process",
  name: "Process Data",
  schemas: {
    input: z.object({ data: z.string() }),
    output: z.object({ result: z.string() }),
    config: z.object({ verbose: z.boolean().default(false) }),
  },
  async execute(ctx) {
    return { output: { result: ctx.input.data.toUpperCase() } };
  },
});

// 2. Build a workflow
const workflow = new WorkflowBuilder(
  "my-workflow", "My Workflow", "Processes data",
  z.object({ data: z.string() }),
  z.object({ result: z.string() })
)
  .pipe(processStage)
  .build();

// 3. Create kernel
const kernel = createKernel({
  persistence: createPrismaWorkflowPersistence(prisma),
  blobStore: myBlobStore,
  jobTransport: createPrismaJobQueue(prisma),
  eventSink: myEventSink,
  scheduler: myScheduler,
  clock: { now: () => new Date() },
  registry: { getWorkflow: (id) => (id === "my-workflow" ? workflow : undefined) },
});

// 4. Start a Node host
const host = createNodeHost({
  kernel,
  jobTransport: createPrismaJobQueue(prisma),
  workerId: "worker-1",
});
await host.start();

// 5. Dispatch a command
await kernel.dispatch({
  type: "run.create",
  idempotencyKey: crypto.randomUUID(),
  workflowId: "my-workflow",
  input: { data: "hello" },
});
```

## Core Exports Reference

| Export | Type | Import Path | Purpose |
|--------|------|-------------|---------|
| `defineStage` | Function | `@bratsos/workflow-engine` | Create sync stages |
| `defineAsyncBatchStage` | Function | `@bratsos/workflow-engine` | Create async/batch stages |
| `WorkflowBuilder` | Class | `@bratsos/workflow-engine` | Chain stages into workflows |
| `createKernel` | Function | `@bratsos/workflow-engine/kernel` | Create command kernel |
| `createNodeHost` | Function | `@bratsos/workflow-engine-host-node` | Create Node.js host |
| `createServerlessHost` | Function | `@bratsos/workflow-engine-host-serverless` | Create serverless host |
| `defineRemoteStage` / `createActivityWorker` | Function | `@bratsos/workflow-engine-host-remote` | Run a stage on a credential-free remote worker (see 11-remote-activity-workers.md) |
| `createRoutingExecutor` / `createLocalExecutor` | Function | `@bratsos/workflow-engine/kernel` | `ActivityExecutor` port: route specific stages to a remote executor / default in-process executor |
| `createAIHelper` | Function | `@bratsos/workflow-engine` | AI operations (text, object, embed, batch) |
| `registerEmbeddingProvider` | Function | `@bratsos/workflow-engine` | Register custom embedding providers (Voyage, Cohere, etc.) |
| `createStageIds` | Function | `@bratsos/workflow-engine` | Create stage ID constants from a workflow |
| `defineStageIds` | Function | `@bratsos/workflow-engine` | Define stage ID constants from a tuple |
| `isValidStageId` | Function | `@bratsos/workflow-engine` | Runtime stage ID validation |
| `assertValidStageId` | Function | `@bratsos/workflow-engine` | Assert stage ID validity (throws) |
| `definePlugin` | Function | `@bratsos/workflow-engine/kernel` | Define kernel plugins |
| `createPluginRunner` | Function | `@bratsos/workflow-engine/kernel` | Create plugin event processor |
| `typedKey` | Function | `@bratsos/workflow-engine/conventions` | Define a well-known annotation key with linked value type |
| `Trigger` / `Decision` / `Approval` / `Revision` | Constants | `@bratsos/workflow-engine/conventions` | Well-known annotation key namespaces (v0.8+) |

## Kernel Commands

All operations go through `kernel.dispatch(command)`:

| Command | Description |
|---------|-------------|
| `run.create` | Create a new workflow run |
| `run.claimPending` | Claim pending runs, enqueue first-stage jobs |
| `run.transition` | Advance to next stage group or complete |
| `run.cancel` | Cancel a running workflow (authoritative: cascades to stages + jobs) |
| `run.rerunFrom` | Rerun from a specific stage (cleans up blob artifacts by prefix) |
| `job.execute` | Execute a single stage (uses multi-phase transactions; see 08-common-patterns.md) |
| `stage.pollSuspended` | Poll suspended stages for readiness (skips cancelled runs; per-stage transactions) |
| `lease.reapStale` | Release stale job leases |
| `run.reapStuck` | Detect and fail RUNNING runs with no recent activity |
| `outbox.flush` | Publish pending outbox events |
| `plugin.replayDLQ` | Replay dead-letter queue events |

## Stage Definition

### Sync Stage

```typescript
const myStage = defineStage({
  id: "my-stage",
  name: "My Stage",
  description: "Optional",
  dependencies: ["prev"],

  schemas: {
    input: InputSchema,     // Zod schema or "none"
    output: OutputSchema,
    config: ConfigSchema,
  },

  async execute(ctx) {
    const { input, config, workflowContext } = ctx;
    const prevOutput = ctx.require("prev");
    const optOutput = ctx.optional("other");

    await ctx.log("INFO", "Processing...");

    return {
      output: { ... },
      customMetrics: { itemsProcessed: 10 },
    };
  },
});
```

### Async Batch Stage

```typescript
const batchStage = defineAsyncBatchStage({
  id: "batch-process",
  name: "Batch Process",
  mode: "async-batch",
  schemas: { input: "none", output: OutputSchema, config: ConfigSchema },

  async execute(ctx) {
    if (ctx.resumeState) {
      return { output: ctx.resumeState.cachedResult };
    }

    const batchId = await submitBatchJob(ctx.input);
    return {
      suspended: true,
      state: {
        batchId,
        submittedAt: new Date().toISOString(),
        pollInterval: 60000,
        maxWaitTime: 3600000,
      },
      pollConfig: { pollInterval: 60000, maxWaitTime: 3600000, nextPollAt: new Date(Date.now() + 60000) },
    };
  },

  async checkCompletion(suspendedState, ctx) {
    const status = await checkBatchStatus(suspendedState.batchId);
    if (status === "completed") return { ready: true, output: { results } };
    if (status === "failed") return { ready: false, error: "Batch failed" };
    return { ready: false, nextCheckIn: 60000 };
  },
});
```

## WorkflowBuilder

Workflows are linear pipelines of **execution groups**. `.pipe()` creates single-stage groups; `.parallel()` creates multi-stage groups. Parallel group outputs are keyed by stage ID in the workflow context.

```typescript
const workflow = new WorkflowBuilder(
  "workflow-id", "Workflow Name", "Description",
  InputSchema, OutputSchema
)
  .pipe(stage1)                          // Group 0
  .pipe(stage2)                          // Group 1
  .parallel([stage3a, stage3b])          // Group 2 (concurrent, output: { "stage3a-id": ..., "stage3b-id": ... })
  .pipe(stage4)                          // Group 3
  .build();

// In stage4, access parallel outputs by stage ID:
ctx.require("stage3a-id")  // output of stage3a
ctx.require("stage3b-id")  // output of stage3b

workflow.getStageIds();
workflow.getExecutionPlan();
workflow.getDefaultConfig();
workflow.validateConfig(config);
```

When a workflow completes, the final execution group's output is persisted in `WorkflowRun.output` and included in the `workflow:completed` event.

## Kernel Setup

```typescript
import { createKernel } from "@bratsos/workflow-engine/kernel";
import type { Kernel, KernelConfig, Persistence, BlobStore, JobTransport, EventSink, Scheduler, Clock } from "@bratsos/workflow-engine/kernel";

const kernel = createKernel({
  persistence,   // Persistence port - runs, stages, logs, outbox, idempotency
  blobStore,     // BlobStore port - large payload storage
  jobTransport,  // JobTransport port - job queue
  eventSink,     // EventSink port - async event publishing
  scheduler,     // Scheduler port - deferred command triggers
  clock,         // Clock port - injectable time source
  registry,      // WorkflowRegistry - { getWorkflow(id) }
  // executor,   // optional ActivityExecutor port - defaults to in-process; inject to run stages on remote workers (see 11-remote-activity-workers.md)
});

// Dispatch typed commands
const { workflowRunId } = await kernel.dispatch({
  type: "run.create",
  idempotencyKey: "unique-key",
  workflowId: "my-workflow",
  input: { data: "hello" },
});
```

### Node Host

```typescript
import { createNodeHost } from "@bratsos/workflow-engine-host-node";

const host = createNodeHost({
  kernel,
  jobTransport,
  workerId: "worker-1",
  orchestrationIntervalMs: 10_000,
  jobPollIntervalMs: 1_000,
  staleLeaseThresholdMs: 60_000,
});

await host.start();   // Starts polling loops + signal handlers
await host.stop();    // Graceful shutdown
host.getStats();      // { workerId, jobsProcessed, orchestrationTicks, isRunning, uptimeMs }
```

### Serverless Host

```typescript
import {
  createServerlessHost,
  type ServerlessHost,
  type ServerlessHostConfig,
  type JobMessage,
  type JobResult,
  type ProcessJobsResult,
  type MaintenanceTickResult,
} from "@bratsos/workflow-engine-host-serverless";

const host = createServerlessHost({
  kernel,
  jobTransport,
  workerId: "my-worker",
  // Optional tuning (same defaults as Node host)
  staleLeaseThresholdMs: 60_000,
  maxClaimsPerTick: 10,
  maxSuspendedChecksPerTick: 10,
  maxOutboxFlushPerTick: 100,
});
```

#### `handleJob(msg: JobMessage): Promise<JobResult>`

Execute a single pre-dequeued job. Consumers wire platform-specific ack/retry around the result.

```typescript
// JobMessage shape (matches queue message body)
interface JobMessage {
  jobId: string;
  workflowRunId: string;
  workflowId: string;
  stageId: string;
  attempt: number;
  maxAttempts?: number;
  payload: Record<string, unknown>;
}

// JobResult
interface JobResult {
  outcome: "completed" | "suspended" | "failed";
  error?: string;
}

const result = await host.handleJob(msg);
if (result.outcome === "completed") msg.ack();
else if (result.outcome === "suspended") msg.ack();
else msg.retry();
```

#### `processAvailableJobs(opts?): Promise<ProcessJobsResult>`

Dequeue and process jobs from the job transport. Defaults to 1 job (safe for edge runtimes with CPU limits).

```typescript
const result = await host.processAvailableJobs({ maxJobs: 5 });
// { processed: number, succeeded: number, failed: number }
```

#### `runMaintenanceTick(): Promise<MaintenanceTickResult>`

Run one bounded maintenance cycle: claim pending, poll suspended, reap stale, flush outbox, reap stuck runs.

```typescript
const tick = await host.runMaintenanceTick();
// { claimed, suspendedChecked, staleReleased, eventsFlushed, stuckReaped }
// Note: resumed suspended stages are automatically followed by run.transition.
```

## Remote Activity Workers

Run a stage's `execute()` on a **separate, credential-free machine** (no database connection, no root object-store credentials) via the **`@bratsos/workflow-engine-host-remote`** package. The orchestrator owns all state; a remote worker leases the task, runs the real stage code, writes large artifacts directly to object storage by reference, and reports back — all driven through the engine's existing suspend/resume machinery (no new DB table).

Two wiring models:

- **Proxy stage** (recommended for long stages): `defineRemoteStage(realStage, transport, opts?)` suspends immediately (releasing the kernel job lease) and resumes when the worker reports.
- **`ActivityExecutor` port** (short stages / in-core routing): inject `createRemoteExecutor(transport)` — or `createRoutingExecutor({ remote, remoteStageIds })` to route only specific stages — via `createKernel({ executor })`. Backward-compatible: the default `createLocalExecutor()` is byte-for-byte the in-process behavior.

```typescript
import { defineRemoteStage } from "@bratsos/workflow-engine-host-remote";

// Orchestrator: wrap a heavy stage so it runs on a remote worker
const workflow = new WorkflowBuilder(...)
  .pipe(defineRemoteStage(heavyStage, oTransport, { maxWaitMs: 3_600_000, stageCodeVersion: "v1" }))
  .pipe(coreStage)
  .build();
```

The worker runs in a separate process/machine with **zero standing credentials** (`createActivityWorker` + `createHttpWorkerTransport`), receiving a presigned URL per artifact. See [`references/11-remote-activity-workers.md`](references/11-remote-activity-workers.md) for the worker, broker, HTTP transport, S3/R2 artifacts, durability, and limitations.

## Annotations (Provenance)

Attach typed key-value facts to runs and stages for queryable provenance — trigger context, decisions, approvals, anything else you'd want to ask back later. Writes are buffered during a stage and flushed atomically with the stage outcome (durable, not fire-and-forget).

```typescript
import { Decision, Trigger } from "@bratsos/workflow-engine/conventions";

// Inside a stage's execute()
ctx.annotate(Decision.outcome, "low");                       // typed
ctx.annotate(Decision.confidence, 0.42);                     // typed
ctx.annotate("acme.compliance.signoff", "alice@acme.com");   // custom key
ctx.annotate({
  actor: { kind: "agent", id: "triage-v3" },
  attributes: {
    "decision.outcome": "low",
    "decision.rationale": "below threshold",
    "decision.used_fallback": true,
  },
});

// At run creation — captures trigger context
await kernel.dispatch({
  type: "run.create",
  workflowId: "ticket-triage",
  input: { ticket },
  annotations: [{
    actor: { kind: "system", id: "zendesk" },
    attributes: {
      "trigger.source": "webhook:zendesk",
      "trigger.parent_run_id": previousRunId,
    },
  }],
});

// External attach (plugins, post-hoc reviews)
await kernel.annotations.attach(runId, {
  actor: { kind: "user", id: "alice@acme.com" },
  attributes: { "review.disposition": "approved-anyway" },
  idempotencyKey: "review-2026-05-24-alice",
});

// Query — flat key namespace, indexed
await kernel.annotations.list(runId);
await kernel.annotations.list(runId, { keyPrefix: "decision." });
await kernel.annotations.list(runId, { actorId: "triage-v3" });
```

Annotations replace the deprecated `WorkflowRun.metadata` column. Existing `metadata` is automatically surfaced as `legacy.metadata.*` virtual rows when consumers call `kernel.annotations.list()` (no dual-write, lazy synthesis). See [`references/10-annotations.md`](references/10-annotations.md) for the full API and conventions catalog.

## AI Integration & Cost Tracking

```typescript
const ai = createAIHelper(
  `workflow.${ctx.workflowRunId}.stage.${ctx.stageId}`,
  aiCallLogger,
);

const { text, cost } = await ai.generateText("gemini-2.5-flash", prompt);
const { object } = await ai.generateObject("gemini-2.5-flash", prompt, schema);
const { embedding } = await ai.embed("text-embedding-004", ["text1"], { dimensions: 768 });
// OpenRouter embedding models (OpenAI, Cohere, etc.)
const { embedding } = await ai.embed("openai/text-embedding-3-small", ["text1"]);

// Provider-specific options passthrough (Voyage, Cohere, etc.)
const { embedding } = await ai.embed("voyage-4-large", ["text1"], {
  providerOptions: { voyage: { outputDimension: 512, inputType: "document" } },
});

// Custom embedding providers (Voyage, Cohere, Jina, etc.)
import { registerEmbeddingProvider } from "@bratsos/workflow-engine";
import { voyage } from "voyage-ai-provider";
registerEmbeddingProvider("voyage", (modelId) => voyage.embeddingModel(modelId));
// Then register models with provider: "voyage" and use ai.embed() as usual
```

## Persistence Setup

### Required Prisma Models (ALL are required)

Copy the complete schema from the [package README](../../README.md#1-database-setup). This includes:
WorkflowRun, WorkflowStage, WorkflowLog, WorkflowArtifact, AICall, JobQueue, OutboxEvent, IdempotencyKey.

### Create Persistence

```typescript
import {
  createPrismaWorkflowPersistence,
  createPrismaJobQueue,
  createPrismaAICallLogger,
} from "@bratsos/workflow-engine/persistence/prisma";

const persistence = createPrismaWorkflowPersistence(prisma);
const jobQueue = createPrismaJobQueue(prisma);
const aiCallLogger = createPrismaAICallLogger(prisma);

// SQLite - MUST pass databaseType option
const persistence = createPrismaWorkflowPersistence(prisma, { databaseType: "sqlite" });
const jobQueue = createPrismaJobQueue(prisma, { databaseType: "sqlite" });
```

## Testing

```typescript
// In-memory persistence and job queue
import {
  InMemoryWorkflowPersistence,
  InMemoryJobQueue,
  InMemoryAICallLogger,
} from "@bratsos/workflow-engine/testing";

// Kernel-specific test adapters
import {
  FakeClock,
  InMemoryBlobStore,
  CollectingEventSink,
  NoopScheduler,
} from "@bratsos/workflow-engine/kernel/testing";

// Create kernel with all in-memory adapters
const persistence = new InMemoryWorkflowPersistence();
const jobQueue = new InMemoryJobQueue();
const kernel = createKernel({
  persistence,
  blobStore: new InMemoryBlobStore(),
  jobTransport: jobQueue,
  eventSink: new CollectingEventSink(),
  scheduler: new NoopScheduler(),
  clock: new FakeClock(),
  registry: { getWorkflow: (id) => workflows.get(id) },
});

// Test a full workflow lifecycle
await kernel.dispatch({ type: "run.create", idempotencyKey: "test", workflowId: "my-wf", input: {} });
await kernel.dispatch({ type: "run.claimPending", workerId: "test-worker" });
const job = await jobQueue.dequeue();
await kernel.dispatch({ type: "job.execute", workflowRunId: job.workflowRunId, workflowId: job.workflowId, stageId: job.stageId, config: {} });
await kernel.dispatch({ type: "run.transition", workflowRunId: job.workflowRunId });
```

## Reference Files

- [01-stage-definitions.md](references/01-stage-definitions.md) - Complete stage API
- [02-workflow-builder.md](references/02-workflow-builder.md) - WorkflowBuilder patterns
- [03-kernel-host-setup.md](references/03-runtime-setup.md) - Kernel & host configuration
- [04-ai-integration.md](references/04-ai-integration.md) - AI helper methods
- [05-persistence-setup.md](references/05-persistence-setup.md) - Database setup
- [06-async-batch-stages.md](references/06-async-batch-stages.md) - Async operations
- [07-testing-patterns.md](references/07-testing-patterns.md) - Testing with kernel
- [08-common-patterns.md](references/08-common-patterns.md) - Kernel patterns & best practices
- [09-troubleshooting.md](references/09-troubleshooting.md) - Debugging stuck runs, P2002 errors, ghost jobs
- [10-annotations.md](references/10-annotations.md) - First-class provenance surface: `ctx.annotate`, `kernel.annotations.*`, conventions catalog
- [11-remote-activity-workers.md](references/11-remote-activity-workers.md) - Credential-free remote workers: `defineRemoteStage`, broker, worker SDK, HTTP transport, S3/R2 artifacts, `ActivityExecutor` port

## Key Principles

1. **Type Safety**: All schemas are Zod - types flow through the entire pipeline
2. **Command Kernel**: All operations are typed commands dispatched through `kernel.dispatch()`
3. **Environment-Agnostic**: Kernel has no timers, no signals, no global state
4. **Context Access**: Use `ctx.require()` and `ctx.optional()` for type-safe stage output access
5. **Transactional Outbox**: Events written to outbox, published via `outbox.flush` command. `job.execute` and `stage.pollSuspended` use multi-phase transactions to avoid holding connections during external I/O
6. **Idempotency**: `run.create` and `job.execute` replay cached results by key; concurrent same-key dispatch throws `IdempotencyInProgressError`
7. **Authoritative Cancellation**: `run.cancel` cascades to stages + jobs. Ghost jobs (running against non-RUNNING runs) are detected via `ghost: true` flag and not retried
8. **Self-Healing**: Stage creation is idempotent (upsert), orchestration steps are isolated, stuck runs are automatically reaped
9. **Cost Tracking**: All AI calls automatically track tokens and costs
10. **BlobStore-Only Artifacts**: All artifact storage goes through the BlobStore port. `run.rerunFrom` cleans up artifacts by key prefix
11. **Durable Provenance**: `ctx.annotate(...)` writes are buffered and flushed inside the stage-completion transaction. Annotations are atomic with the stage outcome — a stage's annotations either all persist or all roll back together with the stage update and outbox events.
12. **Pluggable Execution**: stage execution goes through an injectable `ActivityExecutor` port (default in-process `LocalExecutor`). Inject a remote executor — or wrap a stage with `defineRemoteStage` — to run `execute()` on a separate credential-free machine without changing kernel internals.
