import { randomUUID } from "node:crypto"; import { type WebSocket } from "ws"; import { z } from "zod"; import { type WebSocketAudience } from "./ws-audience"; import { type WebSocketClusterBus } from "./ws-cluster-bus"; import { type WebSocketPresenceStore } from "./ws-presence-store"; import { type ManagedWebSocketConnection, WebSocketRegistry, type WebSocketRegistryOptions, type WebSocketRoomId, type WebSocketUserId, } from "./ws-registry"; // transport-level 상수와 queue threshold를 한 파일에 모아 lifecycle/backpressure 정책을 중앙화함 const WS_CONNECTING = 0; const WS_OPEN = 1; const WS_CLOSED = 3; // RFC 6455 close codes used by Sonamu's WebSocket runtime. const WS_CLOSE_CODE_GOING_AWAY = 1001; const WS_CLOSE_CODE_INVALID_FRAME_PAYLOAD_DATA = 1007; const WS_CLOSE_CODE_POLICY_VIOLATION = 1008; const WS_CLOSE_CODE_MESSAGE_TOO_BIG = 1009; const WS_CLOSE_CODE_INTERNAL_ERROR = 1011; const WS_CLOSE_CODE_TRY_AGAIN_LATER = 1013; const MAX_PENDING_MESSAGES = 100; const MAX_PENDING_OUTBOUND_MESSAGES = 1_000; const MAX_SOCKET_BUFFERED_AMOUNT = 1_048_576; const OUTBOUND_BATCH_SIZE = 50; const OUTBOUND_RETRY_DELAY_MS = 5; // envelope을 `{event, data}` 형태로 고정해 server handler와 generated client가 같은 framing contract를 쓰게 함 const WebSocketEnvelopeSchema = z.object({ event: z.string(), data: z.unknown(), }); type MessageHandler = (data: T) => void | Promise; type CloseHandler = () => void | Promise; export type WebSocketEventMap = Record; type InferWebSocketEventMap = z.infer>; export type WebSocketOutEvents = TOut; export type WebSocketInEvents = TIn; export interface WebSocketConnection< TOut extends WebSocketEventMap = WebSocketEventMap, TIn extends WebSocketEventMap = WebSocketEventMap, > extends ManagedWebSocketConnection { transport: "ws"; onClose(callback: CloseHandler): void; onMessage>( event: K, handler: MessageHandler[K]>, ): void; publish>( event: K, data: WebSocketOutEvents[K], ): void; waitForClose(): Promise; join(roomId: WebSocketRoomId): void; leave(roomId: WebSocketRoomId): void; setUserId(userId: WebSocketUserId): void; clearUserId(): void; } export type AnyWebSocketConnection = WebSocketConnection; type ParsedEnvelope = z.infer; type WebSocketConnectionOptions = { namespace?: string; heartbeat?: number; maxPayload?: number; active?: boolean; outEvents: z.ZodObject; inEvents: z.ZodObject; registry: WebSocketRegistry; }; export type WebSocketRuntimeOptions = { nodeId?: string; presenceStore?: WebSocketPresenceStore; clusterBus?: WebSocketClusterBus; }; // registry를 소유하고 connection 생성/shutdown을 담당함. Sonamu 애플리케이션 수명주기와 같이 움직이도록 설계함 export class WebSocketRuntime { readonly registry: WebSocketRegistry; constructor(options: WebSocketRuntimeOptions = {}) { const registryOptions: WebSocketRegistryOptions = { nodeId: options.nodeId, presenceStore: options.presenceStore, clusterBus: options.clusterBus, }; this.registry = new WebSocketRegistry(registryOptions); } registerConnection( socket: WebSocket, options: Omit, "registry">, ): WebSocketConnection, InferWebSocketEventMap> { return new WebSocketConnectionImpl(socket, { ...options, registry: this.registry, }); } activateConnection(connectionId: string): void { this.registry.activate(connectionId); } broadcast(event: string, data: unknown, namespace?: string): void { this.registry.broadcast(event, data, namespace); } publishToRoom(roomId: WebSocketRoomId, event: string, data: unknown, namespace?: string): void { this.registry.publishToRoom(roomId, event, data, namespace); } publishToUser(userId: WebSocketUserId, event: string, data: unknown, namespace?: string): void { this.registry.publishToUser(userId, event, data, namespace); } publishToAudience(audience: WebSocketAudience, event: string, data: unknown): void { this.registry.publishToAudience(audience, event, data); } // 프로세스 종료 시 살아있는 연결이 남지 않도록 registry를 순회해 일괄 종료함 async shutdown( code: number = WS_CLOSE_CODE_GOING_AWAY, reason = "Server shutting down", ): Promise { await this.registry.shutdown(code, reason); } } export function createWebSocketRuntime(options: WebSocketRuntimeOptions = {}): WebSocketRuntime { return new WebSocketRuntime(options); } class WebSocketConnectionImpl< TOutSchema extends z.ZodRawShape, TInSchema extends z.ZodRawShape, > implements WebSocketConnection< InferWebSocketEventMap, InferWebSocketEventMap > { readonly id = randomUUID(); readonly transport = "ws"; readonly namespace: string; private readonly closeCallbacks: CloseHandler[] = []; private readonly messageHandlers = new Map>>(); private readonly pendingMessages: ParsedEnvelope[] = []; private readonly pendingOutboundMessages: string[] = []; private readonly closePromise: Promise; private readonly resolveClosePromise: () => void; private readonly heartbeatMs: number; private readonly maxPayload?: number; private readonly eventSchemasIn: Record; private readonly eventSchemasOut: Record; private closedInternal = false; private closeStarted = false; private awaitingPong = false; private heartbeatTimer: ReturnType | null = null; private messageQueue: Promise = Promise.resolve(); private outboundFlushScheduled = false; constructor( private readonly socket: WebSocket, private readonly options: WebSocketConnectionOptions, ) { this.namespace = options.namespace ?? "default"; this.heartbeatMs = options.heartbeat ?? 30000; this.maxPayload = options.maxPayload; this.eventSchemasIn = options.inEvents.shape as unknown as Record; this.eventSchemasOut = options.outEvents.shape as unknown as Record; let resolveClosePromise!: () => void; this.closePromise = new Promise((resolve) => { resolveClosePromise = resolve; }); this.resolveClosePromise = resolveClosePromise; this.options.registry.register(this, options.active ?? true); this.socket.on("message", this.handleMessage); this.socket.on("close", this.handleClose); this.socket.on("error", this.handleError); this.socket.on("pong", this.handlePong); this.startHeartbeat(); } get closed(): boolean { return this.closedInternal; } onClose(callback: CloseHandler): void { this.closeCallbacks.push(callback); } onMessage>( event: K, handler: MessageHandler[K]>, ): void { const eventKey = String(event); const handlers = this.messageHandlers.get(eventKey) ?? []; handlers.push(handler as MessageHandler); this.messageHandlers.set(eventKey, handlers); this.flushPendingMessages(eventKey); } publish>( event: K, data: InferWebSocketEventMap[K], ): void { this.publishValidated(String(event), data); } publishUntyped(event: string, data: unknown): void { this.publishValidated(event, data); } waitForClose(): Promise { return this.closePromise; } join(roomId: WebSocketRoomId): void { this.options.registry.join(this.id, roomId); } leave(roomId: WebSocketRoomId): void { this.options.registry.leave(this.id, roomId); } setUserId(userId: WebSocketUserId): void { this.options.registry.setUserId(this.id, userId); } clearUserId(): void { this.options.registry.clearUserId(this.id); } // transport 종료 도중 예외가 나도 markClosed가 반드시 실행되도록 try/finally로 감쌈 close(code?: number, reason?: string): void { if (this.closedInternal || this.closeStarted || this.socket.readyState === WS_CLOSED) { return; } this.closeStarted = true; try { this.closeTransport(code, reason); } finally { this.markClosed(); } } // 인바운드 메시지를 순차 처리 큐에 올림. payload size → envelope 파싱 순으로 transport 레벨 검증을 우선 수행함 private readonly handleMessage = (raw: unknown) => { this.enqueueMessageTask(async () => { const text = normalizeMessage(raw); if (this.maxPayload !== undefined && Buffer.byteLength(text) > this.maxPayload) { this.close(WS_CLOSE_CODE_MESSAGE_TOO_BIG, "Message too large"); return; } const parsedEnvelope = safeParseEnvelope(text); if (!parsedEnvelope) { this.close(WS_CLOSE_CODE_INVALID_FRAME_PAYLOAD_DATA, "Invalid message payload"); return; } this.options.registry.touch(this.id); await this.dispatchEnvelope(parsedEnvelope); }); }; private readonly handleClose = () => { this.markClosed(); }; // 소켓이 transport error를 emit하면 즉시 1011 close로 수렴시켜 상태 누락을 막음 private readonly handleError = () => { this.close(WS_CLOSE_CODE_INTERNAL_ERROR, "WebSocket transport error"); }; private readonly handlePong = () => { this.awaitingPong = false; this.options.registry.touch(this.id); }; // event 존재 여부 → schema 검증 → handler 실행 순으로 분기함 // handler가 아직 등록되지 않은 초기 메시지는 버퍼에 보관했다가 onMessage 등록 시 flush함 // handler는 `await`으로 순차 실행해 한 connection 안의 메시지 순서를 보장함 private async dispatchEnvelope(envelope: ParsedEnvelope): Promise { const handlers = this.messageHandlers.get(envelope.event); const schema = this.eventSchemasIn[envelope.event]; if (!schema) { this.close(WS_CLOSE_CODE_POLICY_VIOLATION, "Unknown event"); return; } const parsed = schema.safeParse(envelope.data); if (!parsed.success) { this.close(WS_CLOSE_CODE_INVALID_FRAME_PAYLOAD_DATA, "Invalid event data"); return; } if (!handlers || handlers.length === 0) { if (this.pendingMessages.length >= MAX_PENDING_MESSAGES) { this.pendingMessages.shift(); } this.pendingMessages.push(envelope); return; } for (const handler of handlers) { await handler(parsed.data); } } private flushPendingMessages(event: string): void { const remaining: ParsedEnvelope[] = []; const toFlush: ParsedEnvelope[] = []; for (const message of this.pendingMessages) { if (message.event !== event) { remaining.push(message); continue; } toFlush.push(message); } this.pendingMessages.length = 0; this.pendingMessages.push(...remaining); for (const message of toFlush) { this.enqueueMessageTask(async () => { await this.dispatchEnvelope(message); }); } } private publishValidated(event: string, data: unknown): void { const schema = this.eventSchemasOut[event]; if (!schema) { throw new Error(`Unknown websocket event: ${event}`); } const parsed = schema.safeParse(data); if (!parsed.success) { throw new Error(`Invalid websocket event payload: ${event}`); } if (this.closedInternal || this.socket.readyState !== WS_OPEN) { return; } this.enqueueOutboundMessage( JSON.stringify({ event, data: parsed.data, }), ); } // listener 해제 / heartbeat 중단 / pending queue 비움 / registry unregister / onClose 실행 / waitForClose resolve 을 한 곳에 모아 원자적으로 처리함 // async onClose가 reject해도 unhandled rejection으로 새지 않도록 catch로 격리함 private markClosed(): void { if (this.closedInternal) { return; } this.closedInternal = true; this.closeStarted = false; this.stopHeartbeat(); this.socket.off("message", this.handleMessage); this.socket.off("close", this.handleClose); this.socket.off("error", this.handleError); this.socket.off("pong", this.handlePong); this.awaitingPong = false; this.pendingMessages.length = 0; this.pendingOutboundMessages.length = 0; this.options.registry.unregister(this.id); for (const callback of this.closeCallbacks.splice(0)) { try { const result = callback(); if (isPromiseLike(result)) { void result.catch(() => { // async close callbacks must not escape as unhandled rejections }); } } catch { // close callbacks must not block transport cleanup } } this.resolveClosePromise(); } // pong이 오지 않은 상태에서 다음 tick이 오면 timeout close로 처리해 zombie connection을 정리함 private startHeartbeat(): void { if (this.heartbeatMs <= 0) { return; } this.heartbeatTimer = setInterval(() => { if (this.closedInternal || this.socket.readyState !== WS_OPEN) { return; } if (this.awaitingPong) { this.close(WS_CLOSE_CODE_GOING_AWAY, "Heartbeat timeout"); return; } this.awaitingPong = true; this.socket.ping(); }, this.heartbeatMs); } private stopHeartbeat(): void { if (!this.heartbeatTimer) { return; } clearInterval(this.heartbeatTimer); this.heartbeatTimer = null; } // close가 실패해도 terminate 폴백까지 시도하고, 끝내 실패하면 markClosed에 상태 정리를 위임함 private closeTransport(code?: number, reason?: string): void { try { if (this.socket.readyState === WS_OPEN || this.socket.readyState === WS_CONNECTING) { this.socket.close(code, truncateCloseReason(reason)); return; } this.socket.terminate(); } catch { try { this.socket.terminate(); } catch { // transport is already broken; state cleanup is handled by markClosed() } } } // 인바운드 handler를 promise chain으로 serialize 하고, handler 예외는 connection-local 1011 close로 축소함 private enqueueMessageTask(task: () => Promise): void { this.messageQueue = this.messageQueue .then(async () => { if (this.closedInternal) { return; } await task(); }) .catch(() => { this.close(WS_CLOSE_CODE_INTERNAL_ERROR, "Message handling failed"); }); } // queue가 한계에 도달하면 1013으로 닫아 느린 소비자가 메모리를 끝없이 잡아먹지 못하게 함 private enqueueOutboundMessage(payload: string): void { if (this.pendingOutboundMessages.length >= MAX_PENDING_OUTBOUND_MESSAGES) { this.close(WS_CLOSE_CODE_TRY_AGAIN_LATER, "WebSocket backpressure overflow"); return; } this.pendingOutboundMessages.push(payload); this.scheduleOutboundFlush(); } private scheduleOutboundFlush(delayMs: number = 0): void { if (this.outboundFlushScheduled || this.closedInternal) { return; } this.outboundFlushScheduled = true; const flush = () => { this.outboundFlushScheduled = false; this.flushOutboundMessages(); }; if (delayMs > 0) { setTimeout(flush, delayMs); return; } setImmediate(flush); } // bufferedAmount가 임계치를 넘으면 flush를 미뤄 socket 내부 큐가 터지지 않도록 backpressure를 존중함 // 한 번에 배치 단위로만 send해 동기 루프가 이벤트 루프를 장시간 점유하지 않게 함 private flushOutboundMessages(): void { if (this.closedInternal || this.socket.readyState !== WS_OPEN) { return; } if (this.socket.bufferedAmount > MAX_SOCKET_BUFFERED_AMOUNT) { this.scheduleOutboundFlush(OUTBOUND_RETRY_DELAY_MS); return; } let sent = 0; while ( sent < OUTBOUND_BATCH_SIZE && this.pendingOutboundMessages.length > 0 && this.socket.readyState === WS_OPEN ) { const payload = this.pendingOutboundMessages.shift(); if (!payload) { break; } try { this.socket.send(payload); } catch { this.close(WS_CLOSE_CODE_INTERNAL_ERROR, "Outbound publish failed"); return; } sent += 1; if (this.socket.bufferedAmount > MAX_SOCKET_BUFFERED_AMOUNT) { break; } } if (this.pendingOutboundMessages.length > 0) { this.scheduleOutboundFlush( this.socket.bufferedAmount > MAX_SOCKET_BUFFERED_AMOUNT ? OUTBOUND_RETRY_DELAY_MS : 0, ); } } } function normalizeMessage(raw: unknown): string { if (typeof raw === "string") { return raw; } if (raw instanceof Buffer) { return raw.toString("utf-8"); } if (raw instanceof ArrayBuffer) { return Buffer.from(raw).toString("utf-8"); } if (Array.isArray(raw)) { return Buffer.concat(raw.filter((chunk): chunk is Buffer => chunk instanceof Buffer)).toString( "utf-8", ); } return JSON.stringify(raw); } function safeParseEnvelope(raw: string): ParsedEnvelope | null { try { const parsed = JSON.parse(raw) as unknown; const validated = WebSocketEnvelopeSchema.safeParse(parsed); return validated.success ? validated.data : null; } catch { return null; } } // RFC 6455가 close frame reason을 123 byte로 제한하므로 초과분은 잘라 전송 실패를 방지함 function truncateCloseReason(reason?: string): string | undefined { if (!reason) { return undefined; } return Buffer.byteLength(reason, "utf-8") <= 123 ? reason : Buffer.from(reason).subarray(0, 123).toString("utf-8"); } function isPromiseLike(value: unknown): value is Promise { return typeof value === "object" && value !== null && "then" in value && "catch" in value; }