import { CarReader, CarWriter } from '@ipld/car' import * as dagJson from '@ipld/dag-json' import { Logger } from 'besonders-logger' import { BlockView, CID } from 'multiformats' import { sortApplogsByTs } from '../applog/applog-utils.ts' import { Applog, ApplogArrayMaybeEncrypted, CidString } from '../applog/datom-types.ts' import { unchunkApplogsBlock } from '../pubsub/snap-push.ts' import { SnapBlockLogs, SnapBlockLogsOrChunks, SnapRootBlock } from '../pubsub/pubsub-types.ts' import { areCidsEqual, containsCid } from './ipfs-utils.ts' const { WARN, LOG, DEBUG, VERBOSE, ERROR } = Logger.setup(Logger.INFO) // eslint-disable-line no-unused-vars export type CIDForCar = CID // Exclude[0], void> export type BlockForCar = Parameters[0] export interface BlockStoreish { get(cid: CID): PromiseLike // (i) not using decoded version to be similar to blockstore-idb } export interface DecodedCar { rootCID: CID // blocks: Map blockStore: BlockStoreish } /** Warning: unsorted & maybe encrypted */ export async function decodePubFromCar(car: CarReader) { const decoded = await getBlocksOfCar(car) return await decodePubFromBlocks(decoded) } export async function decodePubFromBlocks( { rootCID, blockStore }: DecodedCar, _recursionTrace: CID[] = [], // DEPRECATED: kept for API compat, unused in iterative version stopAtCID?: CID // NEW: stop iteration when we hit this CID ) { if (!rootCID || !blockStore) { throw ERROR('Empty roots/blocks', { rootCID, blockStore }) } let allApplogs: ApplogArrayMaybeEncrypted = [] let firstInfo: { logs: CID[] } | null = null let currentCID: CID | undefined = rootCID const visited = new Set() // Loop detection (replaces recursionTrace) let applogsCID: CID | null = null // From first snapshot only while (currentCID) { const cidStr = currentCID.toString() // Loop detection if (visited.has(cidStr)) { throw ERROR('[decodePubFromBlocks] pub chain has a loop', { currentCID: cidStr, visited: [...visited] }) } visited.add(cidStr) // Decode current snapshot const root = (await getDecodedBlock(blockStore, currentCID)) as SnapRootBlock VERBOSE(`[decodePubFromBlocks] root:`, cidStr, root, { blockStore }) if (!root) { throw ERROR('[decodePubFromBlocks] root not found in blockStore', { blockStore, currentCID: cidStr }) } // Decode applogs for this snapshot let pubLogsArray: CID[] if (root?.info) { // New(er) format if (!applogsCID) applogsCID = root.applogs // Save from first snapshot const applogsBlock = (await getDecodedBlock(blockStore, root.applogs)) as SnapBlockLogsOrChunks pubLogsArray = await unchunkApplogsBlock(applogsBlock, blockStore) // Info only from first (most recent) snapshot if (!firstInfo) { const decoded = (await getDecodedBlock(blockStore, root.info)) as SnapBlockLogs if (decoded) { firstInfo = decoded DEBUG(`new format - infoLogs`, firstInfo.logs.map(l => ({ [l.toString()]: l }))) } else { WARN(`[decodePubFromBlocks] info block not found for ${root.info}, using empty info`) firstInfo = { logs: [] } } } // TODO: verify signatures } else { // Old format pubLogsArray = root.applogs as any as CID[] } const resolveLogFromCidLink = async (cidOrLink: CID) => { const cid = cidOrLink const applog = (await getDecodedBlock(blockStore, cid)) as Applog if (!applog) { ERROR(`Could not find applog CID in pub blocks:`, cid.toString(), { cid, root, blockStore }) throw new Error(`Could not find applog CID in pub blocks: ${cid.toString()}`) } if ((applog.pv as any) instanceof CID) applog.pv = (applog.pv as any as CID).toV1().toString() return { ...applog, cid: cid.toV1().toString(), } } const snapshotApplogs = await Promise.all(pubLogsArray.map(resolveLogFromCidLink)) allApplogs = allApplogs.concat(snapshotApplogs) // Check if we should stop if (!root.prev) { break // End of chain } if (stopAtCID && areCidsEqual(root.prev, stopAtCID)) { DEBUG('[decodePubFromBlocks] stopping at stopAtCID:', stopAtCID.toString()) break // Reached already-pulled snapshot } // Verify prev exists before continuing const prevBytes = await blockStore.get(root.prev) if (!prevBytes) { throw ERROR('[decodePubFromBlocks] prev snapshot missing from blockStore', { currentCID: cidStr, prev: root.prev.toString(), stopAtCID: stopAtCID?.toString(), visited: [...visited] }) } currentCID = root.prev // Move to previous snapshot } const result = { cid: rootCID, info: firstInfo ? { ...firstInfo, logs: await Promise.all(firstInfo.logs.map(async (cidOrLink: CID) => { const cid = cidOrLink const applog = (await getDecodedBlock(blockStore, cid)) as Applog if (!applog) { ERROR(`Could not find info log CID in pub blocks:`, cid.toString(), { cid, blockStore }) throw new Error(`Could not find info log CID in pub blocks: ${cid.toString()}`) } if ((applog.pv as any) instanceof CID) applog.pv = (applog.pv as any as CID).toV1().toString() return { ...applog, cid: cid.toV1().toString(), } })), } : null, applogsCID, applogs: allApplogs, } DEBUG('[decodePubFromBlocks] result:', result, { rootCID: rootCID.toString(), blockStore, applogs: allApplogs }) return result } export async function getBlocksOfCar(car: CarReader) { const rootsFromCar = await car.getRoots() const roots = rootsFromCar.map(c => ((typeof c.toV1 === 'function') ? c : CID.decode(c.bytes)).toV1().toString() as CidString) // HACK const blocks = new Map() for await (const { cid: cidFromCarblocks, bytes } of car.blocks()) { const cid = (typeof cidFromCarblocks.toV1 === 'function') ? cidFromCarblocks : CID.decode(cidFromCarblocks.bytes) VERBOSE({ cidFromCarblocks, cid }) // blocks.set(cid.toV1().toString(), dagJson.decode(bytes)) // HACK: tried using CID as map key, but because it's based on referential equality it's not working blocks.set(cid.toV1().toString(), bytes) // HACK: tried using CID as map key, but because it's based on referential equality it's not working } if (roots.length !== 1) { WARN('Unexpected roots count:', roots) } return { rootCID: CID.parse(roots[0]), blockStore: { get: (cid) => blocks.get(cid.toV1().toString()), }, } satisfies DecodedCar } export async function getDecodedBlock(blockStore: BlockStoreish, cid: CID) { try { var blob = await blockStore.get(cid) if (!blob) { WARN('returning null') return null // I don't think this ever happens actually } } catch (err) { if ((err as any).message === 'Not Found') return null throw err } return dagJson.decode(blob) } // make out in the car... been a while but also sounds nice export async function makeCarOut(roots: CIDForCar, blocks: BlockForCar[]) { const { writer, out } = CarWriter.create(Array.isArray(roots) ? roots : [roots]) // add the blocks to the CAR and close it VERBOSE(`Writing ${blocks.length} blocks to CAR`, { roots, blocks }) blocks.forEach(b => writer.put(b)) writer.close() // VERBOSE(`Wrote ${blocks.length} blocks to CAR`, writer) return out } /** create a new CarWriter, with the encoded block as the root */ // export async function makeCarReader(roots: CIDForCar, blocks: BlockForCar[]) { // const out = await makeCarOut(roots, blocks) // // create a new CarReader we can hand to web3.storage.putCar // const reader = await CarReader.fromIterable(out) // VERBOSE(`CAR reader`, reader) // return reader // } /** create a new CarWriter, with the encoded block as the root */ export async function makeCarBlob(roots: CIDForCar, blocks: BlockForCar[]) { const carOut = await makeCarOut(roots, blocks) const chunks = [] for await (const chunk of carOut) { chunks.push(chunk) } const blob = new Blob(chunks) return blob } export async function carFromBlob(blob: Blob | File): Promise { return CarReader.fromBytes(new Uint8Array(await blob.arrayBuffer())) } function extractCids(value: unknown): CID[] { if (value instanceof CID) return [value] if (Array.isArray(value)) return value.flatMap(extractCids) if (value && typeof value === 'object') return Object.values(value).flatMap(extractCids) return [] } const MAX_COLLECT_BLOCKS = 1_000_000 export async function collectDagBlocks( startCID: CID, blockStore: BlockStoreish, ): Promise { const visited = new Set() const blocks: BlockForCar[] = [] const queue: CID[] = [startCID] while (queue.length > 0) { if (blocks.length >= MAX_COLLECT_BLOCKS) { WARN(`[collectDagBlocks] hit ${MAX_COLLECT_BLOCKS} block limit, returning partial result`) break } const cid = queue.shift()! const cidStr = cid.toString() if (visited.has(cidStr)) continue visited.add(cidStr) let bytes: Uint8Array try { bytes = await blockStore.get(cid) } catch { WARN(`[collectDagBlocks] block not found: ${cidStr}, stopping this branch`) continue } if (!bytes) { WARN(`[collectDagBlocks] block not found: ${cidStr}, stopping this branch`) continue } blocks.push({ cid, bytes }) if (blocks.length % 1000 === 0) { LOG(`[collectDagBlocks] collected ${blocks.length} blocks...`) } try { const decoded = dagJson.decode(bytes) const childCids = extractCids(decoded) for (const child of childCids) { if (!visited.has(child.toString())) { queue.push(child) } } } catch { // Not dag-json — leaf block, no children to walk } } DEBUG(`[collectDagBlocks] collected ${blocks.length} blocks from ${startCID.toString()}`) return blocks } export function streamReaderToIterable(bodyReader: ReadableStreamDefaultReader): AsyncIterable { return (async function*() { while (true) { const { done, value } = await bodyReader.read() VERBOSE(`[car] chunk`, { done, value }) if (done) { break } yield value } })() }