/** * L1 Memory Conflict Detection (Batch Mode): decides how to handle multiple new * memories against existing records in a single LLM call. * * v4: Removed JSONL-based Jaccard fallback. Candidate recall now relies exclusively * on vector search (primary) and FTS5 BM25 (degraded). If neither is available, * conflict detection is skipped entirely — all memories go straight to store. * * Two-phase approach: * 1. Candidate search per new memory — vector recall or FTS5 keyword recall (fast, no LLM) * 2. Batch LLM judgment on all new memories + their candidate pools (single call) */ import type { ExtractedMemory, MemoryRecord, DedupDecision, MemoryType } from "./l1-writer.js"; import { CONFLICT_DETECTION_SYSTEM_PROMPT, formatBatchConflictPrompt } from "../prompts/l1-dedup.js"; import type { CandidateMatch } from "../prompts/l1-dedup.js"; import { CleanContextRunner } from "../../utils/clean-context-runner.js"; import { sanitizeJsonForParse } from "../../utils/sanitize.js"; import type { IMemoryStore } from "../store/types.js"; import { buildFtsQuery } from "../store/sqlite.js"; import type { EmbeddingService } from "../store/embedding.js"; import type { LLMRunner } from "../types.js"; interface Logger { debug?: (message: string) => void; info: (message: string) => void; warn: (message: string) => void; error: (message: string) => void; } const TAG = "[memory-tdai][l1-dedup]"; // ============================ // Core function (batch mode) // ============================ /** * Batch conflict detection: compare all new memories against existing records * in a single LLM call. * * Candidate recall strategy (3-tier degradation): * 1. Vector recall (vectorStore + embeddingService) — cosine similarity (best) * 2. FTS5 keyword recall (vectorStore with FTS available) — BM25 ranking (degraded) * 3. Skip conflict detection entirely — all memories go straight to "store" * * The old JSONL-based Jaccard fallback has been removed. If neither vector search * nor FTS is available, we skip dedup rather than paying the O(N) full-file-scan cost. * * @param memories - Newly extracted memories (with record_id) * @param config - OpenClaw config (for LLM access) * @param logger - Optional logger * @param model - Optional model override * @param vectorStore - Optional vector store for cosine similarity search * @param embeddingService - Optional embedding service for computing query vectors * @param conflictRecallTopK - Top-K candidates to recall per new memory (default: 5) * @returns Array of dedup decisions, one per new memory */ export async function batchDedup(params: { memories: Array; config: unknown; logger?: Logger; model?: string; /** Vector store for cosine similarity candidate recall */ vectorStore?: IMemoryStore; /** Embedding service for computing query vectors */ embeddingService?: EmbeddingService; /** Top-K candidates per new memory (default: 5) */ conflictRecallTopK?: number; /** Override embedding timeout for capture-path calls (milliseconds) */ embeddingTimeoutMs?: number; /** Host-neutral LLM runner — when provided, used instead of CleanContextRunner. */ llmRunner?: LLMRunner; }): Promise { const { memories, config, logger, model, vectorStore, embeddingService, llmRunner } = params; const topK = params.conflictRecallTopK ?? 5; if (memories.length === 0) { return []; } const storeAll = () => memories.map((m) => ({ record_id: m.record_id, action: "store" as const, target_ids: [], })); // Determine what recall capabilities are available const hasVectorData = vectorStore && (await vectorStore.countL1()) > 0; const hasFts = vectorStore?.isFtsAvailable() ?? false; // Fast path: no recall capability at all → skip dedup if (!hasVectorData && !hasFts) { logger?.debug?.(`${TAG} No vector data and no FTS available, skipping conflict detection for ${memories.length} memories`); return storeAll(); } // Phase 1: Find candidates // // Decision tree (after the fast-path guard above, vectorStore is guaranteed non-null): // hasVectorData + embeddingService → Tier 1 vector recall (FTS fallback on error) // otherwise hasFts → Tier 2 FTS keyword recall // otherwise → skip dedup (defensive; shouldn't reach here) let matches: CandidateMatch[]; if (hasVectorData && embeddingService) { // === Tier 1: Vector recall mode === logger?.debug?.(`${TAG} Using vector recall mode (topK=${topK})`); try { matches = await findCandidatesByVector(memories, vectorStore!, embeddingService, topK, logger, params.embeddingTimeoutMs); } catch (err) { logger?.warn?.( `${TAG} Vector recall failed, falling back to FTS keyword: ${err instanceof Error ? err.message : String(err)}`, ); // Degrade to FTS keyword recall if (hasFts) { matches = await findCandidatesByFts(memories, vectorStore!, logger); } else { logger?.debug?.(`${TAG} FTS not available either, skipping conflict detection`); return storeAll(); } } } else if (hasFts) { // === Tier 2: FTS keyword recall === logger?.debug?.(`${TAG} Using FTS keyword recall mode (no embedding service or no vector data)`); matches = await findCandidatesByFts(memories, vectorStore!, logger); } else { // Shouldn't reach here given the fast-path check above, but be defensive logger?.debug?.(`${TAG} No usable recall path, skipping conflict detection`); return storeAll(); } // Check if any memory has candidates const hasAnyCandidates = matches.some((m) => m.candidates.length > 0); if (!hasAnyCandidates) { logger?.debug?.(`${TAG} No similar records found for any memory, all will be stored`); return storeAll(); } // Phase 2: Batch LLM judgment return runLlmJudgment(matches, memories, config, logger, model, llmRunner); } /** * Phase 2: Run batch LLM judgment on candidate matches. */ async function runLlmJudgment( matches: CandidateMatch[], memories: Array, config: unknown, logger: Logger | undefined, model: string | undefined, llmRunner?: LLMRunner, ): Promise { logger?.debug?.(`${TAG} Running batch conflict detection for ${memories.length} memories`); try { const userPrompt = formatBatchConflictPrompt(matches); let result: string; if (llmRunner) { // Use the host-neutral LLMRunner interface result = await llmRunner.run({ prompt: userPrompt, systemPrompt: CONFLICT_DETECTION_SYSTEM_PROMPT, taskId: "l1-conflict-detection", timeoutMs: 180_000, }); } else { // Fallback: create CleanContextRunner (OpenClaw path) const runner = new CleanContextRunner({ config, modelRef: model, enableTools: false, logger, }); result = await runner.run({ prompt: userPrompt, systemPrompt: CONFLICT_DETECTION_SYSTEM_PROMPT, taskId: "l1-conflict-detection", timeoutMs: 180_000, }); } const decisions = parseBatchResult(result, memories, logger); return decisions; } catch (err) { logger?.warn?.( `${TAG} Batch conflict detection failed, defaulting all to store: ${err instanceof Error ? err.message : String(err)}`, ); return memories.map((m) => ({ record_id: m.record_id, action: "store" as const, target_ids: [], })); } } // ============================ // Candidate recall strategies // ============================ /** * Vector-based candidate recall (aligned with prototype): * batch-embed new memories → cosine search in VectorStore → exclude self-batch → return candidates. */ async function findCandidatesByVector( memories: Array, vectorStore: IMemoryStore, embeddingService: EmbeddingService, topK: number, logger?: Logger, embeddingTimeoutMs?: number, ): Promise { const newRecordIds = new Set(memories.map((m) => m.record_id)); // Batch-compute embeddings for all new memories const texts = memories.map((m) => m.content); const embeddings = await embeddingService.embedBatch(texts, embeddingTimeoutMs ? { timeoutMs: embeddingTimeoutMs } : undefined); const matches: CandidateMatch[] = []; for (let i = 0; i < memories.length; i++) { const mem = memories[i]; const queryVec = embeddings[i]; // Vector search top-K (request extra to account for self-batch filtering) const searchResults = await vectorStore.searchL1Vector(queryVec, topK + memories.length, mem.content); // Exclude records from current batch, convert to MemoryRecord format const candidates: MemoryRecord[] = searchResults .filter((r) => !newRecordIds.has(r.record_id)) .slice(0, topK) .map((r) => ({ id: r.record_id, content: r.content, type: r.type as MemoryRecord["type"], priority: r.priority, scene_name: r.scene_name, source_message_ids: [], metadata: {}, timestamps: [r.timestamp_str].filter(Boolean), createdAt: "", updatedAt: "", sessionKey: r.session_key, sessionId: r.session_id, })); matches.push({ newMemory: mem, candidates }); } logger?.debug?.( `${TAG} Vector recall: ${matches.map((m) => `${m.newMemory.record_id}→${m.candidates.length}`).join(", ")}`, ); return matches; } /** * FTS5-based candidate recall: * Uses the FTS index for efficient BM25-ranked keyword matching. * This replaces the old Jaccard word-overlap fallback entirely. */ async function findCandidatesByFts( memories: Array, vectorStore: IMemoryStore, _logger?: Logger, ): Promise { const newRecordIds = new Set(memories.map((m) => m.record_id)); const matches: CandidateMatch[] = []; for (const mem of memories) { const ftsQuery = buildFtsQuery(mem.content); if (ftsQuery) { const ftsResults = await vectorStore.searchL1Fts(ftsQuery, 10); // Filter out records from the current batch const candidates: MemoryRecord[] = ftsResults .filter((r) => !newRecordIds.has(r.record_id)) .slice(0, 5) .map((r) => ({ id: r.record_id, content: r.content, type: r.type as MemoryRecord["type"], priority: r.priority, scene_name: r.scene_name, source_message_ids: [], metadata: r.metadata_json ? (() => { try { return JSON.parse(r.metadata_json); } catch { return {}; } })() : {}, timestamps: [r.timestamp_str].filter(Boolean), createdAt: "", updatedAt: "", sessionKey: r.session_key, sessionId: r.session_id, })); matches.push({ newMemory: mem, candidates }); } else { matches.push({ newMemory: mem, candidates: [] }); } } _logger?.debug?.(`${TAG} FTS keyword recall: ${matches.map((m) => `${m.newMemory.record_id}→${m.candidates.length}`).join(", ")}`); return matches; } // ============================ // Result parsing // ============================ const VALID_TYPES: MemoryType[] = ["persona", "episodic", "instruction"]; /** * Parse the LLM's batch conflict detection JSON response. * * Expected format: [{record_id, action, target_ids, merged_content, merged_type, merged_priority, merged_timestamps}] */ function parseBatchResult( raw: string, memories: Array, logger?: Logger, ): DedupDecision[] { try { // Strip markdown code block wrappers let cleaned = raw.trim(); if (cleaned.startsWith("```")) { cleaned = cleaned.replace(/^```(?:json)?\s*\n?/, "").replace(/\n?```\s*$/, ""); } // Extract JSON array const arrayMatch = cleaned.match(/\[[\s\S]*\]/); if (!arrayMatch) { logger?.warn?.(`${TAG} No JSON array found in conflict detection response`); return fallbackStoreAll(memories); } // Sanitize control characters inside JSON string literals that LLM may produce const sanitized = sanitizeJsonForParse(arrayMatch[0]); const parsed = JSON.parse(sanitized) as unknown[]; if (!Array.isArray(parsed)) { logger?.warn?.(`${TAG} Conflict detection response is not an array`); return fallbackStoreAll(memories); } // Build decisions from LLM output const decisions: DedupDecision[] = []; const validActions = ["store", "update", "merge", "skip"]; for (const item of parsed) { if (!item || typeof item !== "object") continue; const d = item as Record; const recordId = String(d.record_id ?? ""); // Skip entries with empty/missing record_id — they are LLM hallucinations if (!recordId) { logger?.debug?.(`${TAG} Skipping decision with empty record_id`); continue; } const action = String(d.action ?? "store"); if (!validActions.includes(action)) { logger?.warn?.(`${TAG} Invalid action "${action}" for record ${recordId}, defaulting to store`); } decisions.push({ record_id: recordId, action: validActions.includes(action) ? (action as DedupDecision["action"]) : "store", target_ids: Array.isArray(d.target_ids) ? d.target_ids.map(String) : [], merged_content: typeof d.merged_content === "string" ? d.merged_content : undefined, merged_type: VALID_TYPES.includes(d.merged_type as MemoryType) ? (d.merged_type as MemoryType) : undefined, merged_priority: typeof d.merged_priority === "number" ? d.merged_priority : undefined, merged_timestamps: Array.isArray(d.merged_timestamps) ? d.merged_timestamps.map(String) : undefined, }); } // Ensure all memories have a decision (fill missing with "store") const decidedIds = new Set(decisions.map((d) => d.record_id)); for (const mem of memories) { if (!decidedIds.has(mem.record_id)) { logger?.debug?.(`${TAG} No decision for record ${mem.record_id}, defaulting to store`); decisions.push({ record_id: mem.record_id, action: "store", target_ids: [], }); } } return decisions; } catch (err) { logger?.warn?.(`${TAG} Failed to parse conflict detection result: ${err instanceof Error ? err.message : String(err)}`); return fallbackStoreAll(memories); } } /** * Fallback: store all memories when parsing fails. */ function fallbackStoreAll(memories: Array): DedupDecision[] { return memories.map((m) => ({ record_id: m.record_id, action: "store" as const, target_ids: [], })); }