import { and, desc, eq, sql } from "drizzle-orm"; import type { BunSQLiteDatabase } from "drizzle-orm/bun-sqlite"; import { Context, Effect, Layer, Metric } from "effect"; import { fromPromise } from "../effect/interop"; import { runPromise, runSync } from "../effect/runtime"; import { dbQueryDuration } from "../effect/metrics"; import { nowMs } from "../utils/time"; import type { SmithersError } from "../utils/errors"; import type { MemoryNamespace, MemoryFact, MemoryThread, MemoryMessage, } from "./types"; import { namespaceToString } from "./types"; import { smithersMemoryFacts, smithersMemoryThreads, smithersMemoryMessages, } from "./schema"; import { memoryFactReads, memoryFactWrites, memoryMessageSaves, } from "./metrics"; // --------------------------------------------------------------------------- // Types // --------------------------------------------------------------------------- export type MemoryStore = { // Working memory getFact: (ns: MemoryNamespace, key: string) => Promise; setFact: (ns: MemoryNamespace, key: string, value: unknown, ttlMs?: number) => Promise; deleteFact: (ns: MemoryNamespace, key: string) => Promise; listFacts: (ns: MemoryNamespace) => Promise; // Threads createThread: (ns: MemoryNamespace, title?: string) => Promise; getThread: (threadId: string) => Promise; deleteThread: (threadId: string) => Promise; // Messages saveMessage: (msg: Omit & { createdAtMs?: number }) => Promise; listMessages: (threadId: string, limit?: number) => Promise; countMessages: (threadId: string) => Promise; // Maintenance deleteExpiredFacts: () => Promise; // Effect variants getFactEffect: (ns: MemoryNamespace, key: string) => Effect.Effect; setFactEffect: (ns: MemoryNamespace, key: string, value: unknown, ttlMs?: number) => Effect.Effect; deleteFactEffect: (ns: MemoryNamespace, key: string) => Effect.Effect; listFactsEffect: (ns: MemoryNamespace) => Effect.Effect; createThreadEffect: (ns: MemoryNamespace, title?: string) => Effect.Effect; getThreadEffect: (threadId: string) => Effect.Effect; deleteThreadEffect: (threadId: string) => Effect.Effect; saveMessageEffect: (msg: Omit & { createdAtMs?: number }) => Effect.Effect; listMessagesEffect: (threadId: string, limit?: number) => Effect.Effect; countMessagesEffect: (threadId: string) => Effect.Effect; deleteExpiredFactsEffect: () => Effect.Effect; }; export const MemoryStoreDb = Context.GenericTag>( "MemoryStoreDb", ); export class MemoryStoreService extends Context.Tag("MemoryStoreService")< MemoryStoreService, MemoryStore >() {} // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- function readEffect( label: string, operation: () => PromiseLike, ): Effect.Effect { return Effect.gen(function* () { const start = performance.now(); const result = yield* fromPromise(label, operation, { code: "DB_QUERY_FAILED", details: { operation: label }, }); yield* Metric.update(dbQueryDuration, performance.now() - start); return result; }).pipe( Effect.annotateLogs({ dbOperation: label }), Effect.withLogSpan(`memory:${label}`), ); } function writeEffect( label: string, operation: () => PromiseLike, ): Effect.Effect { return Effect.gen(function* () { const start = performance.now(); const result = yield* fromPromise(label, operation, { code: "DB_WRITE_FAILED", details: { operation: label }, }); yield* Metric.update(dbQueryDuration, performance.now() - start); return result; }).pipe( Effect.annotateLogs({ dbOperation: label }), Effect.withLogSpan(`memory:${label}`), ); } // --------------------------------------------------------------------------- // Factory // --------------------------------------------------------------------------- function makeMemoryStore(db: BunSQLiteDatabase): MemoryStore { // --- Working Memory Effects --- function getFactEffect( ns: MemoryNamespace, key: string, ): Effect.Effect { const nsStr = namespaceToString(ns); return Effect.gen(function* () { yield* Metric.increment(memoryFactReads); const rows = yield* readEffect("memory getFact", () => db .select() .from(smithersMemoryFacts) .where( and( eq(smithersMemoryFacts.namespace, nsStr), eq(smithersMemoryFacts.key, key), ), ) .limit(1), ); const row = rows[0]; if (!row) return undefined; return { namespace: row.namespace, key: row.key, valueJson: row.valueJson, schemaSig: row.schemaSig, createdAtMs: row.createdAtMs, updatedAtMs: row.updatedAtMs, ttlMs: row.ttlMs, } as MemoryFact; }); } function setFactEffect( ns: MemoryNamespace, key: string, value: unknown, ttlMs?: number, ): Effect.Effect { const nsStr = namespaceToString(ns); const now = nowMs(); return Effect.gen(function* () { yield* Metric.increment(memoryFactWrites); yield* writeEffect("memory setFact", () => db .insert(smithersMemoryFacts) .values({ namespace: nsStr, key, valueJson: JSON.stringify(value), createdAtMs: now, updatedAtMs: now, ttlMs: ttlMs ?? null, }) .onConflictDoUpdate({ target: [smithersMemoryFacts.namespace, smithersMemoryFacts.key], set: { valueJson: JSON.stringify(value), updatedAtMs: now, ttlMs: ttlMs ?? null, }, }), ); }); } function deleteFactEffect( ns: MemoryNamespace, key: string, ): Effect.Effect { const nsStr = namespaceToString(ns); return writeEffect("memory deleteFact", () => db .delete(smithersMemoryFacts) .where( and( eq(smithersMemoryFacts.namespace, nsStr), eq(smithersMemoryFacts.key, key), ), ), ).pipe(Effect.asVoid); } function listFactsEffect( ns: MemoryNamespace, ): Effect.Effect { const nsStr = namespaceToString(ns); return readEffect("memory listFacts", () => db .select() .from(smithersMemoryFacts) .where(eq(smithersMemoryFacts.namespace, nsStr)) .orderBy(smithersMemoryFacts.key), ).pipe( Effect.map((rows) => rows.map((row) => ({ namespace: row.namespace, key: row.key, valueJson: row.valueJson, schemaSig: row.schemaSig, createdAtMs: row.createdAtMs, updatedAtMs: row.updatedAtMs, ttlMs: row.ttlMs, })), ), ); } // --- Thread Effects --- function createThreadEffect( ns: MemoryNamespace, title?: string, ): Effect.Effect { const nsStr = namespaceToString(ns); const now = nowMs(); const threadId = crypto.randomUUID(); const thread: MemoryThread = { threadId, namespace: nsStr, title: title ?? null, metadataJson: null, createdAtMs: now, updatedAtMs: now, }; return writeEffect("memory createThread", () => db.insert(smithersMemoryThreads).values(thread), ).pipe(Effect.map(() => thread)); } function getThreadEffect( threadId: string, ): Effect.Effect { return readEffect("memory getThread", () => db .select() .from(smithersMemoryThreads) .where(eq(smithersMemoryThreads.threadId, threadId)) .limit(1), ).pipe(Effect.map((rows) => rows[0] as MemoryThread | undefined)); } function deleteThreadEffect( threadId: string, ): Effect.Effect { return Effect.gen(function* () { // Delete messages first yield* writeEffect("memory deleteThreadMessages", () => db .delete(smithersMemoryMessages) .where(eq(smithersMemoryMessages.threadId, threadId)), ); // Delete the thread yield* writeEffect("memory deleteThread", () => db .delete(smithersMemoryThreads) .where(eq(smithersMemoryThreads.threadId, threadId)), ); }); } // --- Message Effects --- function saveMessageEffect( msg: Omit & { createdAtMs?: number }, ): Effect.Effect { return Effect.gen(function* () { yield* Metric.increment(memoryMessageSaves); yield* writeEffect("memory saveMessage", () => db.insert(smithersMemoryMessages).values({ id: msg.id, threadId: msg.threadId, role: msg.role, contentJson: msg.contentJson, runId: msg.runId ?? null, nodeId: msg.nodeId ?? null, createdAtMs: msg.createdAtMs ?? nowMs(), }), ); }); } function listMessagesEffect( threadId: string, limit?: number, ): Effect.Effect { return readEffect("memory listMessages", () => { let query = db .select() .from(smithersMemoryMessages) .where(eq(smithersMemoryMessages.threadId, threadId)) .orderBy(smithersMemoryMessages.createdAtMs); if (limit) { query = query.limit(limit) as any; } return query; }).pipe( Effect.map((rows) => rows.map((row) => ({ id: row.id, threadId: row.threadId, role: row.role, contentJson: row.contentJson, runId: row.runId, nodeId: row.nodeId, createdAtMs: row.createdAtMs, })), ), ); } function countMessagesEffect( threadId: string, ): Effect.Effect { return readEffect("memory countMessages", () => db .select({ count: sql`count(*)` }) .from(smithersMemoryMessages) .where(eq(smithersMemoryMessages.threadId, threadId)), ).pipe(Effect.map((rows) => rows[0]?.count ?? 0)); } // --- Maintenance --- function deleteExpiredFactsEffect(): Effect.Effect { const now = nowMs(); return writeEffect("memory deleteExpiredFacts", () => db .delete(smithersMemoryFacts) .where( and( sql`${smithersMemoryFacts.ttlMs} IS NOT NULL`, sql`${smithersMemoryFacts.updatedAtMs} + ${smithersMemoryFacts.ttlMs} < ${now}`, ), ), ).pipe(Effect.map((result: any) => result?.changes ?? result?.rowsAffected ?? 0)); } // --- Build the store --- return { // Promise variants (delegate to Effect) getFact: (ns, key) => runPromise(getFactEffect(ns, key)), setFact: (ns, key, value, ttlMs) => runPromise(setFactEffect(ns, key, value, ttlMs)), deleteFact: (ns, key) => runPromise(deleteFactEffect(ns, key)), listFacts: (ns) => runPromise(listFactsEffect(ns)), createThread: (ns, title) => runPromise(createThreadEffect(ns, title)), getThread: (threadId) => runPromise(getThreadEffect(threadId)), deleteThread: (threadId) => runPromise(deleteThreadEffect(threadId)), saveMessage: (msg) => runPromise(saveMessageEffect(msg)), listMessages: (threadId, limit) => runPromise(listMessagesEffect(threadId, limit)), countMessages: (threadId) => runPromise(countMessagesEffect(threadId)), deleteExpiredFacts: () => runPromise(deleteExpiredFactsEffect()), // Effect variants getFactEffect, setFactEffect, deleteFactEffect, listFactsEffect, createThreadEffect, getThreadEffect, deleteThreadEffect, saveMessageEffect, listMessagesEffect, countMessagesEffect, deleteExpiredFactsEffect, }; } export const MemoryStoreLive = Layer.effect( MemoryStoreService, Effect.map(MemoryStoreDb, (db) => makeMemoryStore(db)), ); export function createMemoryStoreLayer(db: BunSQLiteDatabase) { return MemoryStoreLive.pipe( Layer.provide(Layer.succeed(MemoryStoreDb, db)), ); } export function createMemoryStore(db: BunSQLiteDatabase): MemoryStore { return runSync( MemoryStoreService.pipe(Effect.provide(createMemoryStoreLayer(db))), ); }