import express from 'express' import bodyParser from 'body-parser' import cors, { type CorsOptions } from 'cors' import { spawn, ChildProcessWithoutNullStreams } from 'child_process' import { Server } from '@modelcontextprotocol/sdk/server/index.js' import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js' import { JSONRPCMessage } from '@modelcontextprotocol/sdk/types.js' import { Logger } from '../types.js' import { getVersion } from '../lib/getVersion.js' import { onSignals } from '../lib/onSignals.js' import { serializeCorsOrigin } from '../lib/serializeCorsOrigin.js' export interface StdioToSseArgs { stdioCmd: string port: number baseUrl: string ssePath: string messagePath: string logger: Logger corsOrigin: CorsOptions['origin'] healthEndpoints: string[] headers: Record } const setResponseHeaders = ({ res, headers, }: { res: express.Response headers: Record }) => Object.entries(headers).forEach(([key, value]) => { res.setHeader(key, value) }) export async function stdioToSse(args: StdioToSseArgs) { const { stdioCmd, port, baseUrl, ssePath, messagePath, logger, corsOrigin, healthEndpoints, headers, } = args logger.info( ` - Headers: ${Object(headers).length ? JSON.stringify(headers) : '(none)'}`, ) logger.info(` - port: ${port}`) logger.info(` - stdio: ${stdioCmd}`) if (baseUrl) { logger.info(` - baseUrl: ${baseUrl}`) } logger.info(` - ssePath: ${ssePath}`) logger.info(` - messagePath: ${messagePath}`) logger.info( ` - CORS: ${corsOrigin ? `enabled (${serializeCorsOrigin({ corsOrigin })})` : 'disabled'}`, ) logger.info( ` - Health endpoints: ${healthEndpoints.length ? healthEndpoints.join(', ') : '(none)'}`, ) onSignals({ logger }) const child: ChildProcessWithoutNullStreams = spawn(stdioCmd, { shell: true }) child.on('exit', (code, signal) => { logger.error(`Child exited: code=${code}, signal=${signal}`) process.exit(code ?? 1) }) const server = new Server( { name: 'supergateway', version: getVersion() }, { capabilities: {} }, ) const sessions: Record< string, { transport: SSEServerTransport; response: express.Response } > = {} const app = express() if (corsOrigin) { app.use(cors({ origin: corsOrigin })) } app.use((req, res, next) => { if (req.path === messagePath) return next() return bodyParser.json()(req, res, next) }) for (const ep of healthEndpoints) { app.get(ep, (_req, res) => { setResponseHeaders({ res, headers, }) res.send('ok') }) } app.get(ssePath, async (req, res) => { logger.info(`New SSE connection from ${req.ip}`) setResponseHeaders({ res, headers, }) const sseTransport = new SSEServerTransport(`${baseUrl}${messagePath}`, res) await server.connect(sseTransport) const sessionId = sseTransport.sessionId if (sessionId) { sessions[sessionId] = { transport: sseTransport, response: res } } sseTransport.onmessage = (msg: JSONRPCMessage) => { logger.info(`SSE → Child (session ${sessionId}): ${JSON.stringify(msg)}`) child.stdin.write(JSON.stringify(msg) + '\n') } sseTransport.onclose = () => { logger.info(`SSE connection closed (session ${sessionId})`) delete sessions[sessionId] } sseTransport.onerror = (err) => { logger.error(`SSE error (session ${sessionId}):`, err) delete sessions[sessionId] } req.on('close', () => { logger.info(`Client disconnected (session ${sessionId})`) delete sessions[sessionId] }) }) // @ts-ignore app.post(messagePath, async (req, res) => { const sessionId = req.query.sessionId as string setResponseHeaders({ res, headers, }) if (!sessionId) { return res.status(400).send('Missing sessionId parameter') } const session = sessions[sessionId] if (session?.transport?.handlePostMessage) { logger.info(`POST to SSE transport (session ${sessionId})`) await session.transport.handlePostMessage(req, res) } else { res.status(503).send(`No active SSE connection for session ${sessionId}`) } }) app.listen(port, () => { logger.info(`Listening on port ${port}`) logger.info(`SSE endpoint: http://localhost:${port}${ssePath}`) logger.info(`POST messages: http://localhost:${port}${messagePath}`) }) let buffer = '' child.stdout.on('data', (chunk: Buffer) => { buffer += chunk.toString('utf8') const lines = buffer.split(/\r?\n/) buffer = lines.pop() ?? '' lines.forEach((line) => { if (!line.trim()) return try { const jsonMsg = JSON.parse(line) logger.info('Child → SSE:', jsonMsg) for (const [sid, session] of Object.entries(sessions)) { try { session.transport.send(jsonMsg) } catch (err) { logger.error(`Failed to send to session ${sid}:`, err) delete sessions[sid] } } } catch { logger.error(`Child non-JSON: ${line}`) } }) }) child.stderr.on('data', (chunk: Buffer) => { logger.error(`Child stderr: ${chunk.toString('utf8')}`) }) }