import type { McpServerConfig, McpWrapperTimeouts } from "./config.ts"; import type { McpServerToolList, McpToolSummary, PiToolRoute, } from "./tool-catalog.ts"; const MILLISECONDS_PER_SECOND = 1_000; export interface McpRequestOptions { readonly signal?: AbortSignal; readonly timeout?: number; readonly maxTotalTimeout?: number; } export interface McpClientLike { connect(options?: McpRequestOptions): Promise; listTools( params?: { readonly cursor?: string }, options?: McpRequestOptions, ): Promise<{ readonly tools: Array<{ readonly name: string; readonly description?: string | undefined; readonly inputSchema: unknown; }>; readonly nextCursor?: string | undefined; }>; callTool( params: { readonly name: string; readonly arguments: Record; }, options?: McpRequestOptions, ): Promise; getInstructions(): string | undefined; close(): Promise; } interface McpClientManagerOptions { readonly createClient: ( serverKey: string, config: McpServerConfig, ) => McpClientLike; readonly timeouts: McpWrapperTimeouts; } interface McpConnection { readonly client: McpClientLike; readonly config: McpServerConfig; } interface ServerFailure { readonly serverKey: string; readonly issue: string; } export interface ServerInstructions { readonly serverKey: string; readonly instructions: string; } type ServerDiscoveryResult = | { readonly kind: "valid"; readonly serverToolList: McpServerToolList; readonly serverInstructions?: ServerInstructions; } | { readonly kind: "failure"; readonly failure: ServerFailure }; /** Manages MCP clients, tool discovery, routing, and timeout cleanup. */ export class McpClientManager { private readonly connections = new Map(); private readonly connectPromises = new Map>(); private readonly connectingClients = new Map(); private readonly createClient: McpClientManagerOptions["createClient"]; private readonly timeouts: McpWrapperTimeouts; private closed = false; constructor(options: McpClientManagerOptions) { this.createClient = options.createClient; this.timeouts = options.timeouts; } async discoverServers( servers: Readonly>, ): Promise<{ readonly serverToolLists: readonly McpServerToolList[]; readonly serverInstructions: readonly ServerInstructions[]; readonly failures: readonly ServerFailure[]; }> { const results = await Promise.all( Object.entries(servers).map(([serverKey, config]) => this.discoverServer(serverKey, config), ), ); return { serverToolLists: results.flatMap((result) => result.kind === "valid" ? [result.serverToolList] : [], ), serverInstructions: results.flatMap((result) => result.kind === "valid" && result.serverInstructions !== undefined ? [result.serverInstructions] : [], ), failures: results.flatMap((result) => result.kind === "failure" ? [result.failure] : [], ), }; } async getConnection( serverKey: string, config: McpServerConfig, ): Promise { return (await this.getOrCreateConnection(serverKey, config)).client; } async closeAll(): Promise { this.closed = true; const clients = [ ...[...this.connections.values()].map((connection) => connection.client), ...this.connectingClients.values(), ]; this.connections.clear(); this.connectingClients.clear(); this.connectPromises.clear(); await Promise.all(clients.map((client) => client.close().catch(() => {}))); } async callTool( route: PiToolRoute, config: McpServerConfig, args: Record, ): Promise { const connection = await this.getOrCreateConnection( route.serverKey, config, ); try { return await withAbortTimeout( this.timeouts.callSeconds, (signal) => connection.client.callTool( { name: route.mcpToolName, arguments: args }, { signal, timeout: secondsToMilliseconds(this.timeouts.callSeconds), maxTotalTimeout: secondsToMilliseconds( this.timeouts.maxTotalSeconds, ), }, ), async () => { await this.closeConnection(route.serverKey, connection); }, ); } catch (error) { await this.closeConnection(route.serverKey, connection); throw error; } } private async discoverServer( serverKey: string, config: McpServerConfig, ): Promise { try { const connection = await this.getOrCreateConnection(serverKey, config); const instructions = connection.client.getInstructions(); return { kind: "valid", serverToolList: { serverKey, tools: await this.fetchAllTools(connection.client), }, ...(instructions !== undefined && instructions.trim().length > 0 ? { serverInstructions: { serverKey, instructions } } : {}), }; } catch (error) { await this.closeStoredConnection(serverKey); return { kind: "failure", failure: { serverKey, issue: formatError(error) }, }; } } private async getOrCreateConnection( serverKey: string, config: McpServerConfig, ): Promise { this.ensureOpen(); const existing = this.connections.get(serverKey); if (existing !== undefined) { return existing; } const pending = this.connectPromises.get(serverKey); if (pending !== undefined) { return pending; } const promise = this.createConnection(serverKey, config); this.connectPromises.set(serverKey, promise); try { const connection = await promise; if (this.closed) { await connection.client.close().catch(() => {}); throw new Error("MCP client manager is closed"); } this.connections.set(serverKey, connection); return connection; } finally { this.connectPromises.delete(serverKey); } } private async createConnection( serverKey: string, config: McpServerConfig, ): Promise { this.ensureOpen(); const client = this.createClient(serverKey, config); this.connectingClients.set(serverKey, client); try { await withAbortTimeout( this.timeouts.startupSeconds, (signal) => client.connect({ signal, timeout: secondsToMilliseconds(this.timeouts.startupSeconds), }), async () => { await client.close(); }, ); return { client, config }; } catch (error) { await client.close().catch(() => {}); throw error; } finally { this.connectingClients.delete(serverKey); } } private async fetchAllTools( client: McpClientLike, ): Promise { return this.fetchToolPage(client, undefined, []); } private async fetchToolPage( client: McpClientLike, cursor: string | undefined, collected: readonly McpToolSummary[], ): Promise { const page = await withAbortTimeout( this.timeouts.listToolsSeconds, (signal) => client.listTools(cursor === undefined ? undefined : { cursor }, { signal, timeout: secondsToMilliseconds(this.timeouts.listToolsSeconds), maxTotalTimeout: secondsToMilliseconds(this.timeouts.maxTotalSeconds), }), async () => {}, ); const tools = [...collected, ...page.tools]; return page.nextCursor === undefined ? tools : this.fetchToolPage(client, page.nextCursor, tools); } private ensureOpen(): void { if (this.closed) { throw new Error("MCP client manager is closed"); } } private async closeStoredConnection(serverKey: string): Promise { const connection = this.connections.get(serverKey); if (connection === undefined) { return; } await this.closeConnection(serverKey, connection); } private async closeConnection( serverKey: string, connection: McpConnection, ): Promise { if (this.connections.get(serverKey) !== connection) { return; } this.connections.delete(serverKey); await connection.client.close().catch(() => {}); } } async function withAbortTimeout( seconds: number, operation: (signal: AbortSignal) => Promise, onTimeout: () => Promise, ): Promise { const controller = new AbortController(); let timedOut = false; const timeout = setTimeout(() => { timedOut = true; controller.abort(); }, secondsToMilliseconds(seconds)); try { return await operation(controller.signal); } catch (error) { if (timedOut) { await onTimeout(); } throw error; } finally { clearTimeout(timeout); } } function secondsToMilliseconds(seconds: number): number { return seconds * MILLISECONDS_PER_SECOND; } function formatError(error: unknown): string { return error instanceof Error ? error.message : String(error); }