/** * Async FIFO queue — multi-producer, single-consumer. * * Producers call `send()` (synchronous, non-blocking). * The consumer reads via `for await (const msg of mailbox)`. * `close()` terminates all pending reads. */ export class Mailbox implements AsyncIterable { private queue: T[] = []; private waiting: Array<(value: T | null) => void> = []; private _closed = false; get closed(): boolean { return this._closed; } send(value: T): void { if (this._closed) return; const waiter = this.waiting.shift(); if (waiter) { waiter(value); } else { this.queue.push(value); } } async *[Symbol.asyncIterator](): AsyncGenerator { while (true) { if (this.queue.length > 0) { yield this.queue.shift()!; continue; } if (this._closed) return; const value = await new Promise((resolve) => { if (this._closed) { resolve(null); return; } this.waiting.push(resolve); }); if (value === null) return; yield value; } } close(): void { this._closed = true; for (const w of this.waiting) w(null); this.waiting.length = 0; } } export type Subscription = AsyncIterable & { unsubscribe(): void }; /** * Fan-out event bus — multi-producer, multi-consumer. * * Each `subscribe()` call returns an independent `Mailbox` * that receives a copy of every `emit()`-ed value. * Closing the bus closes all subscriptions. */ export class EventBus { private subscribers = new Set>(); private _closed = false; get closed(): boolean { return this._closed; } emit(value: T): void { if (this._closed) return; for (const sub of this.subscribers) { sub.send(value); } } subscribe(): Subscription { if (this._closed) { const empty = new Mailbox(); empty.close(); return Object.assign(empty, { unsubscribe() {} }); } const mailbox = new Mailbox(); this.subscribers.add(mailbox); const unsubscribe = () => { this.subscribers.delete(mailbox); mailbox.close(); }; return Object.assign(mailbox, { unsubscribe }); } close(): void { this._closed = true; for (const sub of this.subscribers) sub.close(); this.subscribers.clear(); } }