import * as plugins from '../../plugins.js'; import { logger } from '../../core/utils/logger.js'; import type { IRouteContext } from '../../core/models/route-context.js'; import type { RoutePreprocessor } from './route-preprocessor.js'; import type { TDatagramHandler, IDatagramInfo } from './models/route-types.js'; /** * Framed message for datagram relay IPC. * Each message is length-prefixed: [4 bytes big-endian u32 length][JSON payload] */ interface IDatagramRelayMessage { type: 'datagram' | 'reply'; routeKey: string; sourceIp: string; sourcePort: number; destPort: number; payloadBase64: string; } /** * Server that receives UDP datagrams from Rust via Unix stream socket * and dispatches them to TypeScript datagramHandler callbacks. * * Protocol: length-prefixed JSON frames over a persistent Unix stream socket. * - Rust→TS: { type: "datagram", routeKey, sourceIp, sourcePort, destPort, payloadBase64 } * - TS→Rust: { type: "reply", sourceIp, sourcePort, destPort, payloadBase64 } */ export class DatagramHandlerServer { private static readonly MAX_BUFFER_SIZE = 50 * 1024 * 1024; // 50 MB private server: plugins.net.Server | null = null; private connection: plugins.net.Socket | null = null; private socketPath: string; private preprocessor: RoutePreprocessor; private readBuffer: Buffer = Buffer.alloc(0); constructor(socketPath: string, preprocessor: RoutePreprocessor) { this.socketPath = socketPath; this.preprocessor = preprocessor; } /** * Start listening on the Unix socket. */ public async start(): Promise { // Clean up stale socket file try { await plugins.fs.promises.unlink(this.socketPath); } catch { // Ignore if doesn't exist } return new Promise((resolve, reject) => { this.server = plugins.net.createServer((socket) => { this.handleConnection(socket); }); this.server.on('error', (err) => { logger.log('error', `DatagramHandlerServer error: ${err.message}`); reject(err); }); this.server.listen(this.socketPath, () => { logger.log('info', `DatagramHandlerServer listening on ${this.socketPath}`); resolve(); }); }); } /** * Stop the server and clean up. */ public async stop(): Promise { if (this.connection) { this.connection.destroy(); this.connection = null; } if (this.server) { await new Promise((resolve) => { this.server!.close(() => resolve()); }); this.server = null; } try { await plugins.fs.promises.unlink(this.socketPath); } catch { // Ignore } } /** * Handle a new connection from Rust. * Only one connection at a time (Rust maintains a persistent connection). */ private handleConnection(socket: plugins.net.Socket): void { if (this.connection) { logger.log('warn', 'DatagramHandlerServer: replacing existing connection'); this.connection.destroy(); } this.connection = socket; this.readBuffer = Buffer.alloc(0); socket.on('data', (chunk: Buffer) => { this.readBuffer = Buffer.concat([this.readBuffer, chunk]); if (this.readBuffer.length > DatagramHandlerServer.MAX_BUFFER_SIZE) { logger.log('error', `DatagramHandlerServer: buffer exceeded ${DatagramHandlerServer.MAX_BUFFER_SIZE} bytes, resetting`); this.readBuffer = Buffer.alloc(0); return; } this.processFrames(); }); socket.on('error', (err) => { logger.log('error', `DatagramHandlerServer connection error: ${err.message}`); }); socket.on('close', () => { if (this.connection === socket) { this.connection = null; } }); logger.log('info', 'DatagramHandlerServer: Rust relay connected'); } /** * Process length-prefixed frames from the read buffer. */ private processFrames(): void { while (this.readBuffer.length >= 4) { const frameLen = this.readBuffer.readUInt32BE(0); // Safety: reject absurdly large frames if (frameLen > 10 * 1024 * 1024) { logger.log('error', `DatagramHandlerServer: frame too large (${frameLen} bytes), resetting`); this.readBuffer = Buffer.alloc(0); return; } if (this.readBuffer.length < 4 + frameLen) { // Incomplete frame, wait for more data return; } const frameData = this.readBuffer.subarray(4, 4 + frameLen); this.readBuffer = this.readBuffer.subarray(4 + frameLen); try { const msg: IDatagramRelayMessage = JSON.parse(frameData.toString('utf8')); this.handleMessage(msg); } catch (err) { logger.log('error', `DatagramHandlerServer: failed to parse frame: ${err}`); } } } /** * Handle a received datagram message from Rust. */ private handleMessage(msg: IDatagramRelayMessage): void { if (msg.type !== 'datagram') { return; } const originalRoute = this.preprocessor.getOriginalRoute(msg.routeKey); if (!originalRoute) { logger.log('warn', `DatagramHandlerServer: no handler for route '${msg.routeKey}'`); return; } const handler: TDatagramHandler | undefined = originalRoute.action.datagramHandler; if (!handler) { logger.log('warn', `DatagramHandlerServer: route '${msg.routeKey}' has no datagramHandler`); return; } const datagram = Buffer.from(msg.payloadBase64, 'base64'); const context: IRouteContext = { port: msg.destPort, domain: undefined, clientIp: msg.sourceIp, serverIp: '0.0.0.0', path: undefined, isTls: false, tlsVersion: undefined, routeName: originalRoute.name, routeId: originalRoute.id, timestamp: Date.now(), connectionId: `udp-${msg.sourceIp}:${msg.sourcePort}-${Date.now()}`, }; const info: IDatagramInfo = { sourceIp: msg.sourceIp, sourcePort: msg.sourcePort, destPort: msg.destPort, context, }; const reply = (data: Buffer): void => { this.sendReply({ type: 'reply', routeKey: msg.routeKey, sourceIp: msg.sourceIp, sourcePort: msg.sourcePort, destPort: msg.destPort, payloadBase64: data.toString('base64'), }); }; try { const result = handler(datagram, info, reply); if (result && typeof (result as any).catch === 'function') { (result as Promise).catch((err) => { logger.log('error', `DatagramHandler error for route '${msg.routeKey}': ${err}`); }); } } catch (err) { logger.log('error', `DatagramHandler threw for route '${msg.routeKey}': ${err}`); } } /** * Send a reply frame back to Rust. */ private sendReply(msg: IDatagramRelayMessage): void { if (!this.connection || this.connection.destroyed) { logger.log('warn', 'DatagramHandlerServer: cannot send reply, no connection'); return; } const json = JSON.stringify(msg); const payload = Buffer.from(json, 'utf8'); const header = Buffer.alloc(4); header.writeUInt32BE(payload.length, 0); this.connection.write(Buffer.concat([header, payload])); } /** * Get the socket path for passing to Rust via IPC. */ public getSocketPath(): string { return this.socketPath; } }