/** * @license * Copyright 2026 Steven Roussey * SPDX-License-Identifier: Apache-2.0 */ import type { IExecuteContext, StreamEvent, TaskInput, TaskOutput } from "@workglow/task-graph"; import type { AiEmit } from "../../capability/AiEmit"; import type { EmitQueue } from "../../capability/emitQueue"; import type { IAiExecutionStrategy } from "../../execution/IAiExecutionStrategy"; import type { AiJobInput } from "../../job/AiJob"; /** * Factory for the emit callback that producers should use. Receives the * queue (already wired) and must return the function the strategy will * call when emitting events. Sites that need to peek at events before they * reach the consumer (e.g. AiChatTask accumulates a per-turn text and * swallows inner-turn `finish` events) use this hook; sites that don't * just `return (event) => queue.push(event as StreamEvent)`. */ export type RunWithIterableEmitFactory = (queue: EmitQueue>) => AiEmit; /** * Drive a strategy through an emit-queue and surface its events as an * `AsyncIterable>`. Centralises the abort plumbing * shared by every streaming task site: * * 1. Construct a `localAbort` AbortController. * 2. If `context.signal` is already aborted, abort `localAbort` synchronously * so the strategy sees the cancellation on its first signal check. * Otherwise wire a `once: true` listener that mirrors parent aborts * into `localAbort`. The listener is removed when the run settles. * 3. Build a shallow-cloned context with `signal` replaced (other context * fields pass through unchanged by reference) and hand that to * `strategy.execute`. A shallow clone — rather than a `Proxy` — is used * so the wrapper has a stable identity for callers that rely on * reference equality (e.g. `WeakMap`-keyed caches). * 4. Iterate the queue and yield events to the caller. * 5. In a `finally`, abort `localAbort`, fail the queue, and await the * run promise. The `await` swallows the post-abort rejection — the * caller's iteration error (if any) is preserved by the surrounding * try/finally semantics. * * Abort bond is strictly one-way: parent → child. * • context.signal.abort() → localAbort aborts → strategy sees it. * • localAbort.abort() (from the finally) does NOT abort the parent; * the parent's signal is untouched and its other consumers keep running. * * @example * // Cancellation is driven through an AbortController — AbortSignal has * // no public .abort() method, so callers must keep the controller in * // scope and call abort() on it (not on the signal). * const parentController = new AbortController(); * const ctx: IExecuteContext = { ...baseContext, signal: parentController.signal }; * * const iter = runWithIterable(strategy, jobInput, ctx, runnerId, emitFactory); * * // Parent cancels: both ctx.signal and the strategy stop. * parentController.abort(); * * // Consumer break: only the strategy stops. parentController.signal * // remains un-aborted and sibling tasks continue. * for await (const e of iter) { if (done) break; } */ export declare function runWithIterable(strategy: IAiExecutionStrategy, jobInput: AiJobInput, context: IExecuteContext, runnerId: string | undefined, emitFactory: RunWithIterableEmitFactory): AsyncIterable>; //# sourceMappingURL=runWithIterable.d.ts.map