import * as crypto from "crypto"; import { asName, pickKeys, tryCatchV2 } from "prostgles-types"; import type { PRGLIOSocket } from "../DboBuilder/DboBuilderTypes"; import type { TableHandler } from "../DboBuilder/TableHandler/TableHandler"; import type { ViewSubscriptionOptions } from "./PubSubManager"; import { type PubSubManager } from "./PubSubManager"; import { EXCLUDE_QUERY_FROM_SCHEMA_WATCH_ID } from "./PubSubManagerUtils"; import { udtNamesWithoutEqualityComparison } from "./init/getDataWatchFunctionQuery"; export type AddTriggerParams = { table_name: string; condition: string; tracked_columns: [string, ...string[]] | undefined; }; export async function addTrigger( this: PubSubManager, params: AddTriggerParams, viewOptions: ViewSubscriptionOptions | undefined, socket: PRGLIOSocket | undefined, ) { const addedTrigger = await tryCatchV2(async () => { const { table_name } = { ...params }; let { condition } = { ...params }; if (!table_name) throw "MISSING table_name"; if (!condition || !condition.trim().length) { condition = "TRUE"; } if (this.dbo[table_name]?.tableOrViewInfo.isHyperTable) { throw "Triggers do not work on timescaledb hypertables due to bug:\nhttps://github.com/timescale/timescaledb/issues/1084"; } const tableHandler = this.dbo[table_name]; if (!tableHandler) { throw `Cannot add trigger. Tablehandler for ${table_name} not found`; } const trgVals = { tableName: table_name, condition: condition, conditionHash: crypto.createHash("md5").update(condition).digest("hex"), relatedViewName: viewOptions?.viewName ?? null, relatedViewDef: viewOptions?.definition ?? null, columnsInfo: getColumnsInfo(params, tableHandler), }; const TRACKED_COLUMNS = "tracked_columns" as const satisfies keyof AddTriggerParams; await this.db.any( ` BEGIN WORK; /* ${EXCLUDE_QUERY_FROM_SCHEMA_WATCH_ID} */ /* why is this lock level needed? */ --LOCK TABLE prostgles.app_triggers IN ACCESS EXCLUSIVE MODE; /** app_triggers is not refreshed when tables are dropped */ DELETE FROM prostgles.app_triggers at WHERE app_id = \${appId} AND NOT EXISTS ( SELECT 1 FROM pg_catalog.pg_trigger t WHERE tgname like format('prostgles_triggers_%s_', at.table_name) || '%' AND tgenabled = 'O' ); INSERT INTO prostgles.app_triggers ( table_name, condition, condition_hash, app_id, related_view_name, related_view_def, columns_info ) VALUES ( \${tableName}, \${condition}, \${conditionHash}, \${appId} , \${relatedViewName}, \${relatedViewDef}, \${columnsInfo} ) ON CONFLICT (app_id, table_name, condition_hash) DO UPDATE /* upsert ${TRACKED_COLUMNS} where necessary */ SET columns_info = CASE WHEN EXCLUDED.columns_info IS NOT NULL AND prostgles.app_triggers.columns_info IS NOT NULL THEN jsonb_set( prostgles.app_triggers.columns_info, '{${TRACKED_COLUMNS}}', /* THE PARENTHESES ARE CRUCIAL IN ENSURING THE MERGE WORKS */ (prostgles.app_triggers.columns_info->'${TRACKED_COLUMNS}') || (EXCLUDED.columns_info->'${TRACKED_COLUMNS}') ) END WHERE prostgles.app_triggers.columns_info IS NOT NULL ; COMMIT WORK; `, { appId: this.appId, ...trgVals, }, ); /** This might be redundant due to trigger on app_triggers */ await this.refreshTriggers(); return trgVals; }); await this._log({ type: "syncOrSub", command: "addTrigger", condition: addedTrigger.data?.condition ?? params.condition, duration: addedTrigger.duration, socketId: socket?.id, state: !addedTrigger.data?.tableName ? "fail" : "ok", error: addedTrigger.error, sid: socket && this.dboBuilder.prostgles.authHandler.getSIDNoError({ socket }), tableName: params.table_name, connectedSocketIds: this.dboBuilder.prostgles.connectedSockets.map((s) => s.id), localParams: socket && { clientReq: { socket } }, triggers: this._triggers, }); if (addedTrigger.error) throw addedTrigger.error; return addedTrigger; } const getColumnsInfo = ( { tracked_columns, table_name }: AddTriggerParams, tableHandler: Partial, ) => { let hasPkey = false as boolean; const cols = tableHandler.columns?.map((c) => { hasPkey = hasPkey || c.is_pkey; return { ...pickKeys(c, ["name", "is_pkey"]), cast_to: udtNamesWithoutEqualityComparison.includes(c.udt_name) ? "::TEXT" : "", }; }); if (!hasPkey || !cols || !tracked_columns) { return null; } const trackedColumnsWithInfo = tracked_columns.map((name) => { const colInfo = cols.find((c) => c.name === name); if (!colInfo) { throw `tracked_columns ${name} not found in table ${table_name}`; } return colInfo; }); const columns_info = { join_condition: cols .filter((c) => c.is_pkey) .map((c) => `n.${asName(c.name)} = o.${asName(c.name)}`) .join(" AND "), tracked_columns: trackedColumnsWithInfo.reduce( (acc, { name, cast_to }) => ({ ...acc, [name]: cast_to ? 2 : 1, }), {} as Record, ), }; return columns_info; };