/** * L1 Memory Writer: writes extracted memories to JSONL files. * * File naming: records/YYYY-MM-DD.jsonl (daily shards, all sessions merged). * Each record includes sessionKey for traceability. * * Write strategy: * - JSONL is the append-only persistent store (source of truth for backup/recovery). * - VectorStore (SQLite) is the primary retrieval engine. * - On update/merge, old records are deleted from VectorStore in real-time; * JSONL is append-only and cleaned up periodically by memory-cleaner. * * Supports store (append), update, merge, and skip operations. * * v3: Aligned with Kenty's prompt output format — 3 memory types (persona/episodic/instruction), * numeric priority, scene_name, source_message_ids, metadata, timestamps. */ import fs from "node:fs/promises"; import path from "node:path"; import crypto from "node:crypto"; import type { IMemoryStore } from "../store/types.js"; import type { EmbeddingService } from "../store/embedding.js"; // ============================ // Types // ============================ /** v3: 3 memory types aligned with Kenty's extraction prompt */ export type MemoryType = "persona" | "episodic" | "instruction"; /** Metadata for episodic memories (activity time range) */ export interface EpisodicMetadata { activity_start_time?: string; // ISO 8601 activity_end_time?: string; // ISO 8601 } /** * A persisted memory record in L1 JSONL files. * * v3 changes from v2: * - `importance: "high"|"medium"|"low"` → `priority: number` (0-100, -1 for strict global instructions) * - Added `scene_name`, `source_message_ids`, `metadata`, `timestamps` * - Removed `keywords` (will be rebuilt from content for search) * - MemoryType reduced from 4 to 3 (removed "preference", folded into "persona") */ export interface MemoryRecord { /** Unique ID for dedup updates */ id: string; /** Memory content */ content: string; /** Memory type: persona / episodic / instruction */ type: MemoryType; /** Priority score: 0-100 (higher = more important), -1 = strict global instruction */ priority: number; /** Scene name this memory belongs to */ scene_name: string; /** Source message IDs that contributed to this memory */ source_message_ids: string[]; /** Type-specific metadata (e.g., activity_start_time for episodic) */ metadata: EpisodicMetadata | Record; /** Timestamp trail: all timestamps related to this memory (for merge history tracking) */ timestamps: string[]; /** Creation timestamp (ISO) */ createdAt: string; /** Last update timestamp (ISO) */ updatedAt: string; /** Source session key (conversation channel identifier) */ sessionKey: string; /** Source session ID (single conversation instance identifier) */ sessionId: string; } /** * A memory as extracted by LLM (before dedup / persistence). * Matches the output format of Kenty's extraction prompt. */ export interface ExtractedMemory { content: string; type: MemoryType; priority: number; source_message_ids: string[]; metadata: EpisodicMetadata | Record; /** Scene name this memory was extracted in */ scene_name: string; } export type DedupAction = "store" | "update" | "merge" | "skip"; /** * v3 batch dedup decision — one per new memory, aligned with Kenty's conflict detection prompt. * * Key changes: * - `targetId` → `target_ids` (array, supports multi-target merge/update) * - Added `merged_type`, `merged_priority`, `merged_timestamps` for cross-type merge */ export interface DedupDecision { /** Which new memory this decision is about */ record_id: string; action: DedupAction; /** IDs of existing records to replace/remove (for update/merge) */ target_ids: string[]; /** Merged/updated content text (for update/merge) */ merged_content?: string; /** Best type after merge (for update/merge, may differ from original) */ merged_type?: MemoryType; /** Priority after merge (for update/merge) */ merged_priority?: number; /** Union of all related timestamps (for update/merge) */ merged_timestamps?: string[]; } interface Logger { debug?: (message: string) => void; info: (message: string) => void; warn: (message: string) => void; error: (message: string) => void; } const TAG = "[memory-tdai][l1-writer]"; // ============================ // Core functions // ============================ /** * Generate a unique memory ID. */ export function generateMemoryId(): string { return `m_${Date.now()}_${crypto.randomBytes(4).toString("hex")}`; } /** * Write a memory record according to the dedup decision. * * - store: append new record * - update: remove target records + append updated record * - merge: remove target records + append merged record * - skip: do nothing * * v3: supports multi-target removal for update/merge. * v3.1: optional VectorStore + EmbeddingService for dual-write (JSONL + vector). */ export async function writeMemory(params: { memory: ExtractedMemory; decision: DedupDecision; baseDir: string; sessionKey: string; sessionId?: string; logger?: Logger; /** Optional vector store for dual-write (JSONL + vector DB) */ vectorStore?: IMemoryStore; /** Optional embedding service (required when vectorStore is provided) */ embeddingService?: EmbeddingService; }): Promise { const { memory, decision, baseDir, sessionKey, sessionId, logger, vectorStore, embeddingService } = params; if (decision.action === "skip") { logger?.debug?.(`${TAG} Skipping memory: ${memory.content.slice(0, 50)}...`); return null; } const now = new Date().toISOString(); // Determine final content, type, priority based on action let finalContent: string; let finalType: MemoryType; let finalPriority: number; let finalTimestamps: string[]; if (decision.action === "merge" || decision.action === "update") { finalContent = decision.merged_content ?? memory.content; finalType = decision.merged_type ?? memory.type; finalPriority = decision.merged_priority ?? memory.priority; finalTimestamps = decision.merged_timestamps ?? [now]; } else { // store finalContent = memory.content; finalType = memory.type; finalPriority = memory.priority; finalTimestamps = [now]; } const record: MemoryRecord = { id: decision.record_id || generateMemoryId(), content: finalContent, type: finalType, priority: finalPriority, scene_name: memory.scene_name, source_message_ids: memory.source_message_ids, metadata: memory.metadata, timestamps: finalTimestamps, createdAt: now, updatedAt: now, sessionKey, sessionId: sessionId || "", }; const recordsDir = path.join(baseDir, "records"); await fs.mkdir(recordsDir, { recursive: true }); const shardDate = formatLocalDate(new Date()); const filePath = path.join(recordsDir, `${shardDate}.jsonl`); if ((decision.action === "update" || decision.action === "merge") && decision.target_ids.length > 0) { // Remove target records from VectorStore (real-time deletion for retrieval accuracy). // JSONL is append-only — old records remain in files and are cleaned up periodically // by memory-cleaner (which reconciles against VectorStore as source of truth). if (vectorStore) { try { await vectorStore.deleteL1Batch(decision.target_ids); logger?.debug?.(`${TAG} VectorStore: deleted ${decision.target_ids.length} target record(s) for ${decision.action}`); } catch (err) { logger?.warn?.( `${TAG} VectorStore delete failed for ${decision.action}: ${err instanceof Error ? err.message : String(err)}`, ); } } await fs.appendFile(filePath, JSON.stringify(record) + "\n", "utf-8"); logger?.debug?.(`${TAG} ${decision.action} memory: removed [${decision.target_ids.join(",")}] from VectorStore → ${record.id}: ${finalContent.slice(0, 80)}...`); } else { // store: append a new line await fs.appendFile(filePath, JSON.stringify(record) + "\n", "utf-8"); logger?.debug?.(`${TAG} Stored memory ${record.id}: ${finalContent.slice(0, 80)}...`); } // === Vector Store dual-write === if (vectorStore) { try { logger?.debug?.( `${TAG} [vec-dual-write] START id=${record.id}, contentLen=${record.content.length}, ` + `content="${record.content.slice(0, 80)}..."`, ); let embedding: Float32Array | undefined; if (embeddingService) { try { embedding = await embeddingService.embed(record.content); logger?.debug?.( `${TAG} [vec-dual-write] Embedding OK: dims=${embedding.length}, ` + `norm=${Math.sqrt(Array.from(embedding).reduce((s, v) => s + v * v, 0)).toFixed(4)}`, ); } catch (embedErr) { // Embedding failed — pass undefined to upsert() which writes // metadata + FTS only, skipping the vec0 table. logger?.warn( `${TAG} [vec-dual-write] Embedding FAILED for id=${record.id}, ` + `will write metadata only: ${embedErr instanceof Error ? embedErr.message : String(embedErr)}`, ); } } const upsertOk = await vectorStore.upsertL1(record, embedding); logger?.debug?.(`${TAG} [vec-dual-write] upsert result=${upsertOk} id=${record.id}`); } catch (err) { // Vector write failure should NOT block the main JSONL write logger?.warn?.( `${TAG} [vec-dual-write] FAILED (JSONL already written) id=${record.id}: ${err instanceof Error ? err.message : String(err)}`, ); } } else { logger?.debug?.( `${TAG} [vec-dual-write] SKIPPED id=${record.id}: vectorStore=${!!vectorStore}`, ); } return record; } // ============================ // Helpers // ============================ function formatLocalDate(d: Date): string { const y = d.getFullYear(); const m = String(d.getMonth() + 1).padStart(2, "0"); const day = String(d.getDate()).padStart(2, "0"); return `${y}-${m}-${day}`; }