{"version":3,"sources":["../../../../src/dbs/mongo/changes.ts","/home/runner/work/equipped/equipped/dist/cjs/dbs/mongo/changes.cjs"],"names":[],"mappings":"AAAA,q+BAAwC;AACxC,oCAAuB;AAEvB,kDAA8B;AAC9B,qDAAyB;AACzB,sDAAsB;AACtB,iDAAsC;AACtC,yFAAsB;AAIf,MAAM,cAAA,QAA6F,qBAAwB;AAAA,EACjI,CAAA,QAAA,EAAW,KAAA;AAAA,EAEX,WAAA,CACC,MAAA,EACA,MAAA,EACA,UAAA,EACA,SAAA,EACA,MAAA,EACC;AACD,IAAA,KAAA,CAAM,MAAA,EAAQ,SAAA,EAAW,MAAM,CAAA;AAE/B,IAAA,MAAM,QAAA,EAAU,CAAC,IAAA,EAAA,GAChB,IAAA,CAAK,IAAA,EACF;AAAA,MACA,GAAG,IAAA;AAAA,MACH,GAAA,mBAAK,IAAA,CAAK,GAAA,CAAI,MAAM,CAAA,UAAK,IAAA,CAAK;AAAA,IAC/B,EAAA,EACC,KAAA,CAAA;AAEJ,IAAA,MAAM,OAAA,EAAS,UAAA,CAAW,MAAA;AAC1B,IAAA,MAAM,QAAA,EAAU,UAAA,CAAW,cAAA;AAC3B,IAAA,MAAM,UAAA,EAAY,CAAA,EAAA;AACD,IAAA;AAEF,IAAA;AACG,IAAA;AAEF,IAAA;AACC,MAAA;AAEH,MAAA;AACI,MAAA;AAEL,MAAA;AACD,MAAA;AACC,MAAA;AAED,MAAA;AACC,QAAA;AACF,UAAA;AACI,UAAA;AACZ,QAAA;AACc,MAAA;AACJ,QAAA;AACG,UAAA;AACD,UAAA;AACH,UAAA;AACT,QAAA;AACc,MAAA;AACJ,QAAA;AACG,UAAA;AACN,UAAA;AACP,QAAA;AACF,IAAA;AAEQ,IAAA;AACR,MAAA;AACY,MAAA;AACF,QAAA;AACJ,QAAA;AAEC,QAAA;AACO,UAAA;AACL,YAAA;AACL,cAAA;AACA,cAAA;AACA,cAAA;AACA,cAAA;AACA,cAAA;AACA,cAAA;AACA,cAAA;AACA,cAAA;AACA,cAAA;AACA,YAAA;AAEG,YAAA;AACE,YAAA;AACA,YAAA;AACG,YAAA;AACA,YAAA;AACV,UAAA;AACA,UAAA;AACA,UAAA;AACQ,QAAA;AACV,MAAA;AACA,MAAA;AACD,IAAA;AACD,EAAA;AACD;ACvBuB;AACA;AACA","file":"/home/runner/work/equipped/equipped/dist/cjs/dbs/mongo/changes.cjs","sourcesContent":["import { Collection, type Filter } from 'mongodb'\nimport { differ } from 'valleyed'\n\nimport { EquippedError } from '../../errors'\nimport { Instance } from '../../instance'\nimport { retry } from '../../utilities'\nimport { DbChange, TopicPrefix } from '../base/changes'\nimport * as core from '../base/core'\nimport type { DbChangeConfig } from '../base/types'\nimport type { MongoDbConfig } from '../pipes'\n\nexport class MongoDbChange<Model extends core.Model<{ _id: string }>, Entity extends core.Entity> extends DbChange<Model, Entity> {\n\t#started = false\n\n\tconstructor(\n\t\tconfig: MongoDbConfig,\n\t\tchange: DbChangeConfig,\n\t\tcollection: Collection<Model>,\n\t\tcallbacks: core.DbChangeCallbacks<Model, Entity>,\n\t\tmapper: (model: Model) => Entity,\n\t) {\n\t\tsuper(change, callbacks, mapper)\n\n\t\tconst hydrate = (data: any) =>\n\t\t\tdata._id\n\t\t\t\t? {\n\t\t\t\t\t\t...data,\n\t\t\t\t\t\t_id: data._id['$oid'] ?? data._id,\n\t\t\t\t\t}\n\t\t\t\t: undefined\n\n\t\tconst dbName = collection.dbName\n\t\tconst colName = collection.collectionName\n\t\tconst dbColName = `${dbName}.${colName}`\n\t\tconst topic = `${TopicPrefix}.${dbColName}`\n\n\t\tconst TestId = '5f5f65717569707065645f5f' // __equipped__\n\t\tconst condition = { _id: TestId } as Filter<Model>\n\n\t\tchange.eventBus.createStream(topic as never, { skipScope: true }).subscribe(async (data: DbDocumentChange) => {\n\t\t\tconst op = data.op\n\n\t\t\tlet before = JSON.parse(data.before ?? 'null')\n\t\t\tlet after = JSON.parse(data.after ?? 'null')\n\n\t\t\tif (before) before = hydrate(before)\n\t\t\tif (after) after = hydrate(after)\n\t\t\tif (before?._id === TestId || after?._id === TestId) return\n\n\t\t\tif (op === 'c' && this.callbacks.created && after)\n\t\t\t\tawait this.callbacks.created({\n\t\t\t\t\tbefore: null,\n\t\t\t\t\tafter: this.mapper(after)!,\n\t\t\t\t})\n\t\t\telse if (op === 'u' && this.callbacks.updated && before && after)\n\t\t\t\tawait this.callbacks.updated({\n\t\t\t\t\tbefore: this.mapper(before)!,\n\t\t\t\t\tafter: this.mapper(after)!,\n\t\t\t\t\tchanges: differ.from(differ.diff(before, after)),\n\t\t\t\t})\n\t\t\telse if (op === 'd' && this.callbacks.deleted && before)\n\t\t\t\tawait this.callbacks.deleted({\n\t\t\t\t\tbefore: this.mapper(before)!,\n\t\t\t\t\tafter: null,\n\t\t\t\t})\n\t\t})\n\n\t\tInstance.on(\n\t\t\t'start',\n\t\t\tasync () => {\n\t\t\t\tif (this.#started) return\n\t\t\t\tthis.#started = true\n\n\t\t\t\tawait retry(\n\t\t\t\t\tasync () => {\n\t\t\t\t\t\tconst started = await this.configureConnector(topic, {\n\t\t\t\t\t\t\t'connector.class': 'io.debezium.connector.mongodb.MongoDbConnector',\n\t\t\t\t\t\t\t'capture.mode': 'change_streams_update_full_with_pre_image',\n\t\t\t\t\t\t\t'mongodb.connection.string': config.uri,\n\t\t\t\t\t\t\t'collection.include.list': dbColName,\n\t\t\t\t\t\t\t'snapshot.mode': 'when_needed',\n\t\t\t\t\t\t\t'capture.scope': 'collection',\n\t\t\t\t\t\t\t'capture.target': dbColName,\n\t\t\t\t\t\t\t'heartbeat.interval.ms': '60000',\n\t\t\t\t\t\t\t'errors.max.retries': '3',\n\t\t\t\t\t\t})\n\n\t\t\t\t\t\tif (started) return { done: true, value: true }\n\t\t\t\t\t\tawait collection.findOneAndUpdate(condition, { $set: { colName } as any }, { upsert: true })\n\t\t\t\t\t\tawait collection.findOneAndDelete(condition)\n\t\t\t\t\t\tInstance.get().log.warn(`Waiting for db changes for ${dbColName} to start...`)\n\t\t\t\t\t\treturn { done: false }\n\t\t\t\t\t},\n\t\t\t\t\t6,\n\t\t\t\t\t30_000,\n\t\t\t\t).catch((err) => Instance.crash(new EquippedError(`Failed to start db changes`, { dbColName }, err)))\n\t\t\t},\n\t\t\t10,\n\t\t)\n\t}\n}\n\ntype DbDocumentChange = {\n\tbefore: string | null\n\tafter: string | null\n\top: 'c' | 'u' | 'd'\n}\n",null]}