import type { AgentMessage } from "@oh-my-pi/pi-agent-core"; import { logger } from "@oh-my-pi/pi-utils"; export interface YieldDispatcher

{ /** Drop entries already delivered through another path. Called per-entry at flush time. */ isStale?(entry: P): boolean; /** Produce one batched AgentMessage from non-stale entries. Return null to skip. */ build(survivors: P[]): AgentMessage | null; } export interface YieldQueueOptions { isStreaming: () => boolean; injectStreaming(msg: AgentMessage): void; injectIdle(messages: AgentMessage[]): Promise; scheduleIdleFlush(run: () => Promise): void; } type YieldFlushMode = "streaming" | "idle"; interface StoredDispatcher { isStale?: (entry: unknown) => boolean; build: (survivors: unknown[]) => AgentMessage | null; } function formatError(error: unknown): string { return error instanceof Error ? error.message : String(error); } export class YieldQueue { readonly #options: YieldQueueOptions; readonly #dispatchers = new Map(); readonly #entries = new Map(); #idleFlushPending = false; constructor(options: YieldQueueOptions) { this.#options = options; } register

(kind: string, dispatcher: YieldDispatcher

): () => void { const stored: StoredDispatcher = { ...(dispatcher.isStale ? { isStale: entry => dispatcher.isStale?.(entry as P) ?? false } : {}), build: survivors => dispatcher.build(survivors as P[]), }; this.#dispatchers.set(kind, stored); return () => { if (this.#dispatchers.get(kind) !== stored) return; this.#dispatchers.delete(kind); this.#entries.delete(kind); }; } enqueue

(kind: string, entry: P): void { if (!this.#dispatchers.has(kind)) { logger.warn("Yield queue entry ignored for unregistered kind", { kind }); return; } let entries = this.#entries.get(kind); if (!entries) { entries = []; this.#entries.set(kind, entries); } entries.push(entry); if (!this.#options.isStreaming()) { this.#scheduleIdleFlush(); } } has(kind?: string): boolean { if (kind !== undefined) return (this.#entries.get(kind)?.length ?? 0) > 0; for (const entries of this.#entries.values()) { if (entries.length > 0) return true; } return false; } async flush(mode: YieldFlushMode): Promise { if (mode === "idle") { this.#idleFlushPending = false; } const idleMessages: AgentMessage[] = []; for (const [kind, dispatcher] of this.#dispatchers) { const entries = this.#drain(kind); if (entries.length === 0) continue; const message = this.#build(kind, dispatcher, entries); if (!message) continue; if (mode === "streaming") { try { this.#options.injectStreaming(message); } catch (error) { logger.warn("Yield queue streaming dispatch failed", { kind, error: formatError(error) }); } } else { idleMessages.push(message); } } if (mode === "idle" && idleMessages.length > 0) { try { await this.#options.injectIdle(idleMessages); } catch (error) { logger.warn("Yield queue idle dispatch failed", { error: formatError(error) }); } } } clear(): void { this.#entries.clear(); this.#idleFlushPending = false; } #scheduleIdleFlush(): void { if (this.#idleFlushPending) return; this.#idleFlushPending = true; try { this.#options.scheduleIdleFlush(async () => { this.#idleFlushPending = false; if (this.#options.isStreaming()) return; await this.flush("idle"); }); } catch (error) { this.#idleFlushPending = false; logger.warn("Yield queue idle flush scheduling failed", { error: formatError(error) }); } } #drain(kind: string): unknown[] { const entries = this.#entries.get(kind); if (!entries || entries.length === 0) return []; this.#entries.delete(kind); return entries; } #build(kind: string, dispatcher: StoredDispatcher, entries: unknown[]): AgentMessage | null { const survivors: unknown[] = []; for (const entry of entries) { if (dispatcher.isStale) { let stale: boolean; try { stale = dispatcher.isStale(entry); } catch (error) { logger.warn("Yield queue stale check failed", { kind, error: formatError(error) }); continue; } if (stale) continue; } survivors.push(entry); } if (survivors.length === 0) return null; try { return dispatcher.build(survivors); } catch (error) { logger.warn("Yield queue build failed", { kind, error: formatError(error) }); return null; } } }