/** * GateRunner — KCode 控制面的唯一门禁执行器。 * * 职责(仅限): * 1. 在固定检查点收集上下文。 * 2. 调用所有已注册的策略(policy)。 * 3. 产生结构化的 GateDecision 并记录到 ledger。 * * GateRunner 不负责: * - 嵌入业务规则(规则在独立的 policy 模块中)。 * - 读取任意文档。 * - 生成 prompt。 * - 执行工具或命令。 * * 设计决策: * - 策略通过 registerPolicy() 注册,GateRunner 不感知具体策略实现。 * - findings 去重基于 policy+message 键,保留 severity 最高的条目。 * - ledger 写入失败不会阻塞门禁评估(容错设计)。 */ import type { ActiveRun } from "../types.ts"; import { appendLedgerEvent } from "../ledger.ts"; import { generateTraceId } from "../kernel/trace.ts"; import type { GateContext, GateDecision, GateFinding } from "./findings.ts"; import type { GateCheckpoint, GateSeverity } from "./taxonomy.ts"; import { SEVERITY_RANK } from "./taxonomy.ts"; // ── 策略契约 ────────────────────────────────────────────────────────────── /** * 门禁策略函数类型。 * * 策略必须是纯函数:读取上下文 → 返回 findings,无副作用。 * 同一个策略可能注册到多个 checkpoint。 */ export type GatePolicyFunction = (ctx: GateContext) => GateFinding[]; /** 已注册的策略条目。 */ interface RegisteredPolicy { name: string; /** 策略生效的 checkpoint 列表。空数组表示在所有 checkpoint 生效。 */ checkpoints: GateCheckpoint[]; evaluate: GatePolicyFunction; } // ── Ledger 集成 ─────────────────────────────────────────────────────────── /** * 门禁事件类型常量,用于 ledger 记录。 * 与 kernel/events.ts 中 GATE_EVENT_TYPES 的 "gate.decision" 值一致。 */ const GATE_DECISION_EVENT_TYPE = "gate.decision"; // ── GateRunner ─────────────────────────────────────────────────────────────── export class GateRunner { /** 已注册的策略列表。 */ private readonly policies: RegisteredPolicy[] = []; /** finding 序列号生成器,用于产生唯一 ID(F-001, F-002, ...)。 */ private findingSeq = 0; // ── Policy registration ────────────────────────────────────────────────── /** * Register a policy to be evaluated at the specified checkpoints. * * @param name Unique policy name (used in finding attribution). * @param evaluate The policy function. * @param checkpoints Checkpoints where this policy runs. If empty, runs at ALL checkpoints. */ registerPolicy( name: string, evaluate: GatePolicyFunction, checkpoints: GateCheckpoint[] = [], ): void { if (this.policies.some((p) => p.name === name)) { throw new Error(`GateRunner: policy "${name}" is already registered`); } this.policies.push({ name, checkpoints, evaluate }); } /** * Remove a previously registered policy by name. * * @param name The policy name to deregister. * @returns true if the policy was found and removed. */ deregisterPolicy(name: string): boolean { const idx = this.policies.findIndex((p) => p.name === name); if (idx < 0) return false; this.policies.splice(idx, 1); return true; } // ── Gate evaluation ────────────────────────────────────────────────────── /** * 执行门禁评估的完整流程: * * 1. 生成 traceId,发出 gate.requested 事件。 * 2. 遍历所有已注册策略,过滤出适用于当前 checkpoint 的策略, * 收集原始 findings。 * 3. 为 findings 分配序列 ID 并去重(同 policy+message 保留 severity 最高的)。 * 4. 根据 severity 计算 allowed 结果。 * 5. 构建 GateDecision 并记录到 ledger。 * * @param ctx - 门禁上下文(包含 checkpoint、cwd、run、工具信息等) * @returns 结构化的门禁决策 */ async runGate(ctx: GateContext): Promise { // 步骤 0:生成 traceId 并发出 gate.requested 事件(用于审计追踪) const traceId = generateTraceId("gate"); await this.recordGateRequested(ctx, traceId); // 步骤 1:收集所有适用策略的 findings const rawFindings: GateFinding[] = []; for (const policy of this.policies) { // 跳过不适用于当前 checkpoint 的策略 if (policy.checkpoints.length > 0 && !policy.checkpoints.includes(ctx.checkpoint)) { continue; } const findings = policy.evaluate(ctx); if (findings.length > 0) { rawFindings.push(...findings); } } // 步骤 2:分配序列 ID 并去重 const numbered = rawFindings.map((f) => ({ ...f, id: this.nextFindingId(), })); const deduplicated = this.deduplicateFindings(numbered); // 步骤 3:根据 severity 计算是否允许 const allowed = this.computeAllowed(ctx.checkpoint, deduplicated); // 步骤 4:构建决策对象 const decision: GateDecision = { allowed, checkpoint: ctx.checkpoint, findings: deduplicated, traceId, }; // 步骤 5:记录到 ledger(gate.decision + gate.blocked/gate.warning) await this.recordDecision(ctx.cwd, ctx.run, decision); return decision; } // ── Decision recording ─────────────────────────────────────────────────── /** * 将门禁决策追加到运行 ledger。 * * 此方法对外暴露,允许外部来源(如重放、导入)记录决策而无需重新执行策略。 * 发出的事件: * - gate.decision(始终) * - gate.blocked(当门禁阻塞时) * - gate.warning(当仅有 warning 级别 findings 时) */ async recordDecision( cwd: string, run: ActiveRun | undefined, decision: GateDecision, ): Promise { if (!run) return; const findingsSummary = decision.findings.map((f) => ({ id: f.id, severity: f.severity, policy: f.policy, message: f.message, nextAction: f.nextAction, })); try { // Always emit gate.decision. appendLedgerEvent(cwd, run, { type: GATE_DECISION_EVENT_TYPE, summary: `Gate ${decision.checkpoint}: ${decision.allowed ? "ALLOWED" : "BLOCKED"} (${decision.findings.length} findings)`, data: { traceId: decision.traceId, allowed: decision.allowed, checkpoint: decision.checkpoint, findings: findingsSummary, }, }); // Emit gate.blocked when the gate blocks the action. if (!decision.allowed) { appendLedgerEvent(cwd, run, { type: "gate.blocked", summary: `Gate ${decision.checkpoint}: BLOCKED`, data: { traceId: decision.traceId, checkpoint: decision.checkpoint, findings: findingsSummary, }, }); } // Emit gate.warning when there are only warning-level findings. if (decision.allowed && decision.findings.some((f) => f.severity === "warning")) { appendLedgerEvent(cwd, run, { type: "gate.warning", summary: `Gate ${decision.checkpoint}: ALLOWED with warnings`, data: { traceId: decision.traceId, checkpoint: decision.checkpoint, findings: findingsSummary.filter((f) => f.severity === "warning"), }, }); } } catch { // Ledger write failure must not block gate evaluation. } } // ── Private helpers ────────────────────────────────────────────────────── /** * 在策略评估前发出 gate.requested 事件到 ledger。 * 用于审计追踪:记录门禁检查的发起时间和上下文。 */ private async recordGateRequested(ctx: GateContext, traceId: string): Promise { if (!ctx.run) return; try { appendLedgerEvent(ctx.cwd, ctx.run, { type: "gate.requested", summary: `Gate ${ctx.checkpoint}: requested`, data: { traceId, checkpoint: ctx.checkpoint, toolName: ctx.toolName, path: ctx.path, }, }); } catch { // Ledger write failure must not block gate evaluation. } } /** * 根据 findings 的 severity 计算门禁是否允许。 * * 规则(按优先级): * - hard-deny → 阻塞(不可覆盖) * - soft-deny → 阻塞(授权模型 TBD,未来可通过授权令牌覆盖) * - evidence-required → 阻塞(阻止阶段推进;调用方可对非阶段 checkpoint 放宽) * - warning only → 允许(不阻塞) * - 无 findings → 允许 */ private computeAllowed(checkpoint: GateCheckpoint, findings: GateFinding[]): boolean { if (findings.length === 0) return true; for (const f of findings) { switch (f.severity) { case "hard-deny": return false; case "soft-deny": // Blocks by default. When the authorization model is implemented, // this will check for a structured authorization token. return false; case "evidence-required": // Blocks phase advance. For non-phase checkpoints the caller // may choose to override; GateRunner itself treats it as blocking. return false; case "warning": // Never blocks. break; } } return true; } /** * 去重 findings:相同 policy + 相同 message 的 finding 只保留 severity 最高的。 * * 去重键格式:`${policy}::${message}` * 使用 Map 存储,遍历时比较 severity rank,保留更大的。 */ private deduplicateFindings(findings: GateFinding[]): GateFinding[] { const map = new Map(); for (const f of findings) { const key = `${f.policy}::${f.message}`; const existing = map.get(key); if (!existing || this.severityRank(f.severity) > this.severityRank(existing.severity)) { map.set(key, f); } } return Array.from(map.values()); } /** 获取 severity 的数值排名,用于排序和去重比较。 */ private severityRank(severity: GateSeverity): number { return SEVERITY_RANK[severity] ?? 0; } /** 生成唯一 finding ID,格式:"F-001", "F-002", ... */ private nextFindingId(): string { this.findingSeq += 1; return `F-${String(this.findingSeq).padStart(3, "0")}`; } }