/** * @license * Copyright 2022-2026 Matter.js Authors * SPDX-License-Identifier: Apache-2.0 */ import { Logger } from "#log/Logger.js"; import type { Duration } from "#time/Duration.js"; import { Hours } from "#time/TimeUnit.js"; import { Abort } from "#util/Abort.js"; import { Gzip } from "#util/Gzip.js"; import { BasicMultiplex } from "#util/Multiplex.js"; import type { Directory } from "../../fs/Directory.js"; import type { DataNamespace } from "../DataNamespace.js"; import { type CloneableStorage, FilesystemStorageDriver, StorageDriver, StorageError } from "../StorageDriver.js"; import type { SupportedStorageTypes } from "../StringifyTools.js"; import { WalCleaner } from "./WalCleaner.js"; import { type StoreData, type WalCommit, type WalCommitId, compareCommitIds, compressedSegmentFilename, parseSegmentFilename, segmentFilename, } from "./WalCommit.js"; import { WalReader } from "./WalReader.js"; import { WalSnapshot } from "./WalSnapshot.js"; import { WalTransaction, applyCommit } from "./WalTransaction.js"; import { WalWriter } from "./WalWriter.js"; const logger = Logger.get("WalStorageDriver"); /** * Transactional storage backend using a write-ahead log (WAL). * * Data is loaded from the snapshot + WAL on first read and cached. Writes update the cache incrementally so * subsequent reads avoid a full reload. */ export class WalStorageDriver extends FilesystemStorageDriver implements CloneableStorage { static readonly id = "wal"; static async create(namespace: DataNamespace, descriptor: WalStorageDriver.Descriptor) { const storage = new WalStorageDriver(namespace, { maxSegmentSize: descriptor.maxSegmentSize, fsync: descriptor.fsync, compressSnapshot: descriptor.compressSnapshot, compressLog: descriptor.compressLog, headSnapshot: descriptor.headSnapshot, }); try { await storage.initialize(); } catch (error) { await storage.close().catch(() => {}); throw error; } return storage; } readonly #storageDir: Directory; readonly #options: WalStorageDriver.Options; #cache?: StoreData; #cacheLoading?: Promise; #pendingOps?: WalCommit[]; #abort = new Abort(); #workers = new BasicMultiplex(); #initialized = false; #lastCommitId?: WalCommitId; #lastCommitTs?: number; #lastSnapshotCommitId?: WalCommitId; #writer?: WalWriter; #reader?: WalReader; #cleaner?: WalCleaner; #compressSnapshot = true; constructor(namespace?: DataNamespace, options?: WalStorageDriver.Options) { super(namespace); this.#storageDir = options?.storageDir ?? this.root!.directory; this.#options = options ?? {}; } get initialized() { return this.#initialized; } override async initialize(): Promise { if (this.#initialized) { throw new StorageError("WalStorageDriver already initialized"); } await super.initialize(); this.#abort = new Abort(); this.#workers = new BasicMultiplex(); this.#cache = undefined; this.#pendingOps = undefined; this.#lastCommitId = undefined; this.#lastCommitTs = undefined; this.#lastSnapshotCommitId = undefined; await this.#storageDir.mkdir(); const walDir = this.#storageDir.directory("wal"); await walDir.mkdir(); const compressLog = this.#options.compressLog ?? WalStorageDriver.defaults.compressLog; this.#reader = new WalReader(walDir); this.#writer = new WalWriter(walDir, { name: this.root?.namespace, maxSegmentSize: this.#options.maxSegmentSize, fsync: this.#options.fsync, onRotate: compressLog ? () => { this.#workers.add(this.#compressRotatedSegments(), "wal-compress"); } : undefined, }); this.#compressSnapshot = this.#options.compressSnapshot ?? WalStorageDriver.defaults.compressSnapshot; const headSnapshot = this.#options.headSnapshot ?? WalStorageDriver.defaults.headSnapshot; this.#cleaner = new WalCleaner( walDir, headSnapshot ? { dir: this.#storageDir, compress: this.#compressSnapshot, reader: this.#reader, } : undefined, ); // Start background workers const snapshotInterval = this.#options.snapshotInterval ?? WalStorageDriver.defaults.snapshotInterval; this.#workers.add( this.#runWorker("wal-snapshot", snapshotInterval, () => this.#runSnapshot()), "wal-snapshot", ); const cleanInterval = this.#options.cleanInterval ?? WalStorageDriver.defaults.cleanInterval; if (cleanInterval !== undefined) { this.#workers.add( this.#runWorker("wal-clean", cleanInterval, () => this.#runClean()), "wal-clean", ); } this.#initialized = true; } override async close(): Promise { this.#abort(); await this.#workers; // Final snapshot if (this.#lastCommitId) { await this.#runSnapshot(); await this.#runClean(); } await this.#writer?.close(); this.#cache = undefined; this.#initialized = false; await super.close(); } async clone(): Promise { this.#assertInitialized(); // Flush current state to snapshot so the copy is consistent await this.#runSnapshot(); // Copy the entire storage directory to a temp location const tempDir = this.#storageDir.fs.tempDirectory(); await this.#storageDir.fs.copy(this.#storageDir, tempDir); // Return a new WalStorageDriver backed by the copy (no root lock for the clone) const clone = new WalStorageDriver(undefined, { ...this.#options, storageDir: tempDir }); await clone.initialize(); return clone; } // --- Read operations (loaded from cache) --- async get(contexts: string[], key: string): Promise { this.#assertInitialized(); const store = await this.#loadCache(); const contextKey = this.#contextKey(contexts); return store[contextKey]?.[key]; } async keys(contexts: string[]): Promise { this.#assertInitialized(); const store = await this.#loadCache(); const contextKey = this.#contextKey(contexts); return Object.keys(store[contextKey] ?? {}); } async values(contexts: string[]): Promise> { this.#assertInitialized(); const store = await this.#loadCache(); const contextKey = this.#contextKey(contexts); return { ...(store[contextKey] ?? {}) }; } async contexts(contexts: string[]): Promise { this.#assertInitialized(); const store = await this.#loadCache(); const contextKey = contexts.length ? contexts.join(".") : ""; const prefix = contextKey.length ? `${contextKey}.` : ""; const found = new Set(); for (const key of Object.keys(store)) { if (key.startsWith(prefix)) { const sub = key.substring(prefix.length).split("."); if (sub.length >= 1 && sub[0].length > 0) { found.add(sub[0]); } } } return [...found]; } // --- Write operations (delegate through transactions) --- async set(contexts: string[], values: Record): Promise; async set(contexts: string[], key: string, value: SupportedStorageTypes): Promise; async set( contexts: string[], keyOrValues: string | Record, value?: SupportedStorageTypes, ): Promise { this.#assertInitialized(); await using tx = await this.begin(); if (typeof keyOrValues === "string") { await tx.set(contexts, keyOrValues, value!); } else { await tx.set(contexts, keyOrValues); } await tx.commit(); } async delete(contexts: string[], key: string): Promise { this.#assertInitialized(); await using tx = await this.begin(); await tx.delete(contexts, key); await tx.commit(); } async clearAll(contexts: string[]): Promise { this.#assertInitialized(); await using tx = await this.begin(); await tx.clearAll(contexts); await tx.commit(); } // --- Transactions --- override async begin(): Promise { this.#assertInitialized(); return new WalTransaction(this, this.#writer!, (id, ts, ops) => { this.#lastCommitId = id; this.#lastCommitTs = ts; if (this.#cache !== undefined) { applyCommit(this.#cache, { ts, ops }); } else if (this.#cacheLoading !== undefined || this.#pendingOps !== undefined) { // Load in-flight — buffer so #doLoadCache can reconcile after replay (this.#pendingOps ??= []).push({ ts, ops }); } this.#cacheLoading = undefined; }); } // --- Snapshot reconstruction --- /** * Reconstruct a snapshot as of the given timestamp. * * If `asOf` is omitted, returns the full current state. */ async snapshotAtTime(asOf?: number): Promise { this.#assertInitialized(); if (asOf !== undefined && asOf > Date.now()) { throw new StorageError("Timestamp is in the future"); } return this.#reconstruct( asOf !== undefined ? (_id, commitTs) => commitTs > asOf : undefined, asOf !== undefined ? baseTs => { if (baseTs > 0 && asOf < baseTs) { throw new StorageError("Timestamp predates available logs"); } } : undefined, ); } /** * Reconstruct a snapshot at the given commit ID. * * If `commitId` is omitted, returns the full current state. */ async snapshotAtCommit(commitId?: WalCommitId): Promise { this.#assertInitialized(); return this.#reconstruct( commitId !== undefined ? (id, _commitTs) => compareCommitIds(id, commitId) > 0 : undefined, ); } // --- Internal --- async #loadCache(): Promise { if (this.#cache) { return this.#cache; } if (this.#cacheLoading) { return this.#cacheLoading; } this.#cacheLoading = this.#doLoadCache(); try { return await this.#cacheLoading; } finally { this.#cacheLoading = undefined; } } async #doLoadCache(): Promise { const store: StoreData = {}; let afterCommitId: WalCommitId | undefined; // Load snapshot const snap = await WalSnapshot.load(this.#storageDir); if (snap) { Object.assign(store, snap.data); afterCommitId = snap.commitId; this.#lastSnapshotCommitId = snap.commitId; this.#lastCommitTs = snap.ts || undefined; logger.debug("Loaded snapshot at commit", snap.commitId); } // Replay WAL from after the snapshot (or from the beginning) let replayCount = 0; for await (const { id, commit } of this.#reader!.read(afterCommitId)) { applyCommit(store, commit); this.#lastCommitId = id; this.#lastCommitTs = commit.ts || this.#lastCommitTs; replayCount++; } if (replayCount > 0) { logger.debug(`Replayed ${replayCount} WAL commits`); } else if (afterCommitId) { this.#lastCommitId = afterCommitId; } // Reconcile commits that arrived during the load (idempotent if already replayed) const pending = this.#pendingOps; if (pending !== undefined) { this.#pendingOps = undefined; for (const commit of pending) { applyCommit(store, commit); } } this.#cache = store; return store; } /** * Shared reconstruction logic for snapshotAtTime/snapshotAtCommit. * * @param shouldStop - if provided, called for each commit; return true to stop before applying that commit * @param validateBase - if provided, called with the base snapshot ts for pre-condition checks */ async #reconstruct( shouldStop?: (id: WalCommitId, commitTs: number) => boolean, validateBase?: (baseTs: number) => void, ): Promise { const store: StoreData = {}; let afterCommitId: WalCommitId | undefined; let lastId: WalCommitId | undefined; let lastTs = 0; // Load base snapshot const snap = await WalSnapshot.load(this.#storageDir); if (snap) { Object.assign(store, snap.data); afterCommitId = snap.commitId; lastId = snap.commitId; lastTs = snap.ts; } validateBase?.(lastTs); // Replay WAL commits for await (const { id, commit } of this.#reader!.read(afterCommitId)) { if (shouldStop?.(id, commit.ts)) { break; } applyCommit(store, commit); lastId = id; lastTs = commit.ts || lastTs; } if (!lastId) { throw new StorageError("No data available"); } return new WalSnapshot(lastId, lastTs, store); } async #runWorker(name: string, interval: Duration, task: () => Promise): Promise { while (!this.#abort.aborted) { await this.#abort.sleep(name, interval); if (this.#abort.aborted) return; try { await task(); } catch (e) { logger.warn(`${name} worker error:`, e); } } } async #runSnapshot(): Promise { if (!this.#lastCommitId) return; try { const store = await this.#loadCache(); const snapshot = new WalSnapshot(this.#lastCommitId, this.#lastCommitTs ?? 0, store); await snapshot.save(this.#storageDir, { compress: this.#compressSnapshot }); this.#lastSnapshotCommitId = this.#lastCommitId; } catch (e) { logger.warn("Snapshot failed:", e); } } async #runClean(): Promise { if (!this.#lastSnapshotCommitId) return; try { await this.#cleaner!.run(this.#lastSnapshotCommitId); } catch (e) { logger.warn("WAL cleanup failed:", e); } } async #compressRotatedSegments(): Promise { const walDir = this.#storageDir.directory("wal"); if (!(await walDir.exists())) return; const activeSegment = this.#writer!.currentSegment; for await (const entry of walDir.entries()) { if (entry.kind !== "file") continue; if (!entry.name.endsWith(".jsonl")) continue; const seg = parseSegmentFilename(entry.name); if (seg === undefined || seg === activeSegment) continue; try { const gzName = compressedSegmentFilename(seg); const gzFile = walDir.file(gzName); if (await gzFile.exists()) { // Compressed file already exists — just remove the original await entry.delete(); continue; } const srcFile = walDir.file(segmentFilename(seg)); const tmpName = gzName + ".tmp"; const tmpFile = walDir.file(tmpName); await tmpFile.write(Gzip.compress(srcFile.readBytes())); await tmpFile.rename(gzName); await srcFile.delete(); } catch (e) { logger.warn(`Failed to compress WAL segment ${seg}:`, e); } } } #contextKey(contexts: string[]): string { for (const ctx of contexts) { if (!ctx.length || ctx.includes(".")) { throw new StorageError("Context must not contain empty segments or leading or trailing dots."); } } return contexts.join("."); } #assertInitialized(): void { if (!this.#initialized) { throw new StorageError("WalStorageDriver is not initialized"); } } } export namespace WalStorageDriver { export interface Descriptor extends StorageDriver.Descriptor { /** * Maximum WAL segment size in bytes. */ maxSegmentSize?: number; /** * Whether to fsync after each WAL write. */ fsync?: boolean; /** * Whether to gzip-compress snapshots. Defaults to true if the runtime supports gzip. */ compressSnapshot?: boolean; /** * Whether to gzip-compress rotated WAL segments. Defaults to true if the runtime supports gzip. */ compressLog?: boolean; /** * Whether to capture a head snapshot at the truncation boundary when cleaning old WAL segments. */ headSnapshot?: boolean; } export interface Options { /** * Override the storage directory. When omitted the directory is derived from the root passed to the * constructor. Used by {@link WalStorageDriver.clone} to point at a temporary copy. */ storageDir?: Directory; /** * Maximum WAL segment size in bytes before the writer rotates to a new file. */ maxSegmentSize?: number; /** * Whether to fsync after each WAL write for durability against unexpected process termination. */ fsync?: boolean; /** * Interval between periodic snapshots that consolidate WAL state into a single file. */ snapshotInterval?: Duration; /** * Interval between periodic WAL cleanup that removes segments already captured in a snapshot. Undefined * disables cleanup. */ cleanInterval?: Duration; /** * Whether to gzip-compress snapshots. Defaults to true if the runtime supports gzip. */ compressSnapshot?: boolean; /** * Whether to gzip-compress rotated WAL segments. Defaults to true if the runtime supports gzip. */ compressLog?: boolean; /** * Whether to capture a head snapshot at the truncation boundary when cleaning old WAL segments. */ headSnapshot?: boolean; } export const defaults = { maxSegmentSize: 16 * 1024 * 1024, fsync: true, snapshotInterval: Hours(6), cleanInterval: undefined as Duration | undefined, compressSnapshot: Gzip.isAvailable, compressLog: Gzip.isAvailable, headSnapshot: true, } as const satisfies { [K in keyof Required>]: Omit[K] }; }