/** * @license * Copyright 2025 Steven Roussey * SPDX-License-Identifier: Apache-2.0 */ import { EventParameters } from "@workglow/util"; import { DataPortSchemaObject, FromSchema, TypedArraySchemaOptions } from "@workglow/util/schema"; import type { PageCursor } from "./Cursor"; export type { PageCursor } from "./Cursor"; export type ValueOptionType = string | number | bigint | boolean | null | Uint8Array; export type TabularEventListeners = { put: (entity: Entity) => void; get: (key: PrimaryKey, entity: Entity | undefined) => void; getBulk: (keys: readonly PrimaryKey[], entities: readonly Entity[]) => void; query: (key: Partial, entities: Entity[] | undefined) => void; /** * `key` identifies what was deleted, as a {@link Partial} of the entity: the * primary key for a single delete, or the (owner-bearing) criteria / matched * row for a bulk `deleteSearch`. It carries the scope columns so subscribers * can filter and caches can invalidate without the backend reading the row. */ delete: (key: Partial) => void; clearall: () => void; /** * Emitted when an atomic batch op (e.g. vector `putBulk`) detects a mid-batch * failure and restores prior state. Subscribers should treat any uncommitted * `put` events from the failed batch as superseded and reconcile against the * post-rollback state. * * `ids` carries the primary keys of rows that were observably committed (via * a per-row `put` event or backend-level write) before the failure, in the * order they were written. Subscribers can use the list to surgically * invalidate caches without re-reading the whole table. The list is empty * when no row reached the backend / emitted a `put` event before the throw * (e.g. an IndexedDB transaction that aborted before any `tx.oncomplete`). */ rollback: (reason: { readonly op: string; readonly error: unknown; readonly ids: readonly PrimaryKey[]; }) => void; }; export type TabularEventName = keyof TabularEventListeners; export type TabularEventListener = TabularEventListeners[Event]; export type TabularEventParameters = EventParameters, Event>; export type TabularChangeType = "INSERT" | "UPDATE" | "DELETE"; export interface TabularChangePayload { readonly type: TabularChangeType; readonly old?: Entity; readonly new?: Entity; } export interface TabularSubscribeOptions { /** Polling interval in milliseconds (used by implementations that rely on polling) */ readonly pollingIntervalMs?: number; } export type JSONValue = string | number | boolean | null | JSONValue[] | { [key: string]: JSONValue; }; export type SearchOperator = "=" | "<" | "<=" | ">" | ">="; /** * Closed allow-list of SQL comparison operators that can be interpolated * into a WHERE clause. This is the single source of truth: if * {@link SearchOperator} changes, this constant and {@link SEARCH_OPERATOR_SET} * must update in lockstep so SQL builders cannot accidentally accept a value * outside the union (defense in depth at the JSON trust boundary used by * HTTP-proxied storage backends). */ export declare const ALLOWED_SEARCH_OPERATORS: readonly ["=", "<", "<=", ">", ">="]; /** Set form of {@link ALLOWED_SEARCH_OPERATORS} for O(1) membership checks. */ export declare const SEARCH_OPERATOR_SET: ReadonlySet; export interface SearchCondition { readonly value: T; readonly operator: SearchOperator; } /** * Criteria for deleteSearch operations supporting multiple columns. * Each column can have either a direct value (equality) or a SearchCondition with an operator. * * @example * // Equality match * { category: "electronics" } * * // With operator * { createdAt: { value: date, operator: "<" } } * * // Multiple columns * { category: "electronics", createdAt: { value: date, operator: "<" } } */ export type DeleteSearchCriteria = { readonly [K in keyof Entity]?: Entity[K] | SearchCondition; }; export type SearchCriteria = DeleteSearchCriteria; export type SortDirection = "ASC" | "DESC"; export interface OrderBy { readonly column: keyof Entity; readonly direction: SortDirection; } export interface QueryOptions { readonly orderBy?: ReadonlyArray>; readonly limit?: number; readonly offset?: number; } export interface CoveringIndexQueryOptions { readonly select: readonly K[]; readonly orderBy?: ReadonlyArray>; readonly limit?: number; readonly offset?: number; } /** * Request for a cursor-paginated read. * * Pagination is keyset-based: the next page resumes after the row encoded * in `cursor`, with the primary key acting as the stable tiebreaker. * This is stable under concurrent inserts and deletes — unlike offset-based * paging, which can skip or duplicate rows when the underlying data * shifts between calls. * * If `orderBy` is omitted, rows are returned in primary-key order ascending. * If `orderBy` is provided, the effective ordering is `[...orderBy, ...primaryKey]` * so iteration remains deterministic when sort columns contain duplicates. */ export interface PageRequest { readonly orderBy?: ReadonlyArray>; /** Maximum number of rows to return. Defaults to 100. */ readonly limit?: number; /** Opaque cursor returned by a previous call; omit to start from the beginning. */ readonly cursor?: PageCursor; } /** * A page of results from a cursor-paginated read. * * `nextCursor` is `undefined` when there are no more rows to fetch. * When `nextCursor` is present, callers should pass it back via * {@link PageRequest.cursor} to fetch the next page. * * **Termination contract.** A defined `nextCursor` does NOT guarantee * additional rows exist — concurrent deletes can produce an empty page * mid-iteration even though `nextCursor` was set. Loops MUST therefore * terminate on either condition, not just on `nextCursor`: * * ```ts * // CORRECT — terminates on both `nextCursor` and empty `items`: * let cursor: PageCursor | undefined; * do { * const page = await storage.getPage({ limit: 100, cursor }); * for (const row of page.items) handle(row); * if (page.items.length === 0) break; * cursor = page.nextCursor; * } while (cursor); * * // WRONG — can spin forever if a concurrent delete empties the next page * // while leaving rows further along the cursor that get deleted in turn: * while (page.nextCursor) { page = await storage.getPage({ cursor: page.nextCursor }); } * ``` * * The bundled async generators ({@link ITabularStorage.records}, * {@link ITabularStorage.pages}) honour this contract; reach for them * instead of writing the loop manually. */ export interface Page { readonly items: ReadonlyArray; readonly nextCursor: PageCursor | undefined; } /** * Type guard to check if a value is a SearchCondition. * * Verifies the operator is a member of {@link ALLOWED_SEARCH_OPERATORS} so a * forged criterion (e.g. one decoded from JSON at an HTTP boundary) cannot * smuggle an arbitrary string into SQL via {@link buildSearchWhere}. */ export declare function isSearchCondition(value: unknown): value is SearchCondition; /** * Helper type to compute PrimaryKey while deferring Entity resolution. Uses a * conditional type to avoid forcing full Entity resolution at class definition. */ export type SimplifyPrimaryKey> = Entity extends any ? Pick> : never; /** * Extracts property names marked as auto-generated from the schema. * Properties with `x-auto-generated: true` are considered auto-generated. */ export type AutoGeneratedKeys = { [K in keyof Schema["properties"]]: Schema["properties"][K] extends { "x-auto-generated": true; } ? K : never; }[keyof Schema["properties"]]; /** Entity type for insertion — properties marked auto-generated become optional. */ export type InsertEntity = Omit & Partial>; /** * Interface defining the contract for tabular storage repositories. * Provides a flexible interface for storing and retrieving data with typed * primary keys and values, and supports compound keys and partial key lookup. * * @typeParam Schema - The schema definition for the entity using JSON Schema * @typeParam PrimaryKeyNames - Array of property names that form the primary key */ export interface ITabularStorage, Entity = FromSchema, PrimaryKey = SimplifyPrimaryKey, InsertType = InsertEntity>> { put(value: InsertType): Promise; /** * Stores multiple entities in a single bulk operation. * * **Ordering guarantee:** the returned array is in the same order as the * input — `result[i]` always corresponds to `values[i]`. Callers may rely on * this to align bulk inserts with parallel arrays (e.g. chunks paired with * embeddings). Backends are responsible for preserving the order even when * the underlying engine does not formally guarantee it (see each backend's * implementation). * * **Caveat for integer auto-generated keys on remote backends.** Supplying * inputs that omit a backend-assigned integer-autoincrement primary key * leaves the wrapper with no key to match a returned row to a request row * (UUIDs are filled in client-side, so they don't have this problem). Such * inputs fall back to the server's response order, which Postgres does not * formally contract for `INSERT ... RETURNING`. The fallback is reliable in * practice but if `result[i] === values[i]` matters for correctness, supply * the primary key on every input — for example by minting it client-side * — or split the call into per-row `put`s. */ putBulk(values: InsertType[]): Promise; get(key: PrimaryKey): Promise; /** * Fetches multiple entities by their primary keys in a single call. * * Returns only the entities that were found — the result is a filtered * array, not aligned with the input. Each returned entity carries its own * primary-key fields, so callers can re-align by key without a parallel * array. Result ordering is unspecified. * * Empty input returns an empty array without issuing a backend call. * * @param keys - Array of primary keys to look up * @returns Array of matching entities (possibly empty) */ getBulk(keys: readonly PrimaryKey[]): Promise; delete(key: PrimaryKey | Entity): Promise; getAll(options?: QueryOptions): Promise; deleteAll(): Promise; size(): Promise; /** Counts rows matching `criteria` without loading the matching entities. */ count(criteria?: SearchCriteria): Promise; /** * Deletes all entries matching the specified search criteria. * Supports multiple columns with optional comparison operators. * * @param criteria - Object with column names as keys and values or SearchConditions * @example * // Delete by equality * await repo.deleteSearch({ category: "electronics" }); * * // Delete with operator * await repo.deleteSearch({ createdAt: { value: date, operator: "<" } }); * * // Delete with multiple criteria (AND) * await repo.deleteSearch({ category: "electronics", value: { value: 100, operator: "<" } }); */ deleteSearch(criteria: DeleteSearchCriteria): Promise; /** Offset-based paging; prefer {@link getPage} for stable iteration. */ getOffsetPage(offset: number, limit: number): Promise; /** * Fetches a page of records using cursor-based (keyset) pagination. * * Stable under concurrent inserts and deletes: the cursor encodes the * last seen primary key so the next page resumes from a precise position * rather than a numeric offset that shifts as rows are added or removed. * * @param request - Optional ordering, limit, and cursor. * @returns A {@link Page} with the rows for this page and a `nextCursor` * to use for the next call (or `undefined` when iteration is complete). */ getPage(request?: PageRequest): Promise>; /** * Cursor-paginated form of {@link query}. * * @param criteria - Object with column names as keys and values or SearchConditions * @param request - Optional ordering, limit, and cursor. */ queryPage(criteria: SearchCriteria, request?: PageRequest): Promise>; /** Async generator yielding records one at a time. */ records(pageSize?: number): AsyncGenerator; /** Async generator yielding pages of records. */ pages(pageSize?: number): AsyncGenerator; on(name: Event, fn: TabularEventListener): void; off(name: Event, fn: TabularEventListener): void; emit(name: Event, ...args: TabularEventParameters): void; once(name: Event, fn: TabularEventListener): void; waitOn(name: Event): Promise>; /** * Queries entries matching the specified search criteria with optional ordering, limit, and offset. * Uses optimized index paths when possible, falls back to full scan otherwise. * * Implementation contract for third-party backends: when binding a * `SearchCondition` value into the underlying datastore, run it * through the same conversion path as a row value going *into* the * store (e.g. `jsToSqlValue` for SQL backends — Date → ISO string, * etc.). The cursor pagination machinery in {@link getPage} relies * on this round-trip to compare a row's stored representation * against a cursor's decoded value; any backend that skips the * conversion would silently mis-page on Date or other rich types. * * @param criteria - Object with column names as keys and values or SearchConditions * @param options - Optional ordering, limit, and offset options * @returns Array of matching entities or undefined if no matches found */ query(criteria: SearchCriteria, options?: QueryOptions): Promise; /** * Strict, projected query served entirely by a covering compound index. * Throws CoveringIndexMissingError when no registered index can serve * (criteria + orderBy + select). Returns Pick[] — never the heavy fields. * * @param criteria - equality (and optionally non-equality) filters * @param options - select (required), orderBy, limit, offset * @returns array of projected rows (empty array, not undefined, when no matches) */ queryIndex(criteria: SearchCriteria, options: CoveringIndexQueryOptions): Promise[]>; /** * Subscribes to changes in the repository (including remote changes). * @returns Unsubscribe function. */ subscribeToChanges(callback: (change: TabularChangePayload) => void, options?: TabularSubscribeOptions): () => void; /** * Runs `fn` inside a single transaction. If `fn` throws, all writes performed * inside it are rolled back; otherwise they commit atomically. Mutation * events (e.g. `put`) emitted inside `fn` are buffered and delivered after * the transaction commits, so listeners never observe rows that are about * to roll back. * * Backends differ in how strong the guarantee is: * - **SQLite**: real `BEGIN` / `COMMIT` / `ROLLBACK`. * - **PostgreSQL**: real `BEGIN` / `COMMIT` / `ROLLBACK`. On a real * `pg.Pool` (anything exposing `connect()`) the implementation * dedicates a client via `pool.connect()` and runs the transaction on * that client, leaving the parent's pool free for external traffic * in parallel. On single-connection wrappers (PGLitePool, raw PGlite) * the transaction runs on the shared session and concurrent calls on * the same instance are serialized behind a per-instance mutex so * they cannot slip into the open transaction. * - **Supabase, in-memory, file system, IndexedDB**: best-effort. The * callback runs to completion and rejection propagates, but partial * writes are not rolled back because the backend does not expose a * transaction surface usable by this API. * * **Concurrency contract:** * - On backends with native transaction support (SQLite, PostgreSQL), * concurrent calls on the same storage instance are isolated from the * open transaction: SQLite and the single-connection Postgres path * serialize them through a per-instance mutex; the real-pool Postgres * path runs them on independent pool clients in parallel. Either way, * unrelated writes never accidentally commit or roll back along with * `fn`. * - On best-effort backends concurrent writes have no atomicity barrier * to begin with — the contract on those backends is "runs `fn`", not * "isolates `fn`". * * The `tx` handle passed to `fn` is **not** the same object as `this` for * backends with native transaction support — it is a Proxy that routes * writes through the transaction-bound resources (the dedicated client on * real `pg.Pool`, the bypass-mutex internal methods on SQLite/PGlite) and * routes events through the transaction's deferred-emit queue. Callers * MUST use `tx` for everything inside `fn`. Capturing the outer `this` and * calling methods on it from inside `fn` will deadlock against the held * mutex (single-connection backends) or run on the wrong connection * (`pg.Pool`), and is unsupported. * * **Nested calls.** Calls made through the `tx` handle always throw — * `tx.withTransaction(...)` is a hard error on every backend. Calls made * through the *original* (captured `this`) handle behave per backend: * - **SQLite, single-connection Postgres** (PGlite, PGLitePool): throw, * because the backend has no autonomous `BEGIN` and reusing the open * transaction implicitly would be ambiguous. * - **Real `pg.Pool` Postgres**: acquire an *independent* client and run * as an *independent* transaction with its own commit/rollback boundary. * This is the natural Postgres concurrency model on a pool — nothing * ties the two transactions together. If you want the inner work to * roll back when the outer throws, do not use a captured `this`; use * `tx` (which throws) and a SAVEPOINT instead. * * Use SAVEPOINT directly if you need nested rollback boundaries within a * single logical transaction. */ withTransaction(fn: (tx: this) => Promise): Promise; /** * Creates the underlying table/object store. Idempotent: a second call on * an already-set-up storage adapts the schema to any new indexes if the * backend supports it (SQL `CREATE INDEX IF NOT EXISTS`, IndexedDB * version bump for new indexes), and is a no-op otherwise. * * When the storage was constructed with `tabularMigrations`, this method * also applies any pending migrations through the unified tabular * migration runner (see `TabularMigrationOrchestrator`). Otherwise it is * a pure DDL setup primitive — tabular schemas are derived from the JSON * Schema passed at construction rather than from versioned migrations. * * @returns Promise that resolves when setup is complete */ setupDatabase(): Promise; /** * Whether rows written to this storage survive a process restart. Optional: * backends that omit it are assumed durable. In-memory backends return * `false` so callers (e.g. the run-private cache durability check) can warn * when restart-survival won't actually hold. */ isDurable?(): boolean; destroy(): void; [Symbol.dispose](): void; [Symbol.asyncDispose](): Promise; } export type AnyTabularStorage = Omit, "queryIndex" | "withTransaction"> & { queryIndex(criteria: any, options: any): Promise; withTransaction(fn: (tx: any) => Promise): Promise; }; //# sourceMappingURL=ITabularStorage.d.ts.map