import { describe, expect, it } from "bun:test"; import { EventBus, Mailbox } from "../src/agent/mailbox.ts"; describe("Mailbox", () => { it("delivers messages in FIFO order", async () => { const mb = new Mailbox(); mb.send(1); mb.send(2); mb.send(3); mb.close(); const received: number[] = []; for await (const v of mb) received.push(v); expect(received).toEqual([1, 2, 3]); }); it("wakes a waiting consumer immediately on send", async () => { const mb = new Mailbox(); const received: string[] = []; const consumer = (async () => { for await (const v of mb) { received.push(v); if (received.length === 2) break; } })(); await Bun.sleep(10); mb.send("a"); await Bun.sleep(10); mb.send("b"); await consumer; expect(received).toEqual(["a", "b"]); }); it("close() terminates a waiting consumer", async () => { const mb = new Mailbox(); const received: number[] = []; const consumer = (async () => { for await (const v of mb) received.push(v); })(); mb.send(1); await Bun.sleep(10); mb.close(); await consumer; expect(received).toEqual([1]); }); it("drains buffered messages after close", async () => { const mb = new Mailbox(); mb.send(1); mb.send(2); mb.close(); mb.send(3); // dropped — already closed const received: number[] = []; for await (const v of mb) received.push(v); expect(received).toEqual([1, 2]); }); it("send after close is a no-op", () => { const mb = new Mailbox(); mb.close(); mb.send(42); expect(mb.closed).toBe(true); }); it("multiple consumers on the same mailbox compete (single-consumer contract)", async () => { const mb = new Mailbox(); const a: number[] = []; const b: number[] = []; const ca = (async () => { for await (const v of mb) a.push(v); })(); const cb = (async () => { for await (const v of mb) b.push(v); })(); await Bun.sleep(10); mb.send(1); mb.send(2); await Bun.sleep(10); mb.close(); await Promise.all([ca, cb]); expect(a.length + b.length).toBe(2); }); }); describe("EventBus", () => { it("fans out to all subscribers", async () => { const bus = new EventBus(); const s1 = bus.subscribe(); const s2 = bus.subscribe(); bus.emit(1); bus.emit(2); bus.close(); const r1: number[] = []; const r2: number[] = []; for await (const v of s1) r1.push(v); for await (const v of s2) r2.push(v); expect(r1).toEqual([1, 2]); expect(r2).toEqual([1, 2]); }); it("unsubscribe stops delivery without affecting others", async () => { const bus = new EventBus(); const s1 = bus.subscribe(); const s2 = bus.subscribe(); bus.emit("a"); s1.unsubscribe(); bus.emit("b"); bus.close(); const r1: string[] = []; const r2: string[] = []; for await (const v of s1) r1.push(v); for await (const v of s2) r2.push(v); expect(r1).toEqual(["a"]); expect(r2).toEqual(["a", "b"]); }); it("late subscriber doesn't see past events", async () => { const bus = new EventBus(); bus.emit(1); const sub = bus.subscribe(); bus.emit(2); bus.close(); const received: number[] = []; for await (const v of sub) received.push(v); expect(received).toEqual([2]); }); it("close terminates all subscriptions", async () => { const bus = new EventBus(); const sub = bus.subscribe(); const consumer = (async () => { const received: number[] = []; for await (const v of sub) received.push(v); return received; })(); bus.emit(42); await Bun.sleep(10); bus.close(); const result = await consumer; expect(result).toEqual([42]); expect(bus.closed).toBe(true); }); it("subscribe after close returns an immediately-closed mailbox", async () => { const bus = new EventBus(); bus.close(); const sub = bus.subscribe(); const received: number[] = []; for await (const v of sub) received.push(v); expect(received).toEqual([]); }); it("emit after close is a no-op", () => { const bus = new EventBus(); const _sub = bus.subscribe(); bus.close(); bus.emit(99); // should not throw or enqueue expect(bus.closed).toBe(true); }); it("supports many concurrent subscribers", async () => { const bus = new EventBus(); const subs = Array.from({ length: 50 }, () => bus.subscribe()); for (let i = 0; i < 100; i++) bus.emit(i); bus.close(); for (const sub of subs) { const received: number[] = []; for await (const v of sub) received.push(v); expect(received.length).toBe(100); expect(received[0]).toBe(0); expect(received[99]).toBe(99); } }); });