import { CarReader } from '@ipld/car' import * as dagJson from '@ipld/dag-json' import { Logger } from 'besonders-logger' import { CID } from 'multiformats/cid' import type { SnapRootBlock } from '../pubsub/pubsub-types.ts' import { areCidsEqual } from './ipfs-utils.ts' const { WARN, LOG, DEBUG, VERBOSE, ERROR } = Logger.setup(Logger.INFO) // eslint-disable-line no-unused-vars export interface BlockStoreForFetch { get(cid: CID): Promise put(cid: CID, bytes: Uint8Array): Promise } export interface FetchChainOptions { rootCID: CID stopAtCID?: CID // Stop when we hit this CID (lastCID from subscription) stopAtCounter?: number // Stop when we reach this counter (walking backwards) fetchBlock: (cid: CID) => Promise // dag-scope=block fetchAll: (cid: CID) => Promise // dag-scope=all maxDepth?: number } export interface FetchChainResult { rootCID: CID blockStore: BlockStoreForFetch /** Serializable blocks array for worker boundary crossing */ blocks: [string, Uint8Array][] snapshotCount: number counterRange?: { minCounter: number; maxCounter: number } } /** * Fetches a snapshot chain iteratively, stopping at stopAtCID. * Uses 3 requests per snapshot: root(block), applogs(all), info(all). * This avoids the gateway's dag-scope=all following prev links recursively. */ export async function fetchSnapshotChainUntil(options: FetchChainOptions): Promise { const { rootCID, stopAtCID, stopAtCounter, fetchBlock, fetchAll, maxDepth = 100 } = options const blockStore = createMemoryBlockStore() const visited = new Set() // Loop detection for fetch let currentCID: CID | undefined = rootCID let snapshotCount = 0 let minCounter = Infinity let maxCounter = -Infinity while (currentCID && snapshotCount < maxDepth) { const cidStr = currentCID.toString() // Loop detection if (visited.has(cidStr)) { throw ERROR('[fetchSnapshotChain] snapshot chain has a loop', { currentCID: cidStr, visited: [...visited] }) } visited.add(cidStr) // Check stop condition BEFORE fetching content if (stopAtCID && areCidsEqual(currentCID, stopAtCID)) { DEBUG('[fetchSnapshotChain] reached stopAtCID, stopping', stopAtCID.toString()) break // We've reached the last pulled snapshot - don't fetch it again } // 1. Fetch root block only (dag-scope=block) DEBUG('[fetchSnapshotChain] fetching root block', cidStr) const rootCar = await fetchBlock(currentCID) await addCarBlocksToStore(rootCar, blockStore) // Parse root to get applogs, info, prev CIDs const rootBytes = await blockStore.get(currentCID) if (!rootBytes) { throw ERROR('[fetchSnapshotChain] root block not in store after fetch', { currentCID: cidStr }) } const root = dagJson.decode(rootBytes) as SnapRootBlock // Track counter range if (typeof root.prevCounter === 'number') { minCounter = Math.min(minCounter, root.prevCounter) maxCounter = Math.max(maxCounter, root.prevCounter) } // Stop condition based on counter if (stopAtCounter !== undefined && typeof root.prevCounter === 'number' && root.prevCounter <= stopAtCounter) { DEBUG('[fetchSnapshotChain] reached stopAtCounter', { stopAtCounter, prevCounter: root.prevCounter }) break } // 2. Fetch applogs with dag-scope=all (gets applogs block + all linked applog blocks) DEBUG('[fetchSnapshotChain] fetching applogs', root.applogs.toString()) const applogsCar = await fetchAll(root.applogs) await addCarBlocksToStore(applogsCar, blockStore) // 3. Fetch info with dag-scope=all (gets info block + all linked info log blocks) DEBUG('[fetchSnapshotChain] fetching info', root.info.toString()) const infoCar = await fetchAll(root.info) await addCarBlocksToStore(infoCar, blockStore) snapshotCount++ currentCID = root.prev // Move to previous snapshot } DEBUG('[fetchSnapshotChain] done', { snapshotCount, rootCID: rootCID.toString() }) return { rootCID, blockStore, blocks: blockStore.getBlocksArray(), snapshotCount, counterRange: minCounter !== Infinity ? { minCounter, maxCounter } : undefined, } } async function addCarBlocksToStore(car: CarReader, store: BlockStoreForFetch) { for await (const { cid, bytes } of car.blocks()) { const validCid = typeof cid.toV1 === 'function' ? cid : CID.decode(cid.bytes) await store.put(validCid, bytes) } } interface MemoryBlockStoreWithBlocks extends BlockStoreForFetch { /** Get all blocks as serializable array */ getBlocksArray(): [string, Uint8Array][] } function createMemoryBlockStore(): MemoryBlockStoreWithBlocks { const blocks = new Map() return { async get(cid: CID) { return blocks.get(cid.toV1().toString()) }, async put(cid: CID, bytes: Uint8Array) { blocks.set(cid.toV1().toString(), bytes) }, getBlocksArray() { return Array.from(blocks.entries()) }, } }