/** * Stream and error primitives for extension-owned provider wrappers. * * Both pi-credential-vault and pi-multicodex build custom streamSimple * implementations that resolve credentials, forward events, and surface * errors as AssistantMessage events. This module provides the shared * building blocks so each package only owns its domain-specific logic. */ import { type Api, type AssistantMessage, type AssistantMessageEvent, type AssistantMessageEventStream, createAssistantMessageEventStream, type Model, } from "@mariozechner/pi-ai"; // --------------------------------------------------------------------------- // Error normalization // --------------------------------------------------------------------------- /** * Normalize an unknown thrown value into a human-readable error string. * * Handles Error instances, plain strings, and arbitrary values. */ export function normalizeUnknownError(error: unknown): string { if (error instanceof Error) return error.message; if (typeof error === "string") return error; return JSON.stringify(error); } // --------------------------------------------------------------------------- // AssistantMessage construction // --------------------------------------------------------------------------- /** * Build an error AssistantMessage for a model. * * Produces a zero-usage message with `stopReason: "error"` that can be * used inside error events on an AssistantMessageEventStream. */ export function createErrorAssistantMessage( model: Model, message: string, ): AssistantMessage { return { role: "assistant" as const, content: [], api: model.api, provider: model.provider, model: model.id, usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0, cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, }, stopReason: "error" as const, errorMessage: message, timestamp: Date.now(), }; } // --------------------------------------------------------------------------- // Stream helpers // --------------------------------------------------------------------------- /** * Push an error event into an AssistantMessageEventStream. */ export function pushErrorEvent( stream: AssistantMessageEventStream, model: Model, message: string, ): void { stream.push({ type: "error", reason: "error", error: createErrorAssistantMessage(model, message), }); } /** * Create a stream that immediately emits a single error event. * * Useful when an extension detects a problem before starting the real * provider request (e.g., missing credentials or unavailable backend). */ export function createImmediateErrorStream( model: Model, message: string, ): AssistantMessageEventStream { const stream = createAssistantMessageEventStream(); pushErrorEvent(stream, model, message); return stream; } /** * Pipe all events from a source stream into a target stream, then end it. * * push() marks the target as done when it sees a terminal event (done/error), * which prevents further pushes. end() is still needed to flush any * consumers waiting on the target's async iterator after the last event. */ export async function pipeAssistantStream( source: AssistantMessageEventStream, target: AssistantMessageEventStream, ): Promise { for await (const event of source) { target.push(event); } target.end(); } /** * Rewrite the provider field on an AssistantMessageEvent. * * Extensions that re-register a provider under a different ID use this * to ensure downstream consumers see the extension's provider ID instead * of the internal one used for the actual API call. */ export function rewriteProviderOnEvent( event: AssistantMessageEvent, provider: string, ): AssistantMessageEvent { if ("partial" in event) { return { ...event, partial: { ...event.partial, provider } }; } if (event.type === "done") { return { ...event, message: { ...event.message, provider } }; } if (event.type === "error") { return { ...event, error: { ...event.error, provider } }; } return event; } // --------------------------------------------------------------------------- // Abort controller helpers // --------------------------------------------------------------------------- /** * Create an AbortController that aborts when the given signal fires. * * Returns a new controller whose signal can be passed to provider calls. * Aborting the returned controller does not abort the parent signal. */ export function createLinkedAbortController( signal?: AbortSignal, ): AbortController { const controller = new AbortController(); if (signal?.aborted) { controller.abort(); return controller; } signal?.addEventListener("abort", () => controller.abort(), { once: true }); return controller; } /** * Create a linked AbortController that auto-aborts after a timeout. * * Returns the controller and a `clear()` function that cancels the timer. * Always call `clear()` when the operation finishes to prevent leaks. */ export function createTimeoutController( signal: AbortSignal | undefined, timeoutMs: number, ): { controller: AbortController; clear: () => void } { const controller = createLinkedAbortController(signal); const timeout = setTimeout(() => controller.abort(), timeoutMs); return { controller, clear: () => clearTimeout(timeout), }; }