import { appendFileSync, copyFileSync, mkdirSync, rmSync, writeFileSync } from "node:fs"; import { dirname, join } from "node:path"; import { EventBus, type Subscription } from "./agent/mailbox.ts"; import { type AgentConfig, parseAgentContent, parseSoulfile } from "./agent/parser.ts"; import { agentDir, agentLogPath, agentSkillsDir, agentSoulPath, getDaemonLogPath, getMozartDir, } from "./agent/paths.ts"; import { type PeerInfo, QUERY_TIMEOUT_MS, ROUTE_TIMEOUT_MS, type WorkerMessage } from "./agent/protocol.ts"; import { resolveLocalSkill, SkillsManager } from "./agent/skills.ts"; import { type AgentInfo, type StreamEvent, toErrorMessage } from "./agent/types.ts"; import { MessageJournal } from "./journal.ts"; import { cleanupHorcruxImage, seal as horcruxSeal, loadHorcrux, type SealResult, startFromHorcrux } from "./horcrux.ts"; import { DEV_MODE, getEnvFromInspect, imageExists, isPodmanAvailable, killContainer, MOZART_DEV_IMAGE, MOZART_IMAGE, podmanInspect, rejectPendingQueries, removeImage, sendToContainer, shutdownContainer, spawnContainer, startContainer, stopContainers, } from "./podman.ts"; import type { ContainerAgent } from "./types.ts"; import { createContainerAgent } from "./types.ts"; function agentsDir(): string { return join(getMozartDir(), "agents"); } const JOURNAL_CLEANUP_INTERVAL_MS = 60 * 60 * 1000; // 1 hour const JOURNAL_CLEANUP_AGE_MS = 24 * 60 * 60 * 1000; // 24 hours const CHAT_INACTIVITY_TIMEOUT_MS = 120_000; // 2 minutes with no events export class Supervisor { private agents = new Map(); private apiKey: string; private podmanVerified = false; private createdDirs = new Set(); private agentListBus = new EventBus(); readonly journal: MessageJournal; private cleanupTimer: Timer | null = null; constructor(apiKey: string, journalPath?: string) { this.apiKey = apiKey; mkdirSync(agentsDir(), { recursive: true }); this.journal = new MessageJournal(journalPath); this.cleanupTimer = setInterval(() => { try { const removed = this.journal.cleanup(JOURNAL_CLEANUP_AGE_MS); if (removed > 0) this.log("daemon", `Journal cleanup: pruned ${removed} old entries`); } catch (err) { this.log("daemon", `Journal cleanup failed: ${toErrorMessage(err)}`); } }, JOURNAL_CLEANUP_INTERVAL_MS); } subscribeAgentList(): Subscription { return this.agentListBus.subscribe(); } async ensurePodman(): Promise { if (this.podmanVerified) return; const available = await isPodmanAvailable(); if (!available) { throw new Error('Podman is required but not found in PATH. Run "mozart setup" to install it.'); } const hasImage = await imageExists(DEV_MODE ? MOZART_DEV_IMAGE : MOZART_IMAGE); if (!hasImage) { const name = DEV_MODE ? "dev base" : "agent"; throw new Error(`Mozart ${name} image not found. Run "mozart setup" to build it.`); } this.podmanVerified = true; } private async startContainerStream(container: ContainerAgent): Promise { const stdout = container.process.stdout; if (!stdout || typeof stdout === "number") return; const reader = stdout.getReader(); const decoder = new TextDecoder(); (async () => { try { while (true) { const { done, value } = await reader.read(); if (done) break; container.buffer += decoder.decode(value, { stream: true }); const lines = container.buffer.split("\n"); container.buffer = lines.pop()!; for (const line of lines) { if (!line.trim()) continue; try { const msg = JSON.parse(line) as WorkerMessage; this.handleWorkerMessage(container, msg).catch((err) => this.log(container.id, `Error handling worker message: ${err}`), ); } catch (err) { this.log(container.id, `Failed to parse worker message: ${err}`); } } } } catch (err) { this.log(container.id, `stdout stream error: ${err}`); } const exitCode = await container.process.exited; if (this.agents.get(container.id)?.container !== container) { container.events.close(); rejectPendingQueries(container, "Container replaced"); return; } if (container.state !== "stopped") { container.state = "error"; rejectPendingQueries(container, `Agent crashed (exit code ${exitCode})`); container.events.emit({ kind: "error", error: `Agent crashed (exit code ${exitCode})` }); container.events.close(); this.broadcastPeersUpdate(); this.log(container.id, `Container exited with code ${exitCode}`); this.scheduleRestart(container); } })(); } private async handleWorkerMessage(container: ContainerAgent, msg: WorkerMessage): Promise { switch (msg.type) { case "ready": container.state = "running"; container.startedAt = new Date(); this.log(container.id, `Agent started (model: ${container.config.model})`); this.broadcastPeersUpdate(); this.replayPendingMessages(container); break; case "ack": this.journal.acknowledge(msg.messageId); break; case "event": container.events.emit({ kind: "stream", conversationId: msg.conversationId, event: msg.event }); break; case "chat_done": if (msg.messageId) this.journal.complete(msg.messageId); container.events.emit({ kind: "chat_done", conversationId: msg.conversationId }); break; case "route_request": { const { requestId, toId, message } = msg; try { const response = await this.routeMessage(container.id, toId, message, requestId); sendToContainer(container, { type: "route_response", requestId, response }); } catch (err) { const errMsg = toErrorMessage(err); this.log(container.id, `Route to '${toId}' failed: ${errMsg}`); sendToContainer(container, { type: "route_response", requestId, error: errMsg }); } break; } case "route_done": { const messageId = msg.messageId ?? msg.requestId; this.journal.resolveRoute(messageId, msg.response); break; } case "mgmt_request": { const { requestId, action } = msg; try { let response: unknown; switch (action) { case "spawn": { const spawned = await this.registerFromContent(msg.name!, msg.content!); response = this.getAgentInfo(spawned.id); break; } case "stop": response = this.stop(msg.targetId!); break; case "start": { const restarted = await this.restart(msg.targetId!); response = this.getAgentInfo(restarted.id); break; } case "remove": response = await this.remove(msg.targetId!); break; case "list": response = this.listAgents(); break; default: throw new Error(`Unknown management action: ${action}`); } sendToContainer(container, { type: "mgmt_response", requestId, response }); } catch (err) { sendToContainer(container, { type: "mgmt_response", requestId, error: toErrorMessage(err) }); } break; } case "query_response": { const pending = container.pendingQueries.get(msg.requestId); if (pending) { container.pendingQueries.delete(msg.requestId); if (msg.error) { pending.reject(new Error(msg.error)); } else { pending.resolve(msg.data); } } break; } case "bg_event": container.events.emit({ kind: "bg", source: msg.source, conversationId: msg.conversationId, event: msg.event }); if (msg.source.startsWith("route:") && msg.event.type === "text") { const sourceId = msg.source.slice(6); const sourceEntry = this.agents.get(sourceId); if (sourceEntry) { sourceEntry.container.events.emit({ kind: "stream", conversationId: msg.conversationId, event: msg.event }); } } break; case "bg_done": container.events.emit({ kind: "bg_done", source: msg.source, conversationId: msg.conversationId }); break; case "error": this.log(container.id, `Worker error: ${msg.error}`); container.events.emit({ kind: "error", error: msg.error }); break; } } private async routeMessage(fromId: string, toId: string, message: string, sourceRequestId: string): Promise { const target = this.agents.get(toId)?.container; if (!target) throw new Error(`Agent '${toId}' not found or not running.`); if (target.state !== "running") throw new Error(`Agent '${toId}' is not ready (state: ${target.state}).`); const messageId = crypto.randomUUID(); const payload = JSON.stringify({ type: "route", messageId, fromId, message, requestId: messageId }); this.journal.record({ id: messageId, targetId: toId, type: "route", payload, sourceId: fromId, sourceRequestId }); sendToContainer(target, { type: "route", messageId, fromId, message, requestId: messageId }); return new Promise((resolve, reject) => { this.journal.attachResolver(messageId, { resolve, reject }); setTimeout(() => { const entry = this.journal.get(messageId); if (entry && entry.status !== "completed") { this.journal.detachResolver(messageId); reject(new Error(`Route to '${toId}' timed out.`)); } }, ROUTE_TIMEOUT_MS); }); } async queryAgent(id: string, action: string, params?: Record): Promise { const container = this.agents.get(id)?.container; if (!container) throw new Error(`Agent '${id}' not found`); if (container.state !== "running") throw new Error(`Agent '${id}' is not running (state: ${container.state})`); const requestId = crypto.randomUUID(); sendToContainer(container, { type: "query", requestId, action, params }); return new Promise((resolve, reject) => { container.pendingQueries.set(requestId, { resolve, reject }); setTimeout(() => { if (container.pendingQueries.has(requestId)) { container.pendingQueries.delete(requestId); reject(new Error(`Query '${action}' timed out after ${QUERY_TIMEOUT_MS / 1000}s`)); } }, QUERY_TIMEOUT_MS); }); } async register(soulfilePath: string): Promise { const config = parseSoulfile(soulfilePath); const dest = agentSoulPath(config.name); mkdirSync(agentDir(config.name), { recursive: true }); copyFileSync(soulfilePath, dest); return this.startAgent(config); } async registerFromContent(name: string, content: string): Promise { const dir = agentDir(name); mkdirSync(dir, { recursive: true }); const soulPath = agentSoulPath(name); writeFileSync(soulPath, content); const config = parseAgentContent(content, name, soulPath); return this.startAgent(config); } private async startAgent(config: AgentConfig): Promise { await this.ensurePodman(); await this.preResolveSkills(config); const container = this.replaceContainer(config.name, config); return container; } private async preResolveSkills(config: AgentConfig): Promise { if (config.skills.length === 0) return; const mgr = new SkillsManager(agentSkillsDir(config.name)); const destDir = agentSkillsDir(config.name); const results = await Promise.allSettled( config.skills.map((ref) => { if (ref.startsWith("local:")) { return resolveLocalSkill(ref.slice(6), config.sourcePath, destDir); } return mgr.resolve(ref); }), ); for (let i = 0; i < results.length; i++) { if (results[i]!.status === "rejected") { this.log( config.name, `Failed to resolve skill "${config.skills[i]}": ${(results[i] as PromiseRejectedResult).reason}`, ); } } } private clearRestartTimer(id: string): void { const entry = this.agents.get(id); if (entry?.restartTimer) { clearTimeout(entry.restartTimer); entry.restartTimer = null; } } private replaceContainer(id: string, config: AgentConfig, restartCount = 0): ContainerAgent { this.clearRestartTimer(id); const existing = this.agents.get(id); if (existing) { shutdownContainer(existing.container); } killContainer(id); const container = spawnContainer(config, this.apiKey); container.restartCount = restartCount; this.agents.set(id, { container, restartTimer: null }); this.startContainerStream(container); this.broadcastPeersUpdate(); return container; } stop(id: string): boolean { const entry = this.agents.get(id); if (!entry) return false; this.clearRestartTimer(id); shutdownContainer(entry.container); entry.container.events.close(); this.log("daemon", `Agent '${id}' stopped`); this.broadcastPeersUpdate(); return true; } async restart(id: string): Promise { const entry = this.agents.get(id); if (!entry) throw new Error(`Agent '${id}' not found`); const existing = entry.container; if (existing.state !== "stopped" && existing.state !== "error") throw new Error(`Agent '${id}' is not stopped (state: ${existing.state})`); const container = this.replaceContainer(id, existing.config, existing.restartCount); container.messageCount = existing.messageCount; this.log(id, `Agent restarted`); return container; } /** * Discover and reattach to persistent containers from a previous daemon session. * Uses `podman ps -a` + `podman inspect` to find mozart containers with AGENT_CONFIG. */ async restoreAll(): Promise { try { await this.ensurePodman(); } catch { return; } const listProc = Bun.spawn(["podman", "ps", "-a", "--filter", "name=mozart-", "--format", "{{.Names}}"], { stdout: "pipe", stderr: "pipe", }); const listCode = await listProc.exited; if (listCode !== 0) return; const listOutput = await new Response(listProc.stdout).text(); const names = listOutput.trim().split("\n").filter(Boolean); await Promise.allSettled( names.map(async (containerName) => { const id = containerName.replace(/^mozart-/, ""); if (this.agents.has(id)) return; try { const inspectData = await podmanInspect(containerName); if (!inspectData) return; const configJson = getEnvFromInspect(inspectData, "AGENT_CONFIG"); if (!configJson) return; const config = JSON.parse(configJson) as AgentConfig; const stopProc = Bun.spawn(["podman", "stop", "-t", "3", containerName], { stdout: "pipe", stderr: "pipe", }); await stopProc.exited; const proc = startContainer(containerName); const container = createContainerAgent(config.name, config, proc); this.agents.set(config.name, { container, restartTimer: null }); this.startContainerStream(container); this.log(config.name, `Restored container from previous session`); } catch (err) { this.log("daemon", `Failed to restore container '${containerName}': ${toErrorMessage(err)}`); } }), ); if (names.length > 0) this.broadcastPeersUpdate(); } async remove(id: string): Promise { const entry = this.agents.get(id); if (!entry) return false; const containerName = `mozart-${id}`; const imageRef = await this.getContainerImageRef(containerName); this.clearRestartTimer(id); shutdownContainer(entry.container); killContainer(id); this.agents.delete(id); if (imageRef && !this.isBaseImage(imageRef)) { removeImage(imageRef); } this.deleteAgentData(id); this.log("daemon", `Agent '${id}' removed`); this.broadcastPeersUpdate(); return true; } private async getContainerImageRef(containerName: string): Promise { const data = await podmanInspect(containerName); if (!data) return null; return (data as Record & { Image?: string })?.Image ?? null; } private isBaseImage(imageRef: string): boolean { return imageRef === MOZART_IMAGE || imageRef === MOZART_DEV_IMAGE || imageRef.includes("mozart-agent:"); } async seal(id: string, outputPath?: string): Promise { if (!this.agents.has(id)) throw new Error(`Agent '${id}' not found`); return horcruxSeal(id, outputPath); } async unseal(horcruxPath: string, newName?: string): Promise { await this.ensurePodman(); const loaded = await loadHorcrux(horcruxPath, newName ?? undefined); if (this.agents.has(loaded.config.name)) { cleanupHorcruxImage(loaded.loadedImage); throw new Error(`Agent '${loaded.config.name}' already exists. Use --name to specify a different name.`); } const { config, container } = startFromHorcrux(loaded, this.apiKey); this.agents.set(config.name, { container, restartTimer: null }); this.startContainerStream(container); this.broadcastPeersUpdate(); this.log(config.name, `Unsealed from horcrux`); return container; } removeAll(): void { for (const [, entry] of this.agents) { if (entry.restartTimer) clearTimeout(entry.restartTimer); } const ids = Array.from(this.agents.keys()); for (const [, entry] of this.agents) { try { sendToContainer(entry.container, { type: "shutdown" }); } catch {} entry.container.state = "stopped"; entry.container.events.close(); } killContainer(ids); for (const id of ids) { this.deleteAgentData(id); } this.agents.clear(); this.broadcastPeersUpdate(); this.log("daemon", "All agents removed"); } hasAgent(id: string): boolean { return this.agents.has(id); } getAgent(id: string): ContainerAgent | undefined { return this.agents.get(id)?.container; } getAgentConfig(id: string): AgentConfig | undefined { return this.agents.get(id)?.container.config; } private toAgentInfo(c: ContainerAgent): AgentInfo { return { id: c.id, state: c.state, model: c.config.model, identity: c.config.identity.split("\n")[0]?.trim() ?? "", uptime: Date.now() - c.startedAt.getTime(), messageCount: c.messageCount, restartCount: c.restartCount, }; } getAgentInfo(id: string): AgentInfo | undefined { const entry = this.agents.get(id); if (!entry) return undefined; return this.toAgentInfo(entry.container); } listAgents(): AgentInfo[] { return Array.from(this.agents.values()).map((entry) => this.toAgentInfo(entry.container)); } private broadcastPeersUpdate(): void { const allAgents = this.listAgents(); this.agentListBus.emit(allAgents); for (const entry of this.agents.values()) { if (entry.container.state !== "running") continue; const peers: PeerInfo[] = allAgents .filter((a) => a.id !== entry.container.id) .map((a) => ({ id: a.id, model: a.model, identity: a.identity })); sendToContainer(entry.container, { type: "peers_update", peers }); } } private replayPendingMessages(container: ContainerAgent): void { const pending = this.journal.getPending(container.id); if (pending.length > 0) { this.log(container.id, `Replaying ${pending.length} pending message(s)`); } for (const entry of pending) { const attempts = this.journal.incrementAttempts(entry.id); if (attempts > this.journal.maxAttempts) { this.journal.fail(entry.id); this.log(container.id, `Message ${entry.id} failed after ${attempts} attempts`); continue; } try { const msg = JSON.parse(entry.payload); sendToContainer(container, msg); } catch (err) { this.log(container.id, `Failed to replay message ${entry.id}: ${toErrorMessage(err)}`); } } const undelivered = this.journal.getUndeliveredResponses(container.id); for (const entry of undelivered) { if (!entry.response || !entry.sourceRequestId) continue; this.log(container.id, `Delivering stored route response for ${entry.id}`); sendToContainer(container, { type: "route_response", requestId: entry.sourceRequestId, response: entry.response }); this.journal.markResponseDelivered(entry.id); } } async *streamChat(id: string, fromId: string, content: string, conversationId: string): AsyncGenerator { const container = this.agents.get(id)?.container; if (!container) throw new Error(`Agent '${id}' not found`); if (container.state !== "running") throw new Error(`Agent '${id}' is not ready (state: ${container.state})`); const messageId = crypto.randomUUID(); const chatMsg = { type: "chat" as const, messageId, fromId, message: content, conversationId }; this.journal.record({ id: messageId, targetId: id, type: "chat", payload: JSON.stringify(chatMsg) }); const sub = container.events.subscribe(); try { container.messageCount++; sendToContainer(container, chatMsg); let lastActivity = Date.now(); let timedOut = false; const inactivityTimer = setInterval(() => { if (Date.now() - lastActivity > CHAT_INACTIVITY_TIMEOUT_MS) { timedOut = true; sub.unsubscribe(); } }, 5_000); try { for await (const msg of sub) { lastActivity = Date.now(); if ("conversationId" in msg && msg.conversationId !== conversationId) continue; switch (msg.kind) { case "stream": yield msg.event; break; case "chat_done": return; case "error": yield { type: "error", error: msg.error } as StreamEvent; return; } } if (timedOut) { yield { type: "error", error: "Response timed out. The AI provider may be unresponsive." } as StreamEvent; } else if (container.state !== "running") { yield { type: "error", error: `Agent crashed (state: ${container.state}). It will restart automatically.` } as StreamEvent; } } finally { clearInterval(inactivityTimer); } } finally { sub.unsubscribe(); } } shutdownAll(): void { if (this.cleanupTimer) { clearInterval(this.cleanupTimer); this.cleanupTimer = null; } for (const [, entry] of this.agents) { if (entry.restartTimer) clearTimeout(entry.restartTimer); } const ids = Array.from(this.agents.keys()); for (const [, entry] of this.agents) { try { sendToContainer(entry.container, { type: "shutdown" }); } catch {} entry.container.state = "stopped"; entry.container.events.close(); rejectPendingQueries(entry.container, "Daemon shutting down"); } stopContainers(ids); this.agents.clear(); this.agentListBus.close(); this.journal.close(); } /** * Tear down all JS-side state (timers, EventBuses, pending queries) WITHOUT * killing or stopping containers. Used when Bun --watch restarts the module * so the old Supervisor's zombie callbacks don't interfere with the new one. */ decommission(): void { if (this.cleanupTimer) { clearInterval(this.cleanupTimer); this.cleanupTimer = null; } for (const [, entry] of this.agents) { if (entry.restartTimer) clearTimeout(entry.restartTimer); rejectPendingQueries(entry.container, "Supervisor decommissioned"); entry.container.events.close(); } this.agents.clear(); this.agentListBus.close(); } onAgentError(agentId: string, error: Error): void { const entry = this.agents.get(agentId); if (!entry || entry.container.state === "stopped") return; entry.container.state = "error"; this.broadcastPeersUpdate(); this.log(agentId, `Agent error: ${error.message}`); this.scheduleRestart(entry.container); } private scheduleRestart(container: ContainerAgent): void { if (container.state === "stopped") return; const delay = Math.min(1000 * 2 ** container.restartCount, 60_000); this.log(container.id, `Scheduling restart in ${delay}ms (attempt ${container.restartCount + 1})`); const timer = setTimeout(async () => { const entry = this.agents.get(container.id); if (!entry || entry.container !== container) return; entry.restartTimer = null; if (container.state === "stopped") return; container.restartCount++; this.log(container.id, `Restarting...`); try { const restarted = await this.restart(container.id); setTimeout(() => { const currentEntry = this.agents.get(restarted.id); if (currentEntry?.container === restarted && restarted.state === "running") { restarted.restartCount = 0; } }, 300_000); } catch (err) { const currentEntry = this.agents.get(container.id); if (!currentEntry || currentEntry.container !== container || currentEntry.container.state === "running") return; this.log(container.id, `Restart failed: ${toErrorMessage(err)}`); currentEntry.container.state = "error"; this.broadcastPeersUpdate(); this.scheduleRestart(currentEntry.container); } }, delay); const entry = this.agents.get(container.id); if (entry) { entry.restartTimer = timer; } } private deleteAgentData(agentId: string): void { const dir = agentDir(agentId); try { rmSync(dir, { recursive: true, force: true }); } catch { // Best-effort cleanup } } log(agentId: string, message: string): void { const timestamp = new Date().toISOString(); const line = `[${timestamp}] ${message}\n`; const logPath = agentId === "daemon" ? getDaemonLogPath() : agentLogPath(agentId); try { const dir = dirname(logPath); if (!this.createdDirs.has(dir)) { mkdirSync(dir, { recursive: true }); this.createdDirs.add(dir); } appendFileSync(logPath, line); } catch { // Best-effort logging } } }