import { afterEach, describe, expect, it } from "bun:test"; import { rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { MessageJournal } from "../src/journal.ts"; const TEST_BASE = join(tmpdir(), "mozart-journal-test"); let journals: MessageJournal[] = []; function createJournal(): MessageJournal { const id = `test-${crypto.randomUUID().slice(0, 8)}`; const dbPath = join(TEST_BASE, id, "journal.db"); const j = new MessageJournal(dbPath); journals.push(j); return j; } afterEach(() => { for (const j of journals) { try { j.close(); } catch {} } journals = []; try { rmSync(TEST_BASE, { recursive: true, force: true }); } catch {} }); describe("MessageJournal CRUD", () => { it("records and retrieves a pending entry", () => { const j = createJournal(); j.record({ id: "m1", targetId: "agent-a", type: "chat", payload: '{"type":"chat"}' }); const entry = j.get("m1"); expect(entry).not.toBeNull(); expect(entry!.id).toBe("m1"); expect(entry!.targetId).toBe("agent-a"); expect(entry!.type).toBe("chat"); expect(entry!.status).toBe("pending"); expect(entry!.attempts).toBe(0); }); it("acknowledges a pending entry", () => { const j = createJournal(); j.record({ id: "m1", targetId: "agent-a", type: "chat", payload: "{}" }); j.acknowledge("m1"); expect(j.get("m1")!.status).toBe("acknowledged"); }); it("completes an entry with a response", () => { const j = createJournal(); j.record({ id: "m1", targetId: "agent-a", type: "route", payload: "{}", sourceId: "agent-b", sourceRequestId: "r1" }); j.acknowledge("m1"); j.complete("m1", "Hello from A"); const entry = j.get("m1")!; expect(entry.status).toBe("completed"); expect(entry.response).toBe("Hello from A"); }); it("fails an entry", () => { const j = createJournal(); j.record({ id: "m1", targetId: "agent-a", type: "chat", payload: "{}" }); j.fail("m1"); expect(j.get("m1")!.status).toBe("failed"); }); it("fail rejects attached resolver", () => { const j = createJournal(); j.record({ id: "m1", targetId: "agent-a", type: "route", payload: "{}", sourceId: "agent-b", sourceRequestId: "r1" }); let rejected: Error | null = null; j.attachResolver("m1", { resolve: () => {}, reject: (err) => { rejected = err; }, }); j.fail("m1"); expect(rejected).not.toBeNull(); expect(rejected!.message).toContain("max retries"); expect(j.get("m1")!.status).toBe("failed"); }); it("fail without resolver does not throw", () => { const j = createJournal(); j.record({ id: "m1", targetId: "agent-a", type: "chat", payload: "{}" }); expect(() => j.fail("m1")).not.toThrow(); expect(j.get("m1")!.status).toBe("failed"); }); it("returns null for nonexistent entries", () => { const j = createJournal(); expect(j.get("nonexistent")).toBeNull(); }); }); describe("getPending", () => { it("returns pending and acknowledged entries for a target", () => { const j = createJournal(); j.record({ id: "m1", targetId: "agent-a", type: "chat", payload: '{"seq":1}' }); j.record({ id: "m2", targetId: "agent-a", type: "route", payload: '{"seq":2}' }); j.record({ id: "m3", targetId: "agent-b", type: "chat", payload: '{"seq":3}' }); j.acknowledge("m1"); j.complete("m3"); const pending = j.getPending("agent-a"); expect(pending).toHaveLength(2); expect(pending[0]!.id).toBe("m1"); expect(pending[0]!.status).toBe("acknowledged"); expect(pending[1]!.id).toBe("m2"); expect(pending[1]!.status).toBe("pending"); }); it("excludes completed and failed entries", () => { const j = createJournal(); j.record({ id: "m1", targetId: "agent-a", type: "chat", payload: "{}" }); j.record({ id: "m2", targetId: "agent-a", type: "chat", payload: "{}" }); j.complete("m1"); j.fail("m2"); expect(j.getPending("agent-a")).toHaveLength(0); }); it("returns entries in creation order", () => { const j = createJournal(); j.record({ id: "m1", targetId: "agent-a", type: "chat", payload: '{"seq":1}' }); j.record({ id: "m2", targetId: "agent-a", type: "chat", payload: '{"seq":2}' }); j.record({ id: "m3", targetId: "agent-a", type: "chat", payload: '{"seq":3}' }); const pending = j.getPending("agent-a"); expect(pending.map((e) => e.id)).toEqual(["m1", "m2", "m3"]); }); }); describe("incrementAttempts", () => { it("increments and returns the new count", () => { const j = createJournal(); j.record({ id: "m1", targetId: "agent-a", type: "chat", payload: "{}" }); expect(j.incrementAttempts("m1")).toBe(1); expect(j.incrementAttempts("m1")).toBe(2); expect(j.incrementAttempts("m1")).toBe(3); expect(j.get("m1")!.attempts).toBe(3); }); }); describe("route resolver", () => { it("resolveRoute completes entry and calls live resolver", () => { const j = createJournal(); j.record({ id: "m1", targetId: "agent-a", type: "route", payload: "{}", sourceId: "agent-b", sourceRequestId: "r1" }); let resolved = ""; j.attachResolver("m1", { resolve: (r) => { resolved = r; }, reject: () => {}, }); const hadResolver = j.resolveRoute("m1", "response text"); expect(hadResolver).toBe(true); expect(resolved).toBe("response text"); const entry = j.get("m1")!; expect(entry.status).toBe("completed"); expect(entry.response).toBe("response text"); }); it("resolveRoute persists response when no live resolver", () => { const j = createJournal(); j.record({ id: "m1", targetId: "agent-a", type: "route", payload: "{}", sourceId: "agent-b", sourceRequestId: "r1" }); const hadResolver = j.resolveRoute("m1", "deferred response"); expect(hadResolver).toBe(false); const entry = j.get("m1")!; expect(entry.status).toBe("completed"); expect(entry.response).toBe("deferred response"); expect(entry.responseDelivered).toBe(false); }); it("detachResolver removes the live resolver without affecting persistence", () => { const j = createJournal(); j.record({ id: "m1", targetId: "agent-a", type: "route", payload: "{}", sourceId: "agent-b", sourceRequestId: "r1" }); let resolved = false; j.attachResolver("m1", { resolve: () => { resolved = true; }, reject: () => {}, }); j.detachResolver("m1"); const hadResolver = j.resolveRoute("m1", "response"); expect(hadResolver).toBe(false); expect(resolved).toBe(false); }); }); describe("undelivered responses", () => { it("returns completed route entries with undelivered responses", () => { const j = createJournal(); j.record({ id: "m1", targetId: "agent-a", type: "route", payload: "{}", sourceId: "agent-b", sourceRequestId: "r1" }); j.record({ id: "m2", targetId: "agent-a", type: "route", payload: "{}", sourceId: "agent-b", sourceRequestId: "r2" }); j.record({ id: "m3", targetId: "agent-a", type: "chat", payload: "{}" }); j.resolveRoute("m1", "response-1"); j.resolveRoute("m2", "response-2"); j.complete("m3"); const undelivered = j.getUndeliveredResponses("agent-b"); expect(undelivered).toHaveLength(2); expect(undelivered[0]!.response).toBe("response-1"); expect(undelivered[1]!.response).toBe("response-2"); }); it("markResponseDelivered excludes entry from future queries", () => { const j = createJournal(); j.record({ id: "m1", targetId: "agent-a", type: "route", payload: "{}", sourceId: "agent-b", sourceRequestId: "r1" }); j.resolveRoute("m1", "response"); j.markResponseDelivered("m1"); expect(j.getUndeliveredResponses("agent-b")).toHaveLength(0); expect(j.get("m1")!.responseDelivered).toBe(true); }); }); describe("cleanup", () => { it("prunes old completed entries", () => { const j = createJournal(); j.record({ id: "m1", targetId: "agent-a", type: "chat", payload: "{}" }); j.complete("m1"); // With 0ms age threshold, everything completed is "old" const removed = j.cleanup(0); expect(removed).toBe(1); expect(j.get("m1")).toBeNull(); }); it("does not prune undelivered route responses", () => { const j = createJournal(); j.record({ id: "m1", targetId: "agent-a", type: "route", payload: "{}", sourceId: "agent-b", sourceRequestId: "r1" }); j.resolveRoute("m1", "response"); const removed = j.cleanup(0); expect(removed).toBe(0); expect(j.get("m1")).not.toBeNull(); }); it("prunes delivered route responses", () => { const j = createJournal(); j.record({ id: "m1", targetId: "agent-a", type: "route", payload: "{}", sourceId: "agent-b", sourceRequestId: "r1" }); j.resolveRoute("m1", "response"); j.markResponseDelivered("m1"); const removed = j.cleanup(0); expect(removed).toBe(1); expect(j.get("m1")).toBeNull(); }); it("does not prune pending entries", () => { const j = createJournal(); j.record({ id: "m1", targetId: "agent-a", type: "chat", payload: "{}" }); const removed = j.cleanup(0); expect(removed).toBe(0); expect(j.get("m1")).not.toBeNull(); }); }); describe("close", () => { it("rejects all pending resolvers on close", () => { const j = createJournal(); j.record({ id: "m1", targetId: "agent-a", type: "route", payload: "{}", sourceId: "agent-b", sourceRequestId: "r1" }); let rejected = false; j.attachResolver("m1", { resolve: () => {}, reject: () => { rejected = true; }, }); j.close(); expect(rejected).toBe(true); // Remove from cleanup list since we already closed it journals = journals.filter((jj) => jj !== j); }); });