/** * Agent worker entry point — runs inside a Podman container. * Communicates with the Supervisor via stdin/stdout JSON lines. * * stdout is RESERVED for the JSON protocol — all console output is * redirected to stderr so it never corrupts the message stream. */ import { Console } from "node:console"; globalThis.console = new Console({ stdout: process.stderr, stderr: process.stderr }) as unknown as typeof console; import { createOpenRouter } from "@openrouter/ai-sdk-provider"; import { Agent } from "./agent.ts"; import type { AgentConfig } from "./parser.ts"; import type { PeerInfo, SupervisorMessage, WorkerMessage } from "./protocol.ts"; import { MGMT_TIMEOUT_MS, ROUTE_TIMEOUT_MS } from "./protocol.ts"; import { toErrorMessage } from "./types.ts"; const pendingRequests = new Map< string, { resolve: (response: any) => void; reject: (err: Error) => void; } >(); function send(msg: WorkerMessage): void { process.stdout.write(`${JSON.stringify(msg)}\n`); } function requestSupervisor( type: WorkerMessage["type"], payload: Record, timeoutMs = ROUTE_TIMEOUT_MS, ): Promise { const requestId = crypto.randomUUID(); process.stdout.write(`${JSON.stringify({ type, requestId, ...payload })}\n`); return new Promise((resolve, reject) => { pendingRequests.set(requestId, { resolve, reject }); setTimeout(() => { if (pendingRequests.has(requestId)) { pendingRequests.delete(requestId); reject(new Error(`${type} timed out after ${timeoutMs / 1000}s`)); } }, timeoutMs); }); } let agent: Agent; let cachedPeers: PeerInfo[] = []; async function main() { const configJson = process.env.AGENT_CONFIG; const apiKey = process.env.API_KEY; if (!configJson || !apiKey) { send({ type: "error", error: "AGENT_CONFIG and API_KEY environment variables are required" }); process.exit(1); } let config: AgentConfig; try { config = JSON.parse(configJson) as AgentConfig; } catch (err) { send({ type: "error", error: `Failed to parse AGENT_CONFIG: ${err}` }); process.exit(1); } const openrouter = createOpenRouter({ apiKey }); const model = openrouter(config.model); agent = new Agent(config, model); agent.setRouter(async (_fromId, toId, message) => { return requestSupervisor("route_request", { toId, message }) as Promise; }); agent.setBackgroundEventSink((source, conversationId, event) => { if (event) { send({ type: "bg_event", source, conversationId, event }); } else { send({ type: "bg_done", source, conversationId }); } }); agent.setAgentManager({ listAgents: () => requestSupervisor("mgmt_request", { action: "list" }) as any, stopAgent: (id) => requestSupervisor("mgmt_request", { action: "stop", targetId: id }) as any, startAgent: async (id) => { return requestSupervisor("mgmt_request", { action: "start", targetId: id }); }, removeAgent: (id) => requestSupervisor("mgmt_request", { action: "remove", targetId: id }) as any, spawnAgent: async (name, content, _apiKey) => { return requestSupervisor("mgmt_request", { action: "spawn", name, content }); }, getApiKey: () => apiKey, }); agent.setPeersProvider(() => cachedPeers); try { await agent.initialize(); send({ type: "ready" }); } catch (err) { send({ type: "error", error: `Failed to initialize: ${err}` }); process.exit(1); } startStdinReader(); try { const agents = (await requestSupervisor("mgmt_request", { action: "list" }, MGMT_TIMEOUT_MS)) as Array<{ id: string; model: string; identity?: string; }>; cachedPeers = agents .filter((a) => a.id !== agent.id) .map((a) => ({ id: a.id, model: a.model, identity: a.identity ?? "" })); } catch { // Best-effort — agent still works without a roster } } /** * Stdin reader runs as an independent loop so that response messages * (route_response, mgmt_response) can be received while a chat handler * is still executing tools that await those responses. */ function startStdinReader(): void { const reader = Bun.stdin.stream().getReader(); const decoder = new TextDecoder(); let buffer = ""; (async () => { while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split("\n"); buffer = lines.pop()!; for (const line of lines) { if (!line.trim()) continue; try { const msg = JSON.parse(line) as SupervisorMessage; dispatch(msg); } catch (err) { send({ type: "error", error: `Failed to parse message: ${err}` }); } } } })(); } /** * Dispatch incoming messages. Response types resolve pending promises * synchronously. Command types (chat, route) are fired off without * blocking the stdin reader so responses can still be received. */ function dispatch(msg: SupervisorMessage): void { switch (msg.type) { case "route_response": case "mgmt_response": { const pending = pendingRequests.get(msg.requestId); if (pending) { pendingRequests.delete(msg.requestId); const error = "error" in msg ? msg.error : undefined; if (error) { pending.reject(new Error(error)); } else { pending.resolve("response" in msg ? msg.response : undefined); } } break; } case "chat": send({ type: "ack", messageId: msg.messageId }); handleChat(msg); break; case "route": send({ type: "ack", messageId: msg.messageId }); handleRoute(msg); break; case "query": handleQuery(msg); break; case "peers_update": cachedPeers = msg.peers; break; case "shutdown": agent.shutdown(); process.exit(0); } } async function handleQuery(msg: { requestId: string; action: string; params?: Record; }): Promise { try { let data: unknown; switch (msg.action) { case "get_history": data = { messages: agent.memory.getRecentConversations(100), conversationId: agent.memory.getLastUserConversationId(), }; break; case "get_schedules": data = agent.memory.getSchedules(); break; case "delete_schedule": { const scheduleId = msg.params?.scheduleId as string; if (!scheduleId) throw new Error("scheduleId is required"); data = { ok: agent.memory.deleteSchedule(scheduleId) }; break; } case "get_message_count": data = { count: agent.memory.getMessageCount() }; break; case "get_skills": { data = agent.getSkillsManager().listInstalled().map((s) => ({ name: s.name, description: s.description })); break; } default: throw new Error(`Unknown query action: ${msg.action}`); } send({ type: "query_response", requestId: msg.requestId, data }); } catch (err) { send({ type: "query_response", requestId: msg.requestId, error: toErrorMessage(err) }); } } async function handleChat(msg: { messageId: string; fromId: string; message: string; conversationId: string }): Promise { const { conversationId, messageId } = msg; try { for await (const event of agent.handleMessage(msg.fromId, msg.message, conversationId)) { send({ type: "event", conversationId, event }); } send({ type: "chat_done", conversationId, messageId }); } catch (err) { send({ type: "event", conversationId, event: { type: "error", error: toErrorMessage(err) } }); send({ type: "chat_done", conversationId, messageId }); } } async function handleRoute(msg: { messageId: string; fromId: string; message: string; requestId: string }): Promise { const { messageId } = msg; const conversationId = `${msg.fromId}->${agent.id}`; const source = `route:${msg.fromId}`; let fullResponse = ""; try { for await (const event of agent.handleMessage(msg.fromId, msg.message, conversationId)) { send({ type: "bg_event", source, conversationId, event }); if (event.type === "text" && event.text) fullResponse += event.text; if (event.type === "error") { send({ type: "bg_done", source, conversationId }); send({ type: "route_done", requestId: msg.requestId, response: `Error: ${event.error}`, messageId }); return; } } } catch (err) { fullResponse = `Error: ${err}`; } send({ type: "bg_done", source, conversationId }); send({ type: "route_done", requestId: msg.requestId, response: fullResponse, messageId }); } main().catch((err) => { send({ type: "error", error: `Worker crashed: ${err}` }); process.exit(1); });