/** * Gateway * * Standalone daemon for multi-client, multi-app access. * Transport-agnostic: supports both WebSocket and HTTP/SSE. * * Can run standalone or embedded in an external framework. */ import { EventEmitter } from "events"; import type { IncomingMessage as NodeRequest, ServerResponse as NodeResponse } from "http"; import type { Message } from "@agentick/shared"; import { GuardError, isGuardError, isNotFoundError, isValidationError, isStateError, isAgentickError, extractText, NotFoundError, } from "@agentick/shared"; import { devToolsEmitter, type DTClientConnectedEvent, type DTClientDisconnectedEvent, type DTGatewayRequestEvent, type DTGatewayResponseEvent, } from "@agentick/shared"; import { Context, createProcedure, createGuard, Logger, type KernelContext, type Procedure, type Middleware, type UserContext, type ChannelServiceInterface, type ChannelEvent, } from "@agentick/kernel"; import type { Session } from "@agentick/core"; const log = Logger.for("Gateway"); /** * Event types that are internal to the framework and should never be * broadcast to client subscribers. These contain full prompts, model * context, and provider details. DevTools receives them via devToolsEmitter. */ const INTERNAL_EVENT_TYPES = new Set([ "compiled", "model_request", "model_response", "provider_request", "provider_response", ]); import { extractToken, validateAuth, wwwAuthenticateHeader, setSSEHeaders, type AuthResult, } from "@agentick/server"; import { AppRegistry } from "./app-registry.js"; import { SessionManager } from "./session-manager.js"; import { WSTransport } from "./ws-transport.js"; import { HTTPTransport } from "./http-transport.js"; import { EmbeddedSSETransport } from "./sse-transport.js"; import type { ClientTransport, SendInput, StreamEvent, ToolConfirmationResponse, } from "@agentick/shared"; import type { Transport, TransportClient } from "./transport.js"; import { LocalGatewayTransport } from "./local-transport.js"; import { UnixSocketTransport } from "./unix-socket-transport.js"; import { ClientEventBuffer } from "./client-event-buffer.js"; import type { GatewayConfig, GatewayEvents, GatewayHandle, GatewayPlugin, PluginContext, MethodNamespace, MethodDefinition, SimpleMethodHandler, } from "./types.js"; import { isMethodDefinition } from "./types.js"; import { toJSONSchema } from "@agentick/kernel"; import type { ConfigStore } from "./config.js"; import { createConfigStore, bindConfig, getConfigOrNull } from "./config.js"; import { loadConfig } from "./config-loader.js"; import type { RequestMessage, GatewayMethod, GatewayEventType, GatewayMessage, EventMessage, SendParams, StatusParams, HistoryParams, SubscribeParams, StatusPayload, AppsPayload, SessionsPayload, SchemaPayload, MethodSchemaEntry, ConfigPayload, ToolCatalogPayload, ToolCatalogParams, ToolConfirmParams, ToolDispatchParams, } from "./transport-protocol.js"; import { PROTOCOL_VERSION } from "./transport-protocol.js"; import { BUILT_IN_METHOD_SCHEMAS, GATEWAY_EVENTS, PROTOCOL_ERROR_CODES } from "./method-schemas.js"; const DEFAULT_PORT = 18789; const DEFAULT_HOST = "127.0.0.1"; // ============================================================================ // Guard Middleware // ============================================================================ /** Guard middleware that checks user roles */ function createRoleGuardMiddleware(roles: string[]): Middleware { return createGuard({ name: "gateway-role", guardType: "role" }, () => { const userRoles = Context.get().user?.roles ?? []; if (!roles.some((r) => userRoles.includes(r))) { throw GuardError.role(roles); } return true; }); } /** Guard middleware that runs custom guard function */ function createCustomGuardMiddleware( guard: (ctx: KernelContext) => boolean | Promise, ): Middleware { return createGuard({ name: "gateway-custom", reason: "Guard check failed" }, () => guard(Context.get()), ); } // ============================================================================ // Channel Service Helpers // ============================================================================ /** * Create a ChannelServiceInterface that wraps a Session's channel() method. * This allows gateway methods to access session channels via Context. */ function createChannelServiceFromSession( session: Session, _gatewayId: string, ): ChannelServiceInterface { return { getChannel: (_ctx: KernelContext, channelName: string) => session.channel(channelName), publish: (_ctx: KernelContext, channelName: string, event: Omit) => { session.channel(channelName).publish({ ...event, channel: channelName } as ChannelEvent); }, subscribe: ( _ctx: KernelContext, channelName: string, handler: (event: ChannelEvent) => void, ) => { return session.channel(channelName).subscribe(handler); }, waitForResponse: ( _ctx: KernelContext, channelName: string, requestId: string, timeoutMs?: number, ) => { return session.channel(channelName).waitForResponse(requestId, timeoutMs); }, }; } // ============================================================================ // Gateway Class // ============================================================================ // ============================================================================ // Helpers // ============================================================================ /** Extract first text content from SendInput for logging */ function extractTextFromInput(input: SendInput): string { if (!input.messages?.length) return "[no content]"; const texts = input.messages.map((msg) => extractText(msg.content, " ")).filter(Boolean); return texts.join(" ") || "[multimodal content]"; } // Built-in method schemas imported from method-schemas.ts (BUILT_IN_METHOD_SCHEMAS) export class Gateway extends EventEmitter { private config: Required< Pick > & GatewayConfig; private registry: AppRegistry; private sessions: SessionManager; private transports: Transport[] = []; private startTime: Date | null = null; private isRunning = false; private embedded: boolean; /** Pre-compiled map of method paths to procedures */ private methodProcedures = new Map>(); /** Stored schemas for custom methods (method path → raw schema) */ private methodSchemas = new Map(); /** Stored response schemas for custom methods (method path → raw schema) */ private methodResponseSchemas = new Map(); /** Stored metadata for custom methods (method path → { description, roles }) */ private methodMeta = new Map(); /** SSE transport for embedded mode (initialized in constructor when embedded: true) */ private sseTransport: EmbeddedSSETransport | null = null; /** Track channel subscriptions: "sessionId:channelName" -> Set of clientIds */ private channelSubscriptions = new Map>(); /** Track unsubscribe functions for core session channels */ private coreChannelUnsubscribes = new Map void>(); /** Track client connection times for duration calculation */ private clientConnectedAt = new Map(); /** Shared local transport instance (created lazily) */ private _localTransport: LocalGatewayTransport | null = null; /** Per-client event buffers for backpressure */ private clientBuffers = new Map(); /** Sequence counter for DevTools events */ private devToolsSequence = 0; /** Registered plugins: id -> { plugin, ctx } */ private plugins = new Map(); /** Configuration store (always set in constructor — start() may replace with file-loaded) */ private configStore!: ConfigStore; /** Track which plugin owns which method: method path -> pluginId */ private pluginMethodOwnership = new Map(); /** Plugin broadcast subscribers: pluginId -> Set */ private pluginSubscribers = new Map>(); /** Plugin HTTP routes: path → { pluginId, handler } */ private pluginRoutes = new Map< string, { pluginId: string; auth: boolean; absolute: boolean; handler: ( req: import("http").IncomingMessage, res: import("http").ServerResponse, ) => void | Promise; } >(); constructor(config: GatewayConfig) { super(); // Validate config if (!config.apps || Object.keys(config.apps).length === 0) { throw new Error("At least one app is required"); } if (!config.defaultApp) { throw new Error("defaultApp is required"); } this.embedded = config.embedded ?? false; // Set defaults this.config = { ...config, port: config.port ?? DEFAULT_PORT, host: config.host ?? DEFAULT_HOST, id: config.id ?? `gw-${Date.now().toString(36)}`, transport: config.transport ?? "websocket", }; // Initialize components this.registry = new AppRegistry(config.apps, config.defaultApp); this.sessions = new SessionManager(this.registry, { gatewayId: this.config.id }); // Config store: use provided, or create empty (start() may replace with file-loaded) this.configStore = config.configStore ?? createConfigStore({}); if (!getConfigOrNull()) { bindConfig(this.configStore); } // Initialize all methods as procedures if (config.methods) { this.initializeMethods(config.methods, []); } // Create transports if (this.embedded) { // Embedded mode: SSE transport for handleRequest() path this.sseTransport = new EmbeddedSSETransport(); this.setupTransportHandlers(this.sseTransport); this.transports.push(this.sseTransport); } else { // Standalone mode: WS and/or HTTP transports this.initializeTransports(); } // Initialize plugins from config (fire-and-forget — errors logged, not thrown) if (config.plugins) { for (const plugin of config.plugins) { this.use(plugin).catch((err) => console.error(`Plugin ${plugin.id} init failed:`, err)); } } } /** * Walk the methods tree and wrap all handlers as procedures. * Infers full path name (e.g., "tasks:admin:archive") automatically. */ private initializeMethods(methods: MethodNamespace, path: string[]): void { for (const [key, value] of Object.entries(methods)) { const fullPath = [...path, key]; const methodName = fullPath.join(":"); // e.g., "tasks:admin:archive" if (typeof value === "function") { // Simple function -> wrap in procedure automatically this.methodProcedures.set( methodName, createProcedure( { name: `gateway:${methodName}`, executionBoundary: "auto", metadata: { gatewayId: this.config.id, method: methodName }, }, value as (...args: any[]) => any, ), ); } else if (isMethodDefinition(value)) { // method() definition -> create procedure with guards/schema as middleware const middleware: Middleware[] = []; if (value.roles?.length) { middleware.push(createRoleGuardMiddleware(value.roles)); } if (value.guard) { middleware.push(createCustomGuardMiddleware(value.guard)); } // Store schema and metadata for discovery if (value.schema) { this.methodSchemas.set(methodName, value.schema); } if (value.response) { this.methodResponseSchemas.set(methodName, value.response); } if (value.description || value.roles) { this.methodMeta.set(methodName, { description: value.description, roles: value.roles, }); } this.methodProcedures.set( methodName, createProcedure( { name: `gateway:${methodName}`, executionBoundary: "auto", // Cast to any for Zod 3/4 compatibility - runtime uses .parse() only schema: value.schema as any, middleware, metadata: { gatewayId: this.config.id, method: methodName, description: value.description, roles: value.roles, }, }, value.handler as (...args: any[]) => any, ), ); } else { // Plain object -> namespace, recurse this.initializeMethods(value as MethodNamespace, fullPath); } } } /** * Get a method's procedure by path (supports both ":" and "." separators) */ private getMethodProcedure(path: string): Procedure | undefined { // Normalize separators to ":" const normalized = path.replace(/\./g, ":"); return this.methodProcedures.get(normalized); } private initializeTransports(): void { const { transport, port, host, auth, httpPort } = this.config; if (transport === "websocket" || transport === "both") { const wsTransport = new WSTransport({ port, host, auth, onAuthenticated: (client) => this.sendConnectedMessage(client), }); this.setupTransportHandlers(wsTransport); this.transports.push(wsTransport); } if (transport === "http" || transport === "both") { const httpTransportPort = transport === "both" ? (httpPort ?? port + 1) : port; const httpTransportInstance = new HTTPTransport({ port: httpTransportPort, host, auth, onAuthenticated: (client) => this.sendConnectedMessage(client), pathPrefix: this.config.httpPathPrefix, corsOrigin: this.config.httpCorsOrigin, onDirectSend: this.directSend.bind(this), onInvoke: this.invokeMethod.bind(this), onToolResponse: (sessionId, toolUseId, result) => this.respondToConfirmation(sessionId, toolUseId, result), onAbort: (sessionId, reason) => this.abortSession(sessionId, reason), onRouteMatch: async (path, req, res) => { return this.dispatchPluginRoute(path, req, res); }, }); this.setupTransportHandlers(httpTransportInstance); this.transports.push(httpTransportInstance); } // Unix socket — orthogonal to WS/HTTP, can run alongside them if (this.config.socketPath) { const unixTransport = new UnixSocketTransport({ socketPath: this.config.socketPath, auth, onAuthenticated: (client) => this.sendConnectedMessage(client), }); this.setupTransportHandlers(unixTransport); this.transports.push(unixTransport); } } private setupTransportHandlers(transport: Transport): void { transport.on("connection", (client) => { // Track connection time for duration calculation const connectTime = Date.now(); this.clientConnectedAt.set(client.id, connectTime); this.emit("client:connected", { clientId: client.id, }); // Emit DevTools event if (devToolsEmitter.hasSubscribers()) { devToolsEmitter.emitEvent({ type: "client_connected", executionId: this.config.id, clientId: client.id, transport: transport.type as "websocket" | "sse" | "http" | "local", sequence: this.devToolsSequence++, timestamp: connectTime, } as DTClientConnectedEvent); } }); transport.on("disconnect", (clientId, reason) => { // Calculate connection duration const connectedAt = this.clientConnectedAt.get(clientId); const durationMs = connectedAt ? Date.now() - connectedAt : 0; this.clientConnectedAt.delete(clientId); // Clean up subscriptions and buffer this.sessions.unsubscribeAll(clientId); this.cleanupClientChannelSubscriptions(clientId); this.cleanupPluginSubscriptions(clientId); const buffer = this.clientBuffers.get(clientId); if (buffer) { buffer.clear(); this.clientBuffers.delete(clientId); } this.emit("client:disconnected", { clientId, reason, }); // Emit DevTools event if (devToolsEmitter.hasSubscribers()) { devToolsEmitter.emitEvent({ type: "client_disconnected", executionId: this.config.id, clientId, reason, durationMs, sequence: this.devToolsSequence++, timestamp: Date.now(), } as DTClientDisconnectedEvent); } }); transport.on("message", async (clientId, message) => { if (message.type === "req") { await this.handleTransportRequest(transport, clientId, message); } }); transport.on("error", (error) => { this.emit("error", error); }); } /** * Start the gateway (standalone mode only) */ async start(): Promise { if (this.embedded) { throw new Error("Cannot call start() in embedded mode - use handleRequest() instead"); } if (this.isRunning) { throw new Error("Gateway is already running"); } // Load config from file if no pre-loaded configStore was provided if (!this.config.configStore) { this.configStore = await loadConfig({ path: this.config.configPath }); bindConfig(this.configStore); } // Start all transports await Promise.all(this.transports.map((t) => t.start())); this.startTime = new Date(); this.isRunning = true; this.emit("started", { port: this.config.port, host: this.config.host, }); } /** * Stop the gateway */ async stop(): Promise { if (!this.isRunning && !this.embedded) return; // Close all active sessions — triggers component unmount, sandbox teardown, etc. await this.sessions.closeAll(); // Destroy plugins in reverse registration order const entries = [...this.plugins.values()].reverse(); for (const { plugin } of entries) { await plugin .destroy?.() .catch((err) => console.error(`Plugin ${plugin.id} destroy failed:`, err)); } this.plugins.clear(); // Stop all transports (if any) await Promise.all(this.transports.map((t) => t.stop())); this.isRunning = false; this.startTime = null; this.emit("stopped", {}); } /** * Alias for stop() - useful for embedded mode cleanup */ async close(): Promise { return this.stop(); } // ══════════════════════════════════════════════════════════════════════════ // Public Session API (used by local transport and external callers) // ══════════════════════════════════════════════════════════════════════════ /** * Get or create a session with multi-app routing. * Parses session key (e.g., "coding:main") and routes to the correct app. */ async session(sessionKey: string): Promise { const managedSession = await this.sessions.getOrCreate(sessionKey); if (!managedSession.coreSession || managedSession.coreSession.isTerminal) { // Inject gateway handle into ALS context before session creation. // Session captures this in _capturedContext. All subsequent tick // executions merge it via runWithContext. const gatewayHandle: GatewayHandle = { invoke: (method: string, params: unknown) => this.invokeMethod(method, params as Record), use: (plugin: GatewayPlugin) => this.use(plugin), remove: (pluginId: string) => this.remove(pluginId), }; managedSession.coreSession = await Context.fork( { metadata: { ...Context.tryGet()?.metadata, gateway: gatewayHandle } }, () => managedSession.appInfo.app.session(managedSession.sessionName), ); } return managedSession.coreSession; } /** * Close a session and clean up managed state. */ async closeSession(sessionKey: string): Promise { await this.sessions.close(sessionKey); this.emit("session:closed", { sessionId: sessionKey }); } /** * Subscribe a client to session events. * Synthetic keys like `$plugin:` route to plugin broadcast subscribers. */ async subscribe(sessionKey: string, clientId: string): Promise { if (sessionKey.startsWith("$plugin:")) { const pluginId = sessionKey.slice("$plugin:".length); let subs = this.pluginSubscribers.get(pluginId); if (!subs) { subs = new Set(); this.pluginSubscribers.set(pluginId, subs); } subs.add(clientId); return; } await this.sessions.subscribe(sessionKey, clientId); } /** * Unsubscribe a client from session events. * Handles `$plugin:` synthetic keys. */ unsubscribe(sessionKey: string, clientId: string): void { if (sessionKey.startsWith("$plugin:")) { const pluginId = sessionKey.slice("$plugin:".length); this.pluginSubscribers.get(pluginId)?.delete(clientId); return; } this.sessions.unsubscribe(sessionKey, clientId); } /** * Send a message to a session and stream events. * Broadcasts events to all subscribers (cross-client push), excluding * the sender who iterates the handle directly. * * @param senderClientId - If provided, this client is excluded from * push broadcasts (they get events through direct handle iteration). */ async sendToSession(sessionKey: string, input: SendInput, senderClientId?: string) { const session = await this.session(sessionKey); const handle = await session.send(input); // Broadcast events to OTHER subscribers in background. // The sender iterates the handle directly — excluding them from // broadcast prevents double-dispatch. handle.events creates an // independent iterator (EventBuffer supports dual consumption). const broadcast = this.iterateWithBroadcast(sessionKey, handle.events, input, { excludeClientId: senderClientId, }); (async () => { for await (const _ of broadcast) { } })().catch((error) => { this.emit("error", error instanceof Error ? error : new Error(String(error))); }); return handle; } /** * Unified execution path: get session, send, iterate events, broadcast. * Used by WS, HTTP, and channel adapter paths. */ private async *executeSession( sessionKey: string, input: SendInput, opts?: { excludeClientId?: string; clientId?: string }, ): AsyncGenerator { const session = await this.session(sessionKey); const handle = await session.send(input); yield* this.iterateWithBroadcast(sessionKey, handle, input, opts); } /** * Core state management loop: activate session, track message, * iterate events with broadcast, deactivate on completion. * * Both executeSession (yields to caller) and sendToSession * (broadcasts in background) delegate here. The source is any * AsyncIterable — a handle or handle.events. */ private async *iterateWithBroadcast( sessionKey: string, source: AsyncIterable, input: SendInput, opts?: { excludeClientId?: string; clientId?: string }, ): AsyncGenerator { this.sessions.setActive(sessionKey, true); try { this.trackMessage(sessionKey, input, opts?.clientId); for await (const event of source) { log.trace({ sessionId: sessionKey, eventType: event.type }, "event:stream"); // Don't broadcast internal events to subscribers — these contain full prompts, // model context, provider details. DevTools receives them separately. if (!INTERNAL_EVENT_TYPES.has(event.type)) { this.sendEventToSubscribers(sessionKey, event.type, event, opts?.excludeClientId); } yield event; } log.trace( { sessionId: sessionKey }, "event:stream exhausted, sending synthetic execution_end", ); this.sendEventToSubscribers(sessionKey, "execution_end", {}, opts?.excludeClientId); } finally { this.sessions.setActive(sessionKey, false); } } /** * Track a user message for state management. * Increments message count and emits session:message event. */ private trackMessage(sessionKey: string, input: SendInput, clientId?: string): void { this.sessions.incrementMessageCount(sessionKey, clientId); this.emit("session:message", { sessionId: sessionKey, role: "user", content: extractTextFromInput(input), }); } /** * Create an in-process ClientTransport connected to this gateway. * Returns a ClientTransport for use with createClient(). * * Multiple calls create independent clients sharing the same * underlying LocalGatewayTransport. */ createLocalTransport(): ClientTransport { if (!this._localTransport) { this._localTransport = new LocalGatewayTransport(); this.setupTransportHandlers(this._localTransport); this.transports.push(this._localTransport); } return this._localTransport.createClientTransport(this); } /** * Get gateway status */ get status(): StatusPayload["gateway"] { return { id: this.config.id, uptime: this.startTime ? Math.floor((Date.now() - this.startTime.getTime()) / 1000) : 0, clients: this.transports.reduce((sum, t) => sum + t.clientCount, 0), sessions: this.sessions.size, apps: this.registry.ids(), }; } /** * Check if running */ get running(): boolean { return this.isRunning; } /** * Get the gateway ID */ get id(): string { return this.config.id; } // ══════════════════════════════════════════════════════════════════════════ // Embedded Mode: handleRequest() // ══════════════════════════════════════════════════════════════════════════ /** * Handle an HTTP request (embedded mode). * This is the main entry point when Gateway is embedded in an external framework. * * @param req - Node.js IncomingMessage (or Express/Koa/etc request) * @param res - Node.js ServerResponse (or Express/Koa/etc response) * @returns Promise that resolves when request is handled (may reject on error) * * @example * ```typescript * // Express middleware * app.use("/api", (req, res, next) => { * gateway.handleRequest(req, res).catch(next); * }); * ``` */ async handleRequest(req: NodeRequest, res: NodeResponse): Promise { // Set CORS headers const origin = this.config.httpCorsOrigin ?? "*"; res.setHeader("Access-Control-Allow-Origin", origin); res.setHeader("Access-Control-Allow-Methods", "GET, POST, PATCH, DELETE, OPTIONS"); res.setHeader("Access-Control-Allow-Headers", "Content-Type, Authorization"); res.setHeader("Access-Control-Allow-Credentials", "true"); // Handle preflight if (req.method === "OPTIONS") { res.writeHead(204); res.end(); return; } // Extract path (handle Express mounting where path is already stripped) const url = new URL(req.url ?? "/", `http://${req.headers.host}`); const prefix = this.config.httpPathPrefix ?? ""; const path = url.pathname.replace(prefix, "") || "/"; log.debug({ method: req.method, url: req.url, path }, "handleRequest"); // Authenticate once at the request boundary. SSE connections may // pass the token as a query param (EventSource can't set headers). const token = extractToken(req) ?? url.searchParams.get("token") ?? undefined; const authResult = await validateAuth(token, this.config.auth); // Set ALS context for the entire request lifetime. All downstream // handlers, session creation, tool execution, and plugin routes // inherit this context automatically. // // Use Context.child (not Context.create) so that anything an outer // caller put on the ALS context — most importantly the Agentick // instance set as `middleware` — survives into this request scope. // Without this, embedders that wrap handleRequest() in their own // Context.run({ middleware }) silently lose middleware here, and // every downstream procedure dispatch runs without it. const ctx = Context.child({ user: authResult.valid ? authResult.user : undefined, metadata: { ...Context.tryGet()?.metadata, gatewayId: this.config.id }, }); await Context.run(ctx, async () => { // Plugin routes — checked before built-in routes, longest prefix first if (await this.dispatchPluginRoute(path, req, res, authResult)) return; // All built-in routes require auth if (!authResult.valid) { res.writeHead(401, { "Content-Type": "application/json", "WWW-Authenticate": wwwAuthenticateHeader(this.config.auth), }); res.end(JSON.stringify({ error: "Authentication failed" })); return; } // Route requests - all framework-level endpoints switch (path) { case "/events": return this.handleSSE(req, res); case "/send": return this.handleSend(req, res); case "/invoke": return this.handleInvoke(req, res); case "/subscribe": return this.handleSubscribe(req, res); case "/abort": return this.handleAbort(req, res); case "/close": return this.handleCloseEndpoint(req, res); case "/channel": case "/channel/subscribe": case "/channel/publish": return this.handleChannel(req, res); case "/tool-response": return this.handleToolResponse(req, res); } res.writeHead(404, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Not found" })); }); } // ══════════════════════════════════════════════════════════════════════════ // HTTP Handlers (used by both handleRequest and HTTPTransport) // ══════════════════════════════════════════════════════════════════════════ // ── SSE ────────────────────────────────────────────────────────────────── private async handleSSE(req: NodeRequest, res: NodeResponse): Promise { const url = new URL(req.url ?? "/", `http://${req.headers.host}`); // Setup SSE response setSSEHeaders(res); const clientId = url.searchParams.get("clientId") ?? `client-${Date.now().toString(36)}`; // Register as a real transport client — gets backpressure, DevTools, // appears in gateway.status.clients, cleaned up on disconnect this.sseTransport!.registerClient(clientId, res); // Connection confirmation is written directly to the response rather than // going through client.send() because it's a handshake message — not a // GatewayMessage variant. The client resolves its connection promise on // receiving { type: "connection" }. This is the same pattern HTTP and WS // transports use for their initial handshake. res.write( `data: ${JSON.stringify({ type: "connection", connectionId: clientId, subscriptions: [] })}\n\n`, ); } /** * Clean up channel subscriptions for a disconnected client. */ private cleanupClientChannelSubscriptions(clientId: string): void { for (const [key, clientIds] of this.channelSubscriptions.entries()) { clientIds.delete(clientId); // If no more subscribers for this session:channel, unsubscribe from core channel if (clientIds.size === 0) { this.channelSubscriptions.delete(key); const unsubscribe = this.coreChannelUnsubscribes.get(key); if (unsubscribe) { unsubscribe(); this.coreChannelUnsubscribes.delete(key); } } } } private cleanupPluginSubscriptions(clientId: string): void { for (const subs of this.pluginSubscribers.values()) { subs.delete(clientId); } } private async handleSend(req: NodeRequest, res: NodeResponse): Promise { log.debug({ method: req.method, url: req.url }, "handleSend: START"); if (req.method !== "POST") { res.writeHead(405, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Method not allowed" })); return; } const body = await this.parseBody(req); log.debug({ body }, "handleSend: parsed body"); if (!body) { res.writeHead(400, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Invalid request body" })); return; } const sessionId = (body.sessionId as string) ?? "main"; if (!(await this.checkSessionAccess(sessionId, "send", res))) return; const rawMessages = body.messages; log.debug({ sessionId, hasMessages: !!rawMessages }, "handleSend: extracted params"); if (!Array.isArray(rawMessages) || rawMessages.length === 0) { res.writeHead(400, { "Content-Type": "application/json" }); res.end( JSON.stringify({ error: "Invalid message format. Expected { messages: Message[] }", }), ); return; } // Pass full SendInput to directSend const metadata = body.metadata as Record | undefined; const sendInput: SendInput = { messages: rawMessages as Message[], ...(metadata ? { metadata } : undefined), }; // Setup streaming response setSSEHeaders(res); try { // Exclude sending client from SSE broadcast — they get events via this response. const senderClientId = body.connectionId as string | undefined; log.debug({ sessionId }, "handleSend: calling directSend"); const events = this.directSend(sessionId, sendInput, { excludeClientId: senderClientId }); for await (const event of events) { log.debug({ eventType: event.type }, "handleSend: got event from directSend"); const message: EventMessage = { type: "event", event: event.type as GatewayEventType, sessionId, data: event.data, }; res.write(`data: ${JSON.stringify(message)}\n\n`); } log.debug({ sessionId }, "handleSend: directSend complete, sending execution_end"); res.write( `data: ${JSON.stringify({ type: "event", event: "execution_end", sessionId, data: {} } satisfies EventMessage)}\n\n`, ); } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); const errorStack = error instanceof Error ? error.stack : undefined; console.error("[Gateway handleSend ERROR]", errorMessage, "\n", errorStack); log.error({ errorMessage, errorStack, sessionId }, "handleSend: ERROR in directSend"); res.write( `data: ${JSON.stringify({ type: "event", event: "error", sessionId, data: { error: errorMessage } } satisfies EventMessage)}\n\n`, ); } finally { res.end(); } } private async handleInvoke(req: NodeRequest, res: NodeResponse): Promise { if (req.method !== "POST") { res.writeHead(405, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Method not allowed" })); return; } const body = await this.parseBody(req); if (!body) { res.writeHead(400, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Invalid request body" })); return; } const method = body.method as string | undefined; const params = (body.params ?? {}) as Record; if (!method || typeof method !== "string") { res.writeHead(400, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "method is required" })); return; } log.debug({ method, params }, "handleInvoke"); const requestId = `req-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 6)}`; const startTime = Date.now(); const sessionKey = params.sessionId as string | undefined; // Emit DevTools request event if (devToolsEmitter.hasSubscribers()) { devToolsEmitter.emitEvent({ type: "gateway_request", executionId: this.config.id, requestId, method, sessionKey, params, sequence: this.devToolsSequence++, timestamp: startTime, } as DTGatewayRequestEvent); } try { const result = await this.invokeMethod(method, params, Context.tryGet()?.user); log.debug({ method, result }, "handleInvoke: completed"); // Emit DevTools response event if (devToolsEmitter.hasSubscribers()) { devToolsEmitter.emitEvent({ type: "gateway_response", executionId: this.config.id, requestId, method, ok: true, latencyMs: Date.now() - startTime, sequence: this.devToolsSequence++, timestamp: Date.now(), } as DTGatewayResponseEvent); } res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify(result)); } catch (error) { log.error({ method, error }, "handleInvoke: failed"); const errorMessage = error instanceof Error ? error.message : String(error); let errorCode = "METHOD_ERROR"; if (isNotFoundError(error)) { errorCode = error.code; } else if (isValidationError(error)) { errorCode = error.code; } else if (isGuardError(error)) { errorCode = error.code; } else if (isStateError(error)) { errorCode = error.code; } else if (isAgentickError(error)) { errorCode = error.code; } // Emit DevTools response event for error if (devToolsEmitter.hasSubscribers()) { devToolsEmitter.emitEvent({ type: "gateway_response", executionId: this.config.id, requestId, method, ok: false, error: { code: errorCode, message: errorMessage }, latencyMs: Date.now() - startTime, sequence: this.devToolsSequence++, timestamp: Date.now(), } as DTGatewayResponseEvent); } const statusCode = isGuardError(error) ? 403 : isNotFoundError(error) ? 404 : 400; res.writeHead(statusCode, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: errorMessage, code: errorCode })); } } private async handleSubscribe(req: NodeRequest, res: NodeResponse): Promise { if (req.method !== "POST") { res.writeHead(405, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Method not allowed" })); return; } const body = await this.parseBody(req); if (!body) { res.writeHead(400, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Invalid request body" })); return; } // Support both formats: // - { sessionId, clientId } - simple format // - { connectionId, add: [...], remove: [...] } - client format const clientId = (body.clientId ?? body.connectionId) as string | undefined; if (!clientId) { res.writeHead(400, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "clientId or connectionId is required" })); return; } // Handle additions const addSessionIds: string[] = []; if (body.sessionId) { addSessionIds.push(body.sessionId as string); } if (Array.isArray(body.add)) { addSessionIds.push(...(body.add as string[])); } // Handle removals const removeSessionIds: string[] = []; if (Array.isArray(body.remove)) { removeSessionIds.push(...(body.remove as string[])); } if (addSessionIds.length === 0 && removeSessionIds.length === 0) { res.writeHead(400, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "sessionId, add[], or remove[] is required" })); return; } // Process subscriptions for (const sessionId of addSessionIds) { await this.sessions.subscribe(sessionId, clientId); } for (const sessionId of removeSessionIds) { this.sessions.unsubscribe(sessionId, clientId); } res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify({ ok: true })); } private async handleAbort(req: NodeRequest, res: NodeResponse): Promise { if (req.method !== "POST") { res.writeHead(405, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Method not allowed" })); return; } const body = await this.parseBody(req); if (!body) { res.writeHead(400, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Invalid request body" })); return; } const { sessionId, reason } = body as { sessionId?: string; reason?: string }; if (!sessionId) { res.writeHead(400, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "sessionId is required" })); return; } if (!(await this.checkSessionAccess(sessionId, "abort", res))) return; const managed = this.sessions.get(sessionId); if (!managed?.coreSession) { res.writeHead(404, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Session not found" })); return; } managed.coreSession.interrupt(undefined, reason ?? "Aborted by client"); res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify({ ok: true })); } private async handleCloseEndpoint(req: NodeRequest, res: NodeResponse): Promise { if (req.method !== "POST") { res.writeHead(405, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Method not allowed" })); return; } const body = await this.parseBody(req); if (!body?.sessionId) { res.writeHead(400, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "sessionId is required" })); return; } const sessionId = body.sessionId as string; if (!(await this.checkSessionAccess(sessionId, "close", res))) return; await this.sessions.close(sessionId); res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify({ ok: true })); } /** * Channel endpoint - handles channel pub/sub operations. */ private async handleChannel(req: NodeRequest, res: NodeResponse): Promise { if (req.method !== "POST") { res.writeHead(405, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Method not allowed" })); return; } const url = new URL(req.url ?? "/", `http://${req.headers.host}`); const prefix = this.config.httpPathPrefix ?? ""; const path = url.pathname.replace(prefix, "") || "/"; const body = await this.parseBody(req); if (!body) { res.writeHead(400, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Invalid request body" })); return; } const sessionId = body.sessionId as string | undefined; const channelName = body.channel as string | undefined; const clientId = body.clientId as string | undefined; if (!sessionId || !channelName) { res.writeHead(400, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "sessionId and channel are required" })); return; } if (path === "/channel/subscribe" || path === "/channel") { await this.subscribeToChannel(sessionId, channelName, clientId); res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify({ ok: true })); } else if (path === "/channel/publish") { const payload = body.payload; await this.publishToChannel(sessionId, channelName, payload); res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify({ ok: true })); } else { res.writeHead(404, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Unknown channel operation" })); } } /** * Tool confirmation response endpoint. * Handles POST /tool-response with { sessionId, toolUseId, result }. */ private async handleToolResponse(req: NodeRequest, res: NodeResponse): Promise { if (req.method !== "POST") { res.writeHead(405, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Method not allowed" })); return; } const body = await this.parseBody(req); if (!body) { res.writeHead(400, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Invalid request body" })); return; } const { sessionId, toolUseId, result } = body as { sessionId?: string; toolUseId?: string; result?: { approved: boolean; always?: boolean; reason?: string }; }; if (!sessionId || !toolUseId || !result) { res.writeHead(400, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Missing sessionId, toolUseId, or result" })); return; } if (!(await this.checkSessionAccess(sessionId, "tool-response", res))) return; try { await this.respondToConfirmation(sessionId, toolUseId, result); res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify({ ok: true })); } catch (error) { res.writeHead(500, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: error instanceof Error ? error.message : "Unknown error" })); } } /** * Subscribe a client to a session's channel. * Sets up forwarding from core session channel to SSE clients. */ private async subscribeToChannel( sessionId: string, channelName: string, clientId?: string, ): Promise { const subscriptionKey = `${sessionId}:${channelName}`; // Add client to subscription list (if provided) if (clientId) { let clientIds = this.channelSubscriptions.get(subscriptionKey); if (!clientIds) { clientIds = new Set(); this.channelSubscriptions.set(subscriptionKey, clientIds); } clientIds.add(clientId); } // If we already have a core channel subscription for this session:channel, we're done if (this.coreChannelUnsubscribes.has(subscriptionKey)) { return; } // Get or create the managed session and core session const managedSession = await this.sessions.getOrCreate(sessionId); if (!managedSession.coreSession || managedSession.coreSession.isTerminal) { // Use sessionName (without app prefix) for App - Gateway handles routing managedSession.coreSession = await managedSession.appInfo.app.session( managedSession.sessionName, ); } // Subscribe to the core session's channel and forward events to SSE clients const coreChannel = managedSession.coreSession.channel(channelName); const unsubscribe = coreChannel.subscribe((event) => { this.forwardChannelEvent(subscriptionKey, event); }); this.coreChannelUnsubscribes.set(subscriptionKey, unsubscribe); log.debug({ sessionId, channelName }, "Channel forwarding established"); } /** * Forward a channel event to all subscribed clients. */ private forwardChannelEvent( subscriptionKey: string, event: { type: string; channel: string; payload: unknown; metadata?: unknown }, ): void { const clientIds = this.channelSubscriptions.get(subscriptionKey); if (!clientIds || clientIds.size === 0) { return; } // subscriptionKey format is "sessionId:channelName" where sessionId can contain ":" // e.g., "assistant:default:todo-list" → sessionId="assistant:default", channel="todo-list" // Extract sessionId by removing the last segment (channelName) const lastColonIndex = subscriptionKey.lastIndexOf(":"); const sessionId = subscriptionKey.substring(0, lastColonIndex); const message: EventMessage = { type: "event", event: "channel" as GatewayEventType, sessionId, data: { channel: event.channel, event: { type: event.type, payload: event.payload, metadata: event.metadata, }, }, }; for (const clientId of clientIds) { this.deliverToClient(clientId, message); } } /** * Publish an event to a session's channel. */ private async publishToChannel( sessionId: string, channelName: string, payload: unknown, ): Promise { const managedSession = await this.sessions.getOrCreate(sessionId); if (!managedSession.coreSession || managedSession.coreSession.isTerminal) { // Use sessionName (without app prefix) for App - Gateway handles routing managedSession.coreSession = await managedSession.appInfo.app.session( managedSession.sessionName, ); } const coreChannel = managedSession.coreSession.channel(channelName); coreChannel.publish({ type: "message", channel: channelName, payload, }); } /** * Check session-level authorization. Returns true if authorized, false if denied (sends 403). * If no authorizeSession callback is configured, always returns true. */ private async checkSessionAccess( sessionId: string, action: "send" | "subscribe" | "tool-response" | "abort" | "close" | "channel", res: NodeResponse, ): Promise { const authorizeSession = this.config.auth && "authorizeSession" in this.config.auth ? (this.config.auth as any).authorizeSession : undefined; if (!authorizeSession) return true; const user = Context.get().user; if (!user) { res.writeHead(403, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Access denied" })); return false; } const allowed = await authorizeSession(user, sessionId, action); if (!allowed) { res.writeHead(403, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Access denied to session" })); return false; } return true; } private parseBody( req: NodeRequest & { body?: unknown }, ): Promise | null> { // If body already parsed by Express middleware, use it if (req.body && typeof req.body === "object") { return Promise.resolve(req.body as Record); } // Otherwise, read from stream return new Promise((resolve) => { let body = ""; req.on("data", (chunk) => { body += chunk.toString(); }); req.on("end", () => { try { resolve(JSON.parse(body)); } catch { resolve(null); } }); req.on("error", () => { resolve(null); }); }); } // ══════════════════════════════════════════════════════════════════════════ // Transport Request Handling (standalone mode) // ══════════════════════════════════════════════════════════════════════════ private async handleTransportRequest( transport: Transport, clientId: string, request: RequestMessage, ): Promise { const client = transport.getClient(clientId); if (!client) return; const startTime = Date.now(); const requestId = request.id; const sessionKey = (request.params as Record)?.sessionId as string | undefined; // Emit DevTools request event if (devToolsEmitter.hasSubscribers()) { devToolsEmitter.emitEvent({ type: "gateway_request", executionId: this.config.id, requestId, method: request.method, sessionKey, params: request.params as Record, clientId, sequence: this.devToolsSequence++, timestamp: startTime, } as DTGatewayRequestEvent); } log.debug({ method: request.method, requestId, sessionKey, clientId }, "RPC request"); try { const result = await this.executeMethod(transport, clientId, request.method, request.params); log.debug({ method: request.method, requestId }, "RPC response ok"); client.send({ type: "res", id: request.id, ok: true, payload: result, }); // Emit DevTools response event if (devToolsEmitter.hasSubscribers()) { devToolsEmitter.emitEvent({ type: "gateway_response", executionId: this.config.id, requestId, ok: true, latencyMs: Date.now() - startTime, sequence: this.devToolsSequence++, timestamp: Date.now(), } as DTGatewayResponseEvent); } } catch (error) { let errorCode = "METHOD_ERROR"; if (isNotFoundError(error)) { errorCode = error.code; } else if (isValidationError(error)) { errorCode = error.code; } else if (isGuardError(error)) { errorCode = error.code; } else if (isStateError(error)) { errorCode = error.code; } else if (isAgentickError(error)) { errorCode = error.code; } const errorMessage = error instanceof Error ? error.message : String(error); log.error( { method: request.method, requestId, errorCode, errorMessage }, "RPC request failed", ); client.send({ type: "res", id: request.id, ok: false, error: { code: errorCode, message: errorMessage, }, }); // Emit DevTools response event with error if (devToolsEmitter.hasSubscribers()) { devToolsEmitter.emitEvent({ type: "gateway_response", executionId: this.config.id, requestId, ok: false, latencyMs: Date.now() - startTime, error: { code: errorCode, message: errorMessage, }, sequence: this.devToolsSequence++, timestamp: Date.now(), } as DTGatewayResponseEvent); } } } private async executeMethod( transport: Transport, clientId: string, method: GatewayMethod, params: Record, ): Promise { // Transport-dependent methods (require transport/clientId context) switch (method) { case "send": return this.handleSendMethod(transport, clientId, params as unknown as SendParams); case "subscribe": return this.handleSubscribeMethod( transport, clientId, params as unknown as SubscribeParams, ); case "unsubscribe": return this.handleUnsubscribeMethod( transport, clientId, params as unknown as SubscribeParams, ); case "channel-subscribe": { const { sessionId, channel } = params as { sessionId: string; channel: string }; if (!sessionId || !channel) throw new Error("sessionId and channel are required"); await this.subscribeToChannel(sessionId, channel, clientId); return { ok: true }; } case "channel": { const { sessionId, channel, payload } = params as { sessionId: string; channel: string; payload?: unknown; }; if (!sessionId || !channel) throw new Error("sessionId and channel are required"); await this.publishToChannel(sessionId, channel, payload); return { ok: true }; } } // Context-free built-in methods (shared with plugin invoke path) const builtIn = this.resolveBuiltInMethod(method, params); if (builtIn !== undefined) return builtIn; // Check custom methods const procedure = this.getMethodProcedure(method); if (procedure) { return this.executeCustomMethod(transport, clientId, method, params); } throw new NotFoundError("resource", method, `Unknown method: ${method}`); } /** * Execute a custom method within Agentick ALS context. */ private async executeCustomMethod( transport: Transport, clientId: string, method: string, params: Record, ): Promise { const client = transport.getClient(clientId); const sessionId = params.sessionId as string | undefined; // Build metadata: gateway fields + client auth metadata + per-request metadata const metadata = { sessionId, clientId, gatewayId: this.config.id, method, ...client?.state.metadata, ...(params.metadata as Record | undefined), }; // Create kernel context — Context.child preserves any outer ALS // context (e.g. middleware injected by an embedder) while layering // request-level user/metadata on top. const ctx = Context.child({ user: client?.state.user, metadata: { ...Context.tryGet()?.metadata, ...metadata }, }); // Get the procedure const procedure = this.getMethodProcedure(method); if (!procedure) { throw new NotFoundError("resource", method, `Unknown method: ${method}`); } // Execute within context // Procedure handles: context forking, middleware (guards), schema validation, metrics const result = await Context.run(ctx, async () => { const handle = await procedure(params); return handle.result; }); // Handle streaming results if (result && typeof result === "object" && Symbol.asyncIterator in result) { const generator = result as AsyncGenerator; const chunks: unknown[] = []; for await (const chunk of generator) { // Emit chunk to subscribers if (sessionId) { this.sendEventToSubscribers(sessionId, "method:chunk", { method, chunk, }); } chunks.push(chunk); } // Emit end event if (sessionId) { this.sendEventToSubscribers(sessionId, "method:end", { method }); } return { streaming: true, chunks }; } return result; } private async handleSendMethod( transport: Transport, clientId: string, params: SendParams, ): Promise<{ messageId: string }> { const { sessionId, input: rawInput, message } = params; log.debug( { sessionId, hasInput: !!rawInput, hasMessage: !!message, messageCount: rawInput?.messages?.length, }, "handleSendMethod: received", ); // Auto-subscribe sender to session events (transport concern) const client = transport.getClient(clientId); if (client) { client.state.subscriptions.add(sessionId); await this.sessions.subscribe(sessionId, clientId); } const input: SendInput = rawInput ?? { messages: [{ role: "user", content: [{ type: "text", text: message ?? "" }] }], }; log.debug( { sessionId, messageCount: input.messages?.length, firstRole: input.messages?.[0]?.role }, "handleSendMethod: dispatching to session", ); const messageId = `msg-${Date.now().toString(36)}`; // Execute in background — executeSession handles state management const gen = this.executeSession(sessionId, input, { clientId }); (async () => { for await (const _ of gen) { /* drain */ } log.debug({ sessionId, messageId }, "handleSendMethod: execution complete"); })().catch((error) => { log.error( { sessionId, messageId, error: error instanceof Error ? error.message : String(error) }, "handleSendMethod: execution error", ); this.sendEventToSubscribers(sessionId, "error", { message: error instanceof Error ? error.message : String(error), }); }); return { messageId }; } private deliverToClient(clientId: string, message: GatewayMessage): void { for (const transport of this.transports) { const client = transport.getClient(clientId); if (client) { this.getOrCreateBuffer(client).push(message); return; } } } private getOrCreateBuffer(client: TransportClient): ClientEventBuffer { let buffer = this.clientBuffers.get(client.id); if (!buffer) { buffer = new ClientEventBuffer(client); this.clientBuffers.set(client.id, buffer); } return buffer; } private sendEventToSubscribers( sessionId: string, eventType: string, data: unknown, excludeClientId?: string, ): void { const subscribers = this.sessions.getSubscribers(sessionId); const message: EventMessage = { type: "event", event: eventType as GatewayEventType, sessionId, data, }; log.trace({ sessionId, eventType, subscriberCount: subscribers.size }, "event:fanout"); for (const clientId of subscribers) { if (clientId === excludeClientId) continue; this.deliverToClient(clientId, message); } } /** * Direct send handler for HTTP transport. * Returns an async generator that yields events for streaming. * Accepts full Message object to support multimodal content (images, audio, video, docs). * * IMPORTANT: Uses the original sessionId (as provided by client) for events, * not the normalized internal ID. This ensures clients can match events to their sessions. */ private async *directSend( sessionId: string, input: SendInput, opts?: { excludeClientId?: string }, ): AsyncGenerator<{ type: string; data?: unknown }> { for await (const event of this.executeSession(sessionId, input, { excludeClientId: opts?.excludeClientId, })) { yield { type: event.type, data: event }; } } /** * Respond to a tool confirmation (for HTTP transport). */ private async respondToConfirmation( sessionId: string, toolUseId: string, result: ToolConfirmationResponse, ): Promise { const session = await this.session(sessionId); session.channel("tool_confirmation").publish({ type: "response", channel: "tool_confirmation", id: toolUseId, payload: result, }); } /** * Abort a session's current execution (for HTTP transport). */ private async abortSession(sessionId: string, reason?: string): Promise { const managed = this.sessions.get(sessionId); if (!managed?.coreSession) { throw new Error(`Session not found: ${sessionId}`); } managed.coreSession.interrupt(undefined, reason ?? "Aborted by client"); } /** * Invoke a custom method directly (for HTTP transport). * Called with pre-authenticated user context. */ private async invokeMethod( method: string, params: Record, user?: UserContext, ): Promise { // Handle built-in methods that don't require transport/client context. // These are the methods plugins most commonly need. const builtIn = this.resolveBuiltInMethod(method, params); if (builtIn !== undefined) return builtIn; const procedure = this.getMethodProcedure(method); if (!procedure) { throw new NotFoundError("resource", method, `Unknown method: ${method}`); } const sessionId = params.sessionId as string | undefined; // Get or create session to access channels (if sessionId provided) let channels: ChannelServiceInterface | undefined = undefined; if (sessionId) { const managedSession = await this.sessions.getOrCreate(sessionId); if (!managedSession.coreSession || managedSession.coreSession.isTerminal) { // Use sessionName (without app prefix) for App - Gateway handles routing managedSession.coreSession = await managedSession.appInfo.app.session( managedSession.sessionName, ); } channels = createChannelServiceFromSession(managedSession.coreSession, this.config.id); } // Build metadata const metadata = { sessionId, gatewayId: this.config.id, method, ...(params.metadata as Record | undefined), }; // Create kernel context with channels for pub/sub. Context.child // preserves outer ALS state (notably `middleware`) while layering // user/metadata/channels on top. const ctx = Context.child({ user, metadata: { ...Context.tryGet()?.metadata, ...metadata }, channels, }); // Execute within context // Procedures return ExecutionHandle by default - access .result to get the handler's return value const result = await Context.run(ctx, async () => { const handle = await procedure(params); return handle.result; }); // Handle streaming results (collect all chunks) if (result && typeof result === "object" && Symbol.asyncIterator in result) { const iterable = result as AsyncIterable; const chunks: unknown[] = []; for await (const chunk of iterable) { chunks.push(chunk); } return { streaming: true, chunks }; } return result; } /** * Resolve built-in methods that don't need transport/client context. * Returns undefined if the method is not a built-in. */ private resolveBuiltInMethod( method: string, params: Record, ): Promise | undefined { switch (method) { case "apps": return Promise.resolve(this.handleAppsMethod()); case "sessions": return Promise.resolve(this.handleSessionsMethod()); case "status": return Promise.resolve(this.handleStatusMethod(params as unknown as StatusParams)); case "history": return this.handleHistoryMethod(params as unknown as HistoryParams); case "schema": return Promise.resolve(this.handleSchemaMethod()); case "config": return Promise.resolve(this.handleConfigMethod()); case "tool-catalog": return this.handleToolCatalogMethod(params as unknown as ToolCatalogParams); case "tool-confirm": return this.handleToolConfirmMethod(params as unknown as ToolConfirmParams); case "tool-dispatch": return this.handleToolDispatchMethod(params as unknown as ToolDispatchParams); case "abort": return this.handleAbortMethod(params as unknown as { sessionId: string }); case "reset": return this.handleResetMethod(params as unknown as { sessionId: string }); case "close": return this.handleCloseMethod(params as unknown as { sessionId: string }); } return undefined; } private async handleAbortMethod(params: { sessionId: string; reason?: string }): Promise { const managed = this.sessions.get(params.sessionId); if (!managed) { throw new NotFoundError("session", params.sessionId); } if (!managed.coreSession) { throw new NotFoundError("session", params.sessionId); } managed.coreSession.interrupt(undefined, params.reason ?? "Aborted by client"); } private handleStatusMethod(params: StatusParams): StatusPayload { const result: StatusPayload = { gateway: this.status, }; if (params.sessionId) { const session = this.sessions.get(params.sessionId); if (session) { result.session = { id: session.state.id, appId: session.state.appId, messageCount: session.state.messageCount, createdAt: session.state.createdAt.toISOString(), lastActivityAt: session.state.lastActivityAt.toISOString(), isActive: session.state.isActive, }; } } return result; } private async handleHistoryMethod( _params: HistoryParams, ): Promise<{ messages: unknown[]; hasMore: boolean }> { return { messages: [], hasMore: false }; } private async handleResetMethod(params: { sessionId: string }): Promise { // SessionManager.reset() emits DevTools event await this.sessions.reset(params.sessionId); this.emit("session:closed", { sessionId: params.sessionId }); } private async handleCloseMethod(params: { sessionId: string }): Promise { // SessionManager.close() emits DevTools event await this.sessions.close(params.sessionId); this.emit("session:closed", { sessionId: params.sessionId }); } private handleAppsMethod(): AppsPayload { return { apps: this.registry.all().map((appInfo) => ({ id: appInfo.id, name: appInfo.name ?? appInfo.id, description: appInfo.description, isDefault: appInfo.isDefault, })), }; } private handleSessionsMethod(): SessionsPayload { return { sessions: this.sessions.all().map((s) => ({ id: s.state.id, appId: s.state.appId, createdAt: s.state.createdAt.toISOString(), lastActivityAt: s.state.lastActivityAt.toISOString(), messageCount: s.state.messageCount, })), }; } private async handleSubscribeMethod( transport: Transport, clientId: string, params: SubscribeParams, ): Promise { const client = transport.getClient(clientId); if (client) { client.state.subscriptions.add(params.sessionId); await this.subscribe(params.sessionId, clientId); } } private handleUnsubscribeMethod( transport: Transport, clientId: string, params: SubscribeParams, ): void { const client = transport.getClient(clientId); if (client) { client.state.subscriptions.delete(params.sessionId); this.unsubscribe(params.sessionId, clientId); } } // ══════════════════════════════════════════════════════════════════════════ // Protocol Methods (schema, tool-catalog, tool-confirm, tool-dispatch) // ══════════════════════════════════════════════════════════════════════════ /** * Send ConnectedMessage to a client after auth succeeds. * Called via onAuthenticated callback from WS/Unix transports. */ private sendConnectedMessage(client: TransportClient): void { client.send({ type: "connected", gatewayId: this.config.id, protocolVersion: PROTOCOL_VERSION, apps: this.registry.ids(), sessions: Array.from(client.state.subscriptions), }); } private async handleSchemaMethod(): Promise { const methods: Record = {}; // Built-in methods — raw JSON Schema, assign directly for (const [name, schema] of BUILT_IN_METHOD_SCHEMAS) { const entry: MethodSchemaEntry = { description: schema.description, builtin: true, }; if (schema.params) entry.params = schema.params; if (schema.response) entry.response = schema.response; if (schema.errors) entry.errors = schema.errors; methods[name] = entry; } // Custom methods — schemas may be Zod/Standard Schema, need toJSONSchema for (const path of this.methodProcedures.keys()) { if (BUILT_IN_METHOD_SCHEMAS.has(path)) continue; const entry: MethodSchemaEntry = { description: "", builtin: false }; const rawSchema = this.methodSchemas.get(path); if (rawSchema) { entry.params = await toJSONSchema(rawSchema, { stripMeta: true }); } const responseSchema = this.methodResponseSchemas.get(path); if (responseSchema) { entry.response = await toJSONSchema(responseSchema, { stripMeta: true }); } const meta = this.methodMeta.get(path); if (meta?.description) entry.description = meta.description; if (meta?.roles) entry.roles = meta.roles; methods[path] = entry; } return { protocolVersion: PROTOCOL_VERSION, methods, events: GATEWAY_EVENTS, errors: PROTOCOL_ERROR_CODES, }; } private handleConfigMethod(): ConfigPayload { return { config: this.configStore.redacted() as Record }; } private async handleToolCatalogMethod(params: ToolCatalogParams): Promise { const { sessionId } = params; if (!sessionId) throw new Error("sessionId is required"); const session = await this.session(sessionId); const definitions = await session.getToolDefinitions(); return { tools: definitions.map((def) => ({ name: def.name, description: def.description, input: def.input, ...(def.output ? { output: def.output } : {}), type: def.type, intent: def.intent, audience: def.audience, })), }; } private async handleToolConfirmMethod(params: ToolConfirmParams): Promise<{ ok: true }> { const { sessionId, callId, confirmed, reason, always } = params; if (!sessionId) throw new Error("sessionId is required"); if (!callId) throw new Error("callId is required"); // Publish directly to the session's channel — NOT through publishToChannel // which double-wraps in { type: "message" }. The confirmation coordinator // at session.ts:2808 expects { type: "response", id, payload }. const session = await this.session(sessionId); session.channel("tool_confirmation").publish({ type: "response", channel: "tool_confirmation", id: callId, payload: { approved: confirmed, reason, always }, }); return { ok: true }; } private async handleToolDispatchMethod( params: ToolDispatchParams, ): Promise<{ content: unknown[] }> { const { sessionId, tool, input } = params; if (!sessionId) throw new Error("sessionId is required"); if (!tool) throw new Error("tool is required"); const session = await this.session(sessionId); const result = await session.dispatch.exec(tool, input ?? {}).result; return { content: result }; } // ══════════════════════════════════════════════════════════════════════════ // Plugin System // ══════════════════════════════════════════════════════════════════════════ /** * Register a plugin. Calls plugin.initialize() with a PluginContext. * Throws if a plugin with the same id is already registered. */ async use(plugin: GatewayPlugin): Promise { if (this.plugins.has(plugin.id)) { throw new Error(`Plugin "${plugin.id}" is already registered`); } const ctx = this.createPluginContext(plugin.id); try { await plugin.initialize(ctx); } catch (err) { // Clean up any methods and routes registered during partial init this.cleanupPluginRegistrations(plugin.id); throw err; } this.plugins.set(plugin.id, { plugin, ctx }); this.emit("plugin:registered", { pluginId: plugin.id }); } /** * Remove a plugin by id. Calls plugin.destroy() and cleans up its methods. * No-op if plugin id is not found. */ async remove(pluginId: string): Promise { const entry = this.plugins.get(pluginId); if (!entry) return; await entry.plugin.destroy?.(); // Remove all methods and routes this plugin registered this.cleanupPluginRegistrations(pluginId); // Remove plugin broadcast subscribers this.pluginSubscribers.delete(pluginId); this.plugins.delete(pluginId); this.emit("plugin:removed", { pluginId }); } /** * Get a registered plugin by id. */ getPlugin(id: string): T | undefined { return this.plugins.get(id)?.plugin as T | undefined; } /** * List all registered plugins with their registered methods. */ listPlugins(): Array<{ id: string; methods: string[] }> { const result: Array<{ id: string; methods: string[] }> = []; for (const [id] of this.plugins) { const methods: string[] = []; for (const [method, owner] of this.pluginMethodOwnership) { if (owner === id) methods.push(method); } result.push({ id, methods }); } return result; } /** Remove all methods and routes owned by a plugin */ private cleanupPluginRegistrations(pluginId: string): void { for (const [path, owner] of this.pluginMethodOwnership.entries()) { if (owner === pluginId) { this.methodProcedures.delete(path); this.methodSchemas.delete(path); this.methodResponseSchemas.delete(path); this.methodMeta.delete(path); this.pluginMethodOwnership.delete(path); } } for (const [path, route] of this.pluginRoutes.entries()) { if (route.pluginId === pluginId) { this.pluginRoutes.delete(path); } } } /** * Create a PluginContext scoped to a specific plugin. */ private createPluginContext(pluginId: string): PluginContext { return { gatewayId: this.config.id, config: this.configStore!, sendToSession: async (sessionKey: string, input: SendInput) => { return this.sendToSession(sessionKey, input); }, respondToConfirmation: async (sessionKey: string, callId: string, response) => { // Publish directly to channel — NOT through publishToChannel which // double-wraps in { type: "message" }. The confirmation coordinator // expects { type: "response", id, payload }. const session = await this.session(sessionKey); session.channel("tool_confirmation").publish({ type: "response", channel: "tool_confirmation", id: callId, payload: response, }); }, registerMethod: (path: string, handler: SimpleMethodHandler | MethodDefinition) => { if (BUILT_IN_METHOD_SCHEMAS.has(path)) { throw new Error(`Cannot override built-in method: ${path}`); } if (this.methodProcedures.has(path)) { throw new Error(`Method "${path}" is already registered`); } const isDefinition = isMethodDefinition(handler); const actualHandler = isDefinition ? (handler as MethodDefinition).handler : handler; const middleware: Middleware[] = []; if (isDefinition) { const def = handler as MethodDefinition; if (def.roles?.length) { middleware.push(createRoleGuardMiddleware(def.roles)); } if (def.guard) { middleware.push(createCustomGuardMiddleware(def.guard)); } // Store raw schemas for discovery if (def.schema) { this.methodSchemas.set(path, def.schema); } if (def.response) { this.methodResponseSchemas.set(path, def.response); } this.methodMeta.set(path, { description: def.description, roles: def.roles, }); } this.methodProcedures.set( path, createProcedure( { name: `gateway:${path}`, executionBoundary: "auto", schema: isDefinition ? ((handler as MethodDefinition).schema as any) : undefined, middleware: middleware.length > 0 ? middleware : undefined, metadata: { gatewayId: this.config.id, method: path, pluginId, ...(isDefinition && { description: (handler as MethodDefinition).description }), }, }, actualHandler as (...args: any[]) => any, ), ); this.pluginMethodOwnership.set(path, pluginId); }, unregisterMethod: (path: string) => { // Only allow unregistering own methods if (this.pluginMethodOwnership.get(path) !== pluginId) return; this.methodProcedures.delete(path); this.methodSchemas.delete(path); this.methodResponseSchemas.delete(path); this.methodMeta.delete(path); this.pluginMethodOwnership.delete(path); }, invoke: (method: string, params: unknown) => this.invokeMethod(method, params as Record), broadcast: (event: string, data: unknown) => { const subscribers = this.pluginSubscribers.get(pluginId); if (!subscribers?.size) return; const message: EventMessage = { type: "event", event: event as GatewayEventType, sessionId: `$plugin:${pluginId}`, data, }; for (const clientId of subscribers) { this.deliverToClient(clientId, message); } }, registerRoute: (path, handler, options) => { if (this.pluginRoutes.has(path)) { throw new Error(`Route "${path}" is already registered`); } this.pluginRoutes.set(path, { pluginId, auth: options?.auth !== false, absolute: options?.absolute ?? false, handler, }); }, unregisterRoute: (path) => { const route = this.pluginRoutes.get(path); if (route?.pluginId !== pluginId) return; this.pluginRoutes.delete(path); }, validateAuth: (token) => validateAuth(token, this.config.auth), listPlugins: () => this.listPlugins(), on: (event, handler) => this.on(event, handler), off: (event, handler) => this.off(event, handler), }; } /** * Match a request path against plugin routes. * Longest prefix wins. Returns undefined if no match. * * @param strippedPath - Path with httpPathPrefix removed (for relative routes) * @param originalPath - Original URL pathname (for absolute routes) */ private matchPluginRoute(strippedPath: string, originalPath: string) { // Sort by path length descending for longest-prefix match const sorted = [...this.pluginRoutes.entries()].sort((a, b) => b[0].length - a[0].length); for (const [routePath, route] of sorted) { // Absolute routes match against the original URL pathname const matchPath = route.absolute ? originalPath : strippedPath; if (matchPath === routePath || matchPath.startsWith(routePath + "/")) { return route; } } return undefined; } /** * Match, authenticate, and dispatch a plugin route. * Single entry point used by both handleRequest (embedded) and HTTP transport. * Returns true if the route was handled (even if auth failed — response already sent). * * When called from handleRequest (embedded), authResult is pre-validated. * When called from HTTP transport, authResult is not provided and auth is * performed inline. */ private async dispatchPluginRoute( path: string, req: import("http").IncomingMessage, res: import("http").ServerResponse, authResult?: AuthResult, ): Promise { const url = new URL(req.url ?? "/", `http://${req.headers.host}`); const route = this.matchPluginRoute(path, url.pathname); if (!route) return false; let user: UserContext | undefined; if (route.auth) { // Use pre-validated result from handleRequest, or validate inline // (HTTP transport calls this without a pre-validated result) const result = authResult ?? (await validateAuth(extractToken(req), this.config.auth)); if (!result.valid) { res.writeHead(401, { "Content-Type": "application/json", "WWW-Authenticate": wwwAuthenticateHeader(this.config.auth), }); res.end(JSON.stringify({ error: "Authentication failed" })); return true; } user = result.user; } // Ensure ALS context is set for the handler. On the standalone path // this may already be inside a Context.run() — Context.child // preserves whatever the outer caller set up (middleware, user, // traceId, etc.) and layers our request-level fields on top. const ctx = Context.child({ user: Context.tryGet()?.user ?? user, metadata: { ...Context.tryGet()?.metadata, gatewayId: this.config.id }, }); await Context.run(ctx, async () => { await route.handler(req, res); }); return true; } } // Type declaration for EventEmitter export interface Gateway { on(event: K, listener: (payload: GatewayEvents[K]) => void): this; emit(event: K, payload: GatewayEvents[K]): boolean; } /** * Create a gateway instance */ export function createGateway(config: GatewayConfig): Gateway { return new Gateway(config); }