/** * WritePipeline — 通过门禁控制面管理完整的写入生命周期。 * * 两阶段流程: * * beginWrite(写入前): * pre-write gate → 源码锚点检查 → PLAN.md 路径批准检查 → * TDD 红信号检查 → 允许或阻塞 * * completeWrite(写入后): * post-write gate → 文件存在性验证 → 模板占位符检查 * * 内联检查(beginWrite 阶段): * 除 GateRunner 的 pre-write 策略外,WritePipeline 自身执行三项内联检查: * 1. 源码锚点检查:确保写入前已读取最新锚点(防止基于过期内容的写入)。 * 2. PLAN 路径检查:确保 execute 阶段写入的文件已在 PLAN.md 中批准。 * 3. TDD 红信号检查:确保 normal 模式下有对应的失败测试(安全网)。 * * WritePipeline 不负责: * - 执行实际文件写入(调用方执行写入)。 * - 读取文件系统内容(仅使用上下文数据)。 * - 嵌入业务规则(委托给策略模块和 GateRunner)。 */ import type { ActiveRun, KdSourceAnchorSnapshot, KdWriteTransaction } from "../types.ts"; import type { GateDecision, GateFinding } from "../gates/findings.ts"; import type { GateSeverity } from "../gates/taxonomy.ts"; import { GateRunner } from "../gates/gate-runner.ts"; import { latestSourceAnchorForPath, validateSourceAnchorSnapshot, } from "../source-anchors.ts"; // ── WritePipeline 输入类型 ───────────────────────────────────────────────── /** 开始写入事务的上下文。 */ export interface BeginWriteInput { /** 当前运行的工作目录。 */ cwd: string; /** 活跃运行实例(若有)。 */ run?: ActiveRun; /** 写入的目标文件路径。 */ path: string; /** 执行写入的工具名称(默认 "write")。 */ toolName?: string; /** 待写入内容(仅用于预验证,不会被持久化)。 */ content?: string; } /** 完成写入事务的上下文。 */ export interface CompleteWriteInput { /** 当前运行的工作目录。 */ cwd: string; /** 活跃运行实例(若有)。 */ run?: ActiveRun; /** 已写入的目标文件路径。 */ path: string; /** beginWrite 返回的事务 ID。 */ transactionId: string; /** 执行写入的工具名称(默认 "write")。 */ toolName?: string; } // ── WritePipeline result ────────────────────────────────────────────────────── /** * 写入管道操作状态。 * * - "planned": pre-write 检查通过,调用方可执行实际写入。 * - "blocked": 一项或多项检查拒绝了写入。 * - "written": 写入已执行,post-write 验证待完成。 * - "verified": post-write 检查通过,写入完全确认。 */ export type WritePipelineStatus = "planned" | "blocked" | "written" | "verified"; /** * Structured result returned by every WritePipeline operation. */ export interface WritePipelineResult { /** Whether the write is permitted to proceed (or has been verified). */ allowed: boolean; /** Lifecycle status of the write. */ status: WritePipelineStatus; /** The gate decision from the pre-write or post-write checkpoint. */ gateDecision: GateDecision; /** All findings aggregated from inline checks and gate evaluation. */ findings: GateFinding[]; /** Transaction id for correlating begin → complete. */ transactionId: string; /** Trace id for ledger correlation. */ traceId: string; /** Write transaction record (for run state). */ transaction: KdWriteTransaction; } // ── Inline check result ─────────────────────────────────────────────────────── /** * Internal result from an inline check (not from GateRunner). */ interface InlineCheckResult { passed: boolean; finding?: GateFinding; } // ── Source anchor validation interface ───────────────────────────────────────── /** * Minimal interface for source anchor validation. * Decouples WritePipeline from the full source-anchors module. */ export interface SourceAnchorValidator { latestSourceAnchorForPath( cwd: string, snapshots: ActiveRun["sourceAnchors"], path: string, ): KdSourceAnchorSnapshot | undefined; validateSourceAnchorSnapshot( cwd: string, snapshot: KdSourceAnchorSnapshot, ): { passed: boolean; reason?: string }; } // ── WritePipeline ───────────────────────────────────────────────────────────── export class WritePipeline { /** 门禁执行器,用于 pre-write 和 post-write 门禁评估。 */ private readonly gateRunner: GateRunner; /** 源码锚点验证器,解耦于具体的 source-anchors 模块。 */ private readonly anchorValidator: SourceAnchorValidator; /** finding 序列号生成器。 */ private findingSeq = 0; constructor(gateRunner: GateRunner, anchorValidator: SourceAnchorValidator) { this.gateRunner = gateRunner; this.anchorValidator = anchorValidator; } /** * 开始写入事务,执行完整的 pre-write 检查流程: * 1. 运行 pre-write 门禁(GateRunner + 所有 pre-write 策略)。 * 2. 若为源码类文件:检查源码锚点是否存在且新鲜。 * 3. 若为源码类文件:检查路径是否在 PLAN.md 中批准。 * 4. 检查 TDD 红信号是否存在(仅 normal 模式)。 * 5. 返回 allowed(planned)或 blocked 结果。 * * 仅对源码类文件(.java, .kt, .xml, .cs 等)执行步骤 2-4 的内联检查。 * 非源码文件(如 .md, .json)跳过这些检查。 */ async beginWrite(input: BeginWriteInput): Promise { const { cwd, run, path, toolName = "write" } = input; const transactionId = this.createTransactionId(); // ── Step 1: Pre-write gate ────────────────────────────────────── const gateDecision = await this.gateRunner.runGate({ cwd, checkpoint: "pre-write", path, toolName, run, payload: { content: input.content }, }); // If pre-write gate blocks, short-circuit. if (!gateDecision.allowed) { return this.buildBlockedResult(transactionId, gateDecision, gateDecision.findings); } // 步骤 2-4:内联检查(仅对源码类文件执行) // isSourceLikePath 检查文件扩展名是否属于源码类型 const inlineFindings: GateFinding[] = []; const sourceLike = isSourceLikePath(path); if (sourceLike) { // Step 2: Source anchor check. const anchorCheck = this.checkSourceAnchor(cwd, run, path); if (anchorCheck.finding) { inlineFindings.push(anchorCheck.finding); } // Step 3: Plan path check. const planCheck = this.checkPlanPath(cwd, run, path); if (planCheck.finding) { inlineFindings.push(planCheck.finding); } // Step 4: TDD red signal check (normal mode only). const tddCheck = this.checkTddRedSignal(run); if (tddCheck.finding) { inlineFindings.push(tddCheck.finding); } } // If any inline check produced a hard-deny, block. if (inlineFindings.some((f) => f.severity === "hard-deny" || f.severity === "soft-deny")) { return this.buildBlockedResult(transactionId, gateDecision, [ ...gateDecision.findings, ...inlineFindings, ]); } // ── Step 5: Return planned result ────────────────────────────── return this.buildPlannedResult(transactionId, gateDecision, [ ...gateDecision.findings, ...inlineFindings, ]); } /** * 完成写入事务,执行 post-write 验证: * 1. 运行 post-write 门禁。 * 2. 验证文件存在且内容符合预期(由 post-write 策略处理)。 * 3. 检查是否残留模板占位符。 * * 注意:WritePipeline 不直接读取文件系统, * 文件存在性检查由 post-write gate 策略或调用方负责。 */ async completeWrite(input: CompleteWriteInput): Promise { const { cwd, run, path, transactionId, toolName = "write" } = input; // ── Step 1: Post-write gate ───────────────────────────────────── const gateDecision = await this.gateRunner.runGate({ cwd, checkpoint: "post-write", path, toolName, run, payload: { transactionId }, }); // If post-write gate blocks, return blocked. if (!gateDecision.allowed) { return this.buildBlockedResult(transactionId, gateDecision, gateDecision.findings); } // ── Steps 2–3: Post-write inline checks ──────────────────────── // NOTE: These checks require filesystem access (existsSync, readFileSync). // WritePipeline is designed to NOT read the filesystem directly. // The caller is responsible for passing validation data via the gate payload, // or the post-write gate policies handle it. // // For the "verified" status, we trust the post-write gate's decision. // If the gate allowed, we consider the write verified. return this.buildVerifiedResult(transactionId, gateDecision, gateDecision.findings); } // ── Inline checks ────────────────────────────────────────────────────── /** * 检查源码锚点是否存在且新鲜。 * * 源码锚点机制确保写入前已读取文件的最新状态, * 防止基于过期内容的盲目写入。 * 若未读取锚点或锚点校验失败,返回 soft-deny finding。 */ private checkSourceAnchor(cwd: string, run: ActiveRun | undefined, path: string): InlineCheckResult { if (!run?.sourceAnchors || run.sourceAnchors.length === 0) { return { passed: false, finding: this.createFinding( "soft-deny", "source-anchor", `源码类文件写入前必须先使用 kd_anchor_read 读取最新锚点:${path}`, `执行 kd_anchor_read path="${path}" 获取最新锚点后再写入。`, ), }; } const anchor = this.anchorValidator.latestSourceAnchorForPath(cwd, run.sourceAnchors, path); if (!anchor) { return { passed: false, finding: this.createFinding( "soft-deny", "source-anchor", `源码类文件写入前必须先使用 kd_anchor_read 读取最新锚点:${path}`, `执行 kd_anchor_read path="${path}" 获取最新锚点后再写入。`, ), }; } const validation = this.anchorValidator.validateSourceAnchorSnapshot(cwd, anchor); if (!validation.passed) { return { passed: false, finding: this.createFinding( "soft-deny", "source-anchor", validation.reason ?? `锚点校验失败:${path}`, `重新执行 kd_anchor_read path="${path}" 获取最新锚点后再写入。`, ), }; } return { passed: true }; } /** * 检查路径是否在 PLAN.md 中被批准写入。 * * 仅在 execute 阶段生效。若 PLAN.md 中未提及该路径, * 返回 soft-deny finding,提示用户回到 plan 阶段补充。 * .pi/ 目录下的文件豁免此检查(运行时状态文件)。 */ private checkPlanPath(cwd: string, run: ActiveRun | undefined, path: string): InlineCheckResult { if (!run || run.phase !== "execute") return { passed: true }; const normalized = normalizeRelativePath(cwd, path); if (normalized.startsWith(".pi/")) return { passed: true }; // Read PLAN.md artifact if available. const planContent = run.artifacts?.plan; if (!planContent) return { passed: true }; if (planMentionsPath(planContent, normalized)) return { passed: true }; return { passed: false, finding: this.createFinding( "soft-deny", "plan-path", `PLAN.md 未批准写入 ${normalized}。`, `回到 plan 阶段,把 ${normalized} 加入 PLAN.md 的 ## 允许修改的文件 和执行步骤;重新通过门禁后再写。`, ), }; } /** * 检查 TDD 红信号是否存在。 * * 仅在 normal 模式 + execute 阶段生效。 * TDD 红信号的实际检查由 pre-write gate 的 tdd-policy 负责, * 此处作为安全网:若 gate 策略遗漏,此处兜底。 * 当前实现信任 gate 的决策,直接返回 passed。 */ private checkTddRedSignal(run: ActiveRun | undefined): InlineCheckResult { if (!run || run.mode !== "normal") return { passed: true }; if (run.phase !== "execute") return { passed: true }; // TDD red signal is checked via the gate policies (tdd-production-write-blockReason). // The pre-write gate already runs tdd-policy at "pre-write" checkpoint. // This inline check is a safety net: if gate policies missed it, we check here. // Since we don't read the filesystem, we rely on the gate's decision. // Return passed — the gate already handled TDD enforcement. return { passed: true }; } // ── Result builders ───────────────────────────────────────────────────── private buildPlannedResult( transactionId: string, gateDecision: GateDecision, allFindings: GateFinding[], ): WritePipelineResult { const transaction = this.createTransaction(transactionId, "planned", allFindings); return { allowed: true, status: "planned", gateDecision, findings: allFindings, transactionId, traceId: gateDecision.traceId, transaction, }; } private buildBlockedResult( transactionId: string, gateDecision: GateDecision, allFindings: GateFinding[], ): WritePipelineResult { const transaction = this.createTransaction(transactionId, "blocked", allFindings, this.blockReason(allFindings)); return { allowed: false, status: "blocked", gateDecision, findings: allFindings, transactionId, traceId: gateDecision.traceId, transaction, }; } private buildVerifiedResult( transactionId: string, gateDecision: GateDecision, allFindings: GateFinding[], ): WritePipelineResult { const transaction = this.createTransaction(transactionId, "verified", allFindings); return { allowed: true, status: "verified", gateDecision, findings: allFindings, transactionId, traceId: gateDecision.traceId, transaction, }; } // ── Private helpers ───────────────────────────────────────────────────── private createFinding( severity: GateSeverity, policy: string, message: string, nextAction?: string, ): GateFinding { return { id: this.nextFindingId(), severity, policy, message, nextAction, }; } private nextFindingId(): string { this.findingSeq += 1; return `F-${String(this.findingSeq).padStart(3, "0")}`; } private createTransactionId(): string { const timestamp = Date.now(); const randomHex = Math.floor(Math.random() * 0xffffff).toString(16).padStart(6, "0"); return `W-${timestamp}-${randomHex}`; } private createTransaction( id: string, status: KdWriteTransaction["status"], findings: GateFinding[], reason?: string, ): KdWriteTransaction { const now = new Date().toISOString(); return { id, path: "", // Caller sets path on the result. status, checks: findings.map((f) => `${f.policy}:${f.severity}`), reason, createdAt: now, updatedAt: now, }; } private blockReason(findings: GateFinding[]): string | undefined { const blocking = findings.find( (f) => f.severity === "hard-deny" || f.severity === "soft-deny", ); return blocking?.message; } } // ── 路径工具函数(从 path-policy.ts 委托) ───────────────────────────────── // 判断文件路径是否属于"源码类",决定是否执行内联检查。 /** 源码文件扩展名集合。 */ const SOURCE_EXTENSIONS = new Set([ ".java", ".kt", ".kts", ".xml", ".properties", ".yml", ".yaml", ".sql", ".ksql", ".cs", ".py", ]); /** 判断路径是否为源码类文件(基于扩展名)。 */ function isSourceLikePath(path: string): boolean { const normalized = normalizeRelativePath("", path); const segments = normalized.split("/"); const lastSegment = segments[segments.length - 1] ?? normalized; const dotIndex = lastSegment.lastIndexOf("."); if (dotIndex < 0) return false; return SOURCE_EXTENSIONS.has(lastSegment.slice(dotIndex).toLowerCase()); } /** 将路径标准化为相对路径(去除 cwd 前缀、统一斜杠)。 */ function normalizeRelativePath(cwd: string, path: string): string { const normalized = path.replace(/\\/g, "/").replace(/^\.\//, "").replace(/^\/+/, ""); // If absolute and cwd provided, make relative. if (/^[a-z]:/i.test(normalized) && cwd) { const cwdNorm = cwd.replace(/\\/g, "/").replace(/\/+$/, ""); if (normalized.toLowerCase().startsWith(cwdNorm.toLowerCase() + "/")) { return normalized.slice(cwdNorm.length + 1); } } return normalized; } /** 检查 PLAN.md 内容中是否提及指定路径(使用正则匹配)。 */ function planMentionsPath(plan: string, normalizedPath: string): boolean { const normalizedPlan = plan.replace(/\\/g, "/"); const escaped = escapeRegExp(normalizedPath); return new RegExp(`(?:^|[\\s\`"'(])${escaped}(?:[\\s\`"')]|$)`, "i").test(normalizedPlan); } function escapeRegExp(value: string): string { return value.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); } // ── Convenience factory ─────────────────────────────────────────────────────── /** * Create a WritePipeline with the given gate runner and anchor validator. */ export function createWritePipeline( gateRunner: GateRunner, anchorValidator: SourceAnchorValidator, ): WritePipeline { return new WritePipeline(gateRunner, anchorValidator); } /** * Create a WritePipeline with a default anchor validator that delegates to * the source-anchors module. * * Usage: * import { createDefaultWritePipeline } from "./write-pipeline.ts"; * const pipeline = createDefaultWritePipeline(gateRunner); */ export function createDefaultWritePipeline(gateRunner: GateRunner): WritePipeline { const defaultValidator: SourceAnchorValidator = { latestSourceAnchorForPath(cwd, snapshots, path) { return latestSourceAnchorForPath(cwd, snapshots, path); }, validateSourceAnchorSnapshot(cwd, snapshot) { return validateSourceAnchorSnapshot(cwd, snapshot); }, }; return new WritePipeline(gateRunner, defaultValidator); }