/** * TCP socket wrapper using Bun.TCP for client-server communication */ import { logger } from '../utils/logger.js'; import type { Message } from './types.js'; const HEARTBEAT_INTERVAL = 10000; // 10 seconds const HEARTBEAT_TIMEOUT = 15000; // 15 seconds (1.5x heartbeat interval) const SERVER_HEARTBEAT_TIMEOUT = 30000; // 30 seconds - server disconnects clients that don't send heartbeat const RECONNECT_DELAY_MIN = 1000; // 1 second const RECONNECT_DELAY_MAX = 20000; // 20 seconds const TCP_KEEPALIVE_INITIAL_DELAY = 5; // 5 seconds before first keepalive probe /** * Message handler callback */ export type MessageHandler = (message: Message) => void | Promise; /** * Connection state handler */ export type ConnectionStateHandler = (connected: boolean) => void | Promise; /** * TCP Server for handling client connections */ export class TCPServer { private server: any; private clients: Map = new Map(); private clientBuffers: Map = new Map(); private clientLastHeartbeat: Map = new Map(); private heartbeatCheckInterval: Timer | null = null; private messageHandlers: MessageHandler[] = []; private clientConnectHandlers: ((clientId: string) => void)[] = []; private clientDisconnectHandlers: ((clientId: string) => void)[] = []; constructor( private host: string, private port: number ) { } /** * Start the TCP server */ async start(): Promise { this.server = Bun.listen({ hostname: this.host, port: this.port, socket: { data: (socket, data) => { this.handleData(socket, data); }, open: (socket) => { const clientId = `${socket.remoteAddress}`; this.clients.set(clientId, socket); this.clientBuffers.set(clientId, Buffer.allocUnsafe(0)); this.clientLastHeartbeat.set(clientId, Date.now()); logger.info(`Client connected: ${clientId}`); this.clientConnectHandlers.forEach((h) => h(clientId)); }, close: (socket) => { const clientId = `${socket.remoteAddress}`; this.clients.delete(clientId); this.clientBuffers.delete(clientId); this.clientLastHeartbeat.delete(clientId); logger.info(`Client disconnected: ${clientId}`); this.clientDisconnectHandlers.forEach((h) => h(clientId)); }, error: (socket, error) => { logger.error(`Socket error: ${error.message}`); }, }, }); logger.info(`TCP Server listening on ${this.host}:${this.port}`); this.startHeartbeatCheck(); } /** * Stop the TCP server */ async stop(): Promise { if (this.server) { this.stopHeartbeatCheck(); this.server.stop(); this.clients.clear(); this.clientBuffers.clear(); this.clientLastHeartbeat.clear(); logger.info('TCP Server stopped'); } } /** * Send message to specific client */ sendToClient(clientId: string, message: Message): void { const socket = this.clients.get(clientId); if (socket) { this.sendMessage(socket, message); } else { logger.warn(`Client not found: ${clientId}`); } } /** * Broadcast message to all clients */ broadcast(message: Message): void { for (const socket of this.clients.values()) { this.sendMessage(socket, message); } } /** * Register message handler */ onMessage(handler: MessageHandler): void { this.messageHandlers.push(handler); } /** * Register client connect handler */ onClientConnect(handler: (clientId: string) => void): void { this.clientConnectHandlers.push(handler); } /** * Register client disconnect handler */ onClientDisconnect(handler: (clientId: string) => void): void { this.clientDisconnectHandlers.push(handler); } private handleData(socket: any, data: Buffer): void { const clientId = `${socket.remoteAddress}`; let buffer = this.clientBuffers.get(clientId) || Buffer.allocUnsafe(0); buffer = Buffer.concat([buffer, data]); this.clientBuffers.set(clientId, buffer); while (buffer.length >= 4) { const messageLength = buffer.readUInt32BE(0); if (buffer.length < 4 + messageLength) { // Not enough data yet break; } const messageData = buffer.subarray(4, 4 + messageLength); buffer = buffer.subarray(4 + messageLength); this.clientBuffers.set(clientId, buffer); try { const message = this.deserializeMessage(messageData); // Auto-respond to heartbeat messages and track timestamp if (message.type === 'heartbeat') { this.clientLastHeartbeat.set(clientId, Date.now()); this.sendMessage(socket, { type: 'heartbeat' as any, timestamp: Date.now() }); } this.messageHandlers.forEach((h) => h(message)); } catch (error) { logger.error(`Failed to parse message: ${error}`); } } } private sendMessage(socket: any, message: Message): void { const data = this.serializeMessage(message); socket.write(data); } private serializeMessage(message: Message): Buffer { const json = JSON.stringify(message); const length = Buffer.byteLength(json); const buffer = Buffer.allocUnsafe(4 + length); buffer.writeUInt32BE(length, 0); buffer.write(json, 4); return buffer; } private deserializeMessage(data: Buffer): Message { const json = data.toString('utf-8'); return JSON.parse(json) as Message; } /** * Start periodic heartbeat checking to detect stale client connections */ private startHeartbeatCheck(): void { this.heartbeatCheckInterval = setInterval(() => { const now = Date.now(); for (const [clientId, lastHeartbeat] of this.clientLastHeartbeat.entries()) { const timeSinceHeartbeat = now - lastHeartbeat; if (timeSinceHeartbeat > SERVER_HEARTBEAT_TIMEOUT) { logger.warn( `Client ${clientId} heartbeat timeout (${timeSinceHeartbeat}ms) - force disconnecting stale connection` ); const socket = this.clients.get(clientId); if (socket) { socket.end(); // Force close the connection } } } }, HEARTBEAT_INTERVAL); // Check every 10 seconds } /** * Stop heartbeat checking */ private stopHeartbeatCheck(): void { if (this.heartbeatCheckInterval) { clearInterval(this.heartbeatCheckInterval); this.heartbeatCheckInterval = null; } } } /** * TCP Client for connecting to server */ export class TCPClient { private socket: any = null; private connected = false; private reconnectTimer: Timer | null = null; private heartbeatTimer: Timer | null = null; private heartbeatTimeoutTimer: Timer | null = null; private lastHeartbeatReceived: number = 0; private reconnectDelay = RECONNECT_DELAY_MIN; private messageHandlers: MessageHandler[] = []; private connectionStateHandlers: ConnectionStateHandler[] = []; private buffer: Buffer = Buffer.allocUnsafe(0); constructor( private host: string, private port: number, private autoReconnect = true ) { } /** * Connect to server */ async connect(): Promise { try { const socket = await Bun.connect({ hostname: this.host, port: this.port, socket: { data: (socket, data) => { this.handleData(data); }, open: (socket) => { // CRITICAL: Assign socket immediately to avoid race condition // The open callback fires before Bun.connect() returns, so we must // assign this.socket here, not after the await this.socket = socket; this.connected = true; this.reconnectDelay = RECONNECT_DELAY_MIN; this.lastHeartbeatReceived = Date.now(); // Enable TCP keepalive for faster dead connection detection try { socket.setKeepAlive?.(true, TCP_KEEPALIVE_INITIAL_DELAY); } catch (e) { logger.debug('Failed to set TCP keepalive (may not be supported)'); } logger.info(`Connected to server ${this.host}:${this.port}`); this.startHeartbeat(); this.startHeartbeatTimeout(); this.connectionStateHandlers.forEach((h) => h(true)); }, close: (socket) => { this.handleDisconnect(); }, error: (socket, error) => { logger.error(`Connection error: ${error.message}`); this.handleDisconnect(); }, timeout: (socket) => { logger.warn('Socket timeout - connection appears dead'); this.handleDisconnect(); }, }, }); // Note: this.socket is already assigned in the open callback above } catch (error) { logger.error(`Failed to connect: ${error}`); // Schedule reconnect if auto-reconnect is enabled // Don't call handleDisconnect() because we were never connected if (this.autoReconnect) { this.scheduleReconnect(); } } } /** * Disconnect from server */ disconnect(): void { this.autoReconnect = false; if (this.socket) { this.socket.end(); this.socket = null; } this.stopHeartbeat(); this.stopReconnect(); } /** * Send message to server */ send(message: Message): void { if (this.connected && this.socket) { const data = this.serializeMessage(message); this.socket.write(data); } else { logger.warn('Cannot send message: not connected'); } } /** * Check if connected */ isConnected(): boolean { return this.connected; } /** * Register message handler */ onMessage(handler: MessageHandler): void { this.messageHandlers.push(handler); } /** * Register connection state handler */ onConnectionState(handler: ConnectionStateHandler): void { this.connectionStateHandlers.push(handler); } private handleData(data: Buffer): void { this.buffer = Buffer.concat([this.buffer, data]); while (this.buffer.length >= 4) { const messageLength = this.buffer.readUInt32BE(0); if (this.buffer.length < 4 + messageLength) { // Not enough data yet break; } const messageData = this.buffer.subarray(4, 4 + messageLength); this.buffer = this.buffer.subarray(4 + messageLength); try { const message = this.deserializeMessage(messageData); // Track heartbeat messages to detect server liveness if (message.type === 'heartbeat') { this.lastHeartbeatReceived = Date.now(); } this.messageHandlers.forEach((h) => h(message)); } catch (error) { logger.error(`Failed to parse message: ${error}`); } } } private handleDisconnect(): void { if (!this.connected) return; this.connected = false; this.socket = null; this.stopHeartbeat(); this.stopHeartbeatTimeout(); logger.warn('Disconnected from server'); this.connectionStateHandlers.forEach((h) => h(false)); if (this.autoReconnect) { this.scheduleReconnect(); } } private scheduleReconnect(): void { if (this.reconnectTimer) return; logger.info(`Reconnecting in ${this.reconnectDelay}ms...`); this.reconnectTimer = setTimeout(() => { this.reconnectTimer = null; this.connect(); this.reconnectDelay = Math.min(this.reconnectDelay * 2, RECONNECT_DELAY_MAX); }, this.reconnectDelay); } private stopReconnect(): void { if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } } private startHeartbeat(): void { this.heartbeatTimer = setInterval(() => { this.send({ type: 'heartbeat' as any, timestamp: Date.now() }); }, HEARTBEAT_INTERVAL); } private stopHeartbeat(): void { if (this.heartbeatTimer) { clearInterval(this.heartbeatTimer); this.heartbeatTimer = null; } } private startHeartbeatTimeout(): void { this.heartbeatTimeoutTimer = setInterval(() => { const timeSinceLastHeartbeat = Date.now() - this.lastHeartbeatReceived; if (timeSinceLastHeartbeat > HEARTBEAT_TIMEOUT) { logger.warn(`Heartbeat timeout: no response for ${timeSinceLastHeartbeat}ms`); this.handleDisconnect(); } }, HEARTBEAT_TIMEOUT); } private stopHeartbeatTimeout(): void { if (this.heartbeatTimeoutTimer) { clearInterval(this.heartbeatTimeoutTimer); this.heartbeatTimeoutTimer = null; } } private serializeMessage(message: Message): Buffer { const json = JSON.stringify(message); const length = Buffer.byteLength(json); const buffer = Buffer.allocUnsafe(4 + length); buffer.writeUInt32BE(length, 0); buffer.write(json, 4); return buffer; } private deserializeMessage(data: Buffer): Message { const json = data.toString('utf-8'); return JSON.parse(json) as Message; } }