/** * VectorStore: SQLite-based vector storage using sqlite-vec extension. * * Manages two layers of vector-indexed data in a single SQLite database: * * **L1 (structured memories):** * 1. `l1_records` — relational metadata table (content, type, priority, scene, timestamps) * 2. `l1_vec` — vec0 virtual table for cosine similarity search * * **L0 (raw conversations):** * 3. `l0_conversations` — relational metadata table (session_key, role, message text, timestamps) * 4. `l0_vec` — vec0 virtual table for cosine similarity search on individual messages * * Dependencies: Node.js built-in `node:sqlite` (Node 22+) + `sqlite-vec` (from root workspace). * * Design: * - All operations are synchronous (DatabaseSync API). * - Writes use manual BEGIN/COMMIT transactions for atomicity (metadata + vector). * - vec0 virtual table does NOT support ON CONFLICT, so upsert = delete + insert. * - Thread-safe via WAL mode. */ import { createRequire } from "node:module"; import type { DatabaseSync, StatementSync } from "node:sqlite"; import type { MemoryRecord } from "../record/l1-writer.js"; import type { EmbeddingProviderInfo } from "./embedding.js"; import type { IMemoryStore, StoreCapabilities, L0Record, L1SearchResult, L1FtsResult, L0SearchResult, L0FtsResult, } from "./types.js"; // ============================ // Types // ============================ export interface VectorSearchResult { record_id: string; content: string; type: string; priority: number; scene_name: string; /** Cosine similarity score (1.0 - cosine_distance) */ score: number; timestamp_str: string; timestamp_start: string; timestamp_end: string; session_key: string; session_id: string; /** Raw metadata JSON string (e.g., contains activity_start_time / activity_end_time for episodic) */ metadata_json: string; } /** L0 single-message vector search result. */ export interface L0VectorSearchResult { record_id: string; session_key: string; session_id: string; role: string; message_text: string; /** Cosine similarity score (1.0 - cosine_distance) */ score: number; recorded_at: string; /** Original message timestamp (epoch ms) */ timestamp: number; } /** Raw row returned by L1 record queries (column names match SQLite schema). */ export interface L1RecordRow { record_id: string; content: string; type: string; priority: number; scene_name: string; session_key: string; session_id: string; timestamp_str: string; timestamp_start: string; timestamp_end: string; created_time: string; updated_time: string; metadata_json: string; } export interface L0RecordRow { record_id: string; session_key: string; session_id: string; role: string; message_text: string; recorded_at: string; timestamp: number; } /** Filter options for querying L1 records from SQLite. */ export interface L1QueryFilter { /** If provided, only return records for this session key (conversation channel). */ sessionKey?: string; /** If provided, only return records for this session ID (single conversation instance). */ sessionId?: string; /** If provided, only return records with updated_time strictly after this ISO 8601 UTC timestamp. */ updatedAfter?: string; } interface Logger { debug?: (message: string) => void; info: (message: string) => void; warn: (message: string) => void; error: (message: string) => void; } const TAG = "[memory-tdai][sqlite]"; /** Persisted metadata about the embedding provider used to generate stored vectors. */ interface EmbeddingMeta { provider: string; model: string; dimensions: number; } /** Result of VectorStore.init() — indicates whether a re-embed is needed. */ export interface VectorStoreInitResult { /** * `true` if the embedding provider/model/dimensions changed since * the vectors were last written. Callers should re-embed all texts * (via `reindexAll()`) after receiving this flag. */ needsReindex: boolean; /** Human-readable reason (for logging). */ reason?: string; } // Use createRequire to load the experimental node:sqlite module const require = createRequire(import.meta.url); function requireNodeSqlite(): typeof import("node:sqlite") { return require("node:sqlite") as typeof import("node:sqlite"); } // ============================ // FTS5 helpers (adapted from openclaw core hybrid.ts) // ============================ // ── Chinese word segmentation (jieba) ── // Lazy-loaded singleton: initialised on first call to `buildFtsQuery`. // If @node-rs/jieba is unavailable, falls back to Unicode-regex splitting. interface JiebaInstance { cutForSearch(text: string, hmm: boolean): string[]; } let _jieba: JiebaInstance | null | undefined; // undefined = not yet tried function getJieba(): JiebaInstance | null { if (_jieba !== undefined) return _jieba; try { // eslint-disable-next-line @typescript-eslint/no-require-imports const { Jieba } = require("@node-rs/jieba"); // eslint-disable-next-line @typescript-eslint/no-require-imports const { dict } = require("@node-rs/jieba/dict"); _jieba = Jieba.withDict(dict) as JiebaInstance; } catch { _jieba = null; // mark as unavailable — won't retry } return _jieba; } /** * Common Chinese stop-words that add noise to FTS5 queries. * Kept small on purpose — only high-frequency function words. */ const ZH_STOP_WORDS = new Set([ "的", "了", "在", "是", "我", "有", "和", "就", "不", "人", "都", "一", "一个", "上", "也", "很", "到", "说", "要", "去", "你", "会", "着", "没有", "看", "好", "自己", "这", "他", "她", "它", "们", "那", "吗", "吧", "呢", "啊", "呀", "哦", "嗯", ]); /** * Build an FTS5 MATCH query from raw text. * * When `@node-rs/jieba` is available, uses jieba's search-engine mode * (`cutForSearch`) for accurate Chinese word segmentation, producing * much better recall than the previous regex-only approach. * * Falls back to Unicode-regex splitting (`/[\p{L}\p{N}_]+/gu`) if * jieba is not installed. * * Tokens are OR-joined as quoted FTS5 phrase terms so that a document * matching *any* token is returned. BM25 naturally ranks documents that * match more tokens higher, so precision is preserved while recall is * significantly improved — especially for longer queries and when running * in FTS-only fallback mode (no embedding available). * * Example (with jieba): * "用户喜欢编程和TypeScript" → '"用户" OR "喜欢" OR "编程" OR "TypeScript"' * Example (fallback): * "旅行计划 API" → '"旅行计划" OR "API"' */ export function buildFtsQuery(raw: string): string | null { const jieba = getJieba(); let tokens: string[]; if (jieba) { // jieba cutForSearch: splits long words further for better recall // e.g. "北京烤鸭" → ["北京", "烤鸭", "北京烤鸭"] tokens = jieba .cutForSearch(raw, true) .map((t) => t.trim()) .filter((t) => { if (!t) return false; // Remove pure whitespace / punctuation tokens if (!/[\p{L}\p{N}]/u.test(t)) return false; // Remove common Chinese stop-words to reduce noise if (ZH_STOP_WORDS.has(t)) return false; return true; }); // Deduplicate (cutForSearch may produce duplicates for sub-words) tokens = [...new Set(tokens)]; } else { // Fallback: simple Unicode regex split tokens = raw .match(/[\p{L}\p{N}_]+/gu) ?.map((t) => t.trim()) .filter(Boolean) ?? []; } if (tokens.length === 0) return null; const quoted = tokens.map((t) => `"${t.replaceAll('"', "")}"`); return quoted.join(" OR "); } /** * Tokenize text for FTS5 indexing (write-side). * * Uses jieba `cutForSearch()` (search-engine mode) to segment Chinese text, * then joins tokens with spaces. The resulting string is stored in the FTS5 * `content` column so that `unicode61` tokenizer can split it into meaningful * words — including both full words and their sub-words. * * Using `cutForSearch` (instead of `cut`) ensures that the index contains * the same sub-word tokens that `buildFtsQuery()` produces on the query side. * For example, "人工智能" is indexed as "人工 智能 人工智能", so queries for * either the full term or sub-words will match. * * Falls back to the original text if jieba is unavailable. * * Example (with jieba): * "用户五月去日本旅行" → "用户 五月 去 日本 旅行" * "人工智能的分支" → "人工 智能 人工智能 的 分支" * Example (fallback): * "用户五月去日本旅行" → "用户五月去日本旅行" (unchanged) */ export function tokenizeForFts(raw: string): string { const jieba = getJieba(); if (!jieba) return raw; // Use `cutForSearch` (search-engine mode) for indexing — it produces both // full words AND their sub-word components. This ensures that query-side // tokens (also produced by `cutForSearch` in `buildFtsQuery`) will always // find a match in the index. const tokens = jieba.cutForSearch(raw, true); // Join with spaces so `unicode61` tokenizer can split them. // Punctuation tokens are kept — unicode61 treats them as separators anyway. return tokens.join(" "); } /** * Reset jieba state so next call to `buildFtsQuery` re-initialises. * Exported for testing only. * @internal */ export function _resetJiebaForTest(): void { _jieba = undefined; } /** * Override jieba instance (or set to `null` to force fallback). * Exported for testing only. * @internal */ export function _setJiebaForTest(instance: JiebaInstance | null): void { _jieba = instance; } /** * Convert a BM25 rank (negative = more relevant) to a 0–1 score. * Mirrors the formula in openclaw core `hybrid.ts`. */ export function bm25RankToScore(rank: number): number { if (!Number.isFinite(rank)) return 1 / (1 + 999); if (rank < 0) { const relevance = -rank; return relevance / (1 + relevance); } return 1 / (1 + rank); } /** FTS5 search result for L1 records. */ export interface FtsSearchResult { record_id: string; content: string; type: string; priority: number; scene_name: string; /** BM25-derived score (0–1, higher is better) */ score: number; timestamp_str: string; timestamp_start: string; timestamp_end: string; session_key: string; session_id: string; metadata_json: string; } /** FTS5 search result for L0 records. */ export interface L0FtsSearchResult { record_id: string; session_key: string; session_id: string; role: string; message_text: string; /** BM25-derived score (0–1, higher is better) */ score: number; recorded_at: string; timestamp: number; } // ============================ // VectorStore class // ============================ export class VectorStore implements IMemoryStore { private db: DatabaseSync; private readonly dimensions: number; private readonly logger?: Logger; /** @see IMemoryStore.supportsDeferredEmbedding */ readonly supportsDeferredEmbedding = true; /** * When `true`, the store is in a degraded state (e.g. sqlite-vec failed to * load, or init() encountered an unrecoverable error). All public methods * become safe no-ops so the plugin never blocks the main OpenClaw flow. */ private degraded = false; /** Tracks whether close() has been called to prevent double-close errors. */ private closed = false; /** * `true` when vec0 virtual tables (l1_vec / l0_vec) have been created and * their prepared statements are ready. When `dimensions === 0` (i.e. * provider="none"), vec0 tables are deferred and this stays `false`. */ private vecTablesReady = false; // Prepared statements — L1 (initialized in init()) private stmtUpsertMeta!: StatementSync; private stmtDeleteVec?: StatementSync; // optional — only set when vecTablesReady private stmtInsertVec?: StatementSync; // optional — only set when vecTablesReady private stmtDeleteMeta!: StatementSync; private stmtGetMeta!: StatementSync; private stmtSearchVec?: StatementSync; // optional — only set when vecTablesReady private stmtQueryBySessionId!: StatementSync; private stmtQueryBySessionIdSince!: StatementSync; private stmtQueryBySessionKey!: StatementSync; private stmtQueryBySessionKeySince!: StatementSync; private stmtQueryAll!: StatementSync; private stmtQueryAllSince!: StatementSync; // Prepared statements — L0 (initialized in init()) private stmtL0UpsertMeta!: StatementSync; private stmtL0DeleteVec?: StatementSync; // optional — only set when vecTablesReady private stmtL0InsertVec?: StatementSync; // optional — only set when vecTablesReady private stmtL0DeleteMeta!: StatementSync; private stmtL0GetMeta!: StatementSync; private stmtL0SearchVec?: StatementSync; // optional — only set when vecTablesReady /** L0 query for L1 runner: all messages for a session key */ private stmtL0QueryAll!: StatementSync; /** L0 query for L1 runner: messages after a timestamp cursor */ private stmtL0QueryAfter!: StatementSync; /** L1 cursor-based pagination for migration (by PK) */ private stmtL1QueryMigrationCursor!: StatementSync; /** L0 cursor-based pagination for migration (by PK) */ private stmtL0QueryMigrationCursor!: StatementSync; // FTS5 tables availability flag (created best-effort — may be false if fts5 is not compiled in) private ftsAvailable = false; // Prepared statements — FTS5 L1 (initialized in init()) private stmtL1FtsInsert!: StatementSync; private stmtL1FtsDelete!: StatementSync; private stmtL1FtsSearch!: StatementSync; // Prepared statements — FTS5 L0 (initialized in init()) private stmtL0FtsInsert!: StatementSync; private stmtL0FtsDelete!: StatementSync; private stmtL0FtsSearch!: StatementSync; /** * Create a VectorStore instance. * * Note: After construction, you MUST call `init()` to load the sqlite-vec * extension and create the schema. */ constructor(dbPath: string, dimensions: number, logger?: Logger) { this.dimensions = dimensions; this.logger = logger; // Open database with extension support enabled const { DatabaseSync: DbSync } = requireNodeSqlite(); this.db = new DbSync(dbPath, { allowExtension: true }); // Set busy timeout so concurrent processes retry instead of failing with SQLITE_BUSY this.db.exec("PRAGMA busy_timeout = 5000"); // Enable WAL mode for better concurrent read performance this.db.exec("PRAGMA journal_mode = WAL"); // Cap page cache at 64 MB this.db.exec("PRAGMA cache_size = -65536"); // Cap memory-mapped I/O at 128 MB to bound RSS growth this.db.exec("PRAGMA mmap_size = 134217728"); // Auto-checkpoint WAL every 1000 pages (~4 MB) to keep WAL file compact this.db.exec("PRAGMA wal_autocheckpoint = 1000"); } /** * Whether the store is in degraded mode (e.g. sqlite-vec failed to load). * When degraded, all write/search operations become safe no-ops. */ isDegraded(): boolean { return this.degraded; } /** * Load sqlite-vec extension and initialize database schema. * Must be called once after construction. * * @param providerInfo Current embedding provider info. When provided, * the store compares it against the persisted metadata. If the provider, * model, or dimensions changed, the vector tables are dropped and * re-created with the new dimensions, and `needsReindex: true` is returned * so the caller can schedule a full re-embed. */ init(providerInfo?: EmbeddingProviderInfo): VectorStoreInitResult { // Load sqlite-vec extension (same approach as root project's sqlite-vec.ts) try { // eslint-disable-next-line @typescript-eslint/no-require-imports const sqliteVec = require("sqlite-vec"); this.db.enableLoadExtension(true); sqliteVec.load(this.db); } catch (err) { const message = err instanceof Error ? err.message : String(err); this.logger?.error( `${TAG} Failed to load sqlite-vec extension: ${message}. ` + `VectorStore entering degraded mode — all operations will be no-ops.`, ); this.degraded = true; return { needsReindex: false, reason: `sqlite-vec load failed: ${message}` }; } // ── Schema creation & prepared statements ────────────────────────────── // Wrapped in try-catch: if anything fails during schema init (e.g. the DB // is corrupted, disk full, etc.), we degrade gracefully instead of crashing. try { return this.initSchema(providerInfo); } catch (err) { const message = err instanceof Error ? err.message : String(err); this.logger?.error( `${TAG} Schema initialization failed: ${message}. ` + `VectorStore entering degraded mode.`, ); this.degraded = true; return { needsReindex: false, reason: `schema init failed: ${message}` }; } } /** * Internal schema initialization — separated from init() so we can * catch errors at the top level and degrade gracefully. */ private initSchema(providerInfo?: EmbeddingProviderInfo): VectorStoreInitResult { // Tracks which provider/model/dimensions were used to generate vectors. this.db.exec(` CREATE TABLE IF NOT EXISTS embedding_meta ( key TEXT PRIMARY KEY, value TEXT NOT NULL ) `); // Detect whether re-index is needed let needsReindex = false; let reindexReason: string | undefined; const savedMeta = this.readEmbeddingMeta(); if (providerInfo) { if (savedMeta) { const providerChanged = savedMeta.provider !== providerInfo.provider; const modelChanged = savedMeta.model !== providerInfo.model; const dimsChanged = savedMeta.dimensions !== this.dimensions; if (providerChanged || modelChanged || dimsChanged) { const reasons: string[] = []; if (providerChanged) reasons.push(`provider: ${savedMeta.provider} → ${providerInfo.provider}`); if (modelChanged) reasons.push(`model: ${savedMeta.model} → ${providerInfo.model}`); if (dimsChanged) reasons.push(`dimensions: ${savedMeta.dimensions} → ${this.dimensions}`); reindexReason = reasons.join(", "); this.logger?.info( `${TAG} Embedding config changed (${reindexReason}). ` + `Dropping vector tables for rebuild...`, ); // Drop and re-create vector tables with new dimensions this.dropVectorTables(); needsReindex = true; } } else { // No saved meta — first run or legacy DB without meta table. // Two cases require dropping vector tables: // 1. Existing data created without meta tracking (legacy DB) — need re-embed // 2. vec0 tables exist with wrong dimensions (e.g. previously created with // provider="none" placeholder 768D, now switching to a real provider // with different dimensions) — must rebuild even if data tables are empty const l1Count = this.tableRowCount("l1_records"); const l0Count = this.tableRowCount("l0_conversations"); const existingVecDims = this.getVecTableDimensions(); if (l1Count > 0 || l0Count > 0) { this.logger?.info( `${TAG} No embedding_meta found but existing data exists ` + `(L1=${l1Count}, L0=${l0Count}). Dropping vector tables for safety...`, ); this.dropVectorTables(); needsReindex = true; reindexReason = "legacy DB without embedding_meta — cannot verify vector compatibility"; } else if (existingVecDims !== null && existingVecDims !== this.dimensions) { // vec0 tables exist (from a previous provider="none" placeholder or // different config) but with mismatched dimensions. Drop them so they // get re-created with the correct dimensions below. this.logger?.info( `${TAG} vec0 table dimension mismatch (existing=${existingVecDims}, ` + `required=${this.dimensions}). Dropping vector tables for rebuild...`, ); this.dropVectorTables(); // No needsReindex — there's no data to re-embed } } } // ── L1 schema ────────────────────────────────── // Metadata table this.db.exec(` CREATE TABLE IF NOT EXISTS l1_records ( record_id TEXT PRIMARY KEY, content TEXT NOT NULL, type TEXT DEFAULT '', priority INTEGER DEFAULT 50, scene_name TEXT DEFAULT '', session_key TEXT DEFAULT '', session_id TEXT DEFAULT '', timestamp_str TEXT DEFAULT '', timestamp_start TEXT DEFAULT '', timestamp_end TEXT DEFAULT '', created_time TEXT DEFAULT '', updated_time TEXT DEFAULT '', metadata_json TEXT DEFAULT '{}' ) `); // Indexes for common queries this.db.exec("CREATE INDEX IF NOT EXISTS idx_l1_type ON l1_records(type)"); this.db.exec("CREATE INDEX IF NOT EXISTS idx_l1_session_key ON l1_records(session_key)"); this.db.exec("CREATE INDEX IF NOT EXISTS idx_l1_session_id ON l1_records(session_id)"); this.db.exec("CREATE INDEX IF NOT EXISTS idx_l1_scene ON l1_records(scene_name)"); this.db.exec("CREATE INDEX IF NOT EXISTS idx_l1_ts_start ON l1_records(timestamp_start)"); this.db.exec("CREATE INDEX IF NOT EXISTS idx_l1_ts_end ON l1_records(timestamp_end)"); // Composite index: session_id exact match + updated_time range scan (for incremental L2 queries) this.db.exec("CREATE INDEX IF NOT EXISTS idx_l1_session_updated ON l1_records(session_id, updated_time)"); // Composite index: session_key exact match + updated_time range scan (for pipeline cursor queries) this.db.exec("CREATE INDEX IF NOT EXISTS idx_l1_sessionkey_updated ON l1_records(session_key, updated_time)"); // Vector virtual table (cosine distance) — only created when dimensions > 0. // When provider="none", dimensions=0 and vec0 tables are deferred until a // real embedding provider is configured. if (this.dimensions > 0) { this.db.exec(` CREATE VIRTUAL TABLE IF NOT EXISTS l1_vec USING vec0( record_id TEXT PRIMARY KEY, embedding float[${this.dimensions}] distance_metric=cosine, updated_time TEXT DEFAULT '' ) `); } // Prepare statements for reuse this.stmtUpsertMeta = this.db.prepare(` INSERT INTO l1_records ( record_id, content, type, priority, scene_name, session_key, session_id, timestamp_str, timestamp_start, timestamp_end, created_time, updated_time, metadata_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(record_id) DO UPDATE SET content=excluded.content, type=excluded.type, priority=excluded.priority, scene_name=excluded.scene_name, timestamp_str=excluded.timestamp_str, timestamp_start=excluded.timestamp_start, timestamp_end=excluded.timestamp_end, updated_time=excluded.updated_time, metadata_json=excluded.metadata_json `); if (this.dimensions > 0) { this.stmtDeleteVec = this.db.prepare("DELETE FROM l1_vec WHERE record_id = ?"); this.stmtInsertVec = this.db.prepare("INSERT INTO l1_vec (record_id, embedding, updated_time) VALUES (?, ?, ?)"); } this.stmtDeleteMeta = this.db.prepare("DELETE FROM l1_records WHERE record_id = ?"); this.stmtGetMeta = this.db.prepare(` SELECT content, type, priority, scene_name, session_key, session_id, timestamp_str, timestamp_start, timestamp_end, metadata_json FROM l1_records WHERE record_id = ? `); if (this.dimensions > 0) { this.stmtSearchVec = this.db.prepare(` SELECT record_id, distance FROM l1_vec WHERE embedding MATCH ? AND k = ? ORDER BY distance `); } // ── L0 schema ────────────────────────────────── // L0 metadata table: stores individual messages for vector search this.db.exec(` CREATE TABLE IF NOT EXISTS l0_conversations ( record_id TEXT PRIMARY KEY, session_key TEXT NOT NULL, session_id TEXT DEFAULT '', role TEXT NOT NULL DEFAULT '', message_text TEXT NOT NULL, recorded_at TEXT DEFAULT '', timestamp INTEGER DEFAULT 0 ) `); // Migration: add timestamp column if missing (existing DBs pre-v3.x) try { this.db.exec("ALTER TABLE l0_conversations ADD COLUMN timestamp INTEGER DEFAULT 0"); this.logger?.debug?.(`${TAG} Migrated l0_conversations: added timestamp column`); } catch { // Column already exists — expected on non-first run } // Indexes for L0 queries this.db.exec("CREATE INDEX IF NOT EXISTS idx_l0_session ON l0_conversations(session_key)"); this.db.exec("CREATE INDEX IF NOT EXISTS idx_l0_session_id ON l0_conversations(session_id)"); this.db.exec("CREATE INDEX IF NOT EXISTS idx_l0_recorded ON l0_conversations(recorded_at)"); this.db.exec("CREATE INDEX IF NOT EXISTS idx_l0_timestamp ON l0_conversations(timestamp)"); // L0 vector virtual table (cosine distance, same dimensions as L1) — deferred when dimensions=0 if (this.dimensions > 0) { this.db.exec(` CREATE VIRTUAL TABLE IF NOT EXISTS l0_vec USING vec0( record_id TEXT PRIMARY KEY, embedding float[${this.dimensions}] distance_metric=cosine, recorded_at TEXT DEFAULT '' ) `); } // L0 prepared statements this.stmtL0UpsertMeta = this.db.prepare(` INSERT INTO l0_conversations ( record_id, session_key, session_id, role, message_text, recorded_at, timestamp ) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(record_id) DO UPDATE SET message_text=excluded.message_text, recorded_at=excluded.recorded_at, timestamp=excluded.timestamp `); if (this.dimensions > 0) { this.stmtL0DeleteVec = this.db.prepare("DELETE FROM l0_vec WHERE record_id = ?"); this.stmtL0InsertVec = this.db.prepare("INSERT INTO l0_vec (record_id, embedding, recorded_at) VALUES (?, ?, ?)"); } this.stmtL0DeleteMeta = this.db.prepare("DELETE FROM l0_conversations WHERE record_id = ?"); this.stmtL0GetMeta = this.db.prepare(` SELECT session_key, session_id, role, message_text, recorded_at, timestamp FROM l0_conversations WHERE record_id = ? `); if (this.dimensions > 0) { this.stmtL0SearchVec = this.db.prepare(` SELECT record_id, distance FROM l0_vec WHERE embedding MATCH ? AND k = ? ORDER BY distance `); } // L0 query statements for L1 runner (newest-first + LIMIT to bound memory) // Sort/filter by recorded_at (write time) instead of timestamp (conversation time) // because L1 cursor uses recorded_at semantics. ISO 8601 string comparison preserves time order. this.stmtL0QueryAll = this.db.prepare(` SELECT record_id, session_key, session_id, role, message_text, recorded_at, timestamp FROM l0_conversations WHERE session_key = ? ORDER BY recorded_at DESC LIMIT ? `); this.stmtL0QueryAfter = this.db.prepare(` SELECT record_id, session_key, session_id, role, message_text, recorded_at, timestamp FROM l0_conversations WHERE session_key = ? AND recorded_at > ? ORDER BY recorded_at DESC LIMIT ? `); this.stmtL0QueryMigrationCursor = this.db.prepare(` SELECT record_id, session_key, session_id, role, message_text, recorded_at, timestamp FROM l0_conversations WHERE record_id > ? ORDER BY record_id ASC LIMIT ? `); // ── FTS5 tables (best-effort — gracefully degrade if fts5 is not compiled in) ── // Schema v2: `content` column stores jieba-segmented text (for indexing), // `content_original` (UNINDEXED) stores the raw text (for display). // If old v1 tables exist (no content_original column), drop + recreate. try { // ── Migrate old FTS5 tables (v1 → v2) ── // v1 tables stored raw text in the `content` column. v2 stores segmented // text in `content` and raw text in `content_original` / `message_text_original`. // FTS5 virtual tables don't support ALTER TABLE ADD COLUMN, so we must // drop and recreate. The data will be repopulated by `rebuildFtsIndex()`. const needsFtsRebuild = this.migrateFtsTablesIfNeeded(); // L1 FTS5 virtual table (v2 schema) this.db.exec(` CREATE VIRTUAL TABLE IF NOT EXISTS l1_fts USING fts5( content, content_original UNINDEXED, record_id UNINDEXED, type UNINDEXED, priority UNINDEXED, scene_name UNINDEXED, session_key UNINDEXED, session_id UNINDEXED, timestamp_str UNINDEXED, timestamp_start UNINDEXED, timestamp_end UNINDEXED, metadata_json UNINDEXED ) `); // L0 FTS5 virtual table (v2 schema) this.db.exec(` CREATE VIRTUAL TABLE IF NOT EXISTS l0_fts USING fts5( message_text, message_text_original UNINDEXED, record_id UNINDEXED, session_key UNINDEXED, session_id UNINDEXED, role UNINDEXED, recorded_at UNINDEXED, timestamp UNINDEXED ) `); // L1 FTS prepared statements this.stmtL1FtsInsert = this.db.prepare(` INSERT INTO l1_fts (content, content_original, record_id, type, priority, scene_name, session_key, session_id, timestamp_str, timestamp_start, timestamp_end, metadata_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `); this.stmtL1FtsDelete = this.db.prepare("DELETE FROM l1_fts WHERE record_id = ?"); this.stmtL1FtsSearch = this.db.prepare(` SELECT record_id, content_original AS content, type, priority, scene_name, session_key, session_id, timestamp_str, timestamp_start, timestamp_end, metadata_json, bm25(l1_fts) AS rank FROM l1_fts WHERE l1_fts MATCH ? ORDER BY rank ASC LIMIT ? `); // L0 FTS prepared statements this.stmtL0FtsInsert = this.db.prepare(` INSERT INTO l0_fts (message_text, message_text_original, record_id, session_key, session_id, role, recorded_at, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?) `); this.stmtL0FtsDelete = this.db.prepare("DELETE FROM l0_fts WHERE record_id = ?"); this.stmtL0FtsSearch = this.db.prepare(` SELECT record_id, message_text_original AS message_text, session_key, session_id, role, recorded_at, timestamp, bm25(l0_fts) AS rank FROM l0_fts WHERE l0_fts MATCH ? ORDER BY rank ASC LIMIT ? `); this.ftsAvailable = true; this.logger?.debug?.(`${TAG} FTS5 tables initialized (l1_fts, l0_fts) [schema v2 — jieba segmented]`); // Rebuild FTS index if migrated from v1 or tables were freshly created if (needsFtsRebuild) { this.rebuildFtsIndex(); } } catch (err) { const message = err instanceof Error ? err.message : String(err); this.ftsAvailable = false; this.logger?.warn( `${TAG} FTS5 tables NOT available (fts5 may not be compiled in): ${message}. ` + `FTS-based keyword search will be unavailable; recall will use in-memory scoring if needed.`, ); } // Save current embedding meta (write after schema is ready) if (providerInfo) { this.writeEmbeddingMeta({ provider: providerInfo.provider, model: providerInfo.model, dimensions: this.dimensions, }); } // Mark vec0 tables as ready only when they were actually created this.vecTablesReady = this.dimensions > 0; // L1 query statements (for l1-reader) const l1QueryCols = `record_id, content, type, priority, scene_name, session_key, session_id, timestamp_str, timestamp_start, timestamp_end, created_time, updated_time, metadata_json`; this.stmtQueryBySessionId = this.db.prepare(` SELECT ${l1QueryCols} FROM l1_records WHERE session_id = ? ORDER BY updated_time ASC `); this.stmtQueryBySessionIdSince = this.db.prepare(` SELECT ${l1QueryCols} FROM l1_records WHERE session_id = ? AND updated_time > ? ORDER BY updated_time ASC `); this.stmtQueryBySessionKey = this.db.prepare(` SELECT ${l1QueryCols} FROM l1_records WHERE session_key = ? ORDER BY updated_time ASC `); this.stmtQueryBySessionKeySince = this.db.prepare(` SELECT ${l1QueryCols} FROM l1_records WHERE session_key = ? AND updated_time > ? ORDER BY updated_time ASC `); this.stmtQueryAll = this.db.prepare(` SELECT ${l1QueryCols} FROM l1_records ORDER BY updated_time ASC `); this.stmtQueryAllSince = this.db.prepare(` SELECT ${l1QueryCols} FROM l1_records WHERE updated_time > ? ORDER BY updated_time ASC `); this.stmtL1QueryMigrationCursor = this.db.prepare(` SELECT ${l1QueryCols} FROM l1_records WHERE record_id > ? ORDER BY record_id ASC LIMIT ? `); this.logger?.debug?.(`${TAG} Initialized (dimensions=${this.dimensions})`); return { needsReindex, reason: reindexReason }; } // ── Embedding meta helpers ────────────────────────────── private readEmbeddingMeta(): EmbeddingMeta | null { try { const row = this.db .prepare("SELECT value FROM embedding_meta WHERE key = ?") .get("embedding_provider_info") as { value: string } | undefined; if (!row) return null; return JSON.parse(row.value) as EmbeddingMeta; } catch { return null; } } private writeEmbeddingMeta(meta: EmbeddingMeta): void { this.db.prepare( "INSERT INTO embedding_meta (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value", ).run("embedding_provider_info", JSON.stringify(meta)); } /** Allowed table names for row counting (whitelist to prevent SQL injection). */ private static readonly COUNTABLE_TABLES = new Set(["l1_records", "l0_conversations"]); /** * Extra rows to retrieve from vec0 KNN search to compensate for legacy * zero-vector placeholders that may still linger from older data. */ private static readonly ZERO_VEC_BUFFER = 10; /** Default result limit for FTS5 keyword searches. */ private static readonly FTS_DEFAULT_LIMIT = 20; private tableRowCount(table: string): number { if (!VectorStore.COUNTABLE_TABLES.has(table)) { this.logger?.warn(`${TAG} tableRowCount: rejected unknown table name "${table}"`); return 0; } try { const row = this.db .prepare(`SELECT COUNT(*) AS cnt FROM ${table}`) .get() as { cnt: number } | undefined; return row?.cnt ?? 0; } catch { return 0; } } /** * Detect the embedding dimension of an existing vec0 table by inspecting * the DDL stored in sqlite_master. Returns `null` if the table doesn't * exist or the dimension cannot be determined. * * The vec0 DDL looks like: * CREATE VIRTUAL TABLE l1_vec USING vec0(... embedding float[768] ...) * We parse the number inside `float[N]`. */ private getVecTableDimensions(): number | null { try { const row = this.db .prepare("SELECT sql FROM sqlite_master WHERE type='table' AND name=?") .get("l1_vec") as { sql: string } | undefined; if (!row?.sql) return null; const match = row.sql.match(/float\[(\d+)\]/); return match ? Number(match[1]) : null; } catch { return null; } } /** * Drop both L1 and L0 vector virtual tables. * Metadata tables (l1_records, l0_conversations) are preserved — only * the vec0 tables need to be rebuilt with the new dimensions. */ private dropVectorTables(): void { this.db.exec("DROP TABLE IF EXISTS l1_vec"); this.db.exec("DROP TABLE IF EXISTS l0_vec"); this.logger?.info(`${TAG} Dropped vector tables (l1_vec, l0_vec)`); } /** * Write or update a memory record (metadata + vector). * Uses a manual transaction for atomicity. * * If `embedding` is `undefined` or a zero vector (all elements are 0), only * the metadata row is written — the vec0 table is left untouched. This * allows callers without an EmbeddingService to still persist metadata + FTS * without constructing a throwaway zero-vector, and prevents placeholder * zero vectors (from embedding-service failures) from polluting KNN search * results with null / NaN distances. * * **Fault-tolerant**: catches all errors internally so that a vector store * failure never propagates to the caller / main OpenClaw flow. * Returns `true` on success, `false` on failure (logged as warning). */ upsertL1(record: MemoryRecord, embedding: Float32Array | undefined): boolean { if (this.degraded) { this.logger?.warn(`${TAG} [L1-upsert] SKIPPED (degraded mode) id=${record.id}`); return false; } try { const { id: recordId, timestamps } = record; const tsStr = timestamps[0] ?? ""; const tsStart = timestamps.length > 0 ? timestamps.reduce((a, b) => (a < b ? a : b)) : tsStr; const tsEnd = timestamps.length > 0 ? timestamps.reduce((a, b) => (a > b ? a : b)) : tsStr; const skipVec = !embedding || embedding.every(v => v === 0) || !this.vecTablesReady; this.logger?.debug?.( `${TAG} [L1-upsert] START id=${recordId}, type=${record.type}, ` + `content="${record.content.slice(0, 60)}..."` + (embedding ? `, embeddingDims=${embedding.length}, ` + `embeddingNorm=${Math.sqrt(Array.from(embedding).reduce((s, v) => s + v * v, 0)).toFixed(4)}` + `${skipVec ? " (ZERO VECTOR or vec tables not ready — vec write will be skipped)" : ""}` : " (no embedding — metadata-only write)"), ); this.db.exec("BEGIN"); try { // Upsert metadata (INSERT OR UPDATE) this.stmtUpsertMeta.run( recordId, record.content, record.type, record.priority, record.scene_name, record.sessionKey, record.sessionId, tsStr, tsStart, tsEnd, record.createdAt, record.updatedAt, JSON.stringify(record.metadata), ); if (!skipVec) { // vec0 does not support ON CONFLICT → delete then insert this.stmtDeleteVec!.run(recordId); this.stmtInsertVec!.run(recordId, Buffer.from(embedding!.buffer), record.updatedAt); } else { this.logger?.debug?.( `${TAG} [L1-upsert] Skipping vec write (${embedding ? "zero vector" : "no embedding"}) id=${recordId}`, ); } // Sync FTS5 (delete + re-insert to handle updates) if (this.ftsAvailable) { try { this.stmtL1FtsDelete.run(recordId); this.stmtL1FtsInsert.run( tokenizeForFts(record.content), // content — segmented for indexing record.content, // content_original — raw for display recordId, record.type, record.priority, record.scene_name, record.sessionKey, record.sessionId, tsStr, tsStart, tsEnd, JSON.stringify(record.metadata), ); } catch (ftsErr) { // FTS write failure is non-fatal — log and continue this.logger?.warn( `${TAG} [L1-upsert] FTS write failed (non-fatal) id=${recordId}: ${ftsErr instanceof Error ? ftsErr.message : String(ftsErr)}`, ); } } this.db.exec("COMMIT"); } catch (err) { try { this.db.exec("ROLLBACK"); } catch { /* ignore rollback errors */ } throw err; } this.logger?.debug?.(`${TAG} [L1-upsert] OK id=${recordId}${skipVec ? " (meta-only)" : ""}`); return true; } catch (err) { this.logger?.warn( `${TAG} [L1-upsert] FAILED (non-fatal) id=${record.id}: ${err instanceof Error ? err.message : String(err)}`, ); return false; } } /** * Vector similarity search (cosine distance). * Returns top-k results sorted by similarity (highest first). * * **Fault-tolerant**: returns an empty array on any error (e.g. dimension * mismatch, corrupted DB) so callers can fall back to keyword search. */ searchL1Vector(queryEmbedding: Float32Array, topK = 5): VectorSearchResult[] { if (this.degraded || !this.vecTablesReady) { if (this.degraded) this.logger?.warn(`${TAG} [L1-search] SKIPPED (degraded mode)`); return []; } try { // Over-retrieve to compensate for legacy zero-vector placeholders that // may still exist in the vec0 table. New zero vectors are no longer // inserted (upsert() skips vec write for zero vectors since v3.x), but // older data may still contain them — they surface as NULL/NaN distance // in KNN results. A small buffer of 10 is sufficient for remnants. // NOTE: "AND distance IS NOT NULL" is NOT usable because vec0 does not // support that constraint — it causes an empty result set. const ZERO_VEC_BUFFER = 10; const retrieveCount = topK + ZERO_VEC_BUFFER; this.logger?.debug?.( `${TAG} [L1-search] START topK=${topK}, retrieveCount=${retrieveCount}, ` + `queryEmbeddingDims=${queryEmbedding.length}, ` + `queryNorm=${Math.sqrt(Array.from(queryEmbedding).reduce((s, v) => s + v * v, 0)).toFixed(4)}`, ); const rows = this.stmtSearchVec!.all( Buffer.from(queryEmbedding.buffer), retrieveCount, ) as Array<{ record_id: string; distance: number }>; this.logger?.debug?.(`${TAG} [L1-search] vec0 returned ${rows.length} candidate(s)`); if (rows.length === 0) return []; const results: VectorSearchResult[] = []; for (const { record_id, distance } of rows) { // sqlite-vec returns null distance for zero vectors (cosine undefined when ‖v‖=0). // Skip these — they are placeholder vectors from embedding-service-unavailable fallback. if (distance == null || Number.isNaN(distance)) { this.logger?.warn( `${TAG} [L1-search] record_id=${record_id} has null/NaN distance (likely zero vector) — skipping`, ); continue; } const meta = this.stmtGetMeta.get(record_id) as | { content: string; type: string; priority: number; scene_name: string; session_key: string; session_id: string; timestamp_str: string; timestamp_start: string; timestamp_end: string; metadata_json: string; } | undefined; if (!meta) { this.logger?.warn(`${TAG} [L1-search] record_id=${record_id} has vector but NO metadata (orphan)`); continue; } const score = 1.0 - distance; this.logger?.debug?.( `${TAG} [L1-search] HIT id=${record_id}, distance=${distance.toFixed(4)}, score=${score.toFixed(4)}, ` + `type=${meta.type}, content="${meta.content.slice(0, 60)}..."`, ); results.push({ record_id, content: meta.content, type: meta.type, priority: meta.priority, scene_name: meta.scene_name, score, timestamp_str: meta.timestamp_str, timestamp_start: meta.timestamp_start, timestamp_end: meta.timestamp_end, session_key: meta.session_key, session_id: meta.session_id, metadata_json: meta.metadata_json, }); } // Trim back to the caller's requested topK (we over-fetched above). const trimmed = results.slice(0, topK); this.logger?.info( `${TAG} [L1-search] DONE returning ${trimmed.length} result(s) (from ${results.length} valid, ${rows.length} raw)`, ); return trimmed; } catch (err) { this.logger?.warn( `${TAG} [L1-search] FAILED (non-fatal, returning empty): ${err instanceof Error ? err.message : String(err)}`, ); return []; } } /** * Delete a single record (metadata + vector). * * **Fault-tolerant**: logs a warning on failure, never throws. */ deleteL1(recordId: string): boolean { if (this.degraded) return false; try { this.db.exec("BEGIN"); try { this.stmtDeleteMeta.run(recordId); if (this.vecTablesReady) this.stmtDeleteVec!.run(recordId); if (this.ftsAvailable) { try { this.stmtL1FtsDelete.run(recordId); } catch { /* non-fatal */ } } this.db.exec("COMMIT"); } catch (err) { try { this.db.exec("ROLLBACK"); } catch { /* ignore rollback errors */ } throw err; } return true; } catch (err) { this.logger?.warn( `${TAG} delete failed (non-fatal): ${err instanceof Error ? err.message : String(err)}`, ); return false; } } /** * Delete multiple records (metadata + vector). * * **Fault-tolerant**: logs a warning on failure, never throws. */ deleteL1Batch(recordIds: string[]): boolean { if (this.degraded) return false; if (recordIds.length === 0) return true; try { this.db.exec("BEGIN"); try { for (const id of recordIds) { this.stmtDeleteMeta.run(id); if (this.vecTablesReady) this.stmtDeleteVec!.run(id); if (this.ftsAvailable) { try { this.stmtL1FtsDelete.run(id); } catch { /* non-fatal */ } } } this.db.exec("COMMIT"); } catch (err) { try { this.db.exec("ROLLBACK"); } catch { /* ignore rollback errors */ } throw err; } return true; } catch (err) { this.logger?.warn( `${TAG} deleteBatch failed (non-fatal): ${err instanceof Error ? err.message : String(err)}`, ); return false; } } /** * Get the total number of L1 records in the store. * * **Fault-tolerant**: returns 0 on failure. * TTL cleanup by updated_time. * * Deletes expired rows from l1_records and matching vectors from l1_vec * in a single transaction to guarantee consistency. */ deleteL1Expired(cutoffIso: string): number { if (this.degraded) { this.logger?.warn(`${TAG} [deleteExpired] SKIPPED (degraded mode)`); return 0; } try { const row = this.db.prepare( "SELECT COUNT(*) AS cnt FROM l1_records WHERE updated_time != '' AND updated_time < ?", ).get(cutoffIso) as { cnt: number } | undefined; const expiredCount = row?.cnt ?? 0; if (expiredCount <= 0) return 0; // Ratio protection: refuse to delete > 80% in one pass const totalRow = this.db.prepare( "SELECT COUNT(*) AS cnt FROM l1_records", ).get() as { cnt: number }; const total = totalRow.cnt; const ratio = total > 0 ? expiredCount / total : 0; if (ratio > 0.8) { this.logger?.warn( `${TAG} [L1-deleteExpired] BLOCKED: would delete ${expiredCount}/${total} ` + `(${(ratio * 100).toFixed(1)}%) — exceeds 80% safety threshold, cutoff=${cutoffIso}`, ); return 0; } this.db.exec("BEGIN"); try { if (this.vecTablesReady) { this.db.prepare( "DELETE FROM l1_vec WHERE updated_time != '' AND updated_time < ?", ).run(cutoffIso); } this.db.prepare( "DELETE FROM l1_records WHERE updated_time != '' AND updated_time < ?", ).run(cutoffIso); this.db.exec("COMMIT"); this.logger?.info?.( `${TAG} [L1-deleteExpired] Deleted ${expiredCount}/${total} records (cutoff=${cutoffIso})`, ); return expiredCount; } catch (err) { try { this.db.exec("ROLLBACK"); } catch { /* ignore rollback errors */ } throw err; } } catch (err) { this.logger?.warn( `${TAG} deleteL1ExpiredByUpdatedTime failed (non-fatal): ${err instanceof Error ? err.message : String(err)}`, ); return 0; } } /** * Get the total number of records in the store. */ countL1(): number { if (this.degraded) return 0; try { const row = this.db .prepare("SELECT COUNT(*) AS cnt FROM l1_records") .get() as { cnt: number }; this.logger?.debug?.(`${TAG} [L1-count] total=${row.cnt}`); return row.cnt; } catch (err) { this.logger?.warn( `${TAG} count failed (non-fatal, returning 0): ${err instanceof Error ? err.message : String(err)}`, ); return 0; } } /** * Query L1 records with optional session and time filters. * * Uses the composite index `idx_l1_session_updated(session_id, updated_time)` * for efficient filtering. All timestamps are compared as UTC ISO 8601 strings. * * **Fault-tolerant**: returns an empty array on any error (degraded mode, DB issues). */ queryL1Records(filter?: L1QueryFilter): L1RecordRow[] { if (this.degraded) { this.logger?.warn(`${TAG} [L1-query] SKIPPED (degraded mode)`); return []; } try { const { sessionKey, sessionId, updatedAfter } = filter ?? {}; let raw: Record[]; // Priority: sessionId > sessionKey (sessionId is more specific) if (sessionId && updatedAfter) { raw = this.stmtQueryBySessionIdSince.all(sessionId, updatedAfter) as Record[]; } else if (sessionId) { raw = this.stmtQueryBySessionId.all(sessionId) as Record[]; } else if (sessionKey && updatedAfter) { raw = this.stmtQueryBySessionKeySince.all(sessionKey, updatedAfter) as Record[]; } else if (sessionKey) { raw = this.stmtQueryBySessionKey.all(sessionKey) as Record[]; } else if (updatedAfter) { raw = this.stmtQueryAllSince.all(updatedAfter) as Record[]; } else { raw = this.stmtQueryAll.all() as Record[]; } // Runtime sanity check: verify first row has expected columns (guards against schema drift) if (raw.length > 0 && !("record_id" in raw[0] && "content" in raw[0])) { this.logger?.warn( `${TAG} [L1-query] Schema mismatch: first row missing expected columns. ` + `Got keys: [${Object.keys(raw[0]).join(", ")}]`, ); return []; } const rows = raw as unknown as L1RecordRow[]; this.logger?.info( `${TAG} [L1-query] filter={sessionKey=${sessionKey ?? "(all)"}, sessionId=${sessionId ?? "(all)"}, updatedAfter=${updatedAfter ?? "(none)"}}, ` + `returned ${rows.length} record(s)`, ); return rows; } catch (err) { this.logger?.warn( `${TAG} [L1-query] FAILED (non-fatal, returning empty): ${err instanceof Error ? err.message : String(err)}` ); return []; } } // ── L0 operations ────────────────────────────────── /** * Write or update an L0 single-message record (metadata + vector). * Uses a manual transaction for atomicity. * * If `embedding` is `undefined` or a zero vector (all elements are 0), only * the metadata row (`l0_conversations`) is written — the vec0 table * (`l0_vec`) is left untouched. This allows callers without an * EmbeddingService to still persist metadata + FTS without constructing a * throwaway zero-vector, and prevents placeholder zero vectors (from * embedding-service failures) from polluting KNN search results. * * **Fault-tolerant**: catches all errors internally, never throws. * Returns `true` on success, `false` on failure (logged as warning). */ upsertL0(record: L0Record, embedding: Float32Array | undefined): boolean { if (this.degraded) { this.logger?.warn(`${TAG} [L0-upsert] SKIPPED (degraded mode) id=${record.id}`); return false; } try { const skipVec = !embedding || embedding.every(v => v === 0) || !this.vecTablesReady; this.logger?.debug?.( `${TAG} [L0-upsert] START id=${record.id}, session=${record.sessionKey}, role=${record.role}, ` + `text="${record.messageText.slice(0, 60)}..."` + (embedding ? `, embeddingDims=${embedding.length}, ` + `embeddingNorm=${Math.sqrt(Array.from(embedding).reduce((s, v) => s + v * v, 0)).toFixed(4)}` + `${skipVec ? " (ZERO VECTOR or vec tables not ready — vec write will be skipped)" : ""}` : " (no embedding — metadata-only write)"), ); this.db.exec("BEGIN"); try { this.stmtL0UpsertMeta.run( record.id, record.sessionKey, record.sessionId, record.role, record.messageText, record.recordedAt, record.timestamp, ); if (!skipVec) { // vec0 does not support ON CONFLICT → delete then insert this.stmtL0DeleteVec!.run(record.id); this.stmtL0InsertVec!.run(record.id, Buffer.from(embedding!.buffer), record.recordedAt); } else { this.logger?.debug?.( `${TAG} [L0-upsert] Skipping vec write (${embedding ? "zero vector" : "no embedding"}) id=${record.id}`, ); } // Sync FTS5 (delete + re-insert to handle updates) if (this.ftsAvailable) { try { this.stmtL0FtsDelete.run(record.id); this.stmtL0FtsInsert.run( tokenizeForFts(record.messageText), // message_text — segmented for indexing record.messageText, // message_text_original — raw for display record.id, record.sessionKey, record.sessionId, record.role, record.recordedAt, record.timestamp, ); } catch (ftsErr) { // FTS write failure is non-fatal — log and continue this.logger?.warn( `${TAG} [L0-upsert] FTS write failed (non-fatal) id=${record.id}: ${ftsErr instanceof Error ? ftsErr.message : String(ftsErr)}`, ); } } this.db.exec("COMMIT"); } catch (err) { try { this.db.exec("ROLLBACK"); } catch { /* ignore rollback errors */ } throw err; } this.logger?.debug?.(`${TAG} [L0-upsert] OK id=${record.id}${skipVec ? " (meta-only)" : ""}`); return true; } catch (err) { this.logger?.warn( `${TAG} [L0-upsert] FAILED (non-fatal) id=${record.id}: ${err instanceof Error ? err.message : String(err)}`, ); return false; } } /** * Update ONLY the vector embedding for an existing L0 record. * The metadata row must already exist in l0_conversations (written by upsertL0). * * This is used by the background embedding task in auto-capture: * 1. upsertL0() writes metadata + FTS synchronously (no embedding) * 2. Background task calls embedBatch() then updateL0Embedding() for each record * * **Fault-tolerant**: catches all errors internally, never throws. * Returns `true` on success, `false` on failure. */ updateL0Embedding(recordId: string, embedding: Float32Array): boolean { if (this.degraded || !this.vecTablesReady) { return false; } if (!embedding || embedding.every(v => v === 0)) { this.logger?.debug?.(`${TAG} [L0-update-embedding] Skipping zero vector for ${recordId}`); return false; } try { // Look up recorded_at from metadata for the vec0 row const meta = this.stmtL0GetMeta.get(recordId) as { recorded_at: string } | undefined; if (!meta) { this.logger?.warn(`${TAG} [L0-update-embedding] No metadata found for ${recordId}, skipping`); return false; } this.db.exec("BEGIN"); try { this.stmtL0DeleteVec!.run(recordId); this.stmtL0InsertVec!.run(recordId, Buffer.from(embedding.buffer), meta.recorded_at); this.db.exec("COMMIT"); } catch (err) { try { this.db.exec("ROLLBACK"); } catch { /* ignore */ } throw err; } return true; } catch (err) { this.logger?.warn( `${TAG} [L0-update-embedding] FAILED (non-fatal) id=${recordId}: ${err instanceof Error ? err.message : String(err)}`, ); return false; } } /** * Vector similarity search on L0 individual messages (cosine distance). * Returns top-k results sorted by similarity (highest first). * * **Fault-tolerant**: returns an empty array on any error. */ searchL0Vector(queryEmbedding: Float32Array, topK = 5): L0VectorSearchResult[] { if (this.degraded || !this.vecTablesReady) { if (this.degraded) this.logger?.warn(`${TAG} [L0-search] SKIPPED (degraded mode)`); return []; } try { // Over-retrieve to compensate for legacy zero-vector placeholders that // may still exist in the vec0 table. New zero vectors are no longer // inserted (upsertL0() skips vec write for zero vectors since v3.x), but // older data may still contain them — they surface as NULL/NaN distance // in KNN results. // NOTE: "AND distance IS NOT NULL" is NOT usable because vec0 does not // support that constraint — it causes an empty result set. const retrieveCount = topK + VectorStore.ZERO_VEC_BUFFER; this.logger?.debug?.( `${TAG} [L0-search] START topK=${topK}, retrieveCount=${retrieveCount}, ` + `queryEmbeddingDims=${queryEmbedding.length}, ` + `queryNorm=${Math.sqrt(Array.from(queryEmbedding).reduce((s, v) => s + v * v, 0)).toFixed(4)}`, ); const rows = this.stmtL0SearchVec!.all( Buffer.from(queryEmbedding.buffer), retrieveCount, ) as Array<{ record_id: string; distance: number }>; this.logger?.debug?.(`${TAG} [L0-search] vec0 returned ${rows.length} candidate(s)`); if (rows.length === 0) return []; const results: L0VectorSearchResult[] = []; for (const { record_id, distance } of rows) { // sqlite-vec returns null distance for zero vectors (cosine undefined when ‖v‖=0). // Skip these — they are placeholder vectors from embedding-service-unavailable fallback. if (distance == null || Number.isNaN(distance)) { this.logger?.warn( `${TAG} [L0-search] record_id=${record_id} has null/NaN distance (likely zero vector) — skipping`, ); continue; } const meta = this.stmtL0GetMeta.get(record_id) as | { session_key: string; session_id: string; role: string; message_text: string; recorded_at: string; timestamp: number; } | undefined; if (!meta) { this.logger?.warn(`${TAG} [L0-search] record_id=${record_id} has vector but NO metadata (orphan)`); continue; } const score = 1.0 - distance; this.logger?.debug?.( `${TAG} [L0-search] HIT id=${record_id}, distance=${distance.toFixed(4)}, score=${score.toFixed(4)}, ` + `role=${meta.role}, session=${meta.session_key}, text="${meta.message_text.slice(0, 60)}..."`, ); results.push({ record_id, session_key: meta.session_key, session_id: meta.session_id, role: meta.role, message_text: meta.message_text, score, recorded_at: meta.recorded_at, timestamp: meta.timestamp ?? 0, }); } // Trim back to the caller's requested topK (we over-fetched above). const trimmed = results.slice(0, topK); this.logger?.info( `${TAG} [L0-search] DONE returning ${trimmed.length} result(s) (from ${results.length} valid, ${rows.length} raw)`, ); return trimmed; } catch (err) { this.logger?.warn( `${TAG} [L0-search] FAILED (non-fatal, returning empty): ${err instanceof Error ? err.message : String(err)}`, ); return []; } } /** * Delete a single L0 record (metadata + vector). * * **Fault-tolerant**: logs a warning on failure, never throws. */ deleteL0(recordId: string): boolean { if (this.degraded) return false; try { this.db.exec("BEGIN"); try { this.stmtL0DeleteMeta.run(recordId); if (this.vecTablesReady) this.stmtL0DeleteVec!.run(recordId); if (this.ftsAvailable) { try { this.stmtL0FtsDelete.run(recordId); } catch { /* non-fatal */ } } this.db.exec("COMMIT"); } catch (err) { try { this.db.exec("ROLLBACK"); } catch { /* ignore rollback errors */ } throw err; } return true; } catch (err) { this.logger?.warn( `${TAG} deleteL0 failed (non-fatal): ${err instanceof Error ? err.message : String(err)}`, ); return false; } } /** * TTL cleanup by recorded_at (ISO string) for L0 records. * * Deletes expired rows from l0_conversations and matching vectors from l0_vec * in a single transaction to guarantee consistency. */ deleteL0Expired(cutoffIso: string): number { if (this.degraded) { this.logger?.warn(`${TAG} [deleteExpiredL0] SKIPPED (degraded mode)`); return 0; } try { const row = this.db.prepare( "SELECT COUNT(*) AS cnt FROM l0_conversations WHERE recorded_at != '' AND recorded_at < ?", ).get(cutoffIso) as { cnt: number } | undefined; const expiredCount = row?.cnt ?? 0; if (expiredCount <= 0) return 0; // Ratio protection: refuse to delete > 80% in one pass const totalRow = this.db.prepare( "SELECT COUNT(*) AS cnt FROM l0_conversations", ).get() as { cnt: number }; const total = totalRow.cnt; const ratio = total > 0 ? expiredCount / total : 0; if (ratio > 0.8) { this.logger?.warn( `${TAG} [L0-deleteExpired] BLOCKED: would delete ${expiredCount}/${total} ` + `(${(ratio * 100).toFixed(1)}%) — exceeds 80% safety threshold, cutoff=${cutoffIso}`, ); return 0; } this.db.exec("BEGIN"); try { if (this.vecTablesReady) { this.db.prepare( "DELETE FROM l0_vec WHERE recorded_at != '' AND recorded_at < ?", ).run(cutoffIso); } this.db.prepare( "DELETE FROM l0_conversations WHERE recorded_at != '' AND recorded_at < ?", ).run(cutoffIso); this.db.exec("COMMIT"); this.logger?.info?.( `${TAG} [L0-deleteExpired] Deleted ${expiredCount}/${total} records (cutoff=${cutoffIso})`, ); return expiredCount; } catch (err) { try { this.db.exec("ROLLBACK"); } catch { /* ignore rollback errors */ } throw err; } } catch (err) { this.logger?.warn( `${TAG} deleteL0ExpiredByRecordedAt failed (non-fatal): ${err instanceof Error ? err.message : String(err)}`, ); return 0; } } /** * Get the total number of L0 message records in the store. * * **Fault-tolerant**: returns 0 on failure. */ countL0(): number { if (this.degraded) return 0; try { const row = this.db .prepare("SELECT COUNT(*) AS cnt FROM l0_conversations") .get() as { cnt: number }; this.logger?.debug?.(`${TAG} [L0-count] total=${row.cnt}`); return row.cnt; } catch (err) { this.logger?.warn( `${TAG} countL0 failed (non-fatal, returning 0): ${err instanceof Error ? err.message : String(err)}`, ); return 0; } } // ── Re-index operations ────────────────────────────────── /** * Get all L1 record texts for re-embedding. * Returns record_id → content pairs. */ getAllL1Texts(): Array<{ record_id: string; content: string; updated_time: string }> { if (this.degraded) return []; try { return this.db .prepare("SELECT record_id, content, updated_time FROM l1_records") .all() as Array<{ record_id: string; content: string; updated_time: string }>; } catch (err) { this.logger?.warn( `${TAG} getAllL1Texts failed (non-fatal): ${err instanceof Error ? err.message : String(err)}`, ); return []; } } /** * Get all L0 message texts for re-embedding. * Returns record_id → message_text/recorded_at tuples. */ getAllL0Texts(): Array<{ record_id: string; message_text: string; recorded_at: string }> { if (this.degraded) return []; try { return this.db .prepare("SELECT record_id, message_text, recorded_at FROM l0_conversations") .all() as Array<{ record_id: string; message_text: string; recorded_at: string }>; } catch (err) { this.logger?.warn( `${TAG} getAllL0Texts failed (non-fatal): ${err instanceof Error ? err.message : String(err)}`, ); return []; } } /** * Re-embed all existing L1 and L0 texts with a new embedding function. * * This is called after `init()` returns `needsReindex: true` — the vector * tables have already been dropped and re-created with the correct dimensions. * This method reads every text from the metadata tables and writes fresh * embeddings into the new vector tables. * * @param embedFn A function that converts text → Float32Array embedding. * @param onProgress Optional callback for progress reporting. */ async reindexAll( embedFn: (text: string) => Promise, onProgress?: (done: number, total: number, layer: "L1" | "L0") => void, ): Promise<{ l1Count: number; l0Count: number }> { if (this.degraded || !this.vecTablesReady) { if (this.degraded) this.logger?.warn(`${TAG} reindexAll skipped: VectorStore is in degraded mode`); return { l1Count: 0, l0Count: 0 }; } try { // ── Re-embed L1 ── const l1Rows = this.getAllL1Texts(); let l1Done = 0; for (const { record_id, content, updated_time } of l1Rows) { try { const embedding = await embedFn(content); // Wrap delete+insert in a transaction to prevent orphan vectors this.db.exec("BEGIN"); try { this.stmtDeleteVec!.run(record_id); this.stmtInsertVec!.run(record_id, Buffer.from(embedding.buffer), updated_time); this.db.exec("COMMIT"); } catch (txErr) { try { this.db.exec("ROLLBACK"); } catch { /* ignore */ } throw txErr; } } catch (err) { this.logger?.warn?.( `${TAG} reindex L1 skip ${record_id}: ${err instanceof Error ? err.message : String(err)}`, ); } l1Done++; onProgress?.(l1Done, l1Rows.length, "L1"); } // ── Re-embed L0 ── const l0Rows = this.getAllL0Texts(); let l0Done = 0; for (const { record_id, message_text, recorded_at } of l0Rows) { try { const embedding = await embedFn(message_text); // Wrap delete+insert in a transaction to prevent orphan vectors this.db.exec("BEGIN"); try { this.stmtL0DeleteVec!.run(record_id); this.stmtL0InsertVec!.run(record_id, Buffer.from(embedding.buffer), recorded_at); this.db.exec("COMMIT"); } catch (txErr) { try { this.db.exec("ROLLBACK"); } catch { /* ignore */ } throw txErr; } } catch (err) { this.logger?.warn?.( `${TAG} reindex L0 skip ${record_id}: ${err instanceof Error ? err.message : String(err)}`, ); } l0Done++; onProgress?.(l0Done, l0Rows.length, "L0"); } this.logger?.info( `${TAG} Reindex complete: L1=${l1Done}/${l1Rows.length}, L0=${l0Done}/${l0Rows.length}`, ); return { l1Count: l1Done, l0Count: l0Done }; } catch (err) { this.logger?.error( `${TAG} reindexAll failed (non-fatal): ${err instanceof Error ? err.message : String(err)}`, ); return { l1Count: 0, l0Count: 0 }; } } // ── L0 query operations (for L1 runner) ────────────────────────────────── /** * Query L0 messages for a given session key, optionally filtered by recorded_at cursor. * Returns messages ordered by recorded_at ASC (chronological write order). * * Used by L1 runner to read L0 data from DB instead of JSONL files. */ queryL0ForL1( sessionKey: string, afterRecordedAtMs?: number, limit = 50, ): Array<{ record_id: string; session_key: string; session_id: string; role: string; message_text: string; recorded_at: string; timestamp: number; }> { if (this.degraded) { this.logger?.warn(`${TAG} [L0-query] SKIPPED (degraded mode)`); return []; } try { // Query newest-first (DESC) with LIMIT, then reverse to chronological order let rows: Array>; if (afterRecordedAtMs && afterRecordedAtMs > 0) { // Convert epoch ms to ISO string for recorded_at comparison const afterRecordedAtIso = new Date(afterRecordedAtMs).toISOString(); rows = this.stmtL0QueryAfter.all(sessionKey, afterRecordedAtIso, limit) as Array>; } else { rows = this.stmtL0QueryAll.all(sessionKey, limit) as Array>; } this.logger?.info( `${TAG} [L0-query] session=${sessionKey}, afterRecordedAtMs=${afterRecordedAtMs ?? "(all)"}, ` + `limit=${limit}, returned ${rows.length} row(s)`, ); // Reverse: SQL returns newest-first (DESC), callers expect chronological order return rows.map((r) => ({ record_id: r.record_id as string, session_key: r.session_key as string, session_id: (r.session_id as string) || "", role: r.role as string, message_text: r.message_text as string, recorded_at: (r.recorded_at as string) || "", timestamp: (r.timestamp as number) || 0, })).reverse(); } catch (err) { this.logger?.warn( `${TAG} [L0-query] FAILED (non-fatal, returning empty): ${err instanceof Error ? err.message : String(err)}`, ); return []; } } /** * Query L0 messages for a given session key, grouped by session_id. * Each group's messages are in chronological order (recorded_at ASC). * Groups are sorted by earliest message timestamp. * * Used by L1 runner to replace readConversationMessagesGroupedBySessionId(). */ queryL0GroupedBySessionId( sessionKey: string, afterRecordedAtMs?: number, limit = 50, ): Array<{ sessionId: string; messages: Array<{ id: string; role: string; content: string; timestamp: number; recordedAtMs: number }> }> { if (this.degraded) { this.logger?.warn(`${TAG} [L0-query-grouped] SKIPPED (degraded mode)`); return []; } try { const rows = this.queryL0ForL1(sessionKey, afterRecordedAtMs, limit); // Group by session_id const groupMap = new Map>(); for (const row of rows) { const sid = row.session_id || ""; let group = groupMap.get(sid); if (!group) { group = []; groupMap.set(sid, group); } group.push({ id: row.record_id, role: row.role, content: row.message_text, timestamp: row.timestamp, recordedAtMs: row.recorded_at ? Date.parse(row.recorded_at) || 0 : 0, }); } // Convert to array, sorted by earliest message timestamp const groups: Array<{ sessionId: string; messages: Array<{ id: string; role: string; content: string; timestamp: number; recordedAtMs: number }> }> = []; for (const [sessionId, messages] of groupMap) { if (messages.length > 0) { groups.push({ sessionId, messages }); } } groups.sort((a, b) => a.messages[0].timestamp - b.messages[0].timestamp); this.logger?.info( `${TAG} [L0-query-grouped] session=${sessionKey}, afterRecordedAtMs=${afterRecordedAtMs ?? "(all)"}, ` + `${rows.length} messages across ${groups.length} group(s)`, ); return groups; } catch (err) { this.logger?.warn( `${TAG} [L0-query-grouped] FAILED (non-fatal, returning empty): ${err instanceof Error ? err.message : String(err)}`, ); return []; } } // ── Cursor-based pagination for migration ────────────────── /** * Read a page of L1 records using primary key cursor. * Returns rows with `record_id > afterId`, ordered by PK, limited to `pageSize`. * Pass `""` as `afterId` for the first page. */ queryL1RecordsCursor(afterId: string, pageSize: number): L1RecordRow[] { if (this.degraded) return []; try { return this.stmtL1QueryMigrationCursor.all(afterId, pageSize) as unknown as L1RecordRow[]; } catch (err) { this.logger?.warn( `${TAG} [L1-query-cursor] FAILED (non-fatal): ${err instanceof Error ? err.message : String(err)}`, ); return []; } } /** * Read a page of L0 records using primary key cursor. * Returns rows with `record_id > afterId`, ordered by PK, limited to `pageSize`. * Pass `""` as `afterId` for the first page. */ queryL0RecordsCursor(afterId: string, pageSize: number): L0RecordRow[] { if (this.degraded) return []; try { return this.stmtL0QueryMigrationCursor.all(afterId, pageSize) as unknown as L0RecordRow[]; } catch (err) { this.logger?.warn( `${TAG} [L0-query-cursor] FAILED (non-fatal): ${err instanceof Error ? err.message : String(err)}`, ); return []; } } // ── FTS5 search operations ────────────────────────────────── /** * Whether FTS5 full-text search is available. * When `false`, callers should skip keyword-based recall entirely. */ isFtsAvailable(): boolean { return this.ftsAvailable; } /** * FTS5 keyword search on L1 records. * Returns top-`limit` results sorted by BM25 relevance (highest first). * * @param ftsQuery A pre-built FTS5 MATCH expression (from `buildFtsQuery()`). * @param limit Maximum number of results to return. * * **Fault-tolerant**: returns an empty array on any error. */ searchL1Fts(ftsQuery: string, limit = 20): FtsSearchResult[] { if (this.degraded || !this.ftsAvailable) return []; try { const rows = this.stmtL1FtsSearch.all(ftsQuery, limit) as Array<{ record_id: string; content: string; type: string; priority: number; scene_name: string; session_key: string; session_id: string; timestamp_str: string; timestamp_start: string; timestamp_end: string; metadata_json: string; rank: number; }>; return rows.map((r) => ({ record_id: r.record_id, content: r.content, type: r.type, priority: r.priority, scene_name: r.scene_name, score: bm25RankToScore(r.rank), timestamp_str: r.timestamp_str, timestamp_start: r.timestamp_start, timestamp_end: r.timestamp_end, session_key: r.session_key, session_id: r.session_id, metadata_json: r.metadata_json, })); } catch (err) { this.logger?.warn( `${TAG} [L1-fts-search] FAILED (non-fatal, returning empty): ${err instanceof Error ? err.message : String(err)}`, ); return []; } } /** * FTS5 keyword search on L0 conversation messages. * Returns top-`limit` results sorted by BM25 relevance (highest first). * * @param ftsQuery A pre-built FTS5 MATCH expression (from `buildFtsQuery()`). * @param limit Maximum number of results to return. * * **Fault-tolerant**: returns an empty array on any error. */ searchL0Fts(ftsQuery: string, limit = VectorStore.FTS_DEFAULT_LIMIT): L0FtsSearchResult[] { if (this.degraded || !this.ftsAvailable) return []; try { const rows = this.stmtL0FtsSearch.all(ftsQuery, limit) as Array<{ record_id: string; message_text: string; session_key: string; session_id: string; role: string; recorded_at: string; timestamp: number; rank: number; }>; return rows.map((r) => ({ record_id: r.record_id, session_key: r.session_key, session_id: r.session_id, role: r.role, message_text: r.message_text, score: bm25RankToScore(r.rank), recorded_at: r.recorded_at, timestamp: r.timestamp ?? 0, })); } catch (err) { this.logger?.warn( `${TAG} [L0-fts-search] FAILED (non-fatal, returning empty): ${err instanceof Error ? err.message : String(err)}`, ); return []; } } // ── FTS5 migration & rebuild ────────────────────────────────────────────── /** * Detect old FTS5 v1 schema (no `content_original` column) and drop the * tables so they can be recreated with the v2 schema. * * FTS5 virtual tables do NOT support `ALTER TABLE ADD COLUMN`, so the only * migration path is DROP + recreate + repopulate. * * @returns `true` if migration was performed (= FTS index needs rebuilding). * @internal */ private migrateFtsTablesIfNeeded(): boolean { try { // Check if l1_fts exists at all const l1Exists = this.db .prepare("SELECT 1 FROM sqlite_master WHERE type='table' AND name='l1_fts'") .get(); if (!l1Exists) { // Fresh install — tables will be created with v2 schema. // Still need rebuild if there's existing data in l1_records. const hasData = this.db.prepare("SELECT 1 FROM l1_records LIMIT 1").get(); return !!hasData; } // Check if the v2 column `content_original` exists. // FTS5 tables appear in pragma_table_info with their column names. const cols = this.db .prepare("SELECT name FROM pragma_table_info('l1_fts')") .all() as Array<{ name: string }>; const hasV2Col = cols.some((c) => c.name === "content_original"); if (hasV2Col) { return false; // Already v2 — no migration needed } // v1 → v2: drop both FTS tables (data will be repopulated by rebuildFtsIndex) this.logger?.info(`${TAG} Migrating FTS5 tables from v1 to v2 (jieba segmented)`); this.db.exec("DROP TABLE IF EXISTS l1_fts"); this.db.exec("DROP TABLE IF EXISTS l0_fts"); return true; } catch (err) { this.logger?.warn( `${TAG} FTS migration check failed (non-fatal): ${err instanceof Error ? err.message : String(err)}`, ); return false; } } /** * Rebuild the FTS5 index from scratch by reading all records from the * metadata tables and re-inserting them with jieba-segmented text. * * Called automatically after: * - Schema migration from v1 to v2 * - Fresh table creation when existing data exists * * Safe to call multiple times (idempotent — clears FTS tables first). */ rebuildFtsIndex(): void { if (!this.ftsAvailable) return; try { this.logger?.info(`${TAG} Rebuilding FTS5 index with jieba segmentation…`); // ── Rebuild L1 FTS ── // Clear existing FTS data this.db.exec("DELETE FROM l1_fts"); // Read all L1 records from metadata table const l1Rows = this.db .prepare(` SELECT record_id, content, type, priority, scene_name, session_key, session_id, timestamp_str, timestamp_start, timestamp_end, metadata_json FROM l1_records `) .all() as Array<{ record_id: string; content: string; type: string; priority: number; scene_name: string; session_key: string; session_id: string; timestamp_str: string; timestamp_start: string; timestamp_end: string; metadata_json: string; }>; let l1Count = 0; for (const r of l1Rows) { try { this.stmtL1FtsInsert.run( tokenizeForFts(r.content), // content — segmented r.content, // content_original — raw r.record_id, r.type, r.priority, r.scene_name, r.session_key, r.session_id, r.timestamp_str, r.timestamp_start, r.timestamp_end, r.metadata_json, ); l1Count++; } catch (err) { this.logger?.warn?.( `${TAG} FTS rebuild skip L1 ${r.record_id}: ${err instanceof Error ? err.message : String(err)}`, ); } } // ── Rebuild L0 FTS ── this.db.exec("DELETE FROM l0_fts"); const l0Rows = this.db .prepare(` SELECT record_id, message_text, session_key, session_id, role, recorded_at, timestamp FROM l0_conversations `) .all() as Array<{ record_id: string; message_text: string; session_key: string; session_id: string; role: string; recorded_at: string; timestamp: number; }>; let l0Count = 0; for (const r of l0Rows) { try { this.stmtL0FtsInsert.run( tokenizeForFts(r.message_text), // message_text — segmented r.message_text, // message_text_original — raw r.record_id, r.session_key, r.session_id, r.role, r.recorded_at, r.timestamp, ); l0Count++; } catch (err) { this.logger?.warn?.( `${TAG} FTS rebuild skip L0 ${r.record_id}: ${err instanceof Error ? err.message : String(err)}`, ); } } this.logger?.info( `${TAG} FTS5 rebuild complete: L1=${l1Count}/${l1Rows.length}, L0=${l0Count}/${l0Rows.length}`, ); } catch (err) { this.logger?.warn( `${TAG} FTS5 rebuild failed (non-fatal): ${err instanceof Error ? err.message : String(err)}`, ); } } // ============================ // IMemoryStore interface implementation // ============================ /** Query the store's search capabilities. */ getCapabilities(): StoreCapabilities { return { vectorSearch: this.vecTablesReady, ftsSearch: this.ftsAvailable, nativeHybridSearch: false, sparseVectors: false, }; } /** * Close the database connection. * Should be called on shutdown. Idempotent — safe to call multiple times. */ close(): void { if (this.closed) return; this.closed = true; try { this.db.close(); } catch (err) { this.logger?.warn?.( `${TAG} Error closing database: ${err instanceof Error ? err.message : String(err)}`, ); } } }