/** * Agent dispatcher — spawn isolated pi subprocesses. * * Supports three modes: * - Single: one agent, one task * - Parallel: multiple agents in parallel with concurrency limit * - Chain: sequential agents, each receiving previous output via {previous} * * Based on pi's subagent example (MIT), adapted for superteam's * deterministic isolation and TDD guard loading. */ import { spawn } from "node:child_process"; import * as fs from "node:fs"; import * as os from "node:os"; import * as path from "node:path"; import type { AgentToolResult } from "@mariozechner/pi-agent-core"; import type { Message } from "@mariozechner/pi-ai"; import { parseFrontmatter } from "@mariozechner/pi-coding-agent"; import { getConfig, getPackageDir, VALID_THINKING_LEVELS, type ThinkingLevel, type SuperteamConfig } from "./config.js"; // --- Constants --- const MAX_PARALLEL_TASKS = 8; const MAX_CONCURRENCY = 4; // --- Types --- export type AgentSource = "package" | "user" | "project"; export interface AgentProfile { name: string; description: string; tools?: string[]; model?: string; thinking?: ThinkingLevel; systemPrompt: string; source: AgentSource; filePath: string; /** Extra subprocess flags (e.g., -e, --skill) */ extraFlags?: string[]; } export interface UsageStats { input: number; output: number; cacheRead: number; cacheWrite: number; cost: number; contextTokens: number; turns: number; } export interface DispatchResult { agent: string; agentSource: AgentSource; task: string; exitCode: number; messages: Message[]; stderr: string; usage: UsageStats; model?: string; stopReason?: string; errorMessage?: string; step?: number; } export interface DispatchDetails { mode: "single" | "parallel" | "chain"; results: DispatchResult[]; } export type OnUpdateCallback = (partial: AgentToolResult) => void; // --- Stream event types --- export type StreamEvent = { type: string; toolCallId?: string; toolName?: string; args?: Record; partialResult?: any; result?: any; isError?: boolean; }; export type OnStreamEvent = (event: StreamEvent) => void; // --- Cost tracking --- /** Session-level cumulative cost tracker */ let sessionCostUsd = 0; export function getSessionCost(): number { return sessionCostUsd; } export function resetSessionCost(): void { sessionCostUsd = 0; } function addCost(cost: number): void { sessionCostUsd += cost; } export interface CostCheckResult { allowed: boolean; warning?: string; currentCost: number; limit: number; } /** * Check if a dispatch is within cost budget. * Returns warning if past warn threshold, blocks if past hard limit. */ export function checkCostBudget(cwd: string): CostCheckResult { const config = getConfig(cwd); const current = sessionCostUsd; if (current >= config.costs.hardLimitUsd) { return { allowed: false, warning: `Hard cost limit reached: $${current.toFixed(2)} / $${config.costs.hardLimitUsd.toFixed(2)}. Use /team --override-cost to continue.`, currentCost: current, limit: config.costs.hardLimitUsd, }; } if (current >= config.costs.warnAtUsd) { return { allowed: true, warning: `Cost warning: $${current.toFixed(2)} / $${config.costs.hardLimitUsd.toFixed(2)} hard limit`, currentCost: current, limit: config.costs.hardLimitUsd, }; } return { allowed: true, currentCost: current, limit: config.costs.hardLimitUsd }; } // --- Agent discovery --- function loadAgentsFromDir(dir: string, source: AgentSource): AgentProfile[] { const agents: AgentProfile[] = []; if (!fs.existsSync(dir)) return agents; let entries: fs.Dirent[]; try { entries = fs.readdirSync(dir, { withFileTypes: true }); } catch { return agents; } for (const entry of entries) { if (!entry.name.endsWith(".md")) continue; if (!entry.isFile() && !entry.isSymbolicLink()) continue; const filePath = path.join(dir, entry.name); let content: string; try { content = fs.readFileSync(filePath, "utf-8"); } catch { continue; } const { frontmatter, body } = parseFrontmatter>(content); if (!frontmatter.name || !frontmatter.description) continue; const tools = frontmatter.tools ?.split(",") .map((t: string) => t.trim()) .filter(Boolean); const thinking = frontmatter.thinking && VALID_THINKING_LEVELS.includes(frontmatter.thinking as ThinkingLevel) ? (frontmatter.thinking as ThinkingLevel) : undefined; agents.push({ name: frontmatter.name, description: frontmatter.description, tools: tools && tools.length > 0 ? tools : undefined, model: frontmatter.model, thinking, systemPrompt: body, source, filePath, }); } return agents; } /** * Discover agents from package, user, and optionally project directories. */ export function discoverAgents( cwd: string, includeProject: boolean, ): { agents: AgentProfile[]; projectAgentsDir: string | null } { const packageDir = getPackageDir(); const packageAgentsDir = path.join(packageDir, "agents"); const userDir = path.join(os.homedir(), ".pi", "agent", "agents"); let projectAgentsDir: string | null = null; let searchDir = path.resolve(cwd); while (true) { const candidate = path.join(searchDir, ".pi", "agents"); try { if (fs.statSync(candidate).isDirectory()) { projectAgentsDir = candidate; break; } } catch { /* not found */ } const parent = path.dirname(searchDir); if (parent === searchDir) break; searchDir = parent; } const agentMap = new Map(); for (const a of loadAgentsFromDir(packageAgentsDir, "package")) agentMap.set(a.name, a); for (const a of loadAgentsFromDir(userDir, "user")) agentMap.set(a.name, a); if (includeProject && projectAgentsDir) { for (const a of loadAgentsFromDir(projectAgentsDir, "project")) agentMap.set(a.name, a); } return { agents: Array.from(agentMap.values()), projectAgentsDir }; } // --- Subprocess management --- function writePromptToTempFile(agentName: string, prompt: string): { dir: string; filePath: string } { const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "superteam-")); const safeName = agentName.replace(/[^\w.-]+/g, "_"); const filePath = path.join(tmpDir, `prompt-${safeName}.md`); fs.writeFileSync(filePath, prompt, { encoding: "utf-8", mode: 0o600 }); return { dir: tmpDir, filePath }; } function getFinalOutput(messages: Message[]): string { for (let i = messages.length - 1; i >= 0; i--) { const msg = messages[i]; if (msg.role === "assistant") { for (const part of msg.content) { if (part.type === "text") return part.text; } } } return ""; } /** * Resolve the effective model for an agent using priority chain: * config.agents.modelOverrides[name] > agent.model > scoutModel/defaultModel */ export function resolveAgentModel(agent: AgentProfile, config: SuperteamConfig): string { return ( config.agents.modelOverrides[agent.name] || agent.model || (agent.name === "scout" ? config.agents.scoutModel : config.agents.defaultModel) ); } /** * Resolve the effective thinking level for an agent using priority chain: * config.agents.thinkingOverrides[name] > agent.thinking > undefined */ export function resolveAgentThinking(agent: AgentProfile, config: SuperteamConfig): ThinkingLevel | undefined { return config.agents.thinkingOverrides[agent.name] ?? agent.thinking ?? undefined; } /** @internal — exported for testing */ export function buildSubprocessArgs(agent: AgentProfile, cwd: string): string[] { const config = getConfig(cwd); const packageDir = getPackageDir(); const model = resolveAgentModel(agent, config); const thinking = resolveAgentThinking(agent, config); const args: string[] = [ "--mode", "json", "-p", "--no-session", "--no-extensions", "--no-skills", "--no-prompt-templates", "--no-themes", ]; if (model) args.push("--model", model); if (thinking) args.push("--thinking", thinking); if (agent.tools && agent.tools.length > 0) args.push("--tools", agent.tools.join(",")); if (agent.extraFlags) { args.push(...agent.extraFlags); } // Implementer gets: guard extension + TDD skill if (agent.name === "implementer") { const extensionPath = path.join(packageDir, "src", "index.ts"); const skillPath = path.join(packageDir, "skills", "superteam-test-driven-development", "SKILL.md"); if (fs.existsSync(extensionPath)) args.push("-e", extensionPath); if (fs.existsSync(skillPath)) args.push("--skill", skillPath); } // Inject .pi/context.md if it exists in the project const contextMdPath = path.join(cwd, ".pi", "context.md"); if (fs.existsSync(contextMdPath)) { args.push("--append-system-prompt", path.resolve(cwd, ".pi", "context.md")); } return args; } // --- Core dispatch (internal) --- async function runAgent( agent: AgentProfile, task: string, cwd: string, step: number | undefined, signal: AbortSignal | undefined, onResultUpdate: (result: DispatchResult) => void, onStreamEvent?: OnStreamEvent, ): Promise { const args = buildSubprocessArgs(agent, cwd); let tmpPromptDir: string | null = null; let tmpPromptPath: string | null = null; const result: DispatchResult = { agent: agent.name, agentSource: agent.source, task, exitCode: 0, messages: [], stderr: "", usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, cost: 0, contextTokens: 0, turns: 0 }, model: agent.model, step, }; try { if (agent.systemPrompt.trim()) { const tmp = writePromptToTempFile(agent.name, agent.systemPrompt); tmpPromptDir = tmp.dir; tmpPromptPath = tmp.filePath; args.push("--append-system-prompt", tmpPromptPath); } args.push(`Task: ${task}`); let wasAborted = false; const exitCode = await new Promise((resolve) => { const proc = spawn("pi", args, { cwd, shell: false, stdio: ["ignore", "pipe", "pipe"], }); let buffer = ""; const processLine = (line: string) => { if (!line.trim()) return; let event: any; try { event = JSON.parse(line); } catch { return; } // Fire stream events for tool execution lifecycle if (onStreamEvent && ( event.type === "tool_execution_start" || event.type === "tool_execution_update" || event.type === "tool_execution_end" )) { onStreamEvent({ type: event.type, toolCallId: event.toolCallId, toolName: event.toolName, args: event.args, partialResult: event.partialResult, result: event.result, isError: event.isError, }); } if (event.type === "message_end" && event.message) { const msg = event.message as Message; result.messages.push(msg); if (msg.role === "assistant") { result.usage.turns++; const usage = msg.usage; if (usage) { result.usage.input += usage.input || 0; result.usage.output += usage.output || 0; result.usage.cacheRead += usage.cacheRead || 0; result.usage.cacheWrite += usage.cacheWrite || 0; result.usage.cost += usage.cost?.total || 0; result.usage.contextTokens = usage.totalTokens || 0; } if (!result.model && msg.model) result.model = msg.model; if (msg.stopReason) result.stopReason = msg.stopReason; if (msg.errorMessage) result.errorMessage = msg.errorMessage; // Track cost mid-stream for hard limit abort addCost(usage?.cost?.total || 0); const config = getConfig(cwd); if (sessionCostUsd >= config.costs.hardLimitUsd) { wasAborted = true; proc.kill("SIGTERM"); setTimeout(() => { if (!proc.killed) proc.kill("SIGKILL"); }, 5000); } } onResultUpdate(result); } if (event.type === "tool_result_end" && event.message) { result.messages.push(event.message as Message); onResultUpdate(result); } }; proc.stdout.on("data", (data: Buffer) => { buffer += data.toString(); const lines = buffer.split("\n"); buffer = lines.pop() || ""; for (const line of lines) processLine(line); }); proc.stderr.on("data", (data: Buffer) => { result.stderr += data.toString(); }); proc.on("close", (code: number | null) => { if (buffer.trim()) processLine(buffer); resolve(code ?? 0); }); proc.on("error", () => resolve(1)); if (signal) { const killProc = () => { wasAborted = true; proc.kill("SIGTERM"); setTimeout(() => { if (!proc.killed) proc.kill("SIGKILL"); }, 5000); }; if (signal.aborted) killProc(); else signal.addEventListener("abort", killProc, { once: true }); } }); result.exitCode = exitCode; if (wasAborted && !result.errorMessage) { result.exitCode = 1; result.errorMessage = sessionCostUsd >= getConfig(cwd).costs.hardLimitUsd ? `Aborted: hard cost limit ($${getConfig(cwd).costs.hardLimitUsd}) reached` : "Subagent was aborted"; } return result; } finally { if (tmpPromptPath) try { fs.unlinkSync(tmpPromptPath); } catch { /* ignore */ } if (tmpPromptDir) try { fs.rmdirSync(tmpPromptDir); } catch { /* ignore */ } } } // --- Public dispatch functions --- /** * Dispatch a single agent. */ export async function dispatchAgent( agent: AgentProfile, task: string, cwd: string, signal?: AbortSignal, onUpdate?: OnUpdateCallback, onStreamEvent?: OnStreamEvent, ): Promise { return runAgent(agent, task, cwd, undefined, signal, (r) => { if (onUpdate) { onUpdate({ content: [{ type: "text", text: getFinalOutput(r.messages) || "(running...)" }], details: { mode: "single", results: [r] }, }); } }, onStreamEvent); } /** * Dispatch multiple agents in parallel with concurrency limit. */ export async function dispatchParallel( agents: AgentProfile[], tasks: string[], cwd: string, signal?: AbortSignal, onUpdate?: OnUpdateCallback, ): Promise { if (agents.length !== tasks.length) { throw new Error(`Agent count (${agents.length}) must match task count (${tasks.length})`); } if (agents.length > MAX_PARALLEL_TASKS) { throw new Error(`Too many parallel tasks (${agents.length}). Max is ${MAX_PARALLEL_TASKS}.`); } // Initialize placeholder results const allResults: DispatchResult[] = agents.map((a, i) => ({ agent: a.name, agentSource: a.source, task: tasks[i], exitCode: -1, // -1 = still running messages: [], stderr: "", usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, cost: 0, contextTokens: 0, turns: 0 }, })); const emitUpdate = () => { if (onUpdate) { const running = allResults.filter((r) => r.exitCode === -1).length; const done = allResults.filter((r) => r.exitCode !== -1).length; onUpdate({ content: [{ type: "text", text: `Parallel: ${done}/${allResults.length} done, ${running} running...` }], details: { mode: "parallel", results: [...allResults] }, }); } }; const results = await mapWithConcurrencyLimit( agents.map((a, i) => ({ agent: a, task: tasks[i], index: i })), MAX_CONCURRENCY, async ({ agent, task, index }) => { const result = await runAgent(agent, task, cwd, undefined, signal, (r) => { allResults[index] = r; emitUpdate(); }); allResults[index] = result; emitUpdate(); return result; }, ); return results; } /** * Dispatch agents in sequence, passing each output to the next via {previous}. */ export async function dispatchChain( agents: AgentProfile[], tasks: string[], cwd: string, signal?: AbortSignal, onUpdate?: OnUpdateCallback, ): Promise { if (agents.length !== tasks.length) { throw new Error(`Agent count (${agents.length}) must match task count (${tasks.length})`); } const results: DispatchResult[] = []; let previousOutput = ""; for (let i = 0; i < agents.length; i++) { const taskWithContext = tasks[i].replace(/\{previous\}/g, previousOutput); const result = await runAgent(agents[i], taskWithContext, cwd, i + 1, signal, (r) => { if (onUpdate) { const allResults = [...results, r]; onUpdate({ content: [{ type: "text", text: getFinalOutput(r.messages) || "(running...)" }], details: { mode: "chain", results: allResults }, }); } }); results.push(result); const isError = result.exitCode !== 0 || result.stopReason === "error" || result.stopReason === "aborted"; if (isError) break; // Stop chain on error previousOutput = getFinalOutput(result.messages); } return results; } // --- Write-guard --- /** Bash command patterns that indicate write operations */ const WRITE_BASH_PATTERNS = [ /[^>]?>(?!>)/, // single > redirect (but not >>) />>/, // >> append /\btee\b/, /\bsed\s+-i\b/, /\bmv\b/, /\bcp\b/, /\brm\b/, /\bmkdir\b/, /\bchmod\b/, /\bchown\b/, /\btouch\b/, ]; /** Write tool names */ const WRITE_TOOL_NAMES = new Set(["write", "edit"]); /** * Scan agent messages for tool calls that write to the filesystem. * Returns true if any write tool call is found. */ export function hasWriteToolCalls(messages: Message[]): boolean { for (const msg of messages) { if (msg.role !== "assistant") continue; if (!Array.isArray(msg.content)) continue; for (const part of msg.content) { if (part.type !== "tool_use") continue; const toolName = (part as any).name as string; // Direct write tools if (WRITE_TOOL_NAMES.has(toolName)) return true; // Bash write heuristics if (toolName === "bash") { const command = (part as any).input?.command as string; if (command) { for (const pattern of WRITE_BASH_PATTERNS) { if (pattern.test(command)) return true; } } } } } return false; } // --- Utilities --- async function mapWithConcurrencyLimit( items: TIn[], concurrency: number, fn: (item: TIn, index: number) => Promise, ): Promise { if (items.length === 0) return []; const limit = Math.max(1, Math.min(concurrency, items.length)); const results: TOut[] = new Array(items.length); let nextIndex = 0; const workers = new Array(limit).fill(null).map(async () => { while (true) { const current = nextIndex++; if (current >= items.length) return; results[current] = await fn(items[current], current); } }); await Promise.all(workers); return results; } export { getFinalOutput }; export function formatTokens(count: number): string { if (count < 1000) return count.toString(); if (count < 10000) return `${(count / 1000).toFixed(1)}k`; if (count < 1000000) return `${Math.round(count / 1000)}k`; return `${(count / 1000000).toFixed(1)}M`; } export function formatUsage(usage: UsageStats, model?: string): string { const parts: string[] = []; if (usage.turns) parts.push(`${usage.turns} turn${usage.turns > 1 ? "s" : ""}`); if (usage.input) parts.push(`↑${formatTokens(usage.input)}`); if (usage.output) parts.push(`↓${formatTokens(usage.output)}`); if (usage.cost) parts.push(`$${usage.cost.toFixed(4)}`); if (model) parts.push(model); return parts.join(" "); } export function aggregateUsage(results: DispatchResult[]): UsageStats { const total: UsageStats = { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, cost: 0, contextTokens: 0, turns: 0 }; for (const r of results) { total.input += r.usage.input; total.output += r.usage.output; total.cacheRead += r.usage.cacheRead; total.cacheWrite += r.usage.cacheWrite; total.cost += r.usage.cost; total.turns += r.usage.turns; } return total; }