import type { Resolve, Subscription } from "effection"; import { action, resource } from "effection"; /** * Create a replayable signal which exposes `send`/`close` helpers while * preserving the original subscription semantics. Useful for forwarding * child process stdio lines for wellness checks and logging. * * @returns a Signal-like value with `send` and `close` methods and a `next` operation */ export function createReplaySignal() { const subscribers = new Set>(); // single shared durable queue storage const queue = createDurableQueue(); // each subscriber gets its own iterator over the shared items by // calling `queue.subscribe()` which returns a Stream const subscribe = resource>(function* (provide) { const queued = queue.stream(); subscribers.add(queued); try { yield* provide({ next: queued.next }); } finally { subscribers.delete(queued); } }); function send(value: T) { queue.add(value); } function close(value?: TClose) { queue.close(value); } return { ...subscribe, send, close }; } function createDurableQueue() { type Item = IteratorResult; const items: Item[] = []; // a set of active subscribers; each subscription has its own iterator // and its own waiting notifier set const subscribers = new Set<{ notify: Set>; }>(); function enqueue(item: Item) { items.push(item); for (const sub of subscribers) { if (sub.notify.size > 0) { const [resolve] = sub.notify; // use resolve from yield* action to notify waiting subscribers resolve(item); } } } function stream(): Subscription { const iter = items[Symbol.iterator](); const notify = new Set>(); const sub = { notify }; subscribers.add(sub); return { *next() { const item = iter.next().value; // item will be `undefined` when we've iterated to the end of the // current `items` array; in that case we wait for new items to be // enqueued and the resolve will be called with the new `Item`. if (item !== undefined) { return item; } return yield* action((resolve) => { notify.add(resolve); return () => notify.delete(resolve); }); }, }; } return { add: (value: T) => enqueue({ done: false, value }), close: (value?: TClose) => enqueue({ done: true, value: value as TClose }), stream, }; }