import { Logger } from 'besonders-logger' import { ensureTsPvAndFinalizeApplogs } from '../applog/applog-helpers.ts' import { isTsBefore, removeDuplicateAppLogs, sortApplogsByTs } from '../applog/applog-utils.ts' import { type Applog, ApplogForInsert, CidString, isValidApplog } from '../applog/datom-types.ts' import { Thread } from './basic.ts' const { WARN, LOG, DEBUG, VERBOSE, ERROR } = Logger.setup(Logger.INFO) // eslint-disable-line no-unused-vars export abstract class WriteableThread extends Thread { constructor( name: string, // parents: Thread | readonly Thread[] | null, // ? would this ever be a thing applogs: Applog[] = [], filters: readonly string[], ) { super(name, null, filters, applogs) } public purge(cidsToPurge: CidString[]) { const beforeCount = this.applogs.length // HACK setting readonly this._applogs = this.applogs.filter(log => !(cidsToPurge.includes(log.cid))) return beforeCount - this.applogs.length } public insert(appLogsToInsert: ApplogForInsert[]) { DEBUG(`[WriteableThread.insert] ENTER - ${appLogsToInsert.length} applogs for thread "${this.name}"`) DEBUG(`[WriteableThread.insert] About to call ensureTsPvAndFinalizeApplogs`) const mapped = ensureTsPvAndFinalizeApplogs(appLogsToInsert, this) DEBUG(`[WriteableThread.insert] ensureTsPvAndFinalizeApplogs completed, mapped=${mapped.length} applogs`) DEBUG(`[WriteableThread.insert] About to call insertRaw`) const result = this.insertRaw(mapped) DEBUG(`[WriteableThread.insert] insertRaw completed`) return result } /** * Insert only applogs not already in this thread. * @param byRef If true, compares by reference; if false, compares by CID (default) * @returns The applogs that were actually inserted */ public insertMissing(appLogsToInsert: readonly Applog[], byRef = false): Applog[] { const missing = appLogsToInsert.filter(log => !this.hasApplog(log, byRef)) if (missing.length === 0) { VERBOSE(`[insertMissing] no missing applogs`) return [] } return this.insertRaw(missing) ?? [] } /** * Insert raw applogs directly into the thread. * * STRICT VALIDATION: This method throws errors for: * - Duplicate applogs in input array (programming error) * - Invalid applogs (missing required fields) * - Applogs already in thread (programming error) * * For external imports where duplicates are expected, use removeDuplicateAppLogs(..., 'cleanup') * before calling this method. * * @param appLogsToInsert Must be deduplicated and validated. Needs to be mutable because it will be sorted * (and if you need to clone it, so do it when you need to) - this is weird as TS is slathering type safety onto ducks * @throws Error if validation fails */ public insertRaw(appLogsToInsert: Applog[]) { DEBUG(`[WriteableThread.insertRaw] ENTER - ${appLogsToInsert.length} applogs for thread "${this.name}"`) DEBUG(`[WriteableThread.insertRaw] About to deduplicate`) const deduplicated = removeDuplicateAppLogs(appLogsToInsert, 'safety') if (deduplicated.length !== appLogsToInsert.length) { throw ERROR(`[insertRaw] duplicate applogs passed: ${appLogsToInsert.length - deduplicated.length}`, { appLogsToInsert, deduplicated, }) } DEBUG(`[WriteableThread.insertRaw] Deduplication done`) DEBUG(`[WriteableThread.insertRaw] About to validate`) const bogus = appLogsToInsert.filter(log => !isValidApplog(log)) if (bogus.length) { throw ERROR(`[insertRaw] bogus applogs passed: ${bogus.length}`, { bogus }) } DEBUG(`[WriteableThread.insertRaw] Validation done`) DEBUG(`[WriteableThread.insertRaw] About to check for existing`) const existing = appLogsToInsert.filter(log => this.hasApplog(log, false)) if (existing.length) { throw ERROR(`[insertRaw] already existing applogs passed: ${existing.length}`, { existing }) } DEBUG(`[WriteableThread.insertRaw] Existing check done`) if (!appLogsToInsert.length) { WARN('[insertRaw] skipping empty insert empty logs array') return } ;(!this.hasParents && !(this instanceof ThreadInMemory) ? LOG : DEBUG)( 'Inserting:', appLogsToInsert.length === 1 ? appLogsToInsert[0] : appLogsToInsert, { ds: this }, ) DEBUG(`[WriteableThread.insertRaw] About to sort applogs`) sortApplogsByTs(appLogsToInsert) const sortNeeded = this._applogs.length && isTsBefore(appLogsToInsert[0], this._applogs[this._applogs.length - 1]) DEBUG(`[WriteableThread.insertRaw] About to push to _applogs array`) // Chunked push: spread is fast but causes stack overflow with 100k+ items // Chunking keeps us under the limit while using optimized push() const CHUNK_SIZE = 50_000 for (let i = 0; i < appLogsToInsert.length; i += CHUNK_SIZE) { this._applogs.push(...appLogsToInsert.slice(i, i + CHUNK_SIZE)) } if (sortNeeded) { DEBUG(`[WriteableThread.insertRaw] About to sort _applogs (sortNeeded=true)`) sortApplogsByTs(this._applogs) } DEBUG(`[WriteableThread.insertRaw] About to notify subscribers`) this.notifySubscribers({ added: appLogsToInsert, removed: null }) DEBUG(`[WriteableThread.insertRaw] Subscribers notified`) // ? persist sync DEBUG(`[WriteableThread.insertRaw] About to call persist (void - not awaited)`) void this.persist(appLogsToInsert) DEBUG(`[WriteableThread.insertRaw] EXIT - returning ${appLogsToInsert.length} applogs`) return appLogsToInsert } get readOnly() { return false } protected abstract persist(logs: Applog[]): Promise } export class ThreadInMemory extends WriteableThread { static empty(name?: string) { return ThreadInMemory.fromArray([], name ?? 'empty in-memory', false) } static fromArray(applogs: Applog[], name?: string, readOnly = false) { return new ThreadInMemory(name ?? 'in-memory', applogs, [], readOnly) } static fromReadOnlyArray(applogs: readonly Applog[], name?: string) { // @ts-expect-error readonly conditional drama return new ThreadInMemory(name ?? 'in-memory', applogs, [], true) } constructor( name: string, applogs: Applog[], filters: readonly string[], readonly _readOnly: boolean, ) { super(name, applogs, filters) } get readOnly() { return this._readOnly } protected async persist(logs: Applog[]) { VERBOSE(`[InMem.persist] no persist for`, logs) if (this.readOnly) { throw ERROR(`[persist] called for readOnly thread`) } } }