import * as CBOR from '@atcute/cbor'; import type { Cid, CidLink } from '@atcute/cid'; import * as CID from '@atcute/cid'; import { type CarEntry, type CarHeader, isCarV1Header } from './types.ts'; export interface StreamedCarReader { header(): Promise; roots(): Promise; dispose(): Promise; [Symbol.asyncDispose](): Promise; [Symbol.asyncIterator](): AsyncIterator; } export const carEntryTransform = (): ReadableWritablePair => { const transform = new TransformStream(); let car: StreamedCarReader | undefined; return { readable: new ReadableStream({ async start(controller) { car = fromStream(transform.readable); try { for await (const entry of car) { controller.enqueue(entry); } await car.dispose(); controller.close(); } catch (err) { controller.error(err); } }, async cancel() { if (car !== undefined) { await car.dispose(); } }, }), writable: transform.writable, }; }; export const fromStream = (stream: ReadableStream): StreamedCarReader => { let chunk: Uint8Array = new Uint8Array(0); let chunkPos = 0; let offset = 0; let _header: CarHeader | undefined; const reader = stream.getReader(); // advances to the next chunk holding an unread byte; returns false once the stream ends. // a readable stream may legally emit zero-length chunks, so this must loop past them // rather than treat an empty chunk as data const refill = async (): Promise => { while (chunkPos >= chunk.length) { const { value, done } = await reader.read(); if (done) { return false; } chunk = value; chunkPos = 0; } return true; }; const readVarint = async (): Promise => { let value = 0; let shift = 0; let bytes = 0; const MSB = 0x80; const REST = 0x7f; while (true) { if (++bytes > 8) { throw new RangeError(`varint too long`); } if (chunkPos >= chunk.length && !(await refill())) { throw new Error(`unexpected eof while decoding varint`); } const byte = chunk[chunkPos++]; offset++; value += shift < 28 ? (byte & REST) << shift : (byte & REST) * 2 ** shift; shift += 7; if ((byte & MSB) === 0) { return value; } } }; const readExact = async (n: number): Promise => { // fast path: the bytes are contiguous in the current chunk, so hand back a view into // it instead of copying (the sync reader returns views into its source buffer too) if (chunkPos + n <= chunk.length) { const buffer = chunk.subarray(chunkPos, chunkPos + n); chunkPos += n; offset += n; return buffer; } // slow path: the read spans chunk boundaries, so assemble it into a fresh buffer const buffer = new Uint8Array(n); let written = 0; while (written < n) { if (chunkPos >= chunk.length && !(await refill())) { throw new Error('unexpected eof while reading data'); } const taken = Math.min(n - written, chunk.length - chunkPos); buffer.set(chunk.subarray(chunkPos, chunkPos + taken), written); written += taken; chunkPos += taken; } offset += n; return buffer; }; const readCid = async (): Promise => { const bytes = await readExact(36); const version = bytes[0]; const codec = bytes[1]; const digestType = bytes[2]; const digestSize = bytes[3]; if (version !== CID.CID_VERSION) { throw new RangeError(`incorrect cid version (got v${version})`); } if (codec !== CID.CODEC_DCBOR && codec !== CID.CODEC_RAW) { throw new RangeError(`incorrect cid codec (got 0x${codec.toString(16)})`); } if (digestType !== CID.HASH_SHA256) { throw new RangeError(`incorrect cid digest type (got 0x${digestType.toString(16)})`); } if (digestSize !== 32) { throw new RangeError(`incorrect cid digest size (got ${digestSize})`); } return { version: version, codec: codec, digest: { codec: digestType, contents: bytes.subarray(4, 36), }, bytes: bytes, }; }; return { [Symbol.asyncDispose]() { return this.dispose(); }, async dispose() { await reader.cancel(); }, async header(): Promise { if (_header !== undefined) { return _header; } const headerStart = offset; const headerSize = await readVarint(); if (headerSize === 0) { throw new RangeError(`invalid car header; length=0`); } const dataStart = offset; const raw = await readExact(headerSize); const data = CBOR.decode(raw); if (!isCarV1Header(data)) { throw new TypeError(`expected a car v1 archive`); } const dataEnd = offset; const headerEnd = offset; return (_header = { data, headerStart, headerEnd, dataStart, dataEnd }); }, async roots(): Promise { const header = await this.header(); return header.data.roots; }, async *[Symbol.asyncIterator](): AsyncGenerator { // ensure the header is read first if (_header === undefined) { await this.header(); } while (true) { if (!(await refill())) { return; } const entryStart = offset; const entrySize = await readVarint(); if (entrySize < 36) { throw new RangeError(`invalid car block; length=${entrySize}`); } const cidStart = offset; const cid = await readCid(); const bytesStart = offset; const bytesSize = entrySize - 36; const bytes = await readExact(bytesSize); const cidEnd = bytesStart; const bytesEnd = offset; const entryEnd = bytesEnd; yield { cid, bytes, entryStart, entryEnd, cidStart, cidEnd, bytesStart, bytesEnd, }; } }, }; };