import type { DocumentByName, GenericDatabaseReader, GenericDatabaseWriter, GenericDataModel, GenericMutationCtx, NamedTableInfo, QueryInitializer, TableNamesInDataModel, WithOptionalSystemFields, WithoutSystemFields, } from "convex/server"; import type { GenericId } from "convex/values"; /** * This function will be called when a document in the table changes. */ export type Trigger< Ctx, DataModel extends GenericDataModel, TableName extends TableNamesInDataModel, > = ( ctx: Ctx & { innerDb: GenericDatabaseWriter }, change: Change, ) => Promise; export type Change< DataModel extends GenericDataModel, TableName extends TableNamesInDataModel, > = { id: GenericId; } & ( | { operation: "insert"; oldDoc: null; newDoc: DocumentByName; } | { operation: "update"; oldDoc: DocumentByName; newDoc: DocumentByName; } | { operation: "delete"; oldDoc: DocumentByName; newDoc: null; } ); /** * Construct Triggers to register functions that run whenever a table changes. * Sample usage: * * ``` * import { mutation as rawMutation } from "./_generated/server"; * import { DataModel } from "./_generated/dataModel"; * import { Triggers } from "convex-helpers/server/triggers"; * import { customCtx, customMutation } from "convex-helpers/server/customFunctions"; * * const triggers = new Triggers(); * triggers.register("myTableName", async (ctx, change) => { * console.log("Table changed", change); * }); * * // Use `mutation` to define all mutations, and the triggers will get called. * export const mutation = customMutation(rawMutation, customCtx(triggers.wrapDB)); * ``` */ export class Triggers< DataModel extends GenericDataModel, Ctx extends { db: GenericDatabaseWriter; } = GenericMutationCtx, > { registered: { [TableName in TableNamesInDataModel]?: Trigger< Ctx, DataModel, TableName >[]; } = {}; register>( tableName: TableName, trigger: Trigger, ) { if (!this.registered[tableName]) { this.registered[tableName] = []; } this.registered[tableName]!.push(trigger); } wrapDB = (ctx: C): C => { return { ...ctx, db: writerWithTriggers(ctx, ctx.db, this) }; }; } class Lock { promise: Promise | null = null; resolve: (() => void) | null = null; async withLock(f: () => Promise): Promise { const unlock = await this._lock(); try { return await f(); } finally { unlock(); } } async _lock(): Promise<() => void> { while (this.promise !== null) { await this.promise; } [this.promise, this.resolve] = this._newLock(); return () => { this.promise = null; this.resolve?.(); }; } _newLock(): [Promise, () => void] { let resolve: () => void; const promise = new Promise((r) => { resolve = r; }); return [promise, () => resolve()]; } } /** * Locking semantics: * - Database writes to tables with triggers are serialized with * `innerWriteLock` so we can calculate the `change` object without * interference from parallel writes. * - When the application (not a trigger) calls `insert`, `patch`, or `replace`, * it will acquire the outer write lock and hold it while doing the write * operation and all subsequent triggers, including recursive triggers. * - This ensures atomicity in the simple case where a trigger doesn't call * other triggers recursively. * - Recursive triggers are queued up, so they are executed in the same order * as the database writes were. At a high level, this is a BFS traversal of * the trigger graph. * - Note when there are multiple triggers, they can't be executed atomically * with the writes that caused them, from the perspective of the other * triggers. So if one trigger is making sure denormalized data is * consistent, another trigger could see the data in an inconsistent state. * To avoid such problems, triggers should be resilient to such * inconsistencies or the trigger graph should be kept simple. */ const innerWriteLock = new Lock(); const outerWriteLock = new Lock(); const triggerQueue: (() => Promise)[] = []; /** @deprecated use writerWithTriggers instead */ export class DatabaseWriterWithTriggers< DataModel extends GenericDataModel, Ctx extends { db: GenericDatabaseWriter; } = GenericMutationCtx, > implements GenericDatabaseWriter { writer: GenericDatabaseWriter; constructor( ctx: Ctx, innerDb: GenericDatabaseWriter, triggers: Triggers, isWithinTrigger: boolean = false, ) { this.system = innerDb.system; this.writer = writerWithTriggers(ctx, innerDb, triggers, isWithinTrigger); } delete>( table: TableName, id: GenericId>, ): Promise; delete(id: GenericId>): Promise; delete(arg0: any, arg1?: any): Promise { return this.writer.delete(arg0, arg1); } get>( table: TableName, id: GenericId>, ): Promise | null>; get>( id: GenericId, ): Promise | null>; get(arg0: any, arg1?: any) { return this.writer.get(arg0, arg1); } insert>( table: TableName, value: WithoutSystemFields>, ): Promise> { return this.writer.insert(table, value); } patch>( table: TableName, id: GenericId>, value: PatchValue>, ): Promise; patch>( id: GenericId, value: PatchValue>, ): Promise; patch(arg0: any, arg1: any, arg2?: any): Promise { return this.writer.patch(arg0, arg1, arg2); } query>( tableName: TableName, ): QueryInitializer> { return this.writer.query(tableName); } normalizeId>( tableName: TableName, id: string, ): GenericId | null { return this.writer.normalizeId(tableName, id); } replace>( table: TableName, id: GenericId>, value: WithOptionalSystemFields>, ): Promise; replace>( id: GenericId, value: WithOptionalSystemFields>, ): Promise; replace(arg0: any, arg1: any, arg2?: any): Promise { return this.writer.replace(arg0, arg1, arg2); } system: GenericDatabaseWriter["system"]; } export function writerWithTriggers< DataModel extends GenericDataModel, Ctx extends { db: GenericDatabaseWriter; } = GenericMutationCtx, >( ctx: Ctx, innerDb: GenericDatabaseWriter, triggers: Triggers, isWithinTrigger: boolean = false, ): GenericDatabaseWriter { const patch: { >( table: TableName, id: GenericId>, value: PatchValue>, ): Promise; >( id: GenericId, value: PatchValue>, ): Promise; } = async (arg0: any, arg1: any, arg2?: any) => { const [tableName, id, value] = arg2 !== undefined ? [arg0, arg1, arg2] : [_tableNameFromId(innerDb, triggers.registered, arg0), arg0, arg1]; return await _patch(tableName, id, value); }; async function _patch>( tableName: TableName | null, id: GenericId, value: Partial>, ): Promise { if (!tableName) { // eslint-disable-next-line @convex-dev/explicit-table-ids -- tableName not available here return await innerDb.patch(id, value); } return await _execThenTrigger( ctx, innerDb, triggers, tableName, isWithinTrigger, async () => { const oldDoc = (await innerDb.get(tableName, id))!; await innerDb.patch(tableName, id, value); const newDoc = (await innerDb.get(tableName, id))!; return [undefined, { operation: "update", id, oldDoc, newDoc }]; }, ); } const replace: { >( table: TableName, id: GenericId>, value: WithOptionalSystemFields>, ): Promise; >( id: GenericId, value: WithOptionalSystemFields>, ): Promise; } = async (arg0: any, arg1: any, arg2?: any) => { const [tableName, id, value] = arg2 !== undefined ? [arg0, arg1, arg2] : [_tableNameFromId(innerDb, triggers.registered, arg0), arg0, arg1]; return await _replace(tableName, id, value); }; async function _replace>( tableName: TableName | null, id: GenericId, value: WithOptionalSystemFields>, ): Promise { if (!tableName) { // eslint-disable-next-line @convex-dev/explicit-table-ids -- tableName not available here return await innerDb.replace(id, value); } return await _execThenTrigger( ctx, innerDb, triggers, tableName, isWithinTrigger, async () => { const oldDoc = (await innerDb.get(tableName, id))!; await innerDb.replace(tableName, id, value); const newDoc = (await innerDb.get(tableName, id))!; return [undefined, { operation: "update", id, oldDoc, newDoc }]; }, ); } const delete_: { >( table: TableName, id: GenericId>, ): Promise; (id: GenericId>): Promise; } = async (arg0: any, arg1?: any) => { const [tableName, id] = arg1 !== undefined ? [arg0, arg1] : [_tableNameFromId(innerDb, triggers.registered, arg0), arg0]; return await _delete(tableName, id); }; async function _delete>( tableName: TableName | null, id: GenericId>>, ): Promise { if (!tableName) { // eslint-disable-next-line @convex-dev/explicit-table-ids -- tableName not available here– return await innerDb.delete(id); } return await _execThenTrigger( ctx, innerDb, triggers, tableName, isWithinTrigger, async () => { const oldDoc = (await innerDb.get(tableName, id))!; await innerDb.delete(tableName, id); return [undefined, { operation: "delete", id, oldDoc, newDoc: null }]; }, ); } return { insert: async >( table: TableName, value: WithoutSystemFields>, ): Promise> => { if (!triggers.registered[table]) { return await innerDb.insert(table, value); } return await _execThenTrigger( ctx, innerDb, triggers, table, isWithinTrigger, async () => { const id = await innerDb.insert(table, value); const newDoc = (await innerDb.get(table, id))!; return [id, { operation: "insert", id, oldDoc: null, newDoc }]; }, ); }, patch, replace, delete: delete_, system: innerDb.system, get: innerDb.get.bind(innerDb), query: innerDb.query.bind(innerDb), normalizeId: innerDb.normalizeId.bind(innerDb), }; } // Helper methods. function _tableNameFromId< DataModel extends GenericDataModel, TableName extends TableNamesInDataModel, Ctx extends { db: GenericDatabaseWriter; } = GenericMutationCtx, >( db: GenericDatabaseReader, registered: Triggers["registered"], id: GenericId, ): TableName | null { for (const tableName of Object.keys(registered)) { if (db.normalizeId(tableName as TableNamesInDataModel, id)) { return tableName as TableName; } } return null; } async function _queueTriggers< DataModel extends GenericDataModel, R, TableName extends TableNamesInDataModel, Ctx extends { db: GenericDatabaseWriter; } = GenericMutationCtx, >( ctx: Ctx, innerDb: GenericDatabaseWriter, triggers: Triggers, tableName: TableName, f: () => Promise<[R, Change]>, ): Promise { return await innerWriteLock.withLock(async () => { const [result, change] = await f(); const recursiveCtx = { ...ctx, db: writerWithTriggers(ctx, innerDb, triggers, true), innerDb: innerDb, }; for (const trigger of triggers.registered[tableName] ?? []) { triggerQueue.push(async () => { await trigger(recursiveCtx, change); }); } return result; }); } async function _execThenTrigger< DataModel extends GenericDataModel, R, TableName extends TableNamesInDataModel, Ctx extends { db: GenericDatabaseWriter; } = GenericMutationCtx, >( ctx: Ctx, innerDb: GenericDatabaseWriter, triggers: Triggers, tableName: TableName, isWithinTrigger: boolean, f: () => Promise<[R, Change]>, ): Promise { if (isWithinTrigger) { return await _queueTriggers(ctx, innerDb, triggers, tableName, f); } return await outerWriteLock.withLock(async () => { const result = await _queueTriggers(ctx, innerDb, triggers, tableName, f); let e: unknown | null = null; while (triggerQueue.length > 0) { const trigger = triggerQueue.shift()!; try { await trigger(); } catch (err) { if (e) { console.error(err); } else { e = err; } } } if (e !== null) { throw e; } return result; }); } /** * This prevents TypeScript from inferring that the generic `TableName` type is * a union type when `table` and `id` disagree. */ type NonUnion = T extends never // `never` is the bottom type for TypeScript unions ? never : T; /** * This is like Partial, but it also allows undefined to be passed to optional * fields when `exactOptionalPropertyTypes` is enabled in the tsconfig. */ type PatchValue = { [P in keyof T]?: undefined extends T[P] ? T[P] | undefined : T[P]; };