/** * State Store — SQLite persistence via sql.js (pure WASM) * * Persists channel state and message dedup records. * sql.js operates in-memory; explicit persist() writes to disk. * No native compilation needed (unlike better-sqlite3). * * v3: All tables use (account_id, channel_id) composite key. * See §0.4 of refactoring-plan.md for field semantics. */ import fs from 'node:fs'; import path from 'node:path'; import os from 'node:os'; import { logger } from '../util/logger.js'; const DB_PATH = path.join(os.homedir(), '.openclaw', 'clawlink.db'); // ── Types ── export interface ChannelState { account_id: string; channel_id: string; channel_name: string; joined: number; last_visit: number; last_reply: number; last_msg_seq: number; created_at: number; updated_at: number; } export interface ChannelStateDisplay { id: string; name: string; lastVisit: string; lastReply: string; lastMsgSeq: number; } interface SqlJsDatabase { run(sql: string, params?: unknown[]): void; prepare(sql: string): SqlJsStatement; export(): Uint8Array; close(): void; } interface SqlJsStatement { bind(params?: unknown[]): boolean; step(): boolean; getAsObject(): Record; free(): void; } // ── StateStore ── export class StateStore { private db: SqlJsDatabase | null = null; private accountId: string = 'default'; private _seenCount = 0; /** * Initialize the SQLite database. * Must be called before any other methods. * * @param currentAccountId - The account ID of the currently starting account. * Used for schema migration: old data without account_id will be assigned this ID. */ async init(currentAccountId: string): Promise { this.accountId = currentAccountId || 'default'; const initSqlJs = ((await import('sql.js')) as { default: (config?: unknown) => Promise<{ Database: new (data?: ArrayLike) => SqlJsDatabase }> }).default; const SQL = await initSqlJs(); // Load existing DB file if present if (fs.existsSync(DB_PATH)) { const buffer = fs.readFileSync(DB_PATH); this.db = new SQL.Database(buffer); } else { this.db = new SQL.Database(); } this._migrate(); return this; } private _migrate(): void { if (!this.db) return; // Detect old schema (no account_id column) const hasAccountId = this._tableHasColumn('channel_state', 'account_id'); if (hasAccountId) { // Already v3 schema — ensure all tables exist this._createV3Tables(); } else { // Old v2 schema detected — migrate this._migrateFromV2(); } } private _tableHasColumn(table: string, column: string): boolean { if (!this.db) return false; try { const stmt = this.db.prepare(`PRAGMA table_info(${table})`); while (stmt.step()) { const row = stmt.getAsObject(); if (row['name'] === column) { stmt.free(); return true; } } stmt.free(); } catch { // Table doesn't exist } return false; } private _tableExists(table: string): boolean { if (!this.db) return false; try { const stmt = this.db.prepare( `SELECT name FROM sqlite_master WHERE type='table' AND name=?` ); stmt.bind([table]); const exists = stmt.step(); stmt.free(); return exists; } catch { return false; } } private _migrateFromV2(): void { if (!this.db) return; logger.info(`[state] Migrating v2 schema to v3 (account_id='${this.accountId}')`); // 1. Rename old tables if (this._tableExists('channel_state')) { this.db.run('ALTER TABLE channel_state RENAME TO _v2_channel_state'); } if (this._tableExists('seen_messages')) { this.db.run('ALTER TABLE seen_messages RENAME TO _v2_seen_messages'); } if (this._tableExists('channel_skills')) { this.db.run('ALTER TABLE channel_skills RENAME TO _v2_channel_skills'); } if (this._tableExists('channel_purpose')) { this.db.run('ALTER TABLE channel_purpose RENAME TO _v2_channel_purpose'); } // 2. Create v3 tables this._createV3Tables(); // 3. Copy old data with current account_id if (this._tableExists('_v2_channel_state')) { this.db.run(` INSERT INTO channel_state (account_id, channel_id, channel_name, joined, last_visit, last_reply, last_msg_seq, created_at, updated_at) SELECT ?, channel_id, channel_name, joined, last_visit, last_reply, last_msg_seq, created_at, updated_at FROM _v2_channel_state `, [this.accountId]); } if (this._tableExists('_v2_seen_messages')) { this.db.run(` INSERT INTO seen_messages (account_id, message_id, channel_id, seen_at) SELECT ?, message_id, channel_id, seen_at FROM _v2_seen_messages `, [this.accountId]); } if (this._tableExists('_v2_channel_skills')) { this.db.run(` INSERT INTO channel_skills (account_id, channel_id, skill_content, updated_at) SELECT ?, channel_id, skill_content, updated_at FROM _v2_channel_skills `, [this.accountId]); } if (this._tableExists('_v2_channel_purpose')) { this.db.run(` INSERT INTO channel_purpose (account_id, channel_id, purpose, updated_at) SELECT ?, channel_id, purpose, updated_at FROM _v2_channel_purpose `, [this.accountId]); } // 4. Drop old tables this.db.run('DROP TABLE IF EXISTS _v2_channel_state'); this.db.run('DROP TABLE IF EXISTS _v2_seen_messages'); this.db.run('DROP TABLE IF EXISTS _v2_channel_skills'); this.db.run('DROP TABLE IF EXISTS _v2_channel_purpose'); this.persist(); logger.info('[state] Migration complete'); } private _createV3Tables(): void { if (!this.db) return; this.db.run(` CREATE TABLE IF NOT EXISTS channel_state ( account_id TEXT NOT NULL, channel_id TEXT NOT NULL, channel_name TEXT NOT NULL DEFAULT '', joined INTEGER NOT NULL DEFAULT 0, last_visit INTEGER NOT NULL DEFAULT 0, last_reply INTEGER NOT NULL DEFAULT 0, last_msg_seq INTEGER NOT NULL DEFAULT 0, created_at INTEGER NOT NULL DEFAULT 0, updated_at INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (account_id, channel_id) ) `); this.db.run(` CREATE TABLE IF NOT EXISTS seen_messages ( account_id TEXT NOT NULL, message_id TEXT NOT NULL, channel_id TEXT NOT NULL, seen_at INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (account_id, message_id) ) `); this.db.run(` CREATE TABLE IF NOT EXISTS channel_skills ( account_id TEXT NOT NULL, channel_id TEXT NOT NULL, skill_content TEXT NOT NULL DEFAULT '', updated_at INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (account_id, channel_id) ) `); this.db.run(` CREATE TABLE IF NOT EXISTS channel_purpose ( account_id TEXT NOT NULL, channel_id TEXT NOT NULL, purpose TEXT NOT NULL DEFAULT '', updated_at INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (account_id, channel_id) ) `); } // ── Write to disk ── persist(): void { try { if (!this.db) return; const data = this.db.export(); fs.mkdirSync(path.dirname(DB_PATH), { recursive: true }); fs.writeFileSync(DB_PATH, Buffer.from(data)); } catch (err) { logger.error(`[state] persist error: ${(err as Error).message}`); } } // ── Channel State ── /** * Get all joined channels for the current account. */ getJoinedChannels(): ChannelState[] { if (!this.db) return []; const stmt = this.db.prepare( 'SELECT * FROM channel_state WHERE account_id = ? AND joined = 1' ); stmt.bind([this.accountId]); const results: ChannelState[] = []; while (stmt.step()) results.push(stmt.getAsObject() as unknown as ChannelState); stmt.free(); return results; } getJoinedChannelIds(): string[] { return this.getJoinedChannels().map(c => c.channel_id); } getChannel(channelId: string): ChannelState | null { if (!this.db) return null; const stmt = this.db.prepare( 'SELECT * FROM channel_state WHERE account_id = ? AND channel_id = ?' ); stmt.bind([this.accountId, channelId]); const result = stmt.step() ? (stmt.getAsObject() as unknown as ChannelState) : null; stmt.free(); return result; } /** * Mark a channel as joined. * Authoritative writer: clawlink_join_channel tool (on success). */ markJoined(channelId: string, channelName: string): void { if (!this.db) return; const now = Date.now(); this.db.run( `INSERT INTO channel_state (account_id, channel_id, channel_name, joined, last_visit, created_at, updated_at) VALUES (?, ?, ?, 1, ?, ?, ?) ON CONFLICT(account_id, channel_id) DO UPDATE SET joined=1, channel_name=?, updated_at=?`, [this.accountId, channelId, channelName || '', now, now, now, channelName || '', now] ); this.persist(); } /** * Mark a channel as left. * Authoritative writer: clawlink_leave_channel tool (on success). */ markLeft(channelId: string): void { if (!this.db) return; this.db.run( 'UPDATE channel_state SET joined=0, updated_at=? WHERE account_id=? AND channel_id=?', [Date.now(), this.accountId, channelId] ); this.persist(); } /** * Update last_visit timestamp for a channel. * Authoritative writer: Collector — on ingest() accepting a message. */ updateLastVisit(channelId: string): void { if (!this.db) return; const now = Date.now(); this.db.run( 'UPDATE channel_state SET last_visit=?, updated_at=? WHERE account_id=? AND channel_id=?', [now, now, this.accountId, channelId] ); this.persist(); } /** * Update last_reply timestamp for a channel. * Authoritative writer: clawlink_send_message tool — on sendMessage() success. */ updateLastReply(channelId: string): void { if (!this.db) return; const now = Date.now(); this.db.run( 'UPDATE channel_state SET last_reply=?, updated_at=? WHERE account_id=? AND channel_id=?', [now, now, this.accountId, channelId] ); this.persist(); } /** * Update last_msg_seq for a channel. * Authoritative writer: Collector — on ingest() taking msg.sequence. */ updateLastMsgSeq(channelId: string, seq: number): void { if (!this.db) return; this.db.run( 'UPDATE channel_state SET last_msg_seq=?, updated_at=? WHERE account_id=? AND channel_id=?', [seq, Date.now(), this.accountId, channelId] ); this.persist(); } // ── Message Dedup (Collector use) ── hasSeen(messageId: string): boolean { if (!this.db) return false; const stmt = this.db.prepare( 'SELECT 1 FROM seen_messages WHERE account_id=? AND message_id=?' ); stmt.bind([this.accountId, messageId]); const found = stmt.step(); stmt.free(); return found; } markSeen(messageId: string, channelId: string): void { if (!this.db) return; this.db.run( 'INSERT OR IGNORE INTO seen_messages (account_id, message_id, channel_id, seen_at) VALUES (?, ?, ?, ?)', [this.accountId, messageId, channelId, Date.now()] ); // Batch persist: seen_messages are written frequently this._seenCount += 1; if (this._seenCount % 100 === 0) this.persist(); } // ── Channel Skills ── getSkill(channelId: string): string | null { if (!this.db) return null; const stmt = this.db.prepare( 'SELECT skill_content FROM channel_skills WHERE account_id = ? AND channel_id = ?' ); stmt.bind([this.accountId, channelId]); const result = stmt.step() ? stmt.getAsObject() : null; stmt.free(); return (result?.['skill_content'] as string) || null; } setSkill(channelId: string, skillContent: string): void { if (!this.db) return; const now = Date.now(); this.db.run( `INSERT INTO channel_skills (account_id, channel_id, skill_content, updated_at) VALUES (?, ?, ?, ?) ON CONFLICT(account_id, channel_id) DO UPDATE SET skill_content=?, updated_at=?`, [this.accountId, channelId, skillContent, now, skillContent, now] ); this.persist(); } getAllSkills(): Array<{ channel_id: string; skill_content: string }> { if (!this.db) return []; const stmt = this.db.prepare( 'SELECT channel_id, skill_content FROM channel_skills WHERE account_id = ?' ); stmt.bind([this.accountId]); const results: Array<{ channel_id: string; skill_content: string }> = []; while (stmt.step()) { const row = stmt.getAsObject(); results.push({ channel_id: row['channel_id'] as string, skill_content: row['skill_content'] as string, }); } stmt.free(); return results; } // ── Channel Purpose ── getPurpose(channelId: string): string | null { if (!this.db) return null; const stmt = this.db.prepare( 'SELECT purpose FROM channel_purpose WHERE account_id = ? AND channel_id = ?' ); stmt.bind([this.accountId, channelId]); const result = stmt.step() ? stmt.getAsObject() : null; stmt.free(); return (result?.['purpose'] as string) || null; } setPurpose(channelId: string, purpose: string): void { if (!this.db) return; const now = Date.now(); this.db.run( `INSERT INTO channel_purpose (account_id, channel_id, purpose, updated_at) VALUES (?, ?, ?, ?) ON CONFLICT(account_id, channel_id) DO UPDATE SET purpose=?, updated_at=?`, [this.accountId, channelId, purpose, now, purpose, now] ); this.persist(); } removePurpose(channelId: string): void { if (!this.db) return; this.db.run( 'DELETE FROM channel_purpose WHERE account_id = ? AND channel_id = ?', [this.accountId, channelId] ); this.persist(); } /** * Clean up old dedup records (older than 24h) */ cleanupSeen(): void { if (!this.db) return; const cutoff = Date.now() - 86_400_000; this.db.run( 'DELETE FROM seen_messages WHERE account_id = ? AND seen_at < ?', [this.accountId, cutoff] ); this.persist(); } /** * Close the database connection */ close(): void { if (this.db) { this.persist(); // Final persist this.db.close(); this.db = null; } } }