import { spawn } from "node:child_process"; import { existsSync, mkdirSync, rmSync, writeFileSync } from "node:fs"; import { tmpdir } from "node:os"; import { basename, join } from "node:path"; import { Type, type Message } from "@earendil-works/pi-ai"; import { defineTool, type ExtensionAPI, type ExtensionContext } from "@earendil-works/pi-coding-agent"; import { buildChainedDelegationRequest, buildDelegationCommandPrompt, buildDelegationPrompt, CHILD_AGENT_USER_TASK, DEFAULT_REVIEW_TASK, delegationBlockReason, formatDelegationPreview, isDelegationRole, KD_DELEGATE_USAGE, KD_DELEGATE_COMMAND_DESCRIPTION, KD_REVIEW_COMMAND_DESCRIPTION, KD_SUBAGENT_INVALID_PARAMS, KD_SUBAGENT_PARALLEL_ROLE_ERROR, KD_SUBAGENT_SCHEMA_DESCRIPTIONS, KD_SUBAGENT_TOOL_DESCRIPTION, isReadOnlyDelegationRole, isSubagentChild, parallelDelegationBlockReason, parseDelegationArgs, subagentAllowedTools, type DelegationRequest, type DelegationRole, } from "../src/harness/delegation.ts"; import { readActiveRun } from "../src/harness/state.ts"; interface ChildAgentResult { role: DelegationRole; task: string; exitCode: number; output: string; stderr: string; model?: string; turns: number; step?: number; } type RawChildAgentResult = Omit; const DEFAULT_OUTPUT_LIMIT = 30_000; const MAX_PARALLEL_TASKS = 8; const MAX_CONCURRENCY = 4; const MAX_CHAIN_TASKS = 8; const kdSubagentTool = defineTool({ name: "kd_subagent", label: "KD 子Agent", description: KD_SUBAGENT_TOOL_DESCRIPTION, parameters: Type.Object({ role: Type.Optional(Type.String({ description: KD_SUBAGENT_SCHEMA_DESCRIPTIONS.role })), task: Type.Optional(Type.String({ description: KD_SUBAGENT_SCHEMA_DESCRIPTIONS.task })), tasks: Type.Optional( Type.Array( Type.Object({ role: Type.String({ description: KD_SUBAGENT_SCHEMA_DESCRIPTIONS.taskItemRole }), task: Type.String({ description: KD_SUBAGENT_SCHEMA_DESCRIPTIONS.taskItemTask }), }), { description: KD_SUBAGENT_SCHEMA_DESCRIPTIONS.tasks }, ), ), chain: Type.Optional( Type.Array( Type.Object({ role: Type.String({ description: KD_SUBAGENT_SCHEMA_DESCRIPTIONS.taskItemRole }), task: Type.String({ description: KD_SUBAGENT_SCHEMA_DESCRIPTIONS.taskItemTask }), }), { description: KD_SUBAGENT_SCHEMA_DESCRIPTIONS.chain }, ), ), dryRun: Type.Optional(Type.Boolean({ description: KD_SUBAGENT_SCHEMA_DESCRIPTIONS.dryRun })), maxOutputChars: Type.Optional(Type.Number({ description: KD_SUBAGENT_SCHEMA_DESCRIPTIONS.maxOutputChars })), }), async execute(_toolCallId, params, signal, _onUpdate, ctx) { const mode = resolveMode(params); if (!mode) { return { content: [{ type: "text", text: KD_SUBAGENT_INVALID_PARAMS }], details: { error: "invalid-params" }, isError: true, }; } const run = readActiveRun(ctx.cwd); if (params.dryRun) { return { content: [{ type: "text", text: formatModePreview(ctx.cwd, run, mode.requests) }], details: { mode: mode.kind, dryRun: true, requests: mode.requests }, }; } const modeBlockReason = mode.kind === "parallel" ? parallelDelegationBlockReason(mode.requests) : undefined; const blockReason = modeBlockReason ?? mode.requests.map((request) => delegationBlockReason(request.role, run)).find(Boolean); if (blockReason) { return { content: [{ type: "text", text: blockReason }], details: { error: "blocked", mode: mode.kind, run }, isError: true, }; } const limit = normalizeOutputLimit(params.maxOutputChars); const results = mode.kind === "parallel" ? await runParallelAgents(ctx, run, mode.requests, signal) : mode.kind === "chain" ? await runChainedAgents(ctx, run, mode.requests, signal) : [await runDelegation(ctx, run, mode.requests[0], signal)]; const text = formatModeResult(mode.kind, results, limit); return { content: [{ type: "text", text }], details: { mode: mode.kind, results }, isError: results.some((result) => result.exitCode !== 0), }; }, }); export default function (pi: ExtensionAPI) { if (isSubagentChild()) return; pi.registerTool(kdSubagentTool); pi.registerCommand("kd-delegate", { description: KD_DELEGATE_COMMAND_DESCRIPTION, handler: async (args, ctx) => { const parsed = parseDelegationArgs(args); if (!parsed) { ctx.ui.notify(KD_DELEGATE_USAGE, "error"); return; } sendDelegationPrompt(pi, ctx, parsed.role, parsed.task, parsed.dryRun); }, }); pi.registerCommand("kd-review", { description: KD_REVIEW_COMMAND_DESCRIPTION, handler: async (args, ctx) => { const task = args.trim() || DEFAULT_REVIEW_TASK; sendDelegationPrompt(pi, ctx, "review", task, false); }, }); } function sendDelegationPrompt(pi: ExtensionAPI, ctx: ExtensionContext, role: DelegationRole, task: string, dryRun: boolean): void { const message = buildDelegationCommandPrompt({ role, task }, dryRun); if (ctx.isIdle()) pi.sendUserMessage(message); else pi.sendUserMessage(message, { deliverAs: "followUp" }); } function resolveMode(params: { role?: unknown; task?: unknown; tasks?: unknown; chain?: unknown; }): { kind: "single" | "parallel" | "chain"; requests: DelegationRequest[] } | undefined { const single = normalizeRequest(params.role, params.task); const tasks = Array.isArray(params.tasks) ? normalizeTaskList(params.tasks) : []; const chain = Array.isArray(params.chain) ? normalizeTaskList(params.chain) : []; if (tasks === undefined || chain === undefined) return undefined; const modeCount = Number(Boolean(single)) + Number(tasks.length > 0) + Number(chain.length > 0); if (modeCount !== 1) return undefined; if (single) return { kind: "single", requests: [single] }; if (tasks.length > 0 && tasks.length <= MAX_PARALLEL_TASKS) return { kind: "parallel", requests: tasks }; if (chain.length > 0 && chain.length <= MAX_CHAIN_TASKS) return { kind: "chain", requests: chain }; return undefined; } function normalizeTaskList(values: unknown[]): DelegationRequest[] | undefined { const requests = values.map(normalizeTaskItem); if (requests.some((request) => !request)) return undefined; return requests as DelegationRequest[]; } function normalizeTaskItem(value: unknown): DelegationRequest | undefined { if (!value || typeof value !== "object") return undefined; const item = value as { role?: unknown; task?: unknown }; return normalizeRequest(item.role, item.task); } function normalizeRequest(roleValue: unknown, taskValue: unknown): DelegationRequest | undefined { const role = typeof roleValue === "string" ? roleValue.toLowerCase() : ""; const task = typeof taskValue === "string" ? taskValue.trim() : ""; if (!isDelegationRole(role) || !task) return undefined; return { role, task }; } function formatModePreview(cwd: string, run: ReturnType, requests: DelegationRequest[]): string { return requests.map((request, index) => [`## Task ${index + 1}`, formatDelegationPreview(cwd, run, request)].join("\n\n")).join("\n\n---\n\n"); } async function runDelegation( ctx: ExtensionContext, run: ReturnType, request: DelegationRequest, signal: AbortSignal | undefined, step?: number, ): Promise { const prompt = buildDelegationPrompt(ctx.cwd, run, request); return await runChildAgent(ctx, request, prompt, signal, step); } async function runParallelAgents( ctx: ExtensionContext, run: ReturnType, requests: DelegationRequest[], signal: AbortSignal | undefined, ): Promise { const results: ChildAgentResult[] = new Array(requests.length); let next = 0; const workerCount = Math.min(MAX_CONCURRENCY, requests.length); await Promise.all( Array.from({ length: workerCount }, async () => { while (true) { const index = next++; if (index >= requests.length) return; results[index] = await runDelegation(ctx, run, requests[index], signal, index + 1); } }), ); return results; } async function runChainedAgents( ctx: ExtensionContext, run: ReturnType, requests: DelegationRequest[], signal: AbortSignal | undefined, ): Promise { const results: ChildAgentResult[] = []; let previousOutput = ""; for (let i = 0; i < requests.length; i++) { const request = buildChainedDelegationRequest(requests[i], previousOutput); const result = await runDelegation(ctx, run, request, signal, i + 1); results.push(result); previousOutput = result.output; if (result.exitCode !== 0) break; } return results; } function formatModeResult(mode: "single" | "parallel" | "chain", results: ChildAgentResult[], limit: number): string { const succeeded = results.filter((result) => result.exitCode === 0).length; const header = `子 agent ${mode} 完成:${succeeded}/${results.length} 成功`; const sections = results.map((result, index) => { const output = truncateOutput(result.output || result.stderr || "(子 agent 无输出)", limit); return [ `## ${result.step ?? index + 1}. ${result.role}`, `Exit:${result.exitCode}`, result.model ? `Model:${result.model}` : undefined, `Turns:${result.turns}`, "", output, result.stderr.trim() ? `\nSTDERR:\n${truncateOutput(result.stderr.trim(), 4000)}` : undefined, ] .filter(Boolean) .join("\n"); }); return [header, "", ...sections].join("\n"); } function normalizeOutputLimit(value: unknown): number { if (typeof value !== "number" || !Number.isFinite(value)) return DEFAULT_OUTPUT_LIMIT; return Math.max(1000, Math.min(100_000, Math.trunc(value))); } async function runChildAgent( ctx: ExtensionContext, request: DelegationRequest, prompt: string, signal: AbortSignal | undefined, step?: number, ): Promise { const temp = writePromptFile(request.role, prompt); const args = [ "--mode", "json", "-p", "--no-session", "--tools", subagentAllowedTools(request.role).join(","), "--append-system-prompt", temp.file, CHILD_AGENT_USER_TASK, ]; const invocation = getPiInvocation(args); try { const result = await spawnJsonAgent(invocation.command, invocation.args, ctx.cwd, request.role, signal); return { ...result, role: request.role, task: request.task, step }; } finally { rmSync(temp.file, { force: true }); rmSync(temp.dir, { recursive: true, force: true }); } } function writePromptFile(role: DelegationRole, prompt: string): { dir: string; file: string } { const dir = join(tmpdir(), `kcode-subagent-${process.pid}-${Date.now()}-${Math.random().toString(16).slice(2)}`); mkdirSync(dir, { recursive: true }); const file = join(dir, `${role}.md`); writeFileSync(file, prompt, { encoding: "utf8", mode: 0o600 }); return { dir, file }; } function getPiInvocation(args: string[]): { command: string; args: string[] } { const currentScript = process.argv[1]; if (currentScript && existsSync(currentScript)) { return { command: process.execPath, args: [currentScript, ...args] }; } const execName = basename(process.execPath).toLowerCase(); const isGenericRuntime = /^(node|bun)(\.exe)?$/.test(execName); if (!isGenericRuntime) return { command: process.execPath, args }; return { command: "pi", args }; } async function spawnJsonAgent( command: string, args: string[], cwd: string, role: DelegationRole, signal: AbortSignal | undefined, ): Promise { return await new Promise((resolve) => { const messages: Message[] = []; let stderr = ""; let buffer = ""; let turns = 0; let model: string | undefined; let completed = false; const proc = spawn(command, args, { cwd, shell: false, stdio: ["ignore", "pipe", "pipe"], env: { ...process.env, KCODE_SUBAGENT_CHILD: "1", KCODE_SUBAGENT_ROLE: role, }, }); const finish = (exitCode: number) => { if (completed) return; completed = true; resolve({ exitCode, output: finalAssistantText(messages), stderr, model, turns, }); }; const processLine = (line: string) => { if (!line.trim()) return; let event: unknown; try { event = JSON.parse(line); } catch { return; } if (!isJsonEvent(event)) return; if (event.type === "message_end" && event.message) { messages.push(event.message); if (event.message.role === "assistant") { turns++; if (event.message.model) model = event.message.model; } } if (event.type === "tool_result_end" && event.message) messages.push(event.message); }; proc.stdout.on("data", (chunk) => { buffer += chunk.toString(); const lines = buffer.split(/\r?\n/); buffer = lines.pop() ?? ""; for (const line of lines) processLine(line); }); proc.stderr.on("data", (chunk) => { stderr += chunk.toString(); }); proc.on("close", (code) => { if (buffer.trim()) processLine(buffer); finish(code ?? 0); }); proc.on("error", (error) => { stderr += error.message; finish(1); }); if (signal) { const abort = () => { stderr += "\n子 agent 已被中断。"; proc.kill("SIGTERM"); setTimeout(() => { if (!proc.killed) proc.kill("SIGKILL"); }, 5000); }; if (signal.aborted) abort(); else signal.addEventListener("abort", abort, { once: true }); } }); } function isJsonEvent(value: unknown): value is { type: string; message?: Message } { return Boolean(value) && typeof value === "object" && typeof (value as { type?: unknown }).type === "string"; } function finalAssistantText(messages: Message[]): string { for (let i = messages.length - 1; i >= 0; i--) { const message = messages[i]; if (message.role !== "assistant") continue; for (const part of message.content) { if (part.type === "text" && part.text.trim()) return part.text.trim(); } } return ""; } function truncateOutput(output: string, maxChars: number): string { if (output.length <= maxChars) return output; return `${output.slice(0, maxChars)}\n\n[子 agent 输出已截断:剩余 ${output.length - maxChars} 字符]`; }