/** * Persistence Interfaces for Workflow Engine * * These interfaces abstract database operations to enable: * - Testing with mock implementations * - Future extraction into @bratsos/workflow-engine package * - Alternative database backends * * Implementations: * - PrismaWorkflowPersistence (default, in ./prisma/) * - InMemoryPersistence (for testing) */ /** * Unified status type for workflows, stages, and jobs. * * - PENDING: Not started yet * - RUNNING: Currently executing * - SUSPENDED: Paused, waiting for external event (e.g., batch job completion) * - COMPLETED: Finished successfully * - FAILED: Finished with error * - CANCELLED: Manually stopped by user * - SKIPPED: Stage-specific - bypassed due to condition */ type Status = "PENDING" | "RUNNING" | "SUSPENDED" | "COMPLETED" | "FAILED" | "CANCELLED" | "SKIPPED"; /** @deprecated Use Status instead */ type WorkflowStatus = Status; /** @deprecated Use Status instead */ type WorkflowStageStatus = Status; /** @deprecated Use Status instead. Note: PROCESSING is now RUNNING. */ type JobStatus = Status; type LogLevel = "DEBUG" | "INFO" | "WARN" | "ERROR"; type ArtifactType = "STAGE_OUTPUT" | "ARTIFACT" | "METADATA"; declare class StaleVersionError extends Error { readonly entity: string; readonly id: string; readonly expected: number; readonly actual: number; constructor(entity: string, id: string, expected: number, actual: number); } interface WorkflowRunRecord { id: string; createdAt: Date; updatedAt: Date; version: number; workflowId: string; workflowName: string; workflowType: string; status: WorkflowStatus; startedAt: Date | null; completedAt: Date | null; duration: number | null; input: unknown; output: unknown | null; config: unknown; totalCost: number; totalTokens: number; priority: number; metadata: unknown | null; } interface WorkflowStageRecord { id: string; createdAt: Date; updatedAt: Date; version: number; workflowRunId: string; stageId: string; stageName: string; stageNumber: number; executionGroup: number; /** * Rerun generation. 0 for the original execution; incremented each * time `run.rerunFrom` recreates this stage. Annotations written by * `ctx.annotate(...)` during this stage inherit this value so a * future agent can distinguish decisions made on different attempts * of the same logical stage. */ attempt: number; status: WorkflowStageStatus; startedAt: Date | null; completedAt: Date | null; duration: number | null; inputData: unknown | null; outputData: unknown | null; config: unknown | null; suspendedState: unknown | null; resumeData: unknown | null; nextPollAt: Date | null; pollInterval: number | null; maxWaitUntil: Date | null; metrics: unknown | null; embeddingInfo: unknown | null; errorMessage: string | null; } interface WorkflowLogRecord { id: string; createdAt: Date; workflowStageId: string | null; workflowRunId: string | null; level: LogLevel; message: string; metadata: unknown | null; } interface WorkflowArtifactRecord { id: string; createdAt: Date; updatedAt: Date; workflowRunId: string; workflowStageId: string | null; key: string; type: ArtifactType; data: unknown; size: number; metadata: unknown | null; } /** * Annotation actor — who or what produced this annotation. * `kind` is open (recommended values: "agent", "user", "system") so consumers * can introduce custom kinds. `id` and `version` are indexed individually for * cross-version queries. */ interface AnnotationActor { kind?: string; id?: string; version?: string; } /** * Annotation scope — which entity within a run this annotation describes. * - "run": run-level (e.g., trigger context) * - "stage": tied to a specific stage execution (linked via workflowStageRecordId) * - "ai_call": tied to a specific AI call (custom; not used by the engine itself) * - other strings allowed for consumer-defined scopes */ type AnnotationScope = "run" | "stage" | "ai_call" | (string & {}); interface WorkflowAnnotationRecord { id: string; createdAt: Date; workflowRunId: string; workflowStageRecordId: string | null; attempt: number; scope: AnnotationScope; scopeId: string | null; actorKind: string | null; actorId: string | null; actorVersion: string | null; key: string; value: unknown; payload: unknown | null; idempotencyKey: string | null; } interface OutboxRecord { id: string; workflowRunId: string; sequence: number; eventType: string; payload: unknown; causationId: string; occurredAt: Date; publishedAt: Date | null; retryCount: number; dlqAt: Date | null; } interface CreateOutboxEventInput { workflowRunId: string; eventType: string; payload: unknown; causationId: string; occurredAt: Date; } interface IdempotencyRecord { key: string; commandType: string; result: unknown; createdAt: Date; } interface AICallRecord { id: string; createdAt: Date; topic: string; callType: string; modelKey: string; modelId: string; prompt: string; response: string; inputTokens: number; outputTokens: number; cost: number; metadata: unknown | null; } interface JobRecord { id: string; createdAt: Date; updatedAt: Date; workflowRunId: string; workflowId: string; stageId: string; status: JobStatus; priority: number; workerId: string | null; lockedAt: Date | null; startedAt: Date | null; completedAt: Date | null; attempt: number; maxAttempts: number; lastError: string | null; nextPollAt: Date | null; payload: Record; } interface CreateRunInput { id?: string; workflowId: string; workflowName: string; workflowType: string; input: unknown; config?: unknown; priority?: number; /** Optional metadata stored as JSON on the run record. NOT spread into Prisma fields. */ metadata?: Record; } interface UpdateRunInput { status?: WorkflowStatus; startedAt?: Date; completedAt?: Date | null; duration?: number | null; output?: unknown; totalCost?: number; totalTokens?: number; expectedVersion?: number; } interface CreateStageInput { workflowRunId: string; stageId: string; stageName: string; stageNumber: number; executionGroup: number; /** Rerun generation. Defaults to 0. Set by `run.rerunFrom` for recreated stages. */ attempt?: number; status?: WorkflowStageStatus; startedAt?: Date; config?: unknown; inputData?: unknown; } interface UpdateStageInput { status?: WorkflowStageStatus; startedAt?: Date; completedAt?: Date; duration?: number; outputData?: unknown; config?: unknown; suspendedState?: unknown; resumeData?: unknown; nextPollAt?: Date | null; pollInterval?: number; maxWaitUntil?: Date; metrics?: unknown; embeddingInfo?: unknown; artifacts?: unknown; errorMessage?: string; expectedVersion?: number; } interface UpsertStageInput { workflowRunId: string; stageId: string; create: CreateStageInput; update: UpdateStageInput; } interface CreateLogInput { workflowRunId?: string; workflowStageId?: string; level: LogLevel; message: string; metadata?: unknown; } interface SaveArtifactInput { workflowRunId: string; workflowStageId?: string; key: string; type: ArtifactType; data: unknown; size: number; metadata?: unknown; } /** * Input for appending an annotation. `attempt` defaults to 0; callers from * `job-execute.ts` / `stage-poll-suspended.ts` are responsible for computing * the correct attempt value (incremented when a stage is rerun). * * When `idempotencyKey` is set, the unique constraint on * `(workflowRunId, key, idempotencyKey)` ensures duplicates are skipped on * retry. When `idempotencyKey` is null, the constraint does not apply. */ interface CreateAnnotationInput { workflowRunId: string; workflowStageRecordId?: string | null; attempt?: number; scope: AnnotationScope; scopeId?: string | null; actor?: AnnotationActor; key: string; value: unknown; payload?: unknown; idempotencyKey?: string | null; /** * If true, the engine emits an `annotation:created` outbox event when * this row is persisted. Plumbed through the buffered-flush path so * the event lands in the same transaction as the annotation row. * Off by default — most provenance is read-only and doesn't need to * be a real-time event. */ emitEvent?: boolean; } /** * Filters for `listAnnotations`. All filters are AND-combined. * `keyPrefix` is implemented with `startsWith` (Postgres uses the * `(workflowRunId, key)` index; SQLite may table-scan unless the engine * branches to GLOB — see PrismaWorkflowPersistence). */ interface AnnotationFilters { key?: string; keyPrefix?: string; scope?: AnnotationScope; scopeId?: string | null; actorId?: string; actorKind?: string; attempt?: number; since?: Date; until?: Date; limit?: number; } interface CreateAICallInput { topic: string; callType: string; modelKey: string; modelId: string; prompt: string; response: string; inputTokens: number; outputTokens: number; cost: number; metadata?: unknown; } interface EnqueueJobInput { workflowRunId: string; workflowId: string; stageId: string; priority?: number; payload?: Record; scheduledFor?: Date; } interface DequeueResult { jobId: string; workflowRunId: string; workflowId: string; stageId: string; priority: number; attempt: number; maxAttempts: number; payload: Record; } interface WorkflowPersistence { /** Execute operations within a transaction boundary. */ withTransaction(fn: (tx: WorkflowPersistence) => Promise): Promise; createRun(data: CreateRunInput): Promise; updateRun(id: string, data: UpdateRunInput): Promise; getRun(id: string): Promise; getRunStatus(id: string): Promise; getRunsByStatus(status: WorkflowStatus): Promise; getStuckRuns(stuckSince: Date): Promise; /** * Atomically claim a pending workflow run for processing. * Uses atomic update with WHERE status = 'PENDING' to prevent race conditions. * * @param id - The workflow run ID to claim * @returns true if successfully claimed, false if already claimed by another worker */ claimPendingRun(id: string): Promise; /** * Atomically find and claim the next pending workflow run. * Uses FOR UPDATE SKIP LOCKED pattern (in Postgres) to prevent race conditions * when multiple workers try to claim workflows simultaneously. * * Priority ordering: higher priority first, then oldest (FIFO within same priority). * * @returns The claimed workflow run (now with status RUNNING), or null if no pending runs */ claimNextPendingRun(): Promise; createStage(data: CreateStageInput): Promise; upsertStage(data: UpsertStageInput): Promise; updateStage(id: string, data: UpdateStageInput): Promise; updateStageByRunAndStageId(workflowRunId: string, stageId: string, data: UpdateStageInput): Promise; getStage(runId: string, stageId: string): Promise; getStageById(id: string): Promise; getStagesByRun(runId: string, options?: { status?: WorkflowStageStatus; orderBy?: "asc" | "desc"; }): Promise; getSuspendedStages(beforeDate: Date): Promise; getFirstSuspendedStageReadyToResume(runId: string): Promise; getFirstFailedStage(runId: string): Promise; getLastCompletedStage(runId: string): Promise; getLastCompletedStageBefore(runId: string, executionGroup: number): Promise; deleteStage(id: string): Promise; createLog(data: CreateLogInput): Promise; saveArtifact(data: SaveArtifactInput): Promise; loadArtifact(runId: string, key: string): Promise; hasArtifact(runId: string, key: string): Promise; deleteArtifact(runId: string, key: string): Promise; listArtifacts(runId: string): Promise; getStageIdForArtifact(runId: string, stageId: string): Promise; /** * Append one or more annotations. Designed to be called both standalone * (fire-and-forget from external attach) and inside an existing * transaction (buffered during stage execution, flushed in the * stage-completion transaction). * * Rows with the same `(workflowRunId, key, idempotencyKey)` are deduped * via the unique constraint; duplicates are silently skipped. */ appendAnnotations(inputs: CreateAnnotationInput[]): Promise; /** * List annotations for a run, optionally filtered. Returns rows ordered * by `createdAt` ascending (so consumers get a timeline by default). */ listAnnotations(workflowRunId: string, filters?: AnnotationFilters): Promise; saveStageOutput(runId: string, workflowType: string, stageId: string, output: unknown): Promise; /** Increment retry count for a failed outbox event. Returns new count. */ incrementOutboxRetryCount(id: string): Promise; /** Move an outbox event to DLQ (sets dlqAt). */ moveOutboxEventToDLQ(id: string): Promise; /** Reset DLQ events so they can be reprocessed by outbox.flush. Returns count reset. */ replayDLQEvents(maxEvents: number): Promise; /** Write events to the outbox. Sequences are auto-assigned per workflowRunId. */ appendOutboxEvents(events: CreateOutboxEventInput[]): Promise; /** Read unpublished events ordered by (workflowRunId, sequence). */ getUnpublishedOutboxEvents(limit?: number): Promise; /** Mark events as published. */ markOutboxEventsPublished(ids: string[]): Promise; /** Atomically acquire an idempotency key for command execution. */ acquireIdempotencyKey(key: string, commandType: string): Promise<{ status: "acquired"; } | { status: "replay"; result: unknown; } | { status: "in_progress"; }>; /** Mark an idempotency key as completed and cache the command result. */ completeIdempotencyKey(key: string, commandType: string, result: unknown): Promise; /** Release an in-progress idempotency key after command failure. */ releaseIdempotencyKey(key: string, commandType: string): Promise; } interface AIHelperStats { totalCalls: number; totalInputTokens: number; totalOutputTokens: number; totalCost: number; perModel: Record; } interface AICallLogger { /** * Log a single AI call (fire and forget) */ logCall(call: CreateAICallInput): void; /** * Log batch results (for recording batch API results) */ logBatchResults(batchId: string, results: CreateAICallInput[]): Promise; /** * Get aggregated stats for a topic prefix */ getStats(topicPrefix: string): Promise; /** * Check if batch results are already recorded */ isRecorded(batchId: string): Promise; } interface JobQueue { /** * Add a new job to the queue */ enqueue(options: EnqueueJobInput): Promise; /** * Enqueue multiple stages in parallel (same execution group) */ enqueueParallel(jobs: EnqueueJobInput[]): Promise; /** * Atomically dequeue the next available job */ dequeue(): Promise; /** * Mark job as completed */ complete(jobId: string): Promise; /** * Mark job as suspended (for async-batch) */ suspend(jobId: string, nextPollAt: Date): Promise; /** * Mark job as failed */ fail(jobId: string, error: string, shouldRetry?: boolean): Promise; /** * Get suspended jobs that are ready to be checked */ getSuspendedJobsReadyToPoll(): Promise>; /** * Release stale locks (for crashed workers) */ releaseStaleJobs(staleThresholdMs?: number): Promise; /** * Cancel all pending/suspended jobs for a workflow run. * Returns count of cancelled jobs. */ cancelByRun(workflowRunId: string): Promise; } export { type AnnotationFilters as A, type CreateRunInput as C, type DequeueResult as D, type EnqueueJobInput as E, type IdempotencyRecord as I, type JobQueue as J, type LogLevel as L, type OutboxRecord as O, type Status as S, type UpdateRunInput as U, type WorkflowRunRecord as W, type CreateStageInput as a, type WorkflowStageRecord as b, type UpsertStageInput as c, type UpdateStageInput as d, type CreateLogInput as e, type CreateAnnotationInput as f, type WorkflowAnnotationRecord as g, type CreateOutboxEventInput as h, type AnnotationActor as i, type AICallLogger as j, type CreateAICallInput as k, type AIHelperStats as l, type AICallRecord as m, type JobRecord as n, type JobStatus as o, type WorkflowPersistence as p, type WorkflowStatus as q, type WorkflowStageStatus as r, type SaveArtifactInput as s, type WorkflowArtifactRecord as t, type WorkflowLogRecord as u, type AnnotationScope as v, type ArtifactType as w, StaleVersionError as x };