import { Database } from "bun:sqlite"; import { mkdirSync, unlinkSync } from "node:fs"; import { dirname } from "node:path"; const SCHEMA = ` CREATE TABLE IF NOT EXISTS messages ( id TEXT PRIMARY KEY, conversation_id TEXT NOT NULL, from_id TEXT NOT NULL, to_id TEXT NOT NULL, content TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX IF NOT EXISTS idx_messages_conv ON messages(conversation_id, created_at); CREATE INDEX IF NOT EXISTS idx_messages_recent ON messages(created_at DESC); CREATE TABLE IF NOT EXISTS memories ( id TEXT PRIMARY KEY, content TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS schedules ( id TEXT PRIMARY KEY, cron TEXT NOT NULL, description TEXT NOT NULL, task TEXT NOT NULL, conversation_id TEXT NOT NULL, last_run DATETIME, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); `; const MIGRATIONS = [ `ALTER TABLE messages ADD COLUMN role TEXT NOT NULL DEFAULT 'user'`, `ALTER TABLE messages ADD COLUMN tool_call_id TEXT`, `ALTER TABLE messages ADD COLUMN tool_calls_json TEXT`, `ALTER TABLE messages ADD COLUMN name TEXT`, ]; const FTS_SCHEMA = ` CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(content, content=memories, content_rowid=rowid); CREATE TRIGGER IF NOT EXISTS memories_ai AFTER INSERT ON memories BEGIN INSERT INTO memories_fts(rowid, content) VALUES (new.rowid, new.content); END; CREATE TRIGGER IF NOT EXISTS memories_ad AFTER DELETE ON memories BEGIN INSERT INTO memories_fts(memories_fts, rowid, content) VALUES('delete', old.rowid, old.content); END; CREATE TRIGGER IF NOT EXISTS memories_au AFTER UPDATE ON memories BEGIN INSERT INTO memories_fts(memories_fts, rowid, content) VALUES('delete', old.rowid, old.content); INSERT INTO memories_fts(rowid, content) VALUES (new.rowid, new.content); END; `; export interface StoredMessage { id: string; conversationId: string; fromId: string; toId: string; content: string; createdAt: string; } export interface ContextMessage { role: "user" | "assistant" | "tool"; content: string; name?: string; toolCallId?: string; toolCallsJson?: string; } export interface StoredMemory { id: string; content: string; createdAt: string; } export interface StoredSchedule { id: string; cron: string; description: string; task: string; conversationId: string; lastRun: string | null; createdAt: string; } export class MemoryStore { private db: Database; private dbPath: string; constructor(dbPath: string) { this.dbPath = dbPath; mkdirSync(dirname(dbPath), { recursive: true }); this.db = new Database(dbPath); this.db.exec("PRAGMA journal_mode=DELETE;"); this.db.exec(SCHEMA); this.db.exec(FTS_SCHEMA); this.runMigrations(); } private runMigrations(): void { for (const sql of MIGRATIONS) { try { this.db.exec(sql); } catch { // Column already exists — expected for idempotent migrations } } } saveMessage( conversationId: string, fromId: string, toId: string, content: string, opts?: { role?: string; toolCallId?: string; toolCallsJson?: string; name?: string }, ): StoredMessage { const id = crypto.randomUUID(); const role = opts?.role ?? (fromId === "user" || fromId === "scheduler" ? "user" : "assistant"); this.db .prepare( "INSERT INTO messages (id, conversation_id, from_id, to_id, content, role, tool_call_id, tool_calls_json, name) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .run( id, conversationId, fromId, toId, content, role, opts?.toolCallId ?? null, opts?.toolCallsJson ?? null, opts?.name ?? null, ); return { id, conversationId, fromId, toId, content, createdAt: new Date().toISOString(), }; } getConversation(conversationId: string, limit?: number): StoredMessage[] { if (limit) { return this.db .prepare( "SELECT * FROM (SELECT id, conversation_id as conversationId, from_id as fromId, to_id as toId, content, created_at as createdAt FROM messages WHERE conversation_id = ? ORDER BY created_at DESC LIMIT ?) ORDER BY createdAt ASC", ) .all(conversationId, limit) as StoredMessage[]; } return this.db .prepare( "SELECT id, conversation_id as conversationId, from_id as fromId, to_id as toId, content, created_at as createdAt FROM messages WHERE conversation_id = ? ORDER BY created_at ASC", ) .all(conversationId) as StoredMessage[]; } getConversationContext(conversationId: string, limit?: number): ContextMessage[] { if (limit) { const rows = this.db .prepare( "SELECT role, content, name, tool_call_id as toolCallId, tool_calls_json as toolCallsJson FROM messages WHERE conversation_id = ? ORDER BY created_at DESC, _rowid_ DESC LIMIT ?", ) .all(conversationId, limit) .reverse() as ContextMessage[]; // If the window starts with orphaned tool messages, expand backward // to include the preceding assistant message (with tool_calls). if (rows.length > 0 && rows[0]!.role === "tool") { const totalCount = this.getConversationMessageCount(conversationId); if (totalCount > rows.length) { const extra = this.db .prepare( "SELECT role, content, name, tool_call_id as toolCallId, tool_calls_json as toolCallsJson FROM messages WHERE conversation_id = ? ORDER BY created_at DESC, _rowid_ DESC LIMIT ? OFFSET ?", ) .all(conversationId, 10, limit) as ContextMessage[]; const prefix: ContextMessage[] = []; for (const msg of extra) { prefix.unshift(msg); if (msg.role === "assistant" && msg.toolCallsJson) break; } if (prefix.length > 0) { rows.unshift(...prefix); } } } return rows; } return this.db .prepare( "SELECT role, content, name, tool_call_id as toolCallId, tool_calls_json as toolCallsJson FROM messages WHERE conversation_id = ? ORDER BY created_at ASC, _rowid_ ASC", ) .all(conversationId) as ContextMessage[]; } private getConversationMessageCount(conversationId: string): number { const row = this.db .prepare("SELECT COUNT(*) as count FROM messages WHERE conversation_id = ?") .get(conversationId) as { count: number }; return row.count; } getLastUserConversationId(): string | null { const row = this.db .prepare( "SELECT conversation_id as conversationId FROM messages WHERE from_id = 'user' ORDER BY created_at DESC, _rowid_ DESC LIMIT 1", ) .get() as { conversationId: string } | null; return row?.conversationId ?? null; } getRecentConversations(limit = 20): StoredMessage[] { return this.db .prepare( "SELECT id, conversation_id as conversationId, from_id as fromId, to_id as toId, content, created_at as createdAt FROM messages WHERE role IN ('user', 'assistant') AND content != '' ORDER BY created_at DESC LIMIT ?", ) .all(limit) as StoredMessage[]; } saveMemory(content: string): StoredMemory { const id = crypto.randomUUID(); this.db.prepare("INSERT INTO memories (id, content) VALUES (?, ?)").run(id, content); return { id, content, createdAt: new Date().toISOString() }; } searchMemory(query: string, limit = 10): StoredMemory[] { const sanitized = `"${query.replace(/"/g, '""')}"`; return this.db .prepare( "SELECT m.id, m.content, m.created_at as createdAt FROM memories m JOIN memories_fts f ON m.rowid = f.rowid WHERE memories_fts MATCH ? LIMIT ?", ) .all(sanitized, limit) as StoredMemory[]; } getMessageCount(): number { const row = this.db.prepare("SELECT COUNT(*) as count FROM messages").get() as { count: number }; return row.count; } hasRecentUserMessage(conversationId: string, content: string): boolean { const row = this.db .prepare( "SELECT 1 FROM messages WHERE conversation_id = ? AND role = 'user' AND content = ? ORDER BY created_at DESC, _rowid_ DESC LIMIT 1", ) .get(conversationId, content); return row != null; } saveSchedule(id: string, cron: string, description: string, task: string): StoredSchedule { const existing = this.getSchedule(id); const conversationId = existing?.conversationId ?? crypto.randomUUID(); this.db .prepare("INSERT OR REPLACE INTO schedules (id, cron, description, task, conversation_id) VALUES (?, ?, ?, ?, ?)") .run(id, cron, description, task, conversationId); return { id, cron, description, task, conversationId, lastRun: null, createdAt: new Date().toISOString() }; } getScheduleCount(): number { const row = this.db.prepare("SELECT COUNT(*) as count FROM schedules").get() as { count: number }; return row.count; } getSchedules(): StoredSchedule[] { return this.db .prepare( "SELECT id, cron, description, task, conversation_id as conversationId, last_run as lastRun, created_at as createdAt FROM schedules ORDER BY created_at ASC", ) .all() as StoredSchedule[]; } getSchedule(id: string): StoredSchedule | null { return ( (this.db .prepare( "SELECT id, cron, description, task, conversation_id as conversationId, last_run as lastRun, created_at as createdAt FROM schedules WHERE id = ?", ) .get(id) as StoredSchedule) ?? null ); } deleteSchedule(id: string): boolean { const result = this.db.prepare("DELETE FROM schedules WHERE id = ?").run(id); return result.changes > 0; } updateScheduleLastRun(id: string): void { this.db.prepare("UPDATE schedules SET last_run = CURRENT_TIMESTAMP WHERE id = ?").run(id); } close(): void { this.db.close(); } delete(): void { this.db.close(); try { unlinkSync(this.dbPath); } catch {} try { unlinkSync(`${this.dbPath}-wal`); } catch {} try { unlinkSync(`${this.dbPath}-shm`); } catch {} } }