import { CarReader } from '@ipld/car' import * as dagJson from '@ipld/dag-json' import { Logger } from 'besonders-logger' import { CID } from 'multiformats/cid' import type { Applog } from '../applog/datom-types.ts' import { removeDuplicateAppLogs } from '../applog/applog-utils.ts' import type { SnapRootBlock, SnapBlockLogsOrChunks } from '../pubsub/pubsub-types.ts' import { unchunkApplogsBlock } from '../pubsub/snap-push.ts' import { areCidsEqual } from '../ipfs/ipfs-utils.ts' import type { WriteableThread } from '../thread/writeable.ts' import type { BlockStore } from '../blockstore/index.ts' const { WARN, LOG, DEBUG, VERBOSE, ERROR } = Logger.setup(Logger.INFO) // eslint-disable-line no-unused-vars /** * Block retrieval abstraction - fetch or get blocks. * Implemented by gateway retriever or local blockstore. */ export interface BlockRetriever { /** Get single block by CID */ get(cid: CID): Promise /** Get all blocks in DAG rooted at CID (for applogs/info sub-DAGs) */ getDag(cid: CID): AsyncIterable<{ cid: CID; bytes: Uint8Array }> } /** * Wrap a BlockRetriever so fetched blocks flow through a BlockStore. * - get: delegates to store.get() (which handles local-first / remote fallback) * - getDag: streams from source, puts each block into store */ export function withBlockCache( source: BlockRetriever, store: BlockStore, ): BlockRetriever { return { get: (cid) => store.get(cid), async *getDag(cid) { for await (const block of source.getDag(cid)) { await store.put(block.cid, block.bytes) yield block } }, } } /** * Options for updateThreadFromSnapshot */ export interface UpdateOptions { /** CID of last included snapshot - exclude this and older snapshots */ excludeSnapshotCID?: CID /** Stop when we reach this counter (walking backwards) */ stopAtCounter?: number /** Maximum number of snapshots to traverse (default: 100) */ maxDepth?: number } /** * Result from updateThreadFromSnapshot */ export interface UpdateResult { /** Root CID that was fetched */ cid: CID /** All applogs decoded from the chain */ applogs: Applog[] /** Count of applogs actually inserted (not duplicates) */ insertedCount: number /** Number of snapshots traversed */ snapshotCount: number /** Counter range encountered (min/max) */ counterRange?: { minCounter: number; maxCounter: number } } /** * Simple in-memory block store used during snapshot chain fetch. */ interface MemoryBlockStore { get(cid: CID): Uint8Array | undefined put(cid: CID, bytes: Uint8Array): void } function createMemoryBlockStore(): MemoryBlockStore { const blocks = new Map() return { get(cid: CID) { return blocks.get(cid.toV1().toString()) }, put(cid: CID, bytes: Uint8Array) { blocks.set(cid.toV1().toString(), bytes) }, } } async function getDecodedBlock(blockStore: MemoryBlockStore, cid: CID): Promise { const bytes = blockStore.get(cid) if (!bytes) return null return dagJson.decode(bytes) as T } /** * Fetch snapshot chain from CID using a BlockRetriever, decode applogs, insert into thread. * Stops before excludeSnapshotCID if provided (incremental update). * * @param thread - WriteableThread to insert applogs into * @param cid - Root CID of the snapshot to start from * @param retriever - BlockRetriever for fetching blocks * @param options - Optional configuration * @returns UpdateResult with applogs and counts */ export async function updateThreadFromSnapshot( thread: WriteableThread, cid: CID, retriever: BlockRetriever, options?: UpdateOptions ): Promise { const { excludeSnapshotCID, stopAtCounter, maxDepth = 100 } = options ?? {} DEBUG('[updateThreadFromSnapshot] starting from', cid.toString(), { excludeSnapshotCID: excludeSnapshotCID?.toString(), stopAtCounter, maxDepth, }) const blockStore = createMemoryBlockStore() const visited = new Set() let currentCID: CID | undefined = cid let snapshotCount = 0 const allApplogs: Applog[] = [] let minCounter = Infinity let maxCounter = -Infinity let lastCounter: number | undefined while (currentCID && snapshotCount < maxDepth) { const cidStr = currentCID.toString() // Loop detection if (visited.has(cidStr)) { throw ERROR('[updateThreadFromSnapshot] snapshot chain has a loop', { currentCID: cidStr, visited: [...visited], }) } visited.add(cidStr) // Check stop condition BEFORE fetching content if (excludeSnapshotCID && areCidsEqual(currentCID, excludeSnapshotCID)) { DEBUG('[updateThreadFromSnapshot] reached excludeSnapshotCID, stopping', excludeSnapshotCID.toString()) break } // 1. Fetch root block DEBUG('[updateThreadFromSnapshot] fetching root block', cidStr) const rootBytes = await retriever.get(currentCID) blockStore.put(currentCID, rootBytes) // Parse root to get applogs, info, prev CIDs const root = dagJson.decode(rootBytes) as SnapRootBlock // Track counter range and validate sequentiality if (typeof root.prevCounter === 'number') { minCounter = Math.min(minCounter, root.prevCounter) maxCounter = Math.max(maxCounter, root.prevCounter) // Validate sequentiality (walking backwards, counter should decrease) if (lastCounter !== undefined && root.prevCounter !== lastCounter - 1) { WARN('[updateThreadFromSnapshot] counter gap detected', { expected: lastCounter - 1, got: root.prevCounter, }) } lastCounter = root.prevCounter } // Stop condition based on counter if (stopAtCounter !== undefined && typeof root.prevCounter === 'number' && root.prevCounter <= stopAtCounter) { DEBUG('[updateThreadFromSnapshot] reached stopAtCounter', { stopAtCounter, prevCounter: root.prevCounter }) break } // 2. Fetch applogs DAG DEBUG('[updateThreadFromSnapshot] fetching applogs', root.applogs.toString()) for await (const { cid: blockCid, bytes } of retriever.getDag(root.applogs)) { blockStore.put(blockCid, bytes) } // 3. Fetch info DAG DEBUG('[updateThreadFromSnapshot] fetching info', root.info.toString()) for await (const { cid: blockCid, bytes } of retriever.getDag(root.info)) { blockStore.put(blockCid, bytes) } // Decode applogs from this snapshot const applogsBlock = await getDecodedBlock(blockStore, root.applogs) if (!applogsBlock) { throw ERROR('[updateThreadFromSnapshot] applogs block not found', { cid: root.applogs.toString() }) } // Use the unchunk helper which handles both chunked and non-chunked formats const applogCIDs = await unchunkApplogsBlock(applogsBlock, { get: async (cid: CID) => blockStore.get(cid)!, }) // Resolve each applog CID to actual applog data for (const applogCID of applogCIDs) { const applog = await getDecodedBlock(blockStore, applogCID) if (!applog) { WARN('[updateThreadFromSnapshot] applog not found:', applogCID.toString()) continue } // Normalize pv field if it's a CID instance if ((applog.pv as any) instanceof CID) { applog.pv = (applog.pv as any as CID).toV1().toString() } allApplogs.push({ ...applog, cid: applogCID.toV1().toString(), }) } snapshotCount++ currentCID = root.prev // Move to previous snapshot } DEBUG('[updateThreadFromSnapshot] fetched', { snapshotCount, applogCount: allApplogs.length, rootCID: cid.toString(), }) // Deduplicate applogs (in case of overlapping snapshots) const deduplicated = removeDuplicateAppLogs(allApplogs, 'cleanup') // Insert into thread const inserted = thread.insertMissing(deduplicated, false) DEBUG('[updateThreadFromSnapshot] inserted', { insertedCount: inserted.length, duplicateCount: deduplicated.length - inserted.length, }) return { cid, applogs: deduplicated, insertedCount: inserted.length, snapshotCount, counterRange: minCounter !== Infinity ? { minCounter, maxCounter } : undefined, } }