import { Logger } from 'besonders-logger' import { curry, debounce, partial, pull, uniq, uniqBy, uniqWith } from 'lodash-es' import { CID } from 'multiformats' import { ensureTsPvAndFinalizeApplogs, joinThreads } from '../applog/applog-helpers.ts' import { compareApplogsByEnAt, compareApplogsByTs, objEqualByKeys } from '../applog/applog-utils.ts' import { Applog, ApplogForInsert, CidString, EntityID } from '../applog/datom-types.ts' import { Thread } from './basic.ts' import { MappedThread } from './mapped.ts' import { ThreadInMemory, WriteableThread } from './writeable.ts' const { WARN, LOG, DEBUG, VERBOSE, ERROR } = Logger.setup(Logger.INFO) // eslint-disable-line no-unused-vars export function entityCount(thread: Thread) { return allEntityIDs(thread).size // at some point: index size? } export function allEntityIDs(thread: Thread) { return accumulateLogsToSet(thread, log => log.en) } function accumulateLogsToSet( threadOrLogs: Thread | Applog[], callback: (log: Applog, acc: Set) => ACC | undefined, ) { const logs = threadOrLogs instanceof Thread ? threadOrLogs.applogs : threadOrLogs const set = new Set() for (const log of logs) { set.add(callback(log, set)) } return set } export function debounceWrites( thread: Thread, wait = 700, removeDuplicatesWith = compareApplogsByEnAt, ) { if (thread.readOnly) throw ERROR(`[debounceWrites] but readOnly thread`, thread.name) let insertQueue: Array = [] const debouncedCommit = debounce(() => { WARN(`Debounce tail`, { thread, mappedThread, insertQueue }) const toInsert = ensureTsPvAndFinalizeApplogs( // ? uniq, sure - but which one is used? (update: seems the first one, so reverse) uniqWith(insertQueue.reverse(), removeDuplicatesWith), thread, ) thread.insertRaw(toInsert) insertQueue.splice(0, insertQueue.length) // clear queue }, wait) const handleInsert = (applogs: Applog[] | ApplogForInsert[]) => { DEBUG(`Debounce input:`, applogs, { thread, mappedThread, insertQueue }) insertQueue.push(...applogs) debouncedCommit() return null // don't insert anything } const mappedThread = MappedThread.mapWrites( thread, `Debounce(${wait})`, handleInsert, ) return mappedThread } /** * @param deduplicateHoldItemsWith called with (heldLog, realLog), if it returns true, the held log is skipped * @param onFirstWrite called with held logs about to be inserted, if it returns an array, those logs will be inserted instead */ export function holdTillFirstWrite( thread: Thread, applogsToHold: ApplogForInsert[], opts: { deduplicateHoldItemsWith?: typeof compareApplogsByEnAt onFirstWrite: (heldLogsToInsert: Applog[]) => Applog[] | undefined }, ) { DEBUG(`[holdTillFirstWrite] holding logs:`, { applogsToHold }) if (thread.readOnly) throw ERROR(`[holdTillFirstWrite] but readOnly thread`, thread.nameAndSizeUntracked) const heldLogs = ensureTsPvAndFinalizeApplogs([...applogsToHold], thread) const heldThread = ThreadInMemory.fromArray(heldLogs) let hasInserted = false const handleInsert = (realApplogs: Applog[] | ApplogForInsert[]) => { if (hasInserted) return realApplogs hasInserted = true let toInsert = [...heldLogs] // heldLogs.splice(0, heldLogs.length) // ? joinThread could take care of this if (opts.deduplicateHoldItemsWith) { toInsert = toInsert.filter(heldLog => // some duplicate? so don't insert !realApplogs.some(realLog => opts.deduplicateHoldItemsWith(heldLog, realLog)) ) } if (opts.onFirstWrite) { const callbackResult = opts.onFirstWrite(toInsert) if (callbackResult !== undefined) { toInsert = callbackResult } } DEBUG(`[holdTillFirstWrite] adding hold logs:`, { applogsToHold, heldLogs, toInsert, realApplogs }) return [...toInsert, ...realApplogs] } const joinedThread = joinThreads([thread, heldThread]) return new MappedThread( `HoldTillFirstWrite[${applogsToHold.length}]<${thread.nameAndSizeUntracked}>`, joinedThread, thread.filters, null, // no derivation — shares joinedThread's array (applogs) => { const logsToInsert = handleInsert(applogs) thread.insert(logsToInsert) return null }, ) } export function getAgents(thread: Thread) { return uniq(thread.map(l => l.ag)) } /** * Returns applogs NOT contained in the exclusion set. * Uses CID-based comparison (consistent with removeDuplicateAppLogs). */ export const excludeApplogsContainedIn = ( applogs: readonly Applog[], exclude: Thread | readonly Applog[] | Set, ): Applog[] => { const excludeCids: Set = exclude instanceof Set ? new Set([...exclude].map(c => c.toString())) : exclude instanceof Thread ? new Set(exclude.applogsCids) : new Set(exclude.map(a => a.cid)) return applogs.filter(applog => { if (!applog.cid) { ERROR(`applog with missing CID`, applog) throw new Error(`applog with missing CID`) } return !excludeCids.has(applog.cid) }) }