import { setImmediate } from 'node:timers/promises' import * as varint from 'varint' import * as cbor from '@atproto/lex-cbor' import { Cid, decodeCid, isCidForBytes } from '@atproto/lex-data' import { BlockMap } from './block-map' import { CarBlock, schema } from './types' import { concatBytesAsync } from './util' export async function* writeCarStream( root: Cid | null, blocks: AsyncIterable, ): AsyncIterable { const header = new Uint8Array( cbor.encode({ version: 1, roots: root ? [root] : [], }), ) yield new Uint8Array(varint.encode(header.byteLength)) yield header for await (const block of blocks) { yield new Uint8Array( varint.encode(block.cid.bytes.byteLength + block.bytes.byteLength), ) yield block.cid.bytes yield block.bytes } } export async function blocksToCarFile( root: Cid | null, blocks: BlockMap, ): Promise { return concatBytesAsync(blocksToCarStream(root, blocks)) } export const blocksToCarStream = ( root: Cid | null, blocks: BlockMap, ): AsyncIterable => { return writeCarStream(root, iterateBlocks(blocks)) } async function* iterateBlocks(blocks: BlockMap) { for (const entry of blocks.entries()) { yield { cid: entry.cid, bytes: entry.bytes } } } export type ReadCarOptions = { /** * When true, does not verify CID-to-content mapping within CAR. */ skipCidVerification?: boolean } export const readCar = async ( bytes: Uint8Array, opts?: ReadCarOptions, ): Promise<{ roots: Cid[]; blocks: BlockMap }> => { const { roots, blocks } = await readCarReader(new Ui8Reader(bytes), opts) const blockMap = new BlockMap() for await (const block of blocks) { blockMap.set(block.cid, block.bytes) } return { roots, blocks: blockMap } } export const readCarWithRoot = async ( bytes: Uint8Array, opts?: ReadCarOptions, ): Promise<{ root: Cid; blocks: BlockMap }> => { const { roots, blocks } = await readCar(bytes, opts) if (roots.length !== 1) { throw new Error(`Expected one root, got ${roots.length}`) } const root = roots[0] return { root, blocks, } } export type CarBlockIterable = AsyncGenerator & { dump: () => Promise } export const readCarStream = async ( car: Iterable | AsyncIterable, opts?: ReadCarOptions, ): Promise<{ roots: Cid[] blocks: CarBlockIterable }> => { return readCarReader(new BufferedReader(car), opts) } export const readCarReader = async ( reader: BytesReader, opts?: ReadCarOptions, ): Promise<{ roots: Cid[] blocks: CarBlockIterable }> => { try { const headerSize = await readVarint(reader) if (headerSize === null) { throw new Error('Could not parse CAR header') } const headerBytes = await reader.read(headerSize) const header = cbor.decode(headerBytes) const result = schema.carHeader.safeParse(header) if (!result.success) { throw new Error('Could not parse CAR header', { cause: result.error }) } return { roots: result.data.roots, blocks: readCarBlocksIter(reader, opts), } } catch (err) { await reader.close() throw err } } const readCarBlocksIter = ( reader: BytesReader, opts?: ReadCarOptions, ): CarBlockIterable => { let generator = readCarBlocksIterGenerator(reader) if (!opts?.skipCidVerification) { generator = verifyIncomingCarBlocks(generator) } return Object.assign(generator, { async dump() { // try/finally to ensure that reader.close is called even if blocks.return throws. try { // Prevent the iterator from being started after this method is called. await generator.return() } finally { // @NOTE the "finally" block of the async generator won't be called // if the iteration was never started so we need to manually close here. await reader.close() } }, }) } async function* readCarBlocksIterGenerator( reader: BytesReader, ): AsyncGenerator { let blocks = 0 try { while (!reader.isDone) { const blockSize = await readVarint(reader) if (blockSize === null) { break } const blockBytes = await reader.read(blockSize) const cid = decodeCid(blockBytes.subarray(0, 36)) const bytes = blockBytes.subarray(36) yield { cid, bytes } // yield to the event loop every 25 blocks // in the case the incoming CAR is synchronous, this can end up jamming up the thread blocks++ if (blocks % 25 === 0) { await setImmediate() } } } finally { await reader.close() } } export async function* verifyIncomingCarBlocks( car: AsyncIterable, ): AsyncGenerator { for await (const block of car) { if (!(await isCidForBytes(block.cid, block.bytes))) { throw new Error(`Not a valid CID for bytes (${block.cid.toString()})`) } yield block } } const readVarint = async (reader: BytesReader): Promise => { let done = false const bytes: Uint8Array[] = [] while (!done) { const byte = await reader.read(1) if (byte.byteLength === 0) { if (bytes.length > 0) { throw new Error('could not parse varint') } else { return null } } bytes.push(byte) if (byte[0] < 128) { done = true } } const concatted = Buffer.concat(bytes) return varint.decode(concatted) } interface BytesReader { isDone: boolean read(bytesToRead: number): Promise close(): Promise } class Ui8Reader implements BytesReader { idx = 0 isDone = false constructor(public bytes: Uint8Array) {} async read(bytesToRead: number): Promise { const value = this.bytes.subarray(this.idx, this.idx + bytesToRead) this.idx += bytesToRead if (this.idx >= this.bytes.length) { this.isDone = true } return value } async close(): Promise {} } /** * This code was optimized for performance. See * {@link https://github.com/bluesky-social/atproto/pull/4729 #4729} for more details * and benchmarks. */ class BufferedReader implements BytesReader { iterator: Iterator | AsyncIterator isDone = false /** fifo list of chunks to consume */ private chunks: Uint8Array[] = [] constructor(stream: Iterable | AsyncIterable) { this.iterator = Symbol.asyncIterator in stream ? stream[Symbol.asyncIterator]() : stream[Symbol.iterator]() } /** Number of bytes currently buffered and available for reading */ get bufferedByteLength() { let total = 0 for (let i = 0; i < this.chunks.length; i++) { total += this.chunks[i].byteLength } return total } /** * @note concurrent reads are **NOT** supported by the current implementation * and would require call to readUntilBuffered to be using a fifo lock for * read()s to be processed in fifo order. */ async read(bytesToRead: number): Promise { const bytesNeeded = bytesToRead - this.bufferedByteLength if (bytesNeeded > 0 && !this.isDone) { await this.readUntilBuffered(bytesNeeded) } const resultLength = Math.min(bytesToRead, this.bufferedByteLength) if (resultLength <= 0) return new Uint8Array() const firstChunk = this.consumeChunk(resultLength) if (firstChunk.byteLength === resultLength) { // If the data consumed from the first chunk contains all we need, return // it as-is. This allows to avoid any copy operation. return firstChunk } // The first chunk does not have all the data we need. We have to copy // multiple chunks into a larger buffer const result = new Uint8Array(resultLength) let resultWriteIndex = 0 // Copy the first chunk into the result buffer result.set(firstChunk, resultWriteIndex) resultWriteIndex += firstChunk.byteLength // Copy more chunks as needed (we use do-while because we *know* we need // more than one chunk) do { const missingLength = resultLength - resultWriteIndex const currentChunk = this.consumeChunk(missingLength) result.set(currentChunk, resultWriteIndex) resultWriteIndex += currentChunk.byteLength } while (resultWriteIndex < resultLength) return result } private async readUntilBuffered(bytesNeeded: number) { let bytesRead = 0 while (bytesRead < bytesNeeded) { const next = await this.iterator.next() if (next.done) { this.isDone = true break } else { this.chunks.push(next.value) bytesRead += next.value.byteLength } } return bytesRead } private consumeChunk(bytesToConsume: number) { const firstChunk = this.chunks[0]! if (bytesToConsume < firstChunk.byteLength) { // return a sub-view of the data being read and replace the first chunk // with a sub-view that does not contain that data. // @NOTE for some reason, subarray() revealed to be 7-8% slower in NodeJS // benchmarks. // this.chunks[0] = firstChunk.subarray(bytesToConsume) // return firstChunk.subarray(0, bytesToConsume) this.chunks[0] = new Uint8Array( firstChunk.buffer, firstChunk.byteOffset + bytesToConsume, firstChunk.byteLength - bytesToConsume, ) return new Uint8Array( firstChunk.buffer, firstChunk.byteOffset, bytesToConsume, ) } else { // First chunk is being read in full, discard it this.chunks.shift() return firstChunk } } async close(): Promise { try { if (!this.isDone && this.iterator.return) { await this.iterator.return() } } finally { this.isDone = true this.chunks.length = 0 } } }