import sqlite3 from "sqlite3"; import { HistoryManager } from "./base"; export class SQLiteManager implements HistoryManager { private db: sqlite3.Database; private initPromise: Promise; constructor(dbPath: string) { this.db = new sqlite3.Database(dbPath); this.initPromise = this.init(); } private async init() { await this.run(` CREATE TABLE IF NOT EXISTS memory_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, memory_id TEXT NOT NULL, previous_value TEXT, new_value TEXT, action TEXT NOT NULL, created_at TEXT, updated_at TEXT, is_deleted INTEGER DEFAULT 0, user_id TEXT, run_id TEXT, message_id TEXT, happened_at TEXT ) `); // New table for storing ALL raw messages await this.run(` CREATE TABLE IF NOT EXISTS raw_messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, message_id TEXT NOT NULL, content TEXT NOT NULL, role TEXT NOT NULL, user_id TEXT, run_id TEXT, batch_info TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, happened_at TEXT DEFAULT CURRENT_TIMESTAMP, message_index INTEGER DEFAULT 0 ) `); // Add indexes for better search performance await this.run(`CREATE INDEX IF NOT EXISTS idx_raw_messages_message_id ON raw_messages(message_id)`); await this.run(`CREATE INDEX IF NOT EXISTS idx_raw_messages_user_id ON raw_messages(user_id)`); await this.run(`CREATE INDEX IF NOT EXISTS idx_raw_messages_run_id ON raw_messages(run_id)`); await this.run(`CREATE INDEX IF NOT EXISTS idx_raw_messages_happened_at ON raw_messages(happened_at)`); await this.run(` CREATE TABLE IF NOT EXISTS entities ( entity_id TEXT PRIMARY KEY, label TEXT NOT NULL, type TEXT NOT NULL, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, user_id TEXT, run_id TEXT ) `); await this.run(` CREATE TABLE IF NOT EXISTS memory_entities ( memory_id TEXT NOT NULL, entity_id TEXT NOT NULL, created_at TEXT DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (memory_id, entity_id), FOREIGN KEY (entity_id) REFERENCES entities(entity_id) ON DELETE CASCADE ) `); } private async ensureInitialized(): Promise { await this.initPromise; } private async run(sql: string, params: any[] = []): Promise { return new Promise((resolve, reject) => { this.db.run(sql, params, (err) => { if (err) reject(err); else resolve(); }); }); } private async all(sql: string, params: any[] = []): Promise { return new Promise((resolve, reject) => { this.db.all(sql, params, (err, rows) => { if (err) reject(err); else resolve(rows); }); }); } async addHistory( memoryId: string, previousValue: string | null, newValue: string | null, action: string, createdAt?: string, updatedAt?: string, isDeleted: number = 0, userId?: string, runId?: string, messageId?: string, happenedAt?: string, ): Promise { await this.ensureInitialized(); // If happenedAt is not provided, use current timestamp const actualHappenedAt = happenedAt || new Date().toISOString(); await this.run( `INSERT INTO memory_history (memory_id, previous_value, new_value, action, created_at, updated_at, is_deleted, user_id, run_id, message_id, happened_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, [ memoryId, previousValue, newValue, action, createdAt, updatedAt, isDeleted, userId, runId, messageId, actualHappenedAt, ], ); } async getHistory(memoryId: string, options?: { userId?: string, runId?: string }): Promise { await this.ensureInitialized(); let query = "SELECT * FROM memory_history WHERE memory_id = ?"; const params: any[] = [memoryId]; if (options?.userId) { query += " AND user_id = ?"; params.push(options.userId); } if (options?.runId) { query += " AND run_id = ?"; params.push(options.runId); } query += " ORDER BY id DESC"; return this.all(query, params); } async getAllMessages(options?: { userId?: string, runId?: string, limit?: number }): Promise { await this.ensureInitialized(); let query = "SELECT * FROM raw_messages"; const params: any[] = []; const conditions: string[] = []; if (options?.userId) { conditions.push("user_id = ?"); params.push(options.userId); } if (options?.runId) { conditions.push("run_id = ?"); params.push(options.runId); } if (conditions.length > 0) { query += " WHERE " + conditions.join(" AND "); } query += " ORDER BY happened_at DESC, message_index ASC"; if (options?.limit) { query += " LIMIT ?"; params.push(options.limit); } return this.all(query, params); } async getMessageContext( messageId: string, options: { beforeCount?: number; afterCount?: number; userId?: string; runId?: string; } ): Promise { await this.ensureInitialized(); // Default values for context window const beforeCount = options.beforeCount || 10; const afterCount = options.afterCount || 10; // First, get the reference message to find its timestamp let refQuery = "SELECT * FROM memory_history WHERE message_id = ?"; const refParams: any[] = [messageId]; const conditions: string[] = []; if (options.userId) { conditions.push("user_id = ?"); refParams.push(options.userId); } if (options.runId) { conditions.push("run_id = ?"); refParams.push(options.runId); } if (conditions.length > 0) { refQuery += " AND " + conditions.join(" AND "); } const refMessages = await this.all(refQuery, refParams); if (!refMessages || refMessages.length === 0) { return []; // Reference message not found } const refMessage = refMessages[0]; const refTime = refMessage.happened_at; // Get messages before the reference message let beforeQuery = "SELECT * FROM memory_history WHERE happened_at < ?"; const beforeParams: any[] = [refTime]; if (options.userId) { beforeQuery += " AND user_id = ?"; beforeParams.push(options.userId); } if (options.runId) { beforeQuery += " AND run_id = ?"; beforeParams.push(options.runId); } beforeQuery += " ORDER BY happened_at DESC LIMIT ?"; beforeParams.push(beforeCount); const beforeMessages = await this.all(beforeQuery, beforeParams); // Get messages after the reference message let afterQuery = "SELECT * FROM memory_history WHERE happened_at > ?"; const afterParams: any[] = [refTime]; if (options.userId) { afterQuery += " AND user_id = ?"; afterParams.push(options.userId); } if (options.runId) { afterQuery += " AND run_id = ?"; afterParams.push(options.runId); } afterQuery += " ORDER BY happened_at ASC LIMIT ?"; afterParams.push(afterCount); const afterMessages = await this.all(afterQuery, afterParams); // Mark each message with its context type const before = beforeMessages.map(msg => ({ ...msg, type: 'before' })); const match = { ...refMessage, type: 'match' }; const after = afterMessages.map(msg => ({ ...msg, type: 'after' })); // Combine and return all context messages return [...before, match, ...after]; } async reset(): Promise { await this.ensureInitialized(); await this.run("DROP TABLE IF EXISTS memory_history"); await this.run("DROP TABLE IF EXISTS memory_entities"); await this.run("DROP TABLE IF EXISTS entities"); await this.run("DROP TABLE IF EXISTS raw_messages"); // Reinitialize the tables this.initPromise = this.init(); await this.initPromise; } // Entity management methods async addEntity( entityId: string, label: string, type: string, userId?: string, runId?: string, ): Promise { await this.ensureInitialized(); const currentTime = new Date().toISOString(); await this.run( `INSERT OR REPLACE INTO entities (entity_id, label, type, created_at, updated_at, user_id, run_id) VALUES (?, ?, ?, ?, ?, ?, ?)`, [entityId, label, type, currentTime, currentTime, userId, runId], ); } async updateEntity( entityId: string, label: string, type: string, userId?: string, runId?: string, ): Promise { await this.ensureInitialized(); const currentTime = new Date().toISOString(); await this.run( `UPDATE entities SET label = ?, type = ?, updated_at = ?, user_id = ?, run_id = ? WHERE entity_id = ?`, [label, type, currentTime, userId, runId, entityId], ); } async deleteEntity(entityId: string): Promise { await this.ensureInitialized(); await this.run(`DELETE FROM entities WHERE entity_id = ?`, [entityId]); } async getEntity(entityId: string): Promise { await this.ensureInitialized(); const result = await this.all(`SELECT * FROM entities WHERE entity_id = ?`, [entityId]); return result.length > 0 ? result[0] : null; } async getAllEntities(options?: { userId?: string, runId?: string }): Promise { await this.ensureInitialized(); let query = "SELECT * FROM entities"; const params: any[] = []; const conditions: string[] = []; if (options?.userId) { conditions.push("user_id = ?"); params.push(options.userId); } if (options?.runId) { conditions.push("run_id = ?"); params.push(options.runId); } if (conditions.length > 0) { query += " WHERE " + conditions.join(" AND "); } query += " ORDER BY created_at ASC"; return this.all(query, params); } async associateMemoryWithEntities(memoryId: string, entityIds: string[]): Promise { await this.ensureInitialized(); // First, remove existing associations for this memory await this.run(`DELETE FROM memory_entities WHERE memory_id = ?`, [memoryId]); // Add new associations const currentTime = new Date().toISOString(); for (const entityId of entityIds) { await this.run( `INSERT INTO memory_entities (memory_id, entity_id, created_at) VALUES (?, ?, ?)`, [memoryId, entityId, currentTime], ); } } async getMemoryEntities(memoryId: string): Promise { await this.ensureInitialized(); const result = await this.all( `SELECT entity_id FROM memory_entities WHERE memory_id = ?`, [memoryId], ); return result.map(row => row.entity_id); } async getMessageDetails(memoryId: string): Promise { await this.ensureInitialized(); // Get the specific memory's message details const result = await this.all( `SELECT message_id, new_value, previous_value, action, created_at, updated_at, user_id, run_id FROM memory_history WHERE memory_id = ? ORDER BY happened_at DESC`, [memoryId] ); return result.map(row => ({ type: 'message_details', memoryId, messageId: row.message_id, content: row.new_value || row.previous_value, action: row.action, createdAt: row.created_at, updatedAt: row.updated_at, userId: row.user_id, runId: row.run_id })); } async getMessageConversationContext(memoryId: string): Promise { await this.ensureInitialized(); // First get the message_id for this memory const memoryResult = await this.all( `SELECT message_id, user_id, run_id FROM memory_history WHERE memory_id = ? LIMIT 1`, [memoryId] ); if (!memoryResult || memoryResult.length === 0) { return []; } const { message_id, user_id, run_id } = memoryResult[0]; if (!message_id) { return []; } // Get conversation context around this message const contextResult = await this.getMessageContext(message_id, { beforeCount: 5, afterCount: 5, userId: user_id, runId: run_id }); return contextResult.map(msg => ({ type: 'conversation_context', memoryId, messageId: msg.message_id, content: msg.new_value || msg.previous_value, contextType: msg.type, happenedAt: msg.happened_at, userId: msg.user_id, runId: msg.run_id })); } close(): void { this.db.close(); } async storeRawMessages( messages: any[], metadata: { userId?: string; runId?: string; messageId: string; batchInfo?: any; } ): Promise { await this.ensureInitialized(); const timestamp = new Date().toISOString(); const batchInfoStr = metadata.batchInfo ? JSON.stringify(metadata.batchInfo) : null; for (let i = 0; i < messages.length; i++) { const message = messages[i]; const content = typeof message.content === 'string' ? message.content : JSON.stringify(message.content); await this.run( `INSERT INTO raw_messages (message_id, content, role, user_id, run_id, batch_info, created_at, happened_at, message_index) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, [ metadata.messageId, content, message.role || 'user', metadata.userId, metadata.runId, batchInfoStr, timestamp, timestamp, i ] ); } } async searchMessages( query: string, options?: { userId?: string; runId?: string; limit?: number; timeRange?: { start?: string; end?: string }; } ): Promise { await this.ensureInitialized(); let sql: string; const params: any[] = []; // Handle broad queries that should match all content const isBroadQuery = !query || query === "" || query === ".*" || query === "%"; if (isBroadQuery) { // For broad queries, get all messages (just filter by userId/runId) sql = "SELECT * FROM raw_messages WHERE 1=1"; } else { // For specific queries, use LIKE search sql = "SELECT * FROM raw_messages WHERE content LIKE ?"; params.push(`%${query}%`); } if (options?.userId) { sql += " AND user_id = ?"; params.push(options.userId); } if (options?.runId) { sql += " AND run_id = ?"; params.push(options.runId); } if (options?.timeRange?.start) { sql += " AND happened_at >= ?"; params.push(options.timeRange.start); } if (options?.timeRange?.end) { sql += " AND happened_at <= ?"; params.push(options.timeRange.end); } sql += " ORDER BY happened_at DESC"; if (options?.limit) { sql += " LIMIT ?"; params.push(options.limit); } return this.all(sql, params); } async getMessagesByPattern( messageIdPattern: string, options?: { userId?: string; runId?: string; exact?: boolean; } ): Promise { await this.ensureInitialized(); let sql: string; const params: any[] = []; if (options?.exact) { sql = "SELECT * FROM raw_messages WHERE message_id = ?"; params.push(messageIdPattern); } else { sql = "SELECT * FROM raw_messages WHERE message_id LIKE ?"; params.push(`%${messageIdPattern}%`); } if (options?.userId) { sql += " AND user_id = ?"; params.push(options.userId); } if (options?.runId) { sql += " AND run_id = ?"; params.push(options.runId); } sql += " ORDER BY happened_at ASC, message_index ASC"; return this.all(sql, params); } }