import { join } from "node:path"; import type { LanguageModel, ModelMessage, TextStreamPart, ToolSet } from "ai"; import { stepCountIs, streamText } from "ai"; import { translateToCron } from "./cron.ts"; import { Mailbox } from "./mailbox.ts"; import { type ContextMessage, MemoryStore, type StoredSchedule } from "./memory.ts"; import type { AgentConfig } from "./parser.ts"; import { agentDir } from "./paths.ts"; import { CronScheduler } from "./scheduler.ts"; import { type Skill, SkillsManager } from "./skills.ts"; import { createTools } from "./tools.ts"; import { type AgentInfo, type AgentManager, type CronJob, type MessageRouter, type PeerInfo, type StreamEvent, classifyError, toErrorMessage, } from "./types.ts"; export type AgentState = "starting" | "running" | "error" | "stopped"; export type PeersProvider = () => PeerInfo[]; export type BackgroundEventSink = (source: string, conversationId: string, event: StreamEvent | null) => void; export type { AgentInfo, AgentManager, MessageRouter, PeerInfo }; const DEFAULT_MAX_ITERATIONS = 25; const DEFAULT_MAX_HISTORY = 100; const STREAM_ARGS_TOOLS = new Set(["write_file", "create_skill"]); export function contextToMessages(history: ContextMessage[]): ModelMessage[] { const availableResults = new Set(); for (const ctx of history) { if (ctx.role === "tool" && ctx.toolCallId) { availableResults.add(ctx.toolCallId); } } const messages: ModelMessage[] = []; for (const ctx of history) { if (ctx.role === "user") { messages.push({ role: "user", content: ctx.content }); } else if (ctx.role === "assistant") { if (ctx.toolCallsJson) { try { const toolCalls = JSON.parse(ctx.toolCallsJson) as Array<{ id: string; type: string; function: { name: string; arguments: string }; }>; const matched = toolCalls.filter((tc) => availableResults.has(tc.id)); if (matched.length === 0) { if (ctx.content) messages.push({ role: "assistant", content: ctx.content }); } else { messages.push({ role: "assistant", content: [ ...(ctx.content ? [{ type: "text" as const, text: ctx.content }] : []), ...matched.map((tc) => ({ type: "tool-call" as const, toolCallId: tc.id, toolName: tc.function.name, input: JSON.parse(tc.function.arguments || "{}"), })), ], }); } } catch { messages.push({ role: "assistant", content: ctx.content }); } } else { messages.push({ role: "assistant", content: ctx.content }); } } else if (ctx.role === "tool" && ctx.toolCallId) { messages.push({ role: "tool", content: [ { type: "tool-result", toolCallId: ctx.toolCallId, toolName: ctx.name ?? "", output: { type: "text", value: ctx.content }, }, ], }); } } return messages; } export class Agent { readonly id: string; readonly config: AgentConfig; readonly memory: MemoryStore; state: AgentState = "starting"; restartCount = 0; startedAt = new Date(); private model: LanguageModel; private scheduler = new CronScheduler(); private router: MessageRouter | null = null; private peersProvider: PeersProvider | null = null; private agentManager: AgentManager | null = null; private backgroundEventSink: BackgroundEventSink | null = null; private currentConversationId = ""; private skillsManager: SkillsManager; private activeSkills: Skill[] = []; constructor(config: AgentConfig, model: LanguageModel, dataDir?: string) { this.id = config.name; this.config = config; const dir = dataDir ?? agentDir(config.name); this.memory = new MemoryStore(join(dir, "memory.db")); this.model = model; this.skillsManager = new SkillsManager(join(dir, "skills")); } setRouter(router: MessageRouter): void { this.router = router; } setPeersProvider(provider: PeersProvider): void { this.peersProvider = provider; } setAgentManager(manager: AgentManager): void { this.agentManager = manager; } setBackgroundEventSink(sink: BackgroundEventSink): void { this.backgroundEventSink = sink; } getModel(): LanguageModel { return this.model; } getSkillsManager(): SkillsManager { return this.skillsManager; } private buildSystemPrompt(): string { let prompt = this.config.identity; prompt += `\n\nCurrent time: ${new Date().toISOString()}`; const peers = this.peersProvider?.() ?? []; if (peers.length > 0) { const roster = peers.map((p) => `- **${p.id}** (${p.model}): ${p.identity}`).join("\n"); prompt += `\n\n## Your Team\n\nYou are part of a multi-agent team. Use the \`send\` tool to communicate with teammates.\n\n${roster}`; } if (this.config.ifThen.length > 0) { const rules = this.config.ifThen.map((r, i) => `${i + 1}. IF "${r.condition}" THEN "${r.action}"`).join("\n"); prompt += `\n\n## Routing Rules (MANDATORY — evaluate in order, first match wins)\n\nYou MUST evaluate these rules against every incoming message, in the order listed.\nWhen a rule matches, execute the action IMMEDIATELY before doing anything else.\nIf no rule matches, respond directly using your own knowledge and tools.\n\n${rules}`; } if (this.activeSkills.length > 0) { prompt += "\n\n## Skills\n"; for (const skill of this.activeSkills) { prompt += `\n### ${skill.name}\n${skill.content}\n`; } } return prompt; } async initialize(): Promise { this.state = "starting"; this.activeSkills.push(...this.skillsManager.listInstalled()); this.state = "running"; this.startedAt = new Date(); await this.activateSchedules(); } getSchedules(): StoredSchedule[] { return this.memory.getSchedules(); } private buildContext( fromId: string, content: string, conversationId: string, opts?: { saveInput?: boolean }, ): ModelMessage[] { if (opts?.saveInput !== false) { if (!this.memory.hasRecentUserMessage(conversationId, content)) { this.memory.saveMessage(conversationId, fromId, this.id, content, { role: "user" }); } } const maxHistory = this.config.maxHistory ?? DEFAULT_MAX_HISTORY; const history = this.memory.getConversationContext(conversationId, maxHistory); const roleCounts: Record = {}; for (const msg of history) { roleCounts[msg.role] = (roleCounts[msg.role] ?? 0) + 1; } const breakdown = Object.entries(roleCounts) .map(([r, c]) => `${c} ${r}`) .join(", "); console.error(`Context: ${history.length} messages for conv ${conversationId} (${breakdown || "empty"})`); const messages = contextToMessages(history); if (opts?.saveInput === false) { messages.push({ role: "user", content }); } return messages; } async *handleMessage( fromId: string, content: string, conversationId: string, opts?: { saveInput?: boolean }, ): AsyncGenerator { this.currentConversationId = conversationId; const messages = this.buildContext(fromId, content, conversationId, opts); const maxSteps = this.config.maxIterations ?? DEFAULT_MAX_ITERATIONS; const mailbox = new Mailbox(); const tools = createTools({ agentId: this.id, memoryStore: this.memory, getConversationId: () => this.currentConversationId, getRouter: () => this.router, getScheduler: () => this.scheduler, registerScheduleJob: (scheduleId, cronExpr, task) => this.registerScheduleJob(scheduleId, cronExpr, task), getAgentManager: () => this.agentManager, translateToCron: (timing) => translateToCron(this.model, timing), getSkillsManager: () => this.skillsManager, getActiveSkills: () => this.activeSkills, addActiveSkill: (skill: Skill) => { this.activeSkills.push(skill); }, emitToolEvent: (event) => mailbox.send(event), }); let fullResponse = ""; const streamDone = (async () => { try { const result = streamText({ model: this.model, system: this.buildSystemPrompt(), messages, tools, stopWhen: stepCountIs(maxSteps), onStepFinish: ({ toolCalls, toolResults, text }) => { if (toolCalls && toolCalls.length > 0) { const serializedCalls = toolCalls.map((tc) => ({ id: tc.toolCallId, type: "function" as const, function: { name: tc.toolName, arguments: JSON.stringify(tc.input) }, })); this.memory.saveMessage(conversationId, this.id, "tool", text || "", { role: "assistant", toolCallsJson: JSON.stringify(serializedCalls), }); const resultIds = new Set(toolResults?.map((tr) => tr.toolCallId)); for (const tc of toolCalls) { if (!resultIds.has(tc.toolCallId)) { this.memory.saveMessage(conversationId, tc.toolName, this.id, "Error: tool execution failed", { role: "tool", toolCallId: tc.toolCallId, name: tc.toolName, }); } } } if (toolResults) { for (const tr of toolResults) { const resultText = typeof tr.output === "string" ? tr.output : JSON.stringify(tr.output); this.memory.saveMessage(conversationId, tr.toolName, this.id, resultText, { role: "tool", toolCallId: tr.toolCallId, name: tr.toolName, }); } } }, }); const trackedToolNames = new Map(); for await (const part of result.fullStream) { const event = mapStreamPart(part, trackedToolNames); if (event) mailbox.send(event); } fullResponse = await result.text; mailbox.send({ type: "done", text: fullResponse }); } catch (err) { const classified = classifyError(err); mailbox.send({ type: "error", error: classified.message, errorCode: classified.code }); } finally { mailbox.close(); } })(); for await (const event of mailbox) { yield event; } await streamDone; if (fullResponse) { this.memory.saveMessage(conversationId, this.id, fromId, fullResponse, { role: "assistant" }); } } getInfo(): AgentInfo { return { id: this.id, state: this.state, model: this.config.model, identity: this.config.identity.split("\n")[0]?.trim() ?? "", uptime: Date.now() - this.startedAt.getTime(), messageCount: this.memory.getMessageCount(), restartCount: this.restartCount, scheduleCount: this.memory.getScheduleCount(), }; } registerScheduleJob(scheduleId: string, cronExpr: string, task: string): boolean { const job: CronJob = { id: scheduleId, name: `${this.id}/${scheduleId}`, schedule: cronExpr, task, enabled: true, }; return this.scheduler.schedule(job, async () => { if (this.state !== "running") return; if (!this.memory.getSchedule(scheduleId)) return; this.memory.updateScheduleLastRun(scheduleId); const convId = crypto.randomUUID(); const source = `schedule:${scheduleId}|${task}`; console.log(`Schedule '${scheduleId}' fired: ${task}`); try { for await (const event of this.handleMessage("scheduler", task, convId)) { this.backgroundEventSink?.(source, convId, event); } this.backgroundEventSink?.(source, convId, null); } catch (err) { console.error(`Schedule '${scheduleId}' execution failed: ${toErrorMessage(err)}`); this.backgroundEventSink?.(source, convId, null); } }); } private async activateSchedules(): Promise { const persisted = this.memory.getSchedules(); for (const s of persisted) { this.registerScheduleJob(s.id, s.cron, s.task); } for (const rule of this.config.schedule) { const stableId = `soul-${rule.timing.replace(/\W+/g, "-").toLowerCase()}`; if (persisted.find((s) => s.id === stableId)) continue; try { const cronExpr = await translateToCron(this.model, rule.timing); this.memory.saveSchedule(stableId, cronExpr, rule.timing, rule.task); this.registerScheduleJob(stableId, cronExpr, rule.task); } catch (err) { console.error(`Failed to activate SCHEDULE "${rule.timing}": ${toErrorMessage(err)}`); } } } shutdown(): void { this.scheduler.stopAll(); this.state = "stopped"; } destroy(): void { this.state = "stopped"; this.memory.delete(); } } /** * Maps an AI SDK TextStreamPart to a Mozart StreamEvent. * trackedToolNames accumulates toolCallId -> toolName for delta filtering. */ export function mapStreamPart(part: TextStreamPart, trackedToolNames: Map): StreamEvent | null { switch (part.type) { case "text-delta": return { type: "text", text: part.text }; case "reasoning-delta": return { type: "thinking", text: part.text }; case "tool-input-start": trackedToolNames.set(part.id, part.toolName); return { type: "tool_start", text: part.toolName, toolCallId: part.id }; case "tool-input-delta": if (STREAM_ARGS_TOOLS.has(trackedToolNames.get(part.id) ?? "")) { return { type: "tool_delta", text: part.delta, toolCallId: part.id }; } return null; case "tool-call": return { type: "tool_use", toolUse: { id: part.toolCallId, name: part.toolName, input: part.input }, }; case "tool-result": return { type: "tool_result", text: typeof part.output === "string" ? part.output : JSON.stringify(part.output), toolCallId: part.toolCallId, toolName: part.toolName, }; case "tool-error": return { type: "tool_result", text: `Error: ${toErrorMessage(part.error)}`, toolCallId: part.toolCallId, toolName: part.toolName, }; case "error": { const classified = classifyError(part.error); return { type: "error", error: classified.message, errorCode: classified.code }; } default: return null; } }