import { Database } from "bun:sqlite"; import { mkdirSync } from "node:fs"; import { dirname } from "node:path"; import { join } from "node:path"; import { getMozartDir } from "./agent/paths.ts"; const SCHEMA = ` CREATE TABLE IF NOT EXISTS journal ( id TEXT PRIMARY KEY, target_id TEXT NOT NULL, type TEXT NOT NULL CHECK(type IN ('chat', 'route')), payload TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending' CHECK(status IN ('pending', 'acknowledged', 'completed', 'failed')), response TEXT, response_delivered INTEGER NOT NULL DEFAULT 0, source_id TEXT, source_request_id TEXT, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, attempts INTEGER NOT NULL DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_journal_target_status ON journal(target_id, status); CREATE INDEX IF NOT EXISTS idx_journal_source_response ON journal(source_id, status, response_delivered); `; const MAX_ATTEMPTS = 3; export interface JournalEntry { id: string; targetId: string; type: "chat" | "route"; payload: string; status: "pending" | "acknowledged" | "completed" | "failed"; response: string | null; responseDelivered: boolean; sourceId: string | null; sourceRequestId: string | null; createdAt: number; updatedAt: number; attempts: number; } interface JournalRow { id: string; target_id: string; type: "chat" | "route"; payload: string; status: "pending" | "acknowledged" | "completed" | "failed"; response: string | null; response_delivered: number; source_id: string | null; source_request_id: string | null; created_at: number; updated_at: number; attempts: number; } interface RouteResolver { resolve: (response: string) => void; reject: (error: Error) => void; } function rowToEntry(row: JournalRow): JournalEntry { return { id: row.id, targetId: row.target_id, type: row.type, payload: row.payload, status: row.status, response: row.response, responseDelivered: row.response_delivered === 1, sourceId: row.source_id, sourceRequestId: row.source_request_id, createdAt: row.created_at, updatedAt: row.updated_at, attempts: row.attempts, }; } export class MessageJournal { private db: Database; private resolvers = new Map(); constructor(dbPath?: string) { const path = dbPath ?? join(getMozartDir(), "journal.db"); mkdirSync(dirname(path), { recursive: true }); this.db = new Database(path); this.db.exec("PRAGMA journal_mode=DELETE;"); this.db.exec(SCHEMA); } record(entry: { id: string; targetId: string; type: "chat" | "route"; payload: string; sourceId?: string; sourceRequestId?: string; }): void { const now = Date.now(); this.db .prepare( "INSERT INTO journal (id, target_id, type, payload, status, source_id, source_request_id, created_at, updated_at, attempts) VALUES (?, ?, ?, ?, 'pending', ?, ?, ?, ?, 0)", ) .run( entry.id, entry.targetId, entry.type, entry.payload, entry.sourceId ?? null, entry.sourceRequestId ?? null, now, now, ); } acknowledge(messageId: string): void { this.db .prepare("UPDATE journal SET status = 'acknowledged', updated_at = ? WHERE id = ? AND status = 'pending'") .run(Date.now(), messageId); } complete(messageId: string, response?: string): void { this.db .prepare("UPDATE journal SET status = 'completed', response = ?, updated_at = ? WHERE id = ? AND status IN ('pending', 'acknowledged')") .run(response ?? null, Date.now(), messageId); } fail(messageId: string): void { this.db .prepare("UPDATE journal SET status = 'failed', updated_at = ? WHERE id = ?") .run(Date.now(), messageId); const resolver = this.resolvers.get(messageId); if (resolver) { resolver.reject(new Error("Message delivery failed after max retries")); this.resolvers.delete(messageId); } } getPending(targetId: string): JournalEntry[] { const rows = this.db .prepare( "SELECT * FROM journal WHERE target_id = ? AND status IN ('pending', 'acknowledged') ORDER BY created_at ASC", ) .all(targetId) as JournalRow[]; return rows.map(rowToEntry); } getUndeliveredResponses(sourceId: string): JournalEntry[] { const rows = this.db .prepare( "SELECT * FROM journal WHERE source_id = ? AND type = 'route' AND status = 'completed' AND response IS NOT NULL AND response_delivered = 0 ORDER BY created_at ASC", ) .all(sourceId) as JournalRow[]; return rows.map(rowToEntry); } markResponseDelivered(messageId: string): void { this.db .prepare("UPDATE journal SET response_delivered = 1, updated_at = ? WHERE id = ?") .run(Date.now(), messageId); } incrementAttempts(messageId: string): number { this.db .prepare("UPDATE journal SET attempts = attempts + 1, updated_at = ? WHERE id = ?") .run(Date.now(), messageId); const row = this.db.prepare("SELECT attempts FROM journal WHERE id = ?").get(messageId) as { attempts: number } | null; return row?.attempts ?? 0; } get maxAttempts(): number { return MAX_ATTEMPTS; } attachResolver(messageId: string, resolver: RouteResolver): void { this.resolvers.set(messageId, resolver); } /** * Complete a route entry and resolve its live Promise if present. * Returns true if a live resolver was called, false if the response was only persisted. */ resolveRoute(messageId: string, response: string): boolean { this.complete(messageId, response); const resolver = this.resolvers.get(messageId); if (resolver) { this.resolvers.delete(messageId); resolver.resolve(response); return true; } return false; } /** * Reject a route's live resolver (e.g. on timeout) without changing persisted status. */ detachResolver(messageId: string): void { this.resolvers.delete(messageId); } cleanup(olderThanMs: number): number { const cutoff = Date.now() - olderThanMs; const result = this.db .prepare( "DELETE FROM journal WHERE status IN ('completed', 'failed') AND (response_delivered = 1 OR response IS NULL OR type = 'chat') AND updated_at <= ?", ) .run(cutoff); return result.changes; } close(): void { for (const resolver of this.resolvers.values()) { resolver.reject(new Error("Journal closed")); } this.resolvers.clear(); this.db.close(); } get(messageId: string): JournalEntry | null { const row = this.db.prepare("SELECT * FROM journal WHERE id = ?").get(messageId) as JournalRow | null; return row ? rowToEntry(row) : null; } }