import { existsSync, mkdirSync, readFileSync, unlinkSync, writeFileSync } from "node:fs"; import { basename, extname, join, resolve } from "node:path"; import { Hono } from "hono"; import { cors } from "hono/cors"; import type { Subscription } from "./agent/mailbox.ts"; import { getMozartDir, getPidPath } from "./agent/paths.ts"; import { type StreamEvent, toErrorMessage } from "./agent/types.ts"; import { PROJECT_ROOT } from "./root.ts"; import { Supervisor } from "./supervisor.ts"; import type { ContainerAgent } from "./types.ts"; export const PORT = 4141; let supervisor: Supervisor; let daemonReady = false; let readyResolve: () => void; const readyPromise = new Promise((r) => { readyResolve = r; }); type Env = { Variables: { agent: ContainerAgent; agentId: string } }; const SSE_HEADERS = { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", Connection: "keep-alive", } as const; function sseStream(generator: AsyncGenerator): Response { const stream = new ReadableStream({ async start(controller) { const encoder = new TextEncoder(); try { for await (const event of generator) { controller.enqueue(encoder.encode(`data: ${JSON.stringify(event)}\n\n`)); } controller.enqueue(encoder.encode(`data: [DONE]\n\n`)); } catch (err) { const errEvent = JSON.stringify({ type: "error", error: toErrorMessage(err) }); controller.enqueue(encoder.encode(`data: ${errEvent}\n\n`)); } finally { controller.close(); } }, }); return new Response(stream, { headers: SSE_HEADERS }); } function subscriptionStream( sub: Subscription, transform: (item: T) => string | null, initialData?: string, ): Response { const encoder = new TextEncoder(); let cancelled = false; const stream = new ReadableStream({ async start(controller) { if (initialData) controller.enqueue(encoder.encode(`data: ${initialData}\n\n`)); const keepalive = setInterval(() => { if (cancelled) return; try { controller.enqueue(encoder.encode(`data: {"type":"ping"}\n\n`)); } catch { cancelled = true; } }, 30_000); try { for await (const item of sub) { if (cancelled) break; const data = transform(item); if (data === null) continue; try { controller.enqueue(encoder.encode(`data: ${data}\n\n`)); } catch { cancelled = true; break; } } } finally { clearInterval(keepalive); controller.close(); } }, cancel() { cancelled = true; sub.unsubscribe(); }, }); return new Response(stream, { headers: SSE_HEADERS }); } const MAX_REQUEST_BODY = 1024 * 1024; // --- Static file serving --- let cachedUiDist: string | null | undefined; function findUiDist(): string | null { if (cachedUiDist !== undefined) return cachedUiDist; for (const p of [join(PROJECT_ROOT, "ui", "dist"), resolve("ui", "dist")]) { if (existsSync(join(p, "index.html"))) { cachedUiDist = p; return p; } } cachedUiDist = null; return null; } const MIME_TYPES: Record = { ".html": "text/html", ".js": "text/javascript", ".css": "text/css", ".svg": "image/svg+xml", ".png": "image/png", ".ico": "image/x-icon", ".json": "application/json", ".woff": "font/woff", ".woff2": "font/woff2", }; function serveUi(path: string): Response { const dist = findUiDist(); if (!dist) { return new Response( "

Mozart

Chat UI not found. Run bun run build:ui.

", { headers: { "Content-Type": "text/html" } }, ); } if (path !== "/") { const filePath = join(dist, path); if (existsSync(filePath)) { const ext = extname(filePath); return new Response(Bun.file(filePath), { headers: { "Content-Type": MIME_TYPES[ext] ?? "application/octet-stream", ...(filePath.includes("/assets/") ? { "Cache-Control": "public, max-age=31536000, immutable" } : {}), }, }); } } return new Response(Bun.file(join(dist, "index.html")), { headers: { "Content-Type": "text/html" }, }); } // --- Hono app --- export function createApp(sv: Supervisor, ready: Promise): Hono { const app = new Hono(); app.use("*", cors()); app.use("*", async (c, next) => { const contentLength = c.req.header("content-length"); if (contentLength && parseInt(contentLength, 10) > MAX_REQUEST_BODY) { if (!c.req.path.endsWith("/unseal")) { return c.json({ error: "Request body too large" }, 413); } } await next(); }); app.onError((err, c) => c.json({ error: toErrorMessage(err) }, 500)); // --- Top-level routes --- app.get("/health", (c) => c.json({ status: "ok", agents: sv.listAgents().length, ready: daemonReady })); app.get("/api/agents", (c) => c.json(sv.listAgents())); app.get("/api/agents/stream", (_c) => { return subscriptionStream( sv.subscribeAgentList(), (agents) => JSON.stringify(agents), JSON.stringify(sv.listAgents()), ); }); app.post("/api/agents", async (c) => { await ready; const body = await c.req.json<{ soulfilePath: string }>(); if (!body.soulfilePath || typeof body.soulfilePath !== "string") return c.json({ error: "soulfilePath is required" }, 400); const agentId = basename(resolve(body.soulfilePath)).replace(/\.soul$/, ""); if (sv.getAgent(agentId)) { return c.json(sv.getAgentInfo(agentId), 200); } const container = await sv.register(body.soulfilePath); return c.json(sv.getAgentInfo(container.id), 201); }); app.delete("/api/agents", (c) => { sv.removeAll(); return c.json({ ok: true }); }); app.post("/api/agents/unseal", async (c) => { await ready; try { const formData = await c.req.formData(); const file = formData.get("file") as File | null; const newName = formData.get("name") as string | null; if (!file) return c.json({ error: "file is required" }, 400); const tmpPath = join(getMozartDir(), `tmp-unseal-${Date.now()}.horcrux`); const bytes = await file.arrayBuffer(); writeFileSync(tmpPath, Buffer.from(bytes)); try { const container = await sv.unseal(tmpPath, newName ?? undefined); return c.json( sv.getAgentInfo(container.id) ?? { id: container.id, state: container.state, model: container.config.model }, 201, ); } finally { try { unlinkSync(tmpPath); } catch {} } } catch (err) { sv.log("daemon", `Unseal failed: ${toErrorMessage(err)}`); return c.json({ error: toErrorMessage(err) }, 400); } }); // --- Per-agent routes (`:id` param) --- const agents = new Hono(); agents.use("*", async (c, next) => { const id = decodeURIComponent(c.req.param("id")!); c.set("agentId", id); const agent = sv.getAgent(id); if (agent) c.set("agent", agent); await next(); }); const requireAgent = (c: { var: Env["Variables"]; json: (data: unknown, status?: number) => Response }) => { if (!c.var.agent) return c.json({ error: "Agent not found" }, 404); return null; }; agents.delete("/", async (c) => { const removed = await sv.remove(c.var.agentId); return removed ? c.json({ ok: true }) : c.json({ error: "Agent not found" }, 404); }); agents.post("/stop", (c) => { if (!sv.stop(c.var.agentId)) return c.json({ error: "Agent not found" }, 404); return c.json(sv.getAgentInfo(c.var.agentId)); }); agents.post("/start", async (c) => { try { await sv.restart(c.var.agentId); return c.json(sv.getAgentInfo(c.var.agentId)); } catch (err) { return c.json({ error: toErrorMessage(err) }, 400); } }); agents.post("/messages", async (c) => { const guard = requireAgent(c); if (guard) return guard; if (c.var.agent.state !== "running") return c.json({ error: `Agent is not ready (state: ${c.var.agent.state})` }, 503); try { const body = await c.req.json<{ content: string; conversationId?: string }>(); if (!body.content || typeof body.content !== "string") return c.json({ error: "content is required" }, 400); const convId = body.conversationId ?? crypto.randomUUID(); return sseStream(sv.streamChat(c.var.agentId, "user", body.content, convId)); } catch (err) { sv.onAgentError(c.var.agentId, err instanceof Error ? err : new Error(String(err))); return c.json({ error: toErrorMessage(err) }, 500); } }); agents.get("/history", async (c) => { if (!sv.hasAgent(c.var.agentId)) return c.json({ error: "Agent not found" }, 404); try { return c.json(await sv.queryAgent(c.var.agentId, "get_history")); } catch (err) { return c.json({ error: toErrorMessage(err) }, 503); } }); agents.get("/schedules", async (c) => { if (!sv.hasAgent(c.var.agentId)) return c.json({ error: "Agent not found" }, 404); try { return c.json(await sv.queryAgent(c.var.agentId, "get_schedules")); } catch (err) { return c.json({ error: toErrorMessage(err) }, 503); } }); agents.delete("/schedules/:scheduleId", async (c) => { if (!sv.hasAgent(c.var.agentId)) return c.json({ error: "Agent not found" }, 404); const scheduleId = c.req.param("scheduleId")!; try { const result = (await sv.queryAgent(c.var.agentId, "delete_schedule", { scheduleId })) as { ok: boolean }; return result.ok ? c.json({ ok: true }) : c.json({ error: "Schedule not found" }, 404); } catch (err) { return c.json({ error: toErrorMessage(err) }, 503); } }); agents.get("/config", (c) => { const config = sv.getAgentConfig(c.var.agentId); return config ? c.json(config) : c.json({ error: "Agent not found" }, 404); }); agents.get("/skills", async (c) => { if (!sv.hasAgent(c.var.agentId)) return c.json({ error: "Agent not found" }, 404); try { return c.json(await sv.queryAgent(c.var.agentId, "get_skills")); } catch (err) { return c.json({ error: toErrorMessage(err) }, 503); } }); agents.get("/events", (c) => { const guard = requireAgent(c); if (guard) return guard; return subscriptionStream(c.var.agent.events.subscribe(), (msg) => { if (msg.kind === "bg") { return JSON.stringify({ source: msg.source, conversationId: msg.conversationId, event: msg.event }); } if (msg.kind === "bg_done") { return JSON.stringify({ source: msg.source, conversationId: msg.conversationId, done: true }); } return null; }); }); agents.post("/talk", async (c) => { const guard = requireAgent(c); if (guard) return guard; const body = await c.req.json<{ fromId: string; message: string; conversationId?: string }>(); const convId = body.conversationId ?? crypto.randomUUID(); let fullResponse = ""; for await (const event of sv.streamChat(c.var.agentId, body.fromId, body.message, convId)) { if (event.type === "text" && event.text) fullResponse += event.text; if (event.type === "error") return c.json({ error: event.error }, 500); } return c.json({ response: fullResponse }); }); agents.post("/intro", async (c) => { const guard = requireAgent(c); if (guard) return guard; if (c.var.agent.state !== "running") return c.json({ error: `Agent is not ready (state: ${c.var.agent.state})` }, 503); try { const countResult = (await sv.queryAgent(c.var.agentId, "get_message_count")) as { count: number }; if (countResult.count > 0) return c.json({ error: "Agent already has messages" }, 409); } catch (err) { return c.json({ error: toErrorMessage(err) }, 503); } let convId: string; try { const body = await c.req.json<{ conversationId?: string }>(); convId = body.conversationId ?? `user:${c.var.agentId}`; } catch { convId = `user:${c.var.agentId}`; } const prompt = `You are ${c.var.agentId}. Say hi and introduce yourself in a brief, conversational way. Explain who you are and what you can help with based on your identity. Do not mention or describe other agents. Do not output any code or file examples.\n\nAfter your introduction, suggest up to 3 of the most impactful things the user could ask you to do as a plain markdown bullet list (no bold text). Each item should be a complete, actionable question or request.`; return sseStream(sv.streamChat(c.var.agentId, "user", prompt, convId)); }); agents.post("/seal", async (c) => { try { const result = await sv.seal(c.var.agentId); const file = Bun.file(result.path); return new Response(file.stream(), { headers: { "Content-Type": "application/octet-stream", "Content-Disposition": `attachment; filename="${c.var.agentId}.horcrux"`, }, }); } catch (err) { return c.json({ error: toErrorMessage(err) }, 400); } }); app.route("/api/agents/:id", agents); // SPA fallback for non-API GET requests app.get("*", (c) => serveUi(new URL(c.req.url).pathname)); return app; } // --- Daemon lifecycle (unchanged) --- declare const globalThis: typeof global & { __mozartSupervisor?: Supervisor }; export function startDaemon(): void { mkdirSync(getMozartDir(), { recursive: true }); if (globalThis.__mozartSupervisor) { globalThis.__mozartSupervisor.decommission(); } const apiKey = process.env.OPENROUTER_API_KEY ?? ""; supervisor = new Supervisor(apiKey); globalThis.__mozartSupervisor = supervisor; writeFileSync(getPidPath(), String(process.pid)); const app = createApp(supervisor, readyPromise); const server = Bun.serve({ port: PORT, hostname: "localhost", fetch: app.fetch, error(err) { return new Response(JSON.stringify({ error: toErrorMessage(err) }), { status: 500, headers: { "Content-Type": "application/json" }, }); }, maxRequestBodySize: 4 * 1024 * 1024 * 1024, idleTimeout: 255, }); supervisor.log("daemon", `Mozart daemon started on http://localhost:${PORT} (PID: ${process.pid})`); (async () => { try { await supervisor.restoreAll(); } catch (err) { supervisor.log("daemon", `Failed to restore containers: ${toErrorMessage(err)}`); } daemonReady = true; readyResolve(); })(); const cleanup = () => { supervisor.log("daemon", "Shutting down..."); supervisor.shutdownAll(); try { unlinkSync(getPidPath()); } catch {} server.stop(); process.exit(0); }; process.on("SIGTERM", cleanup); process.on("SIGINT", cleanup); } export function isDaemonRunning(): boolean { if (!existsSync(getPidPath())) return false; try { const pid = parseInt(readFileSync(getPidPath(), "utf-8").trim(), 10); process.kill(pid, 0); return true; } catch { try { unlinkSync(getPidPath()); } catch {} return false; } } export function getDaemonPid(): number | null { if (!existsSync(getPidPath())) return null; try { return parseInt(readFileSync(getPidPath(), "utf-8").trim(), 10); } catch { return null; } }