/** * ToolPipeline — 通过门禁控制面编排工具执行。 * * 执行流程: * tool.requested → runGate("pre-tool") → 执行或阻塞 → runGate("post-tool") → 生成 contract → 写入 ledger * * 关键设计: * - 工具执行委托给 ToolExecutor 回调,ToolPipeline 不感知具体工具实现。 * - 授权集成:若配置了 AuthorizationModel,soft-deny 可通过授权令牌覆盖。 * - contract 生成:优先使用工具自带回退到自动生成(success/failed/blocked)。 * - 不可信内容包装:对 kd_search 等工具的输出添加不可信前缀。 * * ToolPipeline 不负责: * - 感知具体工具(kd_* 等)。 * - 实现实际工具执行(委托给 ToolExecutor)。 * - 实现写入管道(那是 WritePipeline 的职责)。 */ import type { ActiveRun } from "../types.ts"; import type { GateContext, GateDecision } from "../gates/findings.ts"; import { GateRunner } from "../gates/gate-runner.ts"; import { AuthorizationModel, evaluateGateWithAuth } from "../gates/authorization.ts"; import { createDefaultPolicyRegistry } from "../gates/policy-registry.ts"; import { appendLedgerEvent } from "../ledger.ts"; import { generateTraceId } from "../kernel/trace.ts"; import { createToolResultContract, isUntrustedContentTool, validateToolResultContract, wrapUntrustedOutput, } from "./tool-contract.ts"; import type { CreateToolResultContractInput, ToolResultContract, ToolResultStatus, } from "./tool-contract.ts"; // ── 工具执行器接口 ──────────────────────────────────────────────────────── /** * 工具执行器返回的结果。 * 执行器负责实际工作;ToolPipeline 将其包装为 contract。 */ export interface ToolExecutionResult { /** 工具的原始输出文本。 */ output: string; /** 工具是否执行成功。 */ success: boolean; /** 失败时的错误消息。 */ error?: string; /** 工具预先构建的 contract(若有)。 */ contract?: ToolResultContract; /** 工具执行期间发现的源码引用。 */ sourceRefs?: string[]; /** 工具产生的证据文件路径。 */ evidencePath?: string; } /** * 工具执行函数签名。 * ToolPipeline 将实际执行委托给此函数,保持与具体工具实现的解耦。 */ export type ToolExecutor = (toolName: string, args: unknown) => Promise; // ── Tool call request ──────────────────────────────────────────────────────── /** * 工具调用请求,进入管道的入口数据。 */ export interface ToolCallRequest { /** 要调用的工具名称。 */ toolName: string; /** 传递给工具的参数。 */ args: unknown; /** 当前运行的工作目录。 */ cwd: string; /** 活跃运行实例(用于 ledger 关联)。 */ run?: ActiveRun; /** 可选的文件路径上下文(传递给门禁评估)。 */ path?: string; } // ── Pipeline result ────────────────────────────────────────────────────────── /** * 管道执行的完整结果。 */ export interface ToolPipelineResult { /** 最终的 contract。 */ contract: ToolResultContract; /** 包装后的输出文本(不可信内容会添加前缀)。 */ output: string; /** pre-tool 门禁决策。 */ preGateDecision: GateDecision; /** post-tool 门禁决策(仅在执行发生后存在)。 */ postGateDecision?: GateDecision; /** 本次管道执行的关联 trace ID。 */ traceId: string; } // ── Ledger event helper type ───────────────────────────────────────────────── /** * Function to append an event to the run ledger. * Matches the signature of appendLedgerEvent from ledger.ts. */ export type LedgerAppender = ( cwd: string, run: ActiveRun, input: Parameters[2], ) => void; // ── ToolPipeline ───────────────────────────────────────────────────────────── export class ToolPipeline { /** 门禁执行器,用于 pre-tool 和 post-tool 门禁评估。 */ private readonly gateRunner: GateRunner; /** 工具执行回调,实际执行工具逻辑。 */ private readonly executor: ToolExecutor; /** 可选的授权模型,用于 soft-deny 覆盖。 */ private readonly authModel?: AuthorizationModel; /** ledger 写入函数,默认使用 appendLedgerEvent。 */ private readonly ledger: LedgerAppender; constructor( gateRunner: GateRunner, executor: ToolExecutor, authModel?: AuthorizationModel, ledger: LedgerAppender = appendLedgerEvent, ) { this.gateRunner = gateRunner; this.executor = executor; this.authModel = authModel; this.ledger = ledger; } /** * 通过完整管道执行工具调用: * 1. 发出 tool.requested 到 ledger * 2. 执行 pre-tool 门禁(可选授权覆盖) * 3. 若门禁允许,执行工具(委托给 ToolExecutor) * 4. 执行 post-tool 门禁 * 5. 生成 contract(优先使用工具自带 contract,否则自动生成) * 6. 发出 tool.result.recorded 到 ledger * * @param request - 工具调用请求 * @returns 完整的管道执行结果 */ async execute(request: ToolCallRequest): Promise { const { toolName, args, cwd, run, path } = request; const traceId = generateTraceId("tool"); // ── Step 0: Emit tool.requested ─────────────────────────────────── this.emitLedger(cwd, run, { type: "tool.result.recorded", summary: `tool.requested: ${toolName}`, data: { traceId, toolName, path }, }); // ── Step 1: Pre-tool gate ───────────────────────────────────────── const preGateDecision = await this.runGateWithAuth({ cwd, checkpoint: "pre-tool", toolName, path, run, payload: args, }); // If pre-tool gate blocks, return a blocked contract immediately. if (!preGateDecision.allowed) { return this.buildBlockedResult(toolName, cwd, run, preGateDecision, traceId); } // ── Step 2: Execute the tool ───────────────────────────────────── let executionResult: ToolExecutionResult; try { executionResult = await this.executor(toolName, args); } catch (error) { // Tool execution threw an unexpected error. executionResult = { output: "", success: false, error: error instanceof Error ? error.message : String(error), }; } // ── Step 3: Post-tool gate ─────────────────────────────────────── const postGateDecision = await this.runGateWithAuth({ cwd, checkpoint: "post-tool", toolName, path, run, payload: { args, output: executionResult.output, success: executionResult.success, }, }); // ── Step 4: Generate contract ──────────────────────────────────── const contract = this.buildContract( toolName, executionResult, preGateDecision, postGateDecision, ); // Wrap output if untrusted. const output = wrapUntrustedOutput(contract, executionResult.output); // ── Step 5: Emit tool.result.recorded ──────────────────────────── this.emitLedger(cwd, run, { type: "tool.result.recorded", summary: `tool.${contract.status}: ${toolName} — ${contract.summary.slice(0, 120)}`, data: { traceId, toolName, status: contract.status, preGateAllowed: preGateDecision.allowed, postGateAllowed: postGateDecision.allowed, untrusted: contract.untrusted, }, }); return { contract, output, preGateDecision, postGateDecision, traceId, }; } // ── Private helpers ────────────────────────────────────────────────────── /** * 执行门禁评估,若配置了授权模型则应用授权覆盖。 * * 授权模型允许 soft-deny 被结构化授权令牌覆盖(如用户手动确认)。 * 未配置授权模型时,直接委托给 GateRunner.runGate()。 */ private async runGateWithAuth(ctx: GateContext): Promise { if (this.authModel) { return evaluateGateWithAuth(this.gateRunner, ctx, this.authModel); } return this.gateRunner.runGate(ctx); } /** * 发出 ledger 事件。吞没错误以避免阻塞管道执行。 * 这是容错设计:ledger 写入失败不应影响工具执行。 */ private emitLedger( cwd: string, run: ActiveRun | undefined, input: Parameters[2], ): void { if (!run) return; try { this.ledger(cwd, run, input); } catch { // Ledger write failure must not block tool execution. } } /** * 当 pre-tool 门禁阻塞时,构建阻塞结果。 * 提取 blocking findings 的 nextAction 作为修复建议。 */ private buildBlockedResult( toolName: string, cwd: string, run: ActiveRun | undefined, decision: GateDecision, traceId: string, ): ToolPipelineResult { const blockingFindings = decision.findings.filter( (f) => f.severity === "hard-deny" || f.severity === "soft-deny", ); const nextAction = blockingFindings[0]?.nextAction ?? "Review gate findings and resolve blocking issues before retrying."; const contract = createToolResultContract({ toolName, status: "blocked", summary: `Blocked by pre-tool gate: ${blockingFindings.map((f) => f.message).join("; ")}`, nextAction, gateTraceId: decision.traceId, }); // Emit tool.blocked to ledger. this.emitLedger(cwd, run, { type: "tool.blocked", summary: `tool.blocked: ${toolName} — ${contract.summary.slice(0, 120)}`, data: { traceId, toolName, gateTraceId: decision.traceId, findings: blockingFindings.map((f) => ({ id: f.id, severity: f.severity, policy: f.policy, message: f.message, })), }, }); return { contract, output: "", preGateDecision: decision, traceId, }; } /** * 根据执行结果和门禁决策构建最终 contract。 * * 优先级: * 1. 工具自带的 contract(验证后使用,无效则回退)。 * 2. 自动生成:根据 success + postGateDecision 确定 status。 */ private buildContract( toolName: string, execution: ToolExecutionResult, preDecision: GateDecision, postDecision: GateDecision, ): ToolResultContract { // If the tool provided its own contract, validate and use it. if (execution.contract) { const augmented: ToolResultContract = { ...execution.contract, gateTraceId: execution.contract.gateTraceId ?? preDecision.traceId, untrusted: execution.contract.untrusted ?? isUntrustedContentTool(toolName), }; const validation = validateToolResultContract(augmented); if (validation.valid) { return augmented; } // If the tool-provided contract is invalid, fall through to generate one. console.warn( `[ToolPipeline] Tool "${toolName}" provided invalid contract: ${validation.issues.join("; ")}. Generating fallback.`, ); } // Warn when tool succeeded but provided no contract. if (execution.success && !execution.contract) { console.warn( `[ToolPipeline] Tool "${toolName}" succeeded but provided no ToolResultContract. Generating fallback.`, ); } // Determine status. const status: ToolResultStatus = execution.success ? (postDecision.allowed ? "success" : "blocked") : "failed"; // For failures, ensure nextAction is always present. const nextAction = execution.success ? undefined : (execution.error ? `Fix the error and retry: ${execution.error.slice(0, 120)}` : "Tool execution failed. Review the error and retry."); // Build summary. const summary = execution.success ? `Tool "${toolName}" completed successfully.` : `Tool "${toolName}" failed: ${execution.error ?? "unknown error"}`; const input: CreateToolResultContractInput = { toolName, status, summary, sourceRefs: execution.sourceRefs, evidencePath: execution.evidencePath, nextAction: status === "failed" ? nextAction : undefined, gateTraceId: preDecision.traceId, }; return createToolResultContract(input); } } // ── Factory ────────────────────────────────────────────────────────────────── /** * Create a ToolPipeline with the given gate runner, executor, and optional auth model. */ export function createToolPipeline( gateRunner: GateRunner, executor: ToolExecutor, authModel?: AuthorizationModel, ): ToolPipeline { return new ToolPipeline(gateRunner, executor, authModel); } /** * 创建使用默认策略注册表的 ToolPipeline。 * * 这是生产环境的主要工厂函数。内部创建 GateRunner 并注册一个 * 桥接 meta-policy,将策略评估委托给 PolicyRegistry。 * PolicyRegistry.evaluate() 已处理 checkpoint 过滤、排序和去重, * 因此 GateRunner 收到的是预处理后的 findings。 * * @param executor - 工具执行函数 * @param authModel - 可选的授权模型(用于 soft-deny 覆盖) * @param ledger - 可选的自定义 ledger 写入函数 */ export function createDefaultToolPipeline( executor: ToolExecutor, authModel?: AuthorizationModel, ledger?: LedgerAppender, ): ToolPipeline { const registry = createDefaultPolicyRegistry(); const gateRunner = new GateRunner(); // 桥接模式:注册单一 meta-policy,委托给 PolicyRegistry。 // PolicyRegistry.evaluate() 内部已完成 checkpoint 过滤、排序和去重, // 因此 GateRunner 收到的是预处理后的 findings。 gateRunner.registerPolicy( "__policy_registry_bridge__", (ctx) => registry.evaluate(ctx.checkpoint, ctx), [], // Empty = runs at all checkpoints ); return new ToolPipeline(gateRunner, executor, authModel, ledger); }