import { S as StageResult, a as SuspendedResult } from './types-ByGg__Kd.js'; import { E as EnqueueJobInput, D as DequeueResult, C as CreateRunInput, W as WorkflowRunRecord, U as UpdateRunInput, S as Status, a as CreateStageInput, b as WorkflowStageRecord, c as UpsertStageInput, d as UpdateStageInput, e as CreateLogInput, f as CreateAnnotationInput, A as AnnotationFilters, g as WorkflowAnnotationRecord, h as CreateOutboxEventInput, O as OutboxRecord } from './interface-DMzwv0lD.js'; import { K as KernelEvent } from './events-B3XPPu0c.js'; /** * Kernel Port Interfaces * * These interfaces define the dependency-inversion boundary for the workflow * kernel. Every external capability the kernel needs is expressed as a port * interface here. Concrete implementations are injected at composition time, * making the kernel fully testable with in-memory fakes. * * Ports: * - Clock – injectable time source * - Persistence – metadata storage (runs, stages, logs, artifacts) * - BlobStore – large payload storage * - JobTransport – job queue abstraction * - EventSink – event publishing * - Scheduler – deferred command triggers */ /** Injectable time source. */ interface Clock { now(): Date; } /** * Metadata storage port. * * Run, stage, and log CRUD operations. Artifact payloads are handled * by the BlobStore port. */ interface Persistence { /** Execute operations within a transaction boundary. */ withTransaction(fn: (tx: Persistence) => Promise): Promise; createRun(data: CreateRunInput): Promise; updateRun(id: string, data: UpdateRunInput): Promise; getRun(id: string): Promise; getRunStatus(id: string): Promise; getRunsByStatus(status: Status): Promise; getStuckRuns(stuckSince: Date): Promise; /** * Atomically claim a pending workflow run for processing. * Uses atomic update with WHERE status = 'PENDING' to prevent race conditions. */ claimPendingRun(id: string): Promise; /** * Atomically find and claim the next pending workflow run. * Uses FOR UPDATE SKIP LOCKED pattern (in Postgres) to prevent race conditions * when multiple workers try to claim workflows simultaneously. * * Priority ordering: higher priority first, then oldest (FIFO within same priority). */ claimNextPendingRun(): Promise; createStage(data: CreateStageInput): Promise; upsertStage(data: UpsertStageInput): Promise; updateStage(id: string, data: UpdateStageInput): Promise; updateStageByRunAndStageId(workflowRunId: string, stageId: string, data: UpdateStageInput): Promise; getStage(runId: string, stageId: string): Promise; getStageById(id: string): Promise; getStagesByRun(runId: string, options?: { status?: Status; orderBy?: "asc" | "desc"; }): Promise; getSuspendedStages(beforeDate: Date): Promise; getFirstSuspendedStageReadyToResume(runId: string): Promise; getFirstFailedStage(runId: string): Promise; getLastCompletedStage(runId: string): Promise; getLastCompletedStageBefore(runId: string, executionGroup: number): Promise; deleteStage(id: string): Promise; createLog(data: CreateLogInput): Promise; /** * Append annotations. Called both from outside transactions (run.create, * external attach) and inside the stage-completion transactions in * job-execute and stage-poll-suspended. Duplicates with the same * `(workflowRunId, key, idempotencyKey)` are silently skipped. */ appendAnnotations(inputs: CreateAnnotationInput[]): Promise; /** List annotations for a run, ordered by `createdAt` ascending. */ listAnnotations(workflowRunId: string, filters?: AnnotationFilters): Promise; /** Write events to the outbox. Sequences are auto-assigned per workflowRunId. */ appendOutboxEvents(events: CreateOutboxEventInput[]): Promise; /** Read unpublished events ordered by (workflowRunId, sequence). */ getUnpublishedOutboxEvents(limit?: number): Promise; /** Mark events as published. */ markOutboxEventsPublished(ids: string[]): Promise; /** Increment retry count for a failed outbox event. Returns new count. */ incrementOutboxRetryCount(id: string): Promise; /** Move an outbox event to DLQ (sets dlqAt). */ moveOutboxEventToDLQ(id: string): Promise; /** Reset DLQ events so they can be reprocessed by outbox.flush. Returns count reset. */ replayDLQEvents(maxEvents: number): Promise; /** Atomically acquire an idempotency key for command execution. */ acquireIdempotencyKey(key: string, commandType: string): Promise<{ status: "acquired"; } | { status: "replay"; result: unknown; } | { status: "in_progress"; }>; /** Mark an idempotency key as completed and cache the command result. */ completeIdempotencyKey(key: string, commandType: string, result: unknown): Promise; /** Release an in-progress idempotency key after command failure. */ releaseIdempotencyKey(key: string, commandType: string): Promise; } /** Large payload storage. */ interface BlobStore { put(key: string, data: unknown): Promise; get(key: string): Promise; has(key: string): Promise; delete(key: string): Promise; list(prefix: string): Promise; } /** * Job queue abstraction. * * Same shape as the existing `JobQueue` interface so that current * implementations (`InMemoryJobQueue`, `PrismaJobQueue`) structurally satisfy * this port without adapters. */ interface JobTransport { /** Add a new job to the queue. */ enqueue(options: EnqueueJobInput): Promise; /** Enqueue multiple stages in parallel (same execution group). */ enqueueParallel(jobs: EnqueueJobInput[]): Promise; /** Atomically dequeue the next available job. */ dequeue(): Promise; /** Mark job as completed. */ complete(jobId: string): Promise; /** Mark job as suspended (for async-batch). */ suspend(jobId: string, nextPollAt: Date): Promise; /** Mark job as failed. */ fail(jobId: string, error: string, shouldRetry?: boolean): Promise; /** Get suspended jobs that are ready to be checked. */ getSuspendedJobsReadyToPoll(): Promise>; /** Release stale locks (for crashed workers). */ releaseStaleJobs(staleThresholdMs?: number): Promise; /** Cancel all pending/suspended jobs for a workflow run. Returns count cancelled. */ cancelByRun(workflowRunId: string): Promise; } /** Event publishing (replaces global EventBus). */ interface EventSink { emit(event: KernelEvent): Promise; } /** Deferred command triggers (minimal in Phase 1). */ interface Scheduler { schedule(commandType: string, payload: unknown, runAt: Date): Promise; cancel(commandType: string, correlationId: string): Promise; } /** * Minimal structural type for a stage definition as seen by the executor. * LocalExecutor uses the full Stage; RemoteExecutor (host-remote) ships only * ids and ignores stageDef.execute. Using `any` Zod schemas keeps the port * free of Stage's type parameters. */ interface SyncStageDefinitionLike { id: string; name: string; inputSchema: { parse(v: unknown): unknown; }; outputSchema?: { parse(v: unknown): unknown; } | null; configSchema?: { parse(v: unknown): unknown; } | null; execute(ctx: any): Promise | SuspendedResult>; } /** A log entry buffered by the executor for the handler to persist. */ interface BufferedLog { level: string; message: string; meta?: Record; } /** Input passed to ActivityExecutor.run(). */ interface ActivityRunInput { stageDef: SyncStageDefinitionLike; workflowId: string; workflowRunId: string; workflowType: string; stageId: string; stageName: string; stageNumber: number; stageRecordId: string; attempt: number; rawInput: unknown; config: Record; resumeState?: unknown; workflowContext: Record; } /** Result from ActivityExecutor.run(). Exactly one of result / error is set. */ interface ActivityRunResult { result?: StageResult | SuspendedResult; error?: string; progress: KernelEvent[]; annotations: CreateAnnotationInput[]; /** * Log entries for the handler to persist after run completes. * LocalExecutor writes logs live during execute(), so it returns []. * RemoteExecutor captures worker logs and returns them here. */ logs: BufferedLog[]; } /** * Subset of KernelDeps that ActivityExecutor.run() needs. * KernelDeps in kernel.ts structurally satisfies this interface; no adapter * is needed when passing deps from the handler. */ interface ExecutorDeps { persistence: Persistence; blobStore: BlobStore; clock: Clock; } /** * Port: controls where a stage body runs. * Default implementation = LocalExecutor (inline, current-process). * Swap for RemoteExecutor to run stages on remote workers. */ interface ActivityExecutor { run(input: ActivityRunInput, deps: ExecutorDeps): Promise; } export type { ActivityExecutor as A, BlobStore as B, Clock as C, EventSink as E, JobTransport as J, Persistence as P, Scheduler as S, ActivityRunInput as a, ActivityRunResult as b, BufferedLog as c, ExecutorDeps as d };