/** * Tests for stream and error primitives. */ import { beforeEach, describe, expect, it, vi } from "vitest"; // Mock pi-ai before importing the module under test vi.mock("@mariozechner/pi-ai", () => { function createAssistantMessageEventStream() { const events: unknown[] = []; let done = false; let resolve: (() => void) | undefined; let waitPromise: Promise | undefined; const stream = { push(event: unknown) { if (done) return; events.push(event); const ev = event as { type?: string }; if (ev.type === "done" || ev.type === "error") { done = true; } if (resolve) { resolve(); resolve = undefined; } }, end() { done = true; if (resolve) { resolve(); resolve = undefined; } }, async *[Symbol.asyncIterator]() { let index = 0; while (true) { if (index < events.length) { const event = events[index]; index++; yield event; const ev = event as { type?: string }; if (ev.type === "done" || ev.type === "error") return; } else if (done) { return; } else { waitPromise = new Promise((r) => { resolve = r; }); await waitPromise; } } }, _events: events, }; return stream; } return { createAssistantMessageEventStream }; }); import type { Api, AssistantMessageEvent, Model } from "@mariozechner/pi-ai"; import { createAssistantMessageEventStream } from "@mariozechner/pi-ai"; import { createErrorAssistantMessage, createImmediateErrorStream, createLinkedAbortController, createTimeoutController, normalizeUnknownError, pipeAssistantStream, pushErrorEvent, rewriteProviderOnEvent, } from "../streams.js"; function createMockModel(): Model { return { id: "test-model", name: "Test Model", api: "openai" as Api, provider: "test-provider", reasoning: false, input: ["text"] as ("text" | "image")[], cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, contextWindow: 128_000, maxTokens: 16_384, baseUrl: "https://api.test.com", } as Model; } beforeEach(() => { vi.clearAllMocks(); }); // --------------------------------------------------------------------------- // normalizeUnknownError // --------------------------------------------------------------------------- describe("normalizeUnknownError", () => { it("extracts message from Error instances", () => { expect(normalizeUnknownError(new Error("boom"))).toBe("boom"); }); it("returns string values as-is", () => { expect(normalizeUnknownError("something failed")).toBe("something failed"); }); it("JSON-serializes other values", () => { expect(normalizeUnknownError({ code: 500 })).toBe('{"code":500}'); }); it("handles null", () => { expect(normalizeUnknownError(null)).toBe("null"); }); it("handles undefined", () => { expect(normalizeUnknownError(undefined)).toBe(undefined); // JSON.stringify(undefined) returns undefined }); it("handles numbers", () => { expect(normalizeUnknownError(42)).toBe("42"); }); }); // --------------------------------------------------------------------------- // createErrorAssistantMessage // --------------------------------------------------------------------------- describe("createErrorAssistantMessage", () => { it("builds a zero-usage error message", () => { const model = createMockModel(); const msg = createErrorAssistantMessage(model, "test error"); expect(msg.role).toBe("assistant"); expect(msg.content).toEqual([]); expect(msg.api).toBe("openai"); expect(msg.provider).toBe("test-provider"); expect(msg.model).toBe("test-model"); expect(msg.stopReason).toBe("error"); expect(msg.errorMessage).toBe("test error"); expect(msg.usage.totalTokens).toBe(0); expect(msg.usage.cost.total).toBe(0); expect(msg.timestamp).toBeGreaterThan(0); }); }); // --------------------------------------------------------------------------- // pushErrorEvent // --------------------------------------------------------------------------- describe("pushErrorEvent", () => { it("pushes an error event into the stream", () => { const stream = createAssistantMessageEventStream(); const model = createMockModel(); pushErrorEvent(stream, model, "credential missing"); const events = (stream as unknown as { _events: unknown[] })._events; expect(events).toHaveLength(1); const event = events[0] as { type: string; reason: string }; expect(event.type).toBe("error"); expect(event.reason).toBe("error"); }); }); // --------------------------------------------------------------------------- // createImmediateErrorStream // --------------------------------------------------------------------------- describe("createImmediateErrorStream", () => { it("creates a stream with a single error event", async () => { const model = createMockModel(); const stream = createImmediateErrorStream(model, "no backend"); const collected: AssistantMessageEvent[] = []; for await (const event of stream) { collected.push(event as AssistantMessageEvent); } expect(collected).toHaveLength(1); expect(collected[0]?.type).toBe("error"); }); }); // --------------------------------------------------------------------------- // pipeAssistantStream // --------------------------------------------------------------------------- describe("pipeAssistantStream", () => { it("forwards all events from source to target", async () => { const source = createAssistantMessageEventStream(); const target = createAssistantMessageEventStream(); source.push({ type: "text_delta", contentIndex: 0, delta: "hello", } as unknown as AssistantMessageEvent); source.push({ type: "done", reason: "stop", } as unknown as AssistantMessageEvent); await pipeAssistantStream(source, target); const events = (target as unknown as { _events: unknown[] })._events; expect(events).toHaveLength(2); }); }); // --------------------------------------------------------------------------- // rewriteProviderOnEvent // --------------------------------------------------------------------------- describe("rewriteProviderOnEvent", () => { it("rewrites provider on partial events", () => { const event: AssistantMessageEvent = { type: "partial", partial: { provider: "internal", content: "hi" }, } as unknown as AssistantMessageEvent; const rewritten = rewriteProviderOnEvent(event, "public-provider"); expect( (rewritten as { partial: { provider: string } }).partial.provider, ).toBe("public-provider"); }); it("rewrites provider on done events", () => { const event: AssistantMessageEvent = { type: "done", message: { provider: "internal", content: "hi" }, } as unknown as AssistantMessageEvent; const rewritten = rewriteProviderOnEvent(event, "public-provider"); expect( (rewritten as { message: { provider: string } }).message.provider, ).toBe("public-provider"); }); it("rewrites provider on error events", () => { const event: AssistantMessageEvent = { type: "error", reason: "error", error: { provider: "internal", errorMessage: "fail" }, } as unknown as AssistantMessageEvent; const rewritten = rewriteProviderOnEvent(event, "public-provider"); expect((rewritten as { error: { provider: string } }).error.provider).toBe( "public-provider", ); }); it("returns unmodified events that have no recognized shape", () => { const event = { type: "other", data: 42, } as unknown as AssistantMessageEvent; const rewritten = rewriteProviderOnEvent(event, "public-provider"); expect(rewritten).toEqual(event); }); }); // --------------------------------------------------------------------------- // createLinkedAbortController // --------------------------------------------------------------------------- describe("createLinkedAbortController", () => { it("creates a standalone controller when no signal is provided", () => { const controller = createLinkedAbortController(); expect(controller.signal.aborted).toBe(false); }); it("creates an already-aborted controller when signal is aborted", () => { const parent = new AbortController(); parent.abort(); const linked = createLinkedAbortController(parent.signal); expect(linked.signal.aborted).toBe(true); }); it("aborts the linked controller when the parent signal fires", () => { const parent = new AbortController(); const linked = createLinkedAbortController(parent.signal); expect(linked.signal.aborted).toBe(false); parent.abort(); expect(linked.signal.aborted).toBe(true); }); it("does not abort the parent when the linked controller is aborted", () => { const parent = new AbortController(); const linked = createLinkedAbortController(parent.signal); linked.abort(); expect(parent.signal.aborted).toBe(false); expect(linked.signal.aborted).toBe(true); }); }); // --------------------------------------------------------------------------- // createTimeoutController // --------------------------------------------------------------------------- describe("createTimeoutController", () => { it("creates a linked controller with a timeout", () => { const { controller, clear } = createTimeoutController(undefined, 5000); expect(controller.signal.aborted).toBe(false); clear(); }); it("aborts after the specified timeout", async () => { const { controller, clear } = createTimeoutController(undefined, 10); await new Promise((r) => setTimeout(r, 50)); expect(controller.signal.aborted).toBe(true); clear(); }); it("can be cleared before timeout fires", async () => { const { controller, clear } = createTimeoutController(undefined, 10); clear(); await new Promise((r) => setTimeout(r, 50)); expect(controller.signal.aborted).toBe(false); }); it("links to parent signal", () => { const parent = new AbortController(); const { controller, clear } = createTimeoutController(parent.signal, 5000); parent.abort(); expect(controller.signal.aborted).toBe(true); clear(); }); });