import { type FastifyReply, type FastifyRequest } from "fastify"; import { type z } from "zod"; // NOTE(Haze, 251106): context provider에서 인자를 채워주면 createSSE(events)만으로 사용 가능 export function createSSEFactory( socket: FastifyRequest["socket"], reply: FastifyReply, _events: T, ): SSEConnection { return new SSEConnectionImpl(socket, reply); } export function createMockSSEFactory(_events: T): SSEConnection { return { get closed() { return false; }, onClose: (_callback) => {}, publish: (_event, _data) => {}, end: () => Promise.resolve(), }; } export interface SSEConnection { get closed(): boolean; onClose(callback: () => void): void; publish>(event: K, data: z.infer[K]): void; end(): Promise; } class SSEConnectionImpl implements SSEConnection { private _closed = false; private _closeCallbacks: Array<() => void> = []; private readonly markClosed = () => { this._closed = true; this.fireCloseCallbacks(); }; get closed(): boolean { return this._closed; } onClose(callback: () => void): void { this._closeCallbacks.push(callback); } // 콜백을 한 번만 실행하고 배열을 비워 중복 호출을 방지 private fireCloseCallbacks(): void { const callbacks = this._closeCallbacks; this._closeCallbacks = []; for (const cb of callbacks) { cb(); } } constructor( private readonly socket: FastifyRequest["socket"], private readonly reply: FastifyReply, ) { this.socket.on("close", this.markClosed); this.socket.on("error", this.markClosed); } publish>(event: K, data: z.infer[K]): void { if (this._closed) { return; } this.reply.sse({ event: event as string, data: JSON.stringify(data), }); } async end(): Promise { if (this._closed) { return; } this._closed = true; this.socket.off("close", this.markClosed); this.socket.off("error", this.markClosed); this.fireCloseCallbacks(); this.reply.sse({ event: "end", data: "END", }); await new Promise((resolve) => setTimeout(resolve, 200)); this.reply.raw.end(); } }