/** * Persistence Store * * JSONL-based persistence for runs and state. * * File structure: * {baseDir}/ * runs.jsonl - Run metadata * state.jsonl - State snapshots * logs/ * {executionId}.jsonl - Action log per run */ import { join } from "node:path"; import { mkdir } from "node:fs/promises"; import { existsSync } from "node:fs"; import type { PersistenceStore, PersistenceConfig, RunRecord, ActionRecord, StateRecord, RunQueryOptions, } from "./types.ts"; import { appendJsonl, readJsonl, queryJsonl, updateJsonl, findJsonl, ensureDir, } from "./jsonl.ts"; /** * Create a file-based persistence store */ export function createPersistenceStore(config: PersistenceConfig): PersistenceStore { const { baseDir, enabled = true } = config; const runsFile = join(baseDir, "runs.jsonl"); const stateFile = join(baseDir, "state.jsonl"); const logsDir = join(baseDir, "logs"); const getLogFile = (executionId: string) => join(logsDir, `${executionId}.jsonl`); // No-op store when disabled if (!enabled) { return { appendRun: async () => {}, updateRun: async () => {}, queryRuns: async () => [], getRun: async () => null, appendAction: async () => {}, getActions: async () => [], appendState: async () => {}, getLatestState: async () => null, queryState: async () => [], init: async () => {}, close: async () => {}, }; } return { async init() { await ensureDir(runsFile); await ensureDir(stateFile); if (!existsSync(logsDir)) { await mkdir(logsDir, { recursive: true }); } }, async close() { // No cleanup needed for file-based store }, // Runs async appendRun(record: RunRecord) { await appendJsonl(runsFile, record); }, async updateRun(executionId: string, update: Partial) { await updateJsonl( runsFile, (r) => r.executionId === executionId, (r) => ({ ...r, ...update }) ); }, async queryRuns(options?: RunQueryOptions): Promise { const records = await readJsonl(runsFile); let filtered = records.filter((r) => { if (options?.after && r.startTime < options.after) return false; if (options?.before && r.startTime > options.before) return false; if (options?.status && r.status !== options.status) return false; return true; }); // Sort by startTime descending (newest first) filtered.sort((a, b) => b.startTime - a.startTime); if (options?.limit && options.limit > 0) { filtered = filtered.slice(0, options.limit); } return filtered; }, async getRun(executionId: string): Promise { return findJsonl(runsFile, (r) => r.executionId === executionId); }, // Actions async appendAction(executionId: string, action: ActionRecord) { const logFile = getLogFile(executionId); await appendJsonl(logFile, action); }, async getActions(executionId: string): Promise { const logFile = getLogFile(executionId); return readJsonl(logFile); }, // State async appendState(record: StateRecord) { await appendJsonl(stateFile, record); }, async getLatestState(): Promise { const records = await readJsonl(stateFile); if (records.length === 0) return null; // Return most recent by timestamp return records.reduce((latest, r) => r.timestamp > latest.timestamp ? r : latest ); }, async queryState(options?: { after?: number; before?: number; limit?: number }): Promise { let records = await readJsonl(stateFile); records = records.filter((r) => { if (options?.after && r.timestamp < options.after) return false; if (options?.before && r.timestamp > options.before) return false; return true; }); // Sort by timestamp descending records.sort((a, b) => b.timestamp - a.timestamp); if (options?.limit && options.limit > 0) { records = records.slice(0, options.limit); } return records; }, }; }