#!/usr/bin/env node import http, { IncomingMessage, ServerResponse } from 'http' import yargs from 'yargs' import { hideBin } from 'yargs/helpers' import httpProxy from 'http-proxy' import { join, dirname } from 'path' import { readFileSync } from 'fs' import { spawn, ChildProcess } from 'child_process' import { fileURLToPath } from 'url' // ---------------------------------------- // Global Error Handlers // ---------------------------------------- process.on('uncaughtException', (err) => { console.error('[@supermachine/proxy] Uncaught Exception:', err) process.exit(1) }) process.on('unhandledRejection', (reason, promise) => { console.error('[@supermachine/proxy] Unhandled Rejection at:', promise, 'reason:', reason) process.exit(1) }) // ---------------------------------------- // ES module path utilities // ---------------------------------------- const __filename = fileURLToPath(import.meta.url) const __dirname = dirname(__filename) // ---------------------------------------- // Types // ---------------------------------------- type State = { routeType: RouteType port?: number machineId: string mcpServerId?: string baseUrl?: string command?: string } type McpServerState = { routeType: RouteType.MCP_SERVER mcpServerId: string baseUrl: string command: string } type MCPServer = { mcpServerId: string baseUrl: string command: string process: ChildProcess port: number } enum RouteType { MACHINE = 'MACHINE', MCP_SERVER = 'MCP_SERVER', } // ---------------------------------------- // Constants & Helpers // ---------------------------------------- const RETRY_TIMEOUT = 500 const RETRY_LIMIT = 120 * 5 // 5 minutes // Key-value map to track spawned processes const mcpServers: Record = {} // Start port allocation at 8001 let nextPort = 8001 // Parse the fly-replay-src header for key-value pairs function parseFlyReplaySrc(header: string): Record { const regex = /(.*?)=(.*?)($|;)/g const matches = header.matchAll(regex) const result: Record = {} for (const match of matches) { if (match.length >= 3) { const key = match[1].trim() const value = match[2].trim() result[key] = value } } return result } // Safely get version from package.json function getVersion(): string { try { const packageJsonPath = join(__dirname, '../package.json') const packageJson = JSON.parse(readFileSync(packageJsonPath, 'utf-8')) return packageJson.version || 'unknown' } catch (err) { console.error('[@supermachine/proxy] Unable to retrieve version:', err) return 'unknown' } } // ---------------------------------------- // Process Spawning // ---------------------------------------- function spawnProcess(state: McpServerState): number { const { mcpServerId, baseUrl, command } = state const existing = mcpServers[mcpServerId] if (existing) { // If the command is identical, reuse existing if (existing.command === command) { console.log( `[@supermachine/proxy] Reusing existing process for mcpServerId "${mcpServerId}" on port ${existing.port}` ) return existing.port } // Otherwise, we have a different command -> we will spawn a NEW child and then kill the old one console.log( `[@supermachine/proxy] Command changed for mcpServerId "${mcpServerId}". ` + `Old port=${existing.port}, new process will be assigned.` ) } else { console.log(`[@supermachine/proxy] Spawning a new process for mcpServerId "${mcpServerId}"...`) } // --- 1) Spawn the NEW process FIRST, so there's never a gap in the dictionary. --- const port = nextPort++ const fullCommand = command.replaceAll('{{supermachinePort}}', `${port}`) console.log( `[@supermachine/proxy] New process for mcpServerId="${mcpServerId}" -> "${fullCommand}" on port ${port}` ) const newChild = spawn(fullCommand, { shell: true, env: { ...process.env } }) // Put the new child in the dictionary immediately mcpServers[mcpServerId] = { mcpServerId, baseUrl, command, process: newChild, port, } // Hook up logging newChild.stdout.on('data', (data) => { process.stdout.write(`[@supermachine/process][${mcpServerId}][stdout]: ${data}`) }) newChild.stderr.on('data', (data) => { process.stderr.write(`[@supermachine/process][${mcpServerId}][stderr]: ${data}`) }) // This reference ensures the exit event only removes the record if it's still the same child const childRef = newChild newChild.on('exit', (code) => { console.log(`[@supermachine/process][${mcpServerId}] Process exited with code ${code}`) // Only remove if the dictionary still points to this child if (mcpServers[mcpServerId]?.process === childRef) { delete mcpServers[mcpServerId] console.log(`[@supermachine/process][${mcpServerId}] Removed from mcpServers after exit.`) } else { console.log(`[@supermachine/process][${mcpServerId}] Exit ignored: a newer process is in mcpServers.`) } }) // --- 2) If there's an old process with a different command, kill it AFTER the new one is set. --- if (existing && existing.command !== command) { console.log( `[@supermachine/proxy] Now killing old process on port=${existing.port} for mcpServerId="${mcpServerId}".` ) existing.process.kill() // Do NOT delete it from the dictionary here; its own exit event will remove it // only if it still references that old child. } return port } // ---------------------------------------- // Main entry // ---------------------------------------- async function main() { const version = getVersion() // CLI options const argv = yargs(hideBin(process.argv)) .option('port', { type: 'number', description: 'Port for the @supermachine/proxy server to listen on', default: 8000 }) .version() .help() .alias('help', 'h') .parseSync() const proxy = httpProxy.createProxyServer() /** * onError callback * - If connection is refused (new child not listening yet), we attempt up to RETRY_LIMIT times. * - If the `mcpServerId` has no entry in mcpServers, we fail immediately (“skipping retries”). */ function onMcpServerError({ target, req, res, mcpServerId, retry = 0 }: { target: string req: IncomingMessage res: ServerResponse mcpServerId: string retry?: number }) { return async (err: NodeJS.ErrnoException) => { if (err.code === 'ECONNREFUSED') { // If there's no process entry, fail immediately if (!mcpServers[mcpServerId]) { console.error( `[@supermachine/proxy] Process for mcpServerId="${mcpServerId}" has exited; skipping retries. ` + `Failing request now.` ) if (!res.headersSent) { res.writeHead(502, { 'Content-Type': 'text/plain' }) } return res.end('Bad Gateway - Process not running') } // Otherwise, keep retrying until RETRY_LIMIT or success if (retry < RETRY_LIMIT) { const nextRetry = retry + 1 console.log( `[@supermachine/proxy] ECONNREFUSED -> Retry ${nextRetry}/${RETRY_LIMIT} in ${RETRY_TIMEOUT}ms.` ) await new Promise((resolve) => setTimeout(resolve, RETRY_TIMEOUT)) proxy.web(req, res, { target }, onMcpServerError({ target, req, res, mcpServerId, retry: nextRetry })) } else { console.error( `[@supermachine/proxy] Exceeded retry limit -> giving up on ${target}${req.url}` ) if (!res.headersSent) { res.writeHead(502, { 'Content-Type': 'text/plain' }) } res.end('Bad Gateway') } } else { // Some other proxy error: fail immediately console.error('[@supermachine/proxy] Proxy error:', err) if (!res.headersSent) { res.writeHead(502, { 'Content-Type': 'text/plain' }) } res.end('Bad Gateway') } } } function onMachineError({ target, req, res }: { target: string req: IncomingMessage res: ServerResponse }) { return async (err: NodeJS.ErrnoException) => { console.error(`[@supermachine/proxy] Proxy error ${target}${req.url}:`, err) if (!res.headersSent) { res.writeHead(502, { 'Content-Type': 'text/plain' }) } res.end('Bad Gateway') } } // Create the main server const server = http.createServer((req, res) => { const headerKey = 'fly-replay-src' const headerValue = req.headers[headerKey.toLowerCase()] if (!headerValue || typeof headerValue !== 'string') { console.error( `[@supermachine/proxy] Missing or invalid "${headerKey}" header:`, headerValue ) res.writeHead(400, { 'Content-Type': 'text/plain' }) return res.end(`Missing or invalid "${headerKey}" header`) } try { const parsedHeader = parseFlyReplaySrc(headerValue) const stateStr = parsedHeader['state'] if (!stateStr) { console.error( '[@supermachine/proxy] Missing "state" in fly-replay-src header:', parsedHeader ) res.writeHead(400, { 'Content-Type': 'text/plain' }) return res.end('Missing "state" in fly-replay-src header') } const decodedStateStr = decodeURIComponent(stateStr) const state: State = JSON.parse(decodedStateStr) console.error(`[@supermachine/proxy] Route type: ${state.routeType}`) if (state.routeType === RouteType.MACHINE) { if (!state.port) { console.error( '[@supermachine/proxy] Missing required field "port" in state object:', state ) res.writeHead(400, { 'Content-Type': 'text/plain' }) return res.end('Missing required field "port" in state object') } const target = `http://localhost:${state.port}` console.log(`[@supermachine/proxy] Routing to: ${target}${req.url}`) proxy.web(req, res, { target }, onMachineError({ target, req, res })) } else if (state.routeType === RouteType.MCP_SERVER) { const { mcpServerId, baseUrl, command } = state if (!mcpServerId || !baseUrl || !command) { console.error( '[@supermachine/proxy] Missing required fields in state object:', state ) res.writeHead(400, { 'Content-Type': 'text/plain' }) return res.end('Missing required fields in state object') } // Spawn or reuse process const port = spawnProcess(state as McpServerState) const target = `http://localhost:${port}` console.log(`[@supermachine/proxy] Routing to: ${target}${req.url}`) proxy.web(req, res, { target }, onMcpServerError({ target, req, res, mcpServerId })) } else { console.error('[@supermachine/proxy] Unknown route type:', state.routeType) res.writeHead(400, { 'Content-Type': 'text/plain' }) res.end('Unknown route type') } } catch (err: any) { console.error('[@supermachine/proxy] Error processing request:', err) res.writeHead(500, { 'Content-Type': 'text/plain' }) res.end('Internal Server Error') } }) // Start listening server.listen(argv.port, '0.0.0.0', () => { console.error(`[@supermachine/proxy] Version: ${version}`) console.error(`[@supermachine/proxy] Server listening on port ${argv.port}`) console.error(`[@supermachine/proxy] Using header: "fly-replay-src"`) console.error(`[@supermachine/proxy] Next available port for child processes: ${nextPort}`) }) // On graceful shutdown, kill all child processes process.on('SIGTERM', () => { console.error('[@supermachine/proxy] Shutting down...') for (const { process: childProc, mcpServerId } of Object.values(mcpServers)) { console.log( `[@supermachine/proxy] Terminating process for mcpServerId="${mcpServerId}" (pid=${childProc.pid})` ) childProc.kill() } server.close(() => process.exit(0)) }) } main().catch((err: any) => { console.error('[@supermachine/proxy] Fatal error:', err) process.exit(1) })