import { Logger } from 'besonders-logger' import { encodeApplogAndGetCid } from '../ipfs/ipfs-utils.ts' import { lastWriteWins } from '../query/basic.ts' import { MappedThread, rollingFilter, Thread, ThreadOnlyCurrent, type ThreadDerivation } from '../thread.ts' import { PartialBy } from '../types/typescript-utils.ts' import { dateNowIso, objEqualByKeys, removeDuplicateAppLogs, sortApplogsByTs } from './applog-utils.ts' import { Applog, ApplogForInsert, ApplogForInsertOptionalAgent, ApplogNoCid, AgentHash, CidString, getApplogTypeErrors, isValidApplog, Timestamp, } from './datom-types.ts' const { WARN, LOG, DEBUG, VERBOSE, ERROR } = Logger.setup(Logger.INFO) // eslint-disable-line no-unused-vars export function ensureTsPvAndFinalizeApplogs(appLogsToInsert: ApplogForInsert[], threadForPv: Thread) { DEBUG(`[ensureTsPvAndFinalizeApplogs] ENTER - ${appLogsToInsert.length} applogs, thread size=${threadForPv.size}`) const ts = dateNowIso() // const currentThread = lastWriteWins(threadForPv, { tolerateAlreadyFiltered: true }) // HACK to get `pv` from last write const currentThread = threadForPv // HACK to not do un-cached lastWriteWins // Within-batch (en, at) duplicates produce same-ts logs whose `pv` all point to the // pre-batch tail (not to each other) — chain order is then ambiguous and the second // insert is at the mercy of cid lex tie-break. Caller bug; warn so it's findable. const seenInBatch = new Set() for (const log of appLogsToInsert) { const key = log.en + '|' + log.at if (seenInBatch.has(key)) { WARN( `[ensureTsPvAndFinalizeApplogs] within-batch duplicate (en, at)=(${log.en}, ${log.at}) — chain-pv won't link these; consider separate insert calls`, { log }, ) break // one warning per batch is plenty } seenInBatch.add(key) } DEBUG(`[ensureTsPvAndFinalizeApplogs] About to map over applogs`) const mapped = appLogsToInsert.map((log, idx) => { DEBUG(`[ensureTsPvAndFinalizeApplogs] Processing applog ${idx + 1}/${appLogsToInsert.length}`) const result = finalizeApplogForInsert(log, { ts, threadForPv: currentThread }) DEBUG(`[ensureTsPvAndFinalizeApplogs] Finalized applog ${idx + 1}/${appLogsToInsert.length}`) return result }) DEBUG(`[ensureTsPvAndFinalizeApplogs] EXIT - mapped ${mapped.length} applogs`) return mapped } export function ensureTsPvAndFinalizeApplog(applogToInsert: ApplogForInsert, threadForPv: Thread) { return ensureTsPvAndFinalizeApplogs([applogToInsert], threadForPv)[0] } export function finalizeApplogForInsert( log: ApplogForInsert, { ts, threadForPv }: { ts?: string; threadForPv?: Thread /*OnlyCurrent*/ } = {}, ) { DEBUG(`[finalizeApplogForInsert] ENTER - en=${log.en}, at=${log.at}`) DEBUG(`[finalizeApplogForInsert] About to call withTs`) const logWithTs = withTs(log, ts ?? dateNowIso()) DEBUG(`[finalizeApplogForInsert] About to call withPvFrom (thread size=${threadForPv?.size ?? 'null'})`) const logWithPv = withPvFrom(logWithTs, threadForPv) DEBUG(`[finalizeApplogForInsert] About to call encodeApplogAndGetCid`) const cid = encodeApplogAndGetCid(logWithPv).toString() as CidString DEBUG(`[finalizeApplogForInsert] CID created: ${cid}`) if ((log as Applog).cid && (log as Applog).cid !== cid) WARN(`[finalizeApplogForInsert] overwriting wrong CID`, { log, cid, logWithPv }) const logWithCid = { ...logWithPv, cid } satisfies Applog DEBUG(`[finalizeApplogForInsert] About to validate applog`) if (!isValidApplog(logWithCid)) { throw ERROR(`Bogus Applog ${JSON.stringify(logWithCid)}`, getApplogTypeErrors(logWithCid)) } DEBUG(`[finalizeApplogForInsert] EXIT - CID=${cid}`) return Object.freeze(logWithCid) } export function hasAg(log: ApplogForInsertOptionalAgent): log is ApplogForInsert { return !!log.ag } export function hasTs(log: ApplogForInsert): log is Omit & { ts: Timestamp } { return !!log.ts } export function hasPv(log: ApplogForInsert): log is ApplogForInsert & { pv: string } { return !!log.pv } export function withTs(log: ApplogForInsert, ts: Timestamp) { return hasTs(log) ? log : { ...log, ts } } export function withAg(log: ApplogForInsertOptionalAgent, ag: AgentHash) { return hasAg(log) ? log : { ...log, ag } } export function withPvFrom(log: PartialBy, thread: Thread /*OnlyCurrent*/ | null) { DEBUG(`[withPvFrom] ENTER - en=${log.en}, at=${log.at}, hasPv=${log.pv !== undefined}`) if (log.pv !== undefined) { DEBUG(`[withPvFrom] EXIT early - pv already set`) return log as ApplogNoCid // TODO: ? devMode WARN if it's different for catching bugs) } if (!thread) { if (!hasPv(log)) throw ERROR(`[withPvFrom] no thread and no pv:`, log) DEBUG(`[withPvFrom] EXIT - no thread, returning log with existing pv`) return log // satisfies Pick } else { const { en, at } = log DEBUG(`[withPvFrom] About to call thread.findLast for en=${en}, at=${at}, thread.size=${thread.size}`) const prevLog = thread.findLast(l => l.en == en && l.at == at) // HACK to not do lastWriteWins calc DEBUG(`[withPvFrom] findLast completed, found=${!!prevLog}`) // const prevLogs = rollingFilter(thread, { en, at }) // ? use some non-reactive filter here? // if (prevLogs.size > 1) throw ERROR(`[withPvFrom] unexpected previous count:`, prevLogs.size, { log, prevLogs, thread }) // `thread` arg must be only current // let prevLog = prevLogs.isEmpty ? null : prevLogs.applogs[0] // const isMatchingPv = prevLog?.cid === log.pv DEBUG(`[withPvFrom] About to check equality`) if (objEqualByKeys(['en', 'at', 'vl', 'ts', 'ag'], log, prevLog)) { throw ERROR(`[withPvFrom] Same as previous:`, { log, pv: prevLog, thread }) // bug catcher } // if (log.pv && !isMatchingPv) { // ineffective bc. shortcut in the beginning of this func // WARN(`[withPvFrom] different than pre-set pv:`, { queriedPv: prevLog, logPv: log.pv }) // } const prevLogCid = (log.pv !== undefined ? log.pv : prevLog?.cid) ?? null DEBUG(`[withPvFrom] EXIT - prevLogCid=${prevLogCid}`) return { ...log, pv: prevLogCid } } } export function joinThreads(threads: ReadonlyArray) { const derivation: ThreadDerivation = { compute(parents) { if (parents.length < 2) DEBUG(`joinThreads with count=${parents.length}`) // ? EmptyThread return sortApplogsByTs( removeDuplicateAppLogs(parents.flatMap(s => { const logs = s.applogs if (!logs) { ERROR(`falsy applogs of thread`, s) throw new Error(`falsy applogs of thread`) } return logs }), 'cleanup'), ) }, mapDelta: (delta, { source, parents, state }) => ({ added: delta.added.filter(log => !state.includes(log)), removed: delta.removed?.filter(log => state.includes(log) && !parents.some(p => p !== source && p.hasApplog(log, true)) ), }), } return new MappedThread( `join(~ ${threads.map(s => s.name).join(', ')})`, threads, ['?'], // HACK this basically says "we're not sure what filters are applied" derivation, ) }