/** * Unix Socket Transport * * Implements the Transport interface using Unix domain sockets. * Protocol: newline-delimited JSON (NDJSON) — same message types as WS. * JSON.stringify escapes internal newlines, so raw \n is an unambiguous delimiter. */ import net from "node:net"; import fs from "node:fs"; import { Logger } from "@agentick/kernel"; import type { ClientMessage, GatewayMessage, ConnectMessage } from "./transport-protocol.js"; import type { ClientState } from "./types.js"; import { BaseTransport, type TransportClient, type TransportConfig } from "./transport.js"; import { LineBuffer } from "./ndjson.js"; const log = Logger.for("UnixSocket"); // ============================================================================ // Configuration // ============================================================================ export interface UnixSocketTransportConfig extends TransportConfig { /** Path to the Unix domain socket file */ socketPath: string; } // ============================================================================ // Unix Socket Client (server-side) // ============================================================================ class UnixSocketClientImpl implements TransportClient { readonly id: string; readonly state: ClientState; private socket: net.Socket; constructor(id: string, socket: net.Socket) { this.id = id; this.socket = socket; this.state = { id, connectedAt: new Date(), authenticated: false, subscriptions: new Set(), }; } send(message: GatewayMessage): void { if (this.socket.destroyed || !this.socket.writable) return; try { const json = JSON.stringify(message) + "\n"; const ok = this.socket.write(json); if (!ok) { const eventType = message.type === "event" ? (message as unknown as Record).event : message.type; log.debug( { eventType, bytes: json.length, writableLength: this.socket.writableLength }, "backpressure", ); } } catch { // EPIPE / ECONNRESET — client disconnected between check and write this.socket.destroy(); } } close(_code?: number, _reason?: string): void { this.socket.destroy(); } get isConnected(): boolean { return !this.socket.destroyed && this.socket.writable; } isPressured(): boolean { return this.socket.writableLength > 64 * 1024; } onDrain(callback: () => void): void { this.socket.on("drain", callback); } /** @internal - Update client ID (for custom client IDs) */ _setId(newId: string): void { (this as { id: string }).id = newId; (this.state as { id: string }).id = newId; } } // ============================================================================ // Unix Socket Transport // ============================================================================ export class UnixSocketTransport extends BaseTransport { readonly type = "unix" as const; private server: net.Server | null = null; protected override config: UnixSocketTransportConfig; constructor(config: UnixSocketTransportConfig) { super(config); this.config = config; } override start(): Promise { return new Promise((resolve, reject) => { // Clean up stale socket file if it exists try { fs.unlinkSync(this.config.socketPath); } catch { // File doesn't exist — fine } try { this.server = net.createServer((socket) => this.handleConnection(socket)); this.server.on("error", (error) => { this.handlers.error?.(error); reject(error); }); this.server.listen(this.config.socketPath, () => { resolve(); }); } catch (error) { reject(error); } }); } override async stop(): Promise { // Close all client connections for (const client of this.clients.values()) { client.close(); } this.clients.clear(); // Close the server if (this.server) { await new Promise((resolve) => { this.server!.close(() => { this.server = null; resolve(); }); }); } // Remove socket file try { fs.unlinkSync(this.config.socketPath); } catch { // Already removed or never created } } private handleConnection(socket: net.Socket): void { const clientId = this.generateClientId(); const client = new UnixSocketClientImpl(clientId, socket); this.clients.set(clientId, client); const lineBuffer = new LineBuffer(); // Message processing queue — serializes async handling per client. // Without this, messages in the same TCP chunk race: auth (async) // for the connect message hasn't finished when the first req arrives. let messageQueue = Promise.resolve(); socket.on("data", (data) => { const lines = lineBuffer.feed(data.toString()); for (const line of lines) { messageQueue = messageQueue.then(async () => { try { const message = JSON.parse(line) as ClientMessage; await this.handleMessage(client, message); } catch { client.send({ type: "error", code: "INVALID_MESSAGE", message: "Failed to parse message", }); } }); } }); socket.on("close", () => { this.clients.delete(client.id); this.handlers.disconnect?.(client.id); }); socket.on("error", () => { // Client socket errors (EPIPE, ECONNRESET) are routine disconnects, // not transport-level errors. Clean up — the "close" event follows. if (!socket.destroyed) socket.destroy(); }); // Notify handler of new connection (before auth) this.handlers.connection?.(client); } private async handleMessage(client: UnixSocketClientImpl, message: ClientMessage): Promise { // Handle connect message (authentication) if (message.type === "connect") { await this.handleConnect(client, message); return; } // Handle ping if (message.type === "ping") { client.send({ type: "pong", timestamp: message.timestamp }); return; } // All other messages require authentication if (!client.state.authenticated) { client.send({ type: "error", code: "UNAUTHORIZED", message: "Authentication required. Send connect message first.", }); return; } // Forward to message handler this.handlers.message?.(client.id, message); } private async handleConnect( client: UnixSocketClientImpl, message: ConnectMessage, ): Promise { const authResult = await this.validateAuth(message.token); if (!authResult.valid) { client.send({ type: "error", code: "AUTH_FAILED", message: "Authentication failed", }); client.close(); return; } // Update client state client.state.authenticated = true; client.state.user = authResult.user; client.state.metadata = { ...client.state.metadata, ...authResult.metadata, ...message.metadata, }; // Client ID from message takes precedence if (message.clientId) { this.clients.delete(client.id); client._setId(message.clientId); this.clients.set(message.clientId, client); } // Notify gateway that auth succeeded — gateway sends ConnectedMessage this.config.onAuthenticated?.(client); } }