import { afterEach, beforeEach, describe, expect, it, mock } from "bun:test"; import { mkdirSync, rmSync, writeFileSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; const testDir = join(tmpdir(), `mozart-supervisor-test-${process.pid}-${Date.now()}`); process.env.MOZART_DIR = testDir; const calls = { shutdown: [] as string[], kill: [] as string[], spawn: [] as string[], sent: [] as Array<{ containerId: string; msg: any }>, }; function resetCalls() { calls.shutdown = []; calls.kill = []; calls.spawn = []; calls.sent = []; fakeProcesses.clear(); } function makeFakeProcess() { let exitResolve: (code: number) => void; const exited = new Promise((r) => { exitResolve = r; }); const encoder = new TextEncoder(); let streamController: ReadableStreamDefaultController; const stdout = new ReadableStream({ start(c) { streamController = c; }, }); return { proc: { stdout, stdin: { write: () => {} }, stderr: null, exited, kill: () => {}, pid: Math.floor(Math.random() * 99999), } as any, emit: (msg: object) => streamController!.enqueue(encoder.encode(`${JSON.stringify(msg)}\n`)), exit: (code: number) => { try { streamController!.close(); } catch {} exitResolve!(code); }, }; } const fakeProcesses = new Map>(); const { EventBus } = await import("../src/agent/mailbox.ts"); mock.module("../src/podman.ts", () => ({ MOZART_IMAGE: "mock", MOZART_DEV_IMAGE: "mock-dev", DEV_MODE: false, spawnContainer: (config: any, _apiKey: string) => { const fake = makeFakeProcess(); fakeProcesses.set(config.name, fake); calls.spawn.push(config.name); return { id: config.name, config, process: fake.proc, state: "starting" as const, startedAt: new Date(), restartCount: 0, messageCount: 0, pendingQueries: new Map(), buffer: "", events: new EventBus(), }; }, startContainer: (containerName: string) => { const fake = makeFakeProcess(); fakeProcesses.set(containerName.replace("mozart-", ""), fake); return fake.proc; }, shutdownContainer: (container: any) => { calls.shutdown.push(container.id); container.state = "stopped"; container.events?.close(); if (container.pendingQueries?.size > 0) { const err = new Error("Container shut down"); for (const [, pending] of container.pendingQueries) pending.reject(err); container.pendingQueries.clear(); } }, sendToContainer: (container: any, msg: any) => { calls.sent.push({ containerId: container.id, msg }); }, killContainer: (names: string | string[]) => { const arr = Array.isArray(names) ? names : [names]; calls.kill.push(...arr); }, stopContainers: () => {}, podmanInspect: async () => null, getEnvFromInspect: () => null, createContainer: () => "mock-container", copyIntoContainer: () => {}, removeImage: () => {}, commitContainer: () => {}, saveImage: () => {}, loadImage: () => "mock-loaded-image", rejectPendingQueries: (container: any, reason: string) => { if (!container.pendingQueries?.size) return; const err = new Error(reason); for (const [, pending] of container.pendingQueries) pending.reject(err); container.pendingQueries.clear(); }, isPodmanAvailable: async () => true, imageExists: async () => true, })); function makeConfig(name: string) { return { name, model: "test/model", identity: "test agent", ifThen: [], schedule: [], skills: [], sanctums: [], sourcePath: `/fake/${name}.soul`, }; } mock.module("../src/agent/parser.ts", () => ({ parseSoulfile: (path: string) => { const parts = path.split("/"); const filename = parts.pop()!; const name = filename === "agent.soul" ? parts.pop()! : filename.replace(/\.soul$/, ""); return makeConfig(name); }, parseAgentContent: (_content: string, name: string, _sourcePath: string) => makeConfig(name), findSoulfiles: () => [], })); const { Supervisor } = await import("../src/supervisor.ts"); function createSoulFile(name: string): string { const soulDir = join(testDir, "agents", name); mkdirSync(soulDir, { recursive: true }); const soulPath = join(soulDir, "agent.soul"); writeFileSync(soulPath, `MODEL test/model\nSOUL test agent`); return soulPath; } beforeEach(() => { mkdirSync(testDir, { recursive: true }); resetCalls(); }); afterEach(() => { try { rmSync(testDir, { recursive: true, force: true }); } catch {} }); describe("container replacement lifecycle", () => { it("replaceContainer shuts down existing container before spawning new one", async () => { const sup = new Supervisor("test-key"); const soulPath = createSoulFile("agent-a"); await sup.register(soulPath); const firstProcess = fakeProcesses.get("agent-a")!; firstProcess.emit({ type: "ready" }); await Bun.sleep(10); const agentBefore = sup.getAgent("agent-a"); expect(agentBefore?.state).toBe("running"); resetCalls(); await sup.register(soulPath); expect(calls.shutdown).toContain("agent-a"); expect(calls.kill).toContain("agent-a"); expect(calls.spawn).toContain("agent-a"); }); it("old container exit does not trigger restart when replaced", async () => { const sup = new Supervisor("test-key"); const soulPath = createSoulFile("agent-b"); await sup.register(soulPath); const firstProcess = fakeProcesses.get("agent-b")!; firstProcess.emit({ type: "ready" }); await Bun.sleep(10); expect(sup.getAgent("agent-b")?.state).toBe("running"); await sup.register(soulPath); const secondProcess = fakeProcesses.get("agent-b")!; secondProcess.emit({ type: "ready" }); await Bun.sleep(10); const agentAfterReplace = sup.getAgent("agent-b"); expect(agentAfterReplace?.state).toBe("running"); firstProcess.exit(137); await Bun.sleep(50); const agentFinal = sup.getAgent("agent-b"); expect(agentFinal?.state).toBe("running"); expect(agentFinal?.restartCount).toBe(0); }); it("restart timer from old container is cancelled on replacement", async () => { const sup = new Supervisor("test-key"); const soulPath = createSoulFile("agent-c"); await sup.register(soulPath); const firstProcess = fakeProcesses.get("agent-c")!; firstProcess.emit({ type: "ready" }); await Bun.sleep(10); firstProcess.exit(1); await Bun.sleep(50); resetCalls(); await sup.register(soulPath); const secondProcess = fakeProcesses.get("agent-c")!; secondProcess.emit({ type: "ready" }); await Bun.sleep(10); await Bun.sleep(1500); const agent = sup.getAgent("agent-c"); expect(agent?.state).toBe("running"); expect(agent?.restartCount).toBe(0); expect(calls.spawn.filter((n) => n === "agent-c")).toHaveLength(1); }); it("scheduleRestart is no-op when agent was replaced and new one is running", async () => { const sup = new Supervisor("test-key"); const soulPath = createSoulFile("agent-d"); await sup.register(soulPath); const firstProcess = fakeProcesses.get("agent-d")!; firstProcess.emit({ type: "ready" }); await Bun.sleep(10); firstProcess.exit(137); await Bun.sleep(10); await sup.register(soulPath); const secondProcess = fakeProcesses.get("agent-d")!; secondProcess.emit({ type: "ready" }); await Bun.sleep(10); await Bun.sleep(1500); const agent = sup.getAgent("agent-d"); expect(agent?.state).toBe("running"); expect(agent?.restartCount).toBe(0); }); }); describe("restart behavior", () => { it("crashed container triggers restart and recovers", async () => { const sup = new Supervisor("test-key"); const soulPath = createSoulFile("agent-e"); await sup.register(soulPath); const firstProcess = fakeProcesses.get("agent-e")!; firstProcess.emit({ type: "ready" }); await Bun.sleep(10); expect(sup.getAgent("agent-e")?.state).toBe("running"); firstProcess.exit(1); await Bun.sleep(50); expect(sup.getAgent("agent-e")?.state).toBe("error"); await Bun.sleep(1200); const newProcess = fakeProcesses.get("agent-e")!; newProcess.emit({ type: "ready" }); await Bun.sleep(10); const agent = sup.getAgent("agent-e"); expect(agent?.state).toBe("running"); expect(agent?.restartCount).toBe(1); }); it("stop prevents restart after crash", async () => { const sup = new Supervisor("test-key"); const soulPath = createSoulFile("agent-f"); await sup.register(soulPath); const firstProcess = fakeProcesses.get("agent-f")!; firstProcess.emit({ type: "ready" }); await Bun.sleep(10); sup.stop("agent-f"); firstProcess.exit(137); await Bun.sleep(1500); expect(sup.getAgent("agent-f")?.state).toBe("stopped"); }); }); describe("journal and replay", () => { it("streamChat records a journal entry", async () => { const sup = new Supervisor("test-key"); const soulPath = createSoulFile("agent-j1"); await sup.register(soulPath); const proc = fakeProcesses.get("agent-j1")!; proc.emit({ type: "ready" }); await Bun.sleep(10); // Start consuming in background so the generator body executes const events: any[] = []; const done = (async () => { for await (const e of sup.streamChat("agent-j1", "user", "hello", "conv-1")) events.push(e); })(); await Bun.sleep(10); const chatMsg = calls.sent.find((s) => s.msg.type === "chat"); expect(chatMsg).toBeDefined(); const messageId = chatMsg!.msg.messageId; proc.emit({ type: "event", conversationId: "conv-1", event: { type: "text", text: "hi" } }); proc.emit({ type: "chat_done", conversationId: "conv-1", messageId }); await done; const entry = sup.journal.get(messageId); expect(entry).not.toBeNull(); expect(entry!.status).toBe("completed"); }); it("pending messages are replayed on container restart", async () => { const sup = new Supervisor("test-key"); const soulPath = createSoulFile("agent-j2"); await sup.register(soulPath); const proc = fakeProcesses.get("agent-j2")!; proc.emit({ type: "ready" }); await Bun.sleep(10); // Manually record a pending chat in the journal (simulating a crash before completion) const messageId = "replay-test-msg-1"; const payload = JSON.stringify({ type: "chat", messageId, fromId: "user", message: "replayed", conversationId: "conv-2" }); sup.journal.record({ id: messageId, targetId: "agent-j2", type: "chat", payload }); expect(sup.journal.get(messageId)!.status).toBe("pending"); resetCalls(); // Simulate crash + restart proc.exit(1); await Bun.sleep(50); await Bun.sleep(1200); const newProc = fakeProcesses.get("agent-j2")!; newProc.emit({ type: "ready" }); await Bun.sleep(50); // Verify the pending message was replayed via sendToContainer const replayed = calls.sent.find( (s) => s.containerId === "agent-j2" && s.msg.type === "chat" && s.msg.messageId === messageId, ); expect(replayed).toBeDefined(); expect(replayed!.msg.message).toBe("replayed"); }); it("ack message marks journal entry as acknowledged", async () => { const sup = new Supervisor("test-key"); const soulPath = createSoulFile("agent-j3"); await sup.register(soulPath); const proc = fakeProcesses.get("agent-j3")!; proc.emit({ type: "ready" }); await Bun.sleep(10); // Start consuming in background so the generator body executes const done = (async () => { for await (const _e of sup.streamChat("agent-j3", "user", "test ack", "conv-3")) {} })(); await Bun.sleep(10); const chatMsg = calls.sent.find((s) => s.msg.type === "chat"); expect(chatMsg).toBeDefined(); const messageId = chatMsg!.msg.messageId; // Worker sends ack proc.emit({ type: "ack", messageId }); await Bun.sleep(10); expect(sup.journal.get(messageId)!.status).toBe("acknowledged"); // Complete to clean up proc.emit({ type: "chat_done", conversationId: "conv-3", messageId }); await done; }); it("failed messages are not replayed after max attempts", async () => { const sup = new Supervisor("test-key"); const soulPath = createSoulFile("agent-j4"); await sup.register(soulPath); const proc = fakeProcesses.get("agent-j4")!; proc.emit({ type: "ready" }); await Bun.sleep(10); // Record a message that has already been retried 3 times const messageId = "exhausted-msg"; const payload = JSON.stringify({ type: "chat", messageId, fromId: "user", message: "no more", conversationId: "conv-4" }); sup.journal.record({ id: messageId, targetId: "agent-j4", type: "chat", payload }); // Burn through 3 attempts sup.journal.incrementAttempts(messageId); sup.journal.incrementAttempts(messageId); sup.journal.incrementAttempts(messageId); resetCalls(); // Simulate crash + restart proc.exit(1); await Bun.sleep(50); await Bun.sleep(1200); const newProc = fakeProcesses.get("agent-j4")!; newProc.emit({ type: "ready" }); await Bun.sleep(50); // Verify the message was NOT replayed (attempt count exceeds max) const replayed = calls.sent.find( (s) => s.containerId === "agent-j4" && s.msg.type === "chat" && s.msg.messageId === messageId, ); expect(replayed).toBeUndefined(); expect(sup.journal.get(messageId)!.status).toBe("failed"); }); it("route_done resolves via journal instead of pendingRoutes", async () => { const sup = new Supervisor("test-key"); const soulPath1 = createSoulFile("agent-src"); const soulPath2 = createSoulFile("agent-tgt"); await sup.register(soulPath1); await sup.register(soulPath2); const srcProc = fakeProcesses.get("agent-src")!; const tgtProc = fakeProcesses.get("agent-tgt")!; srcProc.emit({ type: "ready" }); tgtProc.emit({ type: "ready" }); await Bun.sleep(10); // Source sends a route_request srcProc.emit({ type: "route_request", requestId: "src-req-1", toId: "agent-tgt", message: "hello target" }); await Bun.sleep(10); // Verify route was sent to target with a journal messageId const routeMsg = calls.sent.find((s) => s.containerId === "agent-tgt" && s.msg.type === "route"); expect(routeMsg).toBeDefined(); const messageId = routeMsg!.msg.messageId; expect(sup.journal.get(messageId)!.status).toBe("pending"); // Target responds with route_done tgtProc.emit({ type: "route_done", requestId: messageId, response: "hi source", messageId }); await Bun.sleep(50); // Journal should be completed expect(sup.journal.get(messageId)!.status).toBe("completed"); expect(sup.journal.get(messageId)!.response).toBe("hi source"); // Source should have received the route_response const responseMsg = calls.sent.find( (s) => s.containerId === "agent-src" && s.msg.type === "route_response" && s.msg.requestId === "src-req-1", ); expect(responseMsg).toBeDefined(); expect(responseMsg!.msg.response).toBe("hi source"); }); }); describe("EventBus closure on container death", () => { it("streamChat terminates with error when container crashes", async () => { const sup = new Supervisor("test-key"); const soulPath = createSoulFile("agent-eb1"); await sup.register(soulPath); const proc = fakeProcesses.get("agent-eb1")!; proc.emit({ type: "ready" }); await Bun.sleep(10); const events: any[] = []; const done = (async () => { for await (const e of sup.streamChat("agent-eb1", "user", "hello", "conv-eb1")) { events.push(e); } })(); await Bun.sleep(10); // Crash the container proc.exit(1); await Bun.sleep(100); // The generator should have terminated await done; // Should have received an error event before the stream closed const errorEvent = events.find((e) => e.type === "error"); expect(errorEvent).toBeDefined(); expect(errorEvent.error).toContain("exit code 1"); }); it("stop closes EventBus and terminates active streams", async () => { const sup = new Supervisor("test-key"); const soulPath = createSoulFile("agent-eb2"); await sup.register(soulPath); const proc = fakeProcesses.get("agent-eb2")!; proc.emit({ type: "ready" }); await Bun.sleep(10); const events: any[] = []; const done = (async () => { for await (const e of sup.streamChat("agent-eb2", "user", "hello", "conv-eb2")) { events.push(e); } })(); await Bun.sleep(10); // Stop the agent sup.stop("agent-eb2"); // The stream should terminate (EventBus closed) await done; }); it("restart closes old EventBus before creating new container", async () => { const sup = new Supervisor("test-key"); const soulPath = createSoulFile("agent-eb3"); await sup.register(soulPath); const proc = fakeProcesses.get("agent-eb3")!; proc.emit({ type: "ready" }); await Bun.sleep(10); // Subscribe to the old EventBus const agent = sup.getAgent("agent-eb3")!; const oldEvents = agent.events; const sub = oldEvents.subscribe(); // Crash to enter error state for restart proc.exit(1); await Bun.sleep(100); expect(sup.getAgent("agent-eb3")?.state).toBe("error"); // Clear restart timer and restart manually await sup.restart("agent-eb3"); // Old EventBus should be closed — subscription should terminate const items: any[] = []; for await (const msg of sub) { items.push(msg); } // for-await terminates immediately because the bus was closed expect(oldEvents.closed).toBe(true); }); });