/**
* Subagent runner — spawns an isolated `pi --mode json -p` process for a single
* task and collects its structured output and usage. Adapted from the pi
* subagent extension's runSingleAgent.
*/
import { spawn } from "node:child_process";
import * as fs from "node:fs";
import * as os from "node:os";
import * as path from "node:path";
import type { Message } from "@earendil-works/pi-ai";
import { withFileMutationQueue } from "@earendil-works/pi-coding-agent";
import type { AgentConfig } from "./agents.ts";
import { emptyUsage, type UsageStats } from "./usage.ts";
export interface RunResult {
agent: string;
task: string;
exitCode: number;
output: string;
stderr: string;
usage: UsageStats;
model?: string;
stopReason?: string;
errorMessage?: string;
/** Total subagent attempts incl. retries (set by the runtime's retry wrapper). */
attempts?: number;
}
export interface LiveUpdate {
/** Latest assistant text or tool activity (single-line, truncated upstream). */
text: string;
usage: UsageStats;
model?: string;
}
export interface RunOptions {
model?: string;
thinking?: string;
tools?: string[];
cwd?: string;
signal?: AbortSignal;
/** Fires on each assistant turn with the latest activity + accumulated usage. */
onLive?: (live: LiveUpdate) => void;
}
export function isFailed(r: RunResult): boolean {
return r.exitCode !== 0 || r.stopReason === "error" || r.stopReason === "aborted";
}
/** Placeholder written to a failed phase's `output` so downstream interpolation
* can detect "upstream failed" without being polluted by raw HTML/JSON. */
export const TRANSPORT_ERROR_PLACEHOLDER = "(upstream error: subagent failed; see error)";
/** Hard cap on the errorMessage field stored in PhaseState (≈ 4 KB). */
export const ERROR_MESSAGE_MAX_LEN = 4096;
/** Cheap HTML/JSON detector so we can summarize upstream garbage. */
export function looksLikeHtmlOrJson(s: string): boolean {
const t = s.trimStart();
if (!t) return false;
if (t.startsWith("<")) {
// HTML/XML/Cloudflare challenge pages
return /^<(?:!doctype\s+html|html|head|body|script|svg|div|iframe|span|p)\b/i.test(t);
}
if (t.startsWith("{")) {
// Truncated JSON. A genuine JSON envelope is fine to keep; an unwrapped
// {error: "..."} from an SDK is short. We only treat it as "garbage" if
// it parses and is huge — but that's caught by the size cap below.
return false;
}
return false;
}
/**
* Truncate and (when obviously HTML) summarize an errorMessage before it is
* persisted. Returns the cleaned string. Empty input returns empty.
*/
export function sanitizeErrorMessage(raw: string | undefined): string {
if (!raw) return "";
const cleaned = raw.replace(/\s+/g, " ").trim();
if (!cleaned) return "";
// Decide the sanitization branch on the RAW length, not the whitespace-
// collapsed length — otherwise an HTML page padded with spaces would slip
// through the "looks like HTML" branch and be persisted as-is.
const rawLen = raw.length;
if (rawLen > ERROR_MESSAGE_MAX_LEN) {
const head = cleaned.slice(0, 200);
const tail = cleaned.slice(-200);
return `${head} ... [truncated ${rawLen - 400} chars] ... ${tail}`;
}
if (looksLikeHtmlOrJson(cleaned)) {
// Any document-like HTML (Cloudflare challenge pages, proxy error pages,
// gateway error pages) is a strong signal the upstream returned a page
// instead of JSON. Summarize it instead of letting HTML pollute the
// phase's error and downstream interpolation contexts.
const title = cleaned.match(/
]*>([^<]*)<\/title>/i)?.[1]?.trim();
const stripped = cleaned.replace(/<[^>]+>/g, " ").replace(/\s+/g, " ").trim();
const m = stripped.match(/(?:Unable to load site|Ray ID[: ]+([A-Za-z0-9]+)|[A-Z][a-z]+Error[: ]+(.{0,200}))/i);
const hint = title || (m ? (m[1] || m[0]).trim() : stripped.slice(0, 200));
return `Upstream returned non-JSON response (${rawLen} chars). Hint: ${hint}`;
}
return cleaned;
}
function getFinalOutput(messages: Message[]): string {
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i];
if (msg.role === "assistant") {
for (const part of msg.content) {
if (part.type === "text" && part.text.trim()) return part.text;
}
}
}
return "";
}
/** Accumulated state folded from a subagent's NDJSON event stream. */
export interface EventAccumulator {
messages: Message[];
usage: UsageStats;
model?: string;
stopReason?: string;
errorMessage?: string;
lastActivity: string;
}
export function newAccumulator(model?: string): EventAccumulator {
return { messages: [], usage: emptyUsage(), model, lastActivity: "" };
}
/**
* Fold one NDJSON line into the accumulator. Returns a LiveUpdate when an
* assistant message ended (for streaming), else null. Empty, malformed, and
* non-`message_end` lines are ignored — making the parser robust to partial
* buffers/noise and unit-testable without spawning a process.
*/
export function foldEventLine(acc: EventAccumulator, line: string): LiveUpdate | null {
if (!line.trim()) return null;
let event: any;
try {
event = JSON.parse(line);
} catch {
return null;
}
if (event.type !== "message_end" || !event.message) return null;
const msg = event.message as Message;
acc.messages.push(msg);
if (msg.role !== "assistant") return null;
acc.usage.turns++;
const u = (msg as any).usage;
if (u) {
acc.usage.input += u.input || 0;
acc.usage.output += u.output || 0;
acc.usage.cacheRead += u.cacheRead || 0;
acc.usage.cacheWrite += u.cacheWrite || 0;
acc.usage.cost += u.cost?.total || 0;
acc.usage.contextTokens = u.totalTokens || 0;
}
if (!acc.model && (msg as any).model) acc.model = (msg as any).model;
if ((msg as any).stopReason) acc.stopReason = (msg as any).stopReason;
if ((msg as any).errorMessage) acc.errorMessage = (msg as any).errorMessage;
const activity = describeActivity(msg);
if (activity) acc.lastActivity = activity;
return { text: acc.lastActivity, usage: { ...acc.usage }, model: acc.model };
}
/** One-line description of the most recent assistant activity (text or tool call). */
function describeActivity(msg: Message): string {
if (msg.role !== "assistant") return "";
let lastText = "";
let lastTool = "";
for (const part of (msg as any).content ?? []) {
if (part.type === "text" && part.text?.trim()) lastText = part.text.trim();
else if (part.type === "toolCall") lastTool = summarizeToolCall(part.name, part.arguments ?? {});
}
const chosen = lastText || lastTool;
return chosen.replace(/\s+/g, " ").trim();
}
function summarizeToolCall(name: string, args: Record): string {
const short = (p: unknown) => {
const s = String(p ?? "");
return s.length > 48 ? `${s.slice(0, 48)}…` : s;
};
switch (name) {
case "bash":
return `$ ${short(args.command)}`;
case "read":
return `read ${short(args.path ?? args.file_path)}`;
case "write":
return `write ${short(args.path ?? args.file_path)}`;
case "edit":
return `edit ${short(args.path ?? args.file_path)}`;
case "grep":
return `grep ${short(args.pattern)}`;
case "find":
return `find ${short(args.pattern)}`;
case "ls":
return `ls ${short(args.path)}`;
default:
return `${name}`;
}
}
async function writePromptToTempFile(filePath: string, prompt: string): Promise {
await withFileMutationQueue(filePath, async () => {
await fs.promises.writeFile(filePath, prompt, { encoding: "utf-8", mode: 0o600 });
});
}
function getPiInvocation(args: string[]): { command: string; args: string[] } {
// Explicit override (used by tests and unusual launch setups).
const override = process.env.PI_TASKFLOW_PI_BIN;
if (override) return { command: override, args };
const currentScript = process.argv[1];
const isBunVirtualScript = currentScript?.startsWith("/$bunfs/root/");
// Only re-exec the current script if it actually looks like the pi CLI entry.
const looksLikePi = currentScript ? /(?:^|[\\/])(?:cli|pi)\.(?:js|mjs|cjs)$/.test(currentScript) : false;
if (currentScript && !isBunVirtualScript && looksLikePi && fs.existsSync(currentScript)) {
return { command: process.execPath, args: [currentScript, ...args] };
}
const execName = path.basename(process.execPath).toLowerCase();
const isGenericRuntime = /^(node|bun)(\.exe)?$/.test(execName);
if (!isGenericRuntime) return { command: process.execPath, args };
return { command: "pi", args };
}
/**
* Run a single subagent task. Resolves the agent from `agents` by name and
* spawns an isolated pi process, returning structured output + usage.
*/
export async function runAgentTask(
defaultCwd: string,
agents: AgentConfig[],
agentName: string,
task: string,
opts: RunOptions,
globalThinking?: string,
): Promise {
const agent = agents.find((a) => a.name === agentName);
if (!agent) {
const available = agents.map((a) => `"${a.name}"`).join(", ") || "none";
return {
agent: agentName,
task,
exitCode: 1,
output: "",
stderr: `Unknown agent: "${agentName}". Available: ${available}.`,
usage: emptyUsage(),
errorMessage: `Unknown agent: ${agentName}`,
stopReason: "error",
};
}
const model = opts.model ?? agent.model;
const thinking = opts.thinking ?? agent.thinking ?? globalThinking;
const tools = opts.tools ?? agent.tools;
const args: string[] = ["--mode", "json", "-p", "--no-session"];
if (model) args.push("--model", model);
if (thinking) args.push("--thinking", thinking);
if (tools && tools.length > 0) args.push("--tools", tools.join(","));
let tmpPromptDir: string | null = null;
let tmpPromptPath: string | null = null;
const acc = newAccumulator(model);
const result: RunResult = {
agent: agentName,
task,
exitCode: 0,
output: "",
stderr: "",
usage: emptyUsage(),
model,
};
try {
if (agent.systemPrompt.trim()) {
// Allocate the temp dir + path BEFORE any fallible I/O so that if
// writeFile throws, tmpPromptDir/tmpPromptPath are already set and
// the finally block can clean up the directory (F-004).
tmpPromptDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "pi-taskflow-"));
const safeName = agent.name.replace(/[^\w.-]+/g, "_");
tmpPromptPath = path.join(tmpPromptDir, `prompt-${safeName}.md`);
await writePromptToTempFile(tmpPromptPath, agent.systemPrompt);
args.push("--append-system-prompt", tmpPromptPath);
}
args.push(`Task: ${task}`);
let wasAborted = false;
const exitCode = await new Promise((resolve) => {
const invocation = getPiInvocation(args);
const proc = spawn(invocation.command, invocation.args, {
cwd: opts.cwd ?? defaultCwd,
shell: false,
stdio: ["ignore", "pipe", "pipe"],
});
let buffer = "";
const processLine = (line: string) => {
const live = foldEventLine(acc, line);
if (live && opts.onLive) opts.onLive(live);
};
proc.stdout.on("data", (data) => {
buffer += data.toString();
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) processLine(line);
});
proc.stderr.on("data", (data) => {
result.stderr += data.toString();
});
proc.on("close", (code) => {
if (buffer.trim()) processLine(buffer);
resolve(code ?? 0);
});
proc.on("error", (err) => {
if (!result.stderr) result.stderr = err.message;
if (!result.errorMessage) result.errorMessage = err.message;
resolve(1);
});
if (opts.signal) {
const kill = () => {
wasAborted = true;
proc.kill("SIGTERM");
// Force-kill fallback. proc.kill("SIGKILL") is idempotent if
// the process already exited, and `proc.killed` is set true
// synchronously by the SIGTERM above — so the previous
// `if (!proc.killed)` guard would skip SIGKILL entirely,
// hanging forever on a child that ignores SIGTERM.
// .unref() keeps the timer from holding the event loop open
// after the process is gone.
const forceKill = setTimeout(() => proc.kill("SIGKILL"), 5000);
forceKill.unref();
};
if (opts.signal.aborted) kill();
else opts.signal.addEventListener("abort", kill, { once: true });
}
});
result.exitCode = exitCode;
result.usage = acc.usage;
result.model = acc.model;
result.stopReason = acc.stopReason;
result.errorMessage = acc.errorMessage;
result.output = getFinalOutput(acc.messages);
if (wasAborted) {
result.stopReason = "aborted";
result.errorMessage = "Subagent was aborted";
}
// On failure, build a short, structured errorMessage + a placeholder
// output. We deliberately do NOT copy the raw errorMessage into
// `output`: upstream providers (e.g. a Cloudflare challenge page) can
// surface huge HTML/JSON in errorMessage, and that garbage would
// otherwise flow into downstream phase interpolations.
// Sanitization must run whenever the run failed, even if some output
// was already emitted (e.g. crash mid-stream with a partial result):
// an unsanitized errorMessage would still leak into PhaseState and
// downstream interpolation contexts. (F-013)
if (isFailed(result)) {
if (!result.output) {
result.output = TRANSPORT_ERROR_PLACEHOLDER;
if (!result.errorMessage) {
result.errorMessage = result.stderr || `Subagent exited with code ${result.exitCode} (stopReason: ${result.stopReason ?? "unknown"})`;
}
}
if (result.errorMessage) {
result.errorMessage = sanitizeErrorMessage(result.errorMessage);
}
}
return result;
} finally {
if (tmpPromptPath) {
try {
fs.unlinkSync(tmpPromptPath);
} catch {
/* ignore */
}
}
if (tmpPromptDir) {
try {
fs.rmSync(tmpPromptDir, { recursive: true, force: true });
} catch {
/* ignore */
}
}
}
}
/** Run an array of items through `fn` with a bounded concurrency pool. */
export async function mapWithConcurrencyLimit(
items: TIn[],
concurrency: number,
fn: (item: TIn, index: number) => Promise,
): Promise {
if (items.length === 0) return [];
const limit = Math.max(1, Math.min(concurrency, items.length));
const results: TOut[] = new Array(items.length);
let nextIndex = 0;
const workers = new Array(limit).fill(null).map(async () => {
while (true) {
const current = nextIndex++;
if (current >= items.length) return;
results[current] = await fn(items[current], current);
}
});
await Promise.all(workers);
return results;
}