/*! * Copyright (c) 2026-present, Vanilagy and contributors * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ import type { FileHandle } from 'node:fs/promises'; import { assert, binarySearchLessOrEqual, clamp, closedIntervalsOverlap, FilePath, isNumber, isWebKit, MaybePromise, mergeRequestInit, normalizeHeaders, polyfillSymbolDispose, promiseWithResolvers, retriedFetch, toDataView, toUint8Array, wait, EventEmitter, } from './misc'; import * as nodeAlias from './node'; import { InputDisposedError } from './input'; polyfillSymbolDispose(); const node = typeof nodeAlias !== 'undefined' ? nodeAlias // Aliasing it prevents some bundler warnings : undefined!; export type ReadResult = { bytes: Uint8Array; view: DataView; /** The offset of the bytes in the file. */ offset: number; }; export const DEFAULT_MIN_READ_POSITION = 0; export const DEFAULT_MAX_READ_POSITION = Infinity; /** * The events emitted by a {@link Source}, with each key being an event name and its value being the event data. * @group Input sources * @public */ export type SourceEvents = { /** Emitted each time data is retrieved from the source. */ read: { /** The start of the retrieved range, inclusive. */ start: number; /** The end of the retrieved range, exclusive. */ end: number; }; }; let sourceFinalizationRegistry: FinalizationRegistry<() => unknown> | null = null; if (typeof FinalizationRegistry !== 'undefined') { sourceFinalizationRegistry = new FinalizationRegistry((cleanup) => { cleanup(); }); } /** * The source base class, representing a resource from which bytes can be read. * @group Input sources * @public */ export abstract class Source extends EventEmitter { /** @internal */ abstract _getFileSize(): number | null | undefined; /** @internal */ abstract _read( start: number, end: number, minReadPosition: number, maxReadPosition: number, ): MaybePromise; /** @internal */ abstract _dispose(): void; /** @internal */ _disposed = false; /** @internal */ _refCount = 0; /** * Used internally to mark if a source stems from an HLS reading operation. Used to suppress certain warnings. * @internal */ _usedForHls = false; /** * FinalizationRegistry for rogue refs to this source that didn't get freed. It lives on the Source itself so that * in case the Source transitively points back to itself and forms a cycle (for example through a custom * CustomSource callback) that we're not leaking memory. * @internal */ _refFinalizationRegistry: FinalizationRegistry | null = null; /** @internal */ private _sizePromise: Promise | null = null; constructor() { super(); if (typeof FinalizationRegistry !== 'undefined') { this._refFinalizationRegistry = new FinalizationRegistry((source) => { source._decrementRefCount(); }); } } /** * Resolves with the total size of the file in bytes. This function is memoized, meaning only the first call * will retrieve the size. * * Returns null if the source is unsized. */ async getSizeOrNull() { if (this._disposed) { throw new InputDisposedError(); } return this._sizePromise ??= (async () => { let size = this._getFileSize(); if (size !== undefined) { return size; } await this._read(0, 1, DEFAULT_MIN_READ_POSITION, DEFAULT_MAX_READ_POSITION); size = this._getFileSize(); assert(size !== undefined); return size; })(); } /** * Resolves with the total size of the file in bytes. This function is memoized, meaning only the first call * will retrieve the size. * * Throws an error if the source is unsized. */ async getSize() { if (this._disposed) { throw new InputDisposedError(); } const result = await this.getSizeOrNull(); if (result === null) { throw new Error('Cannot determine the size of an unsized source.'); } return result; } /** * Returns a new {@link RangedSource} that maps data onto this source using the given offset and length. If a length * is not provided, the ranged source spans until the end of this source's data. * * Useful for reading files that are embedded within larger files. */ slice(offset: number, length?: number) { if (!Number.isInteger(offset) || offset < 0) { throw new TypeError('offset must be a non-negative integer.'); } if (length !== undefined && (!Number.isInteger(length) || length < 0)) { throw new TypeError('length, when provided, must be a non-negative integer.'); } return new RangedSource(this, offset, length); } /** * Called each time data is retrieved from the source. Will be called with the retrieved range (end exclusive). * * @deprecated Use `source.on('read', ({ start, end }) => ...)` instead. */ onread: ((start: number, end: number) => unknown) | null = null; /** @internal */ _dispatchRead(start: number, end: number) { // eslint-disable-next-line @typescript-eslint/no-deprecated this.onread?.(start, end); this._emit('read', { start, end }); } /** * Creates a new `SourceRef` pointing to this source. You are expected to call `.free()` on said `SourceRef` when * you're done with it. */ ref() { return new SourceRef(this); } /** @internal */ _incrementRefCount() { this._refCount++; } /** @internal */ _decrementRefCount() { this._refCount--; if (this._refCount === 0) { this._dispose(); this._disposed = true; } } } /** * A reference to a {@link Source}, used to manage a source's lifecycle. Creating a `SourceRef` via {@link Source.ref} * increases that source's internal reference count. As long as a source has a non-zero reference count, it is assumed * to still be in use. Once all references are freed via {@link SourceRef.free}, the source gets disposed. * * @group Input sources * @public */ export class SourceRef implements Disposable { /** @internal */ private _source: S | null; /** @internal */ private _freed = false; /** @internal */ constructor(source: S) { if (source._disposed) { throw new Error('Cannot ref a disposed source.'); } source._incrementRefCount(); source._refFinalizationRegistry?.register(this, source, this); this._source = source; } /** The {@link Source} this ref references. Accessing this field throws an error after having freed the ref. */ get source() { if (!this._source) { throw new Error('Can\'t get source; ref has already been freed.'); } return this._source; } /** Whether or not this reference has been freed via {@link SourceRef.free}. */ get freed() { return this._freed; } /** * Frees the ref, decrementing the source's internal reference count. If the source's internal reference count * reaches zero, it gets disposed. To catch bugs, this method throws if the ref is already freed. */ free() { if (this._freed) { throw new Error('Illegal operation: double free on SourceRef.'); } const source = this.source; assert(source._refCount > 0); source._decrementRefCount(); source._refFinalizationRegistry?.unregister(this); this._freed = true; this._source = null; } /** * Calls {@link SourceRef.free}. */ [Symbol.dispose]() { if (!this.freed) { this.free(); } } } /** * A source which can create new sources from file paths. Required for multi-file inputs such as HLS playlists. * @public * @group Input sources */ export abstract class PathedSource extends Source { constructor( /** The path that points to the root file; the entry file of the media. */ public rootPath: FilePath, /** The callback that is called for each requested file; must return a {@link Source} or {@link SourceRef}. */ public requestHandler: (request: SourceRequest) => MaybePromise, ) { if (typeof rootPath !== 'string') { throw new TypeError('rootPath must be a string.'); } if (typeof requestHandler !== 'function') { throw new TypeError('requestHandler must be a function.'); } super(); } /** @internal */ _resolveRequest(request: SourceRequest): MaybePromise { const result = this.requestHandler(request); const handle = (result: Source | SourceRef) => { if (!(result instanceof Source || result instanceof SourceRef)) { throw new TypeError('requestHandler must return or resolve to a Source or SourceRef.'); } const ref = result instanceof Source ? result.ref() : result; ref.source._usedForHls ||= this._usedForHls; return ref; }; if (result instanceof Promise) { return result.then(handle); } else { return handle(result); } } } /** * A request for a {@link Source} at the given path. * @group Input sources * @public */ export type SourceRequest = { /** The requested file path. */ path: FilePath; /** Whether the requested file is the root file. */ isRoot: boolean; }; export const sourceRequestsAreEqual = (a: SourceRequest, b: SourceRequest) => { return a.path === b.path; }; /** * A custom multi-file source where each file is uniquely identified by a {@link FilePath} and can be resolved to * an arbitrary {@link Source}. * * @public * @group Input sources */ export class CustomPathedSource extends PathedSource { /** @internal */ _root: SourceRef | null = null; /** @internal */ _rootRequest: Promise | null = null; /** @internal */ override _read( start: number, end: number, minReadPosition: number, maxReadPosition: number, ): MaybePromise { if (!this._root) { if (!this._rootRequest) { const result = this._resolveRequest({ path: this.rootPath, isRoot: true }); const handle = (result: Source | SourceRef) => { const ref = result instanceof Source ? result.ref() : result; this._root = ref; this._rootRequest = null; return ref; }; if (result instanceof Promise) { this._rootRequest = result.then(handle); } else { handle(result); assert(this._root); } } if (this._rootRequest) { return this._rootRequest.then(ref => ref.source._read(start, end, minReadPosition, maxReadPosition)); } } return this._root!.source._read(start, end, minReadPosition, maxReadPosition); } /** @internal */ override _getFileSize(): number | null | undefined { if (this._root) { return this._root.source._getFileSize(); } return undefined; } /** @internal */ override _dispose(): void { if (this._root) { this._root.free(); } else if (this._rootRequest) { void this._rootRequest .then(ref => ref.free()); } } } /** * A source backed by an ArrayBuffer or ArrayBufferView, with the entire file held in memory. * @group Input sources * @public */ export class BufferSource extends Source { /** @internal */ _bytes: Uint8Array; /** @internal */ _view: DataView; /** @internal */ _onreadCalled = false; /** * Creates a new {@link BufferSource} backed by the specified `ArrayBuffer`, `SharedArrayBuffer`, * or `ArrayBufferView`. */ constructor(buffer: AllowSharedBufferSource) { if ( !(buffer instanceof ArrayBuffer) && !(typeof SharedArrayBuffer !== 'undefined' && buffer instanceof SharedArrayBuffer) && !ArrayBuffer.isView(buffer) ) { throw new TypeError('buffer must be an ArrayBuffer, SharedArrayBuffer, or ArrayBufferView.'); } super(); this._bytes = toUint8Array(buffer); this._view = toDataView(buffer); } /** @internal */ _getFileSize(): number { return this._bytes.byteLength; } /** @internal */ _read(): ReadResult { if (!this._onreadCalled) { // We just say the first read retrieves all bytes from the source (which, I mean, it does) this._dispatchRead(0, this._bytes.byteLength); this._onreadCalled = true; } return { bytes: this._bytes, view: this._view, offset: 0, }; } /** @internal */ _dispose() {} } /** * Options for {@link BlobSource}. * @group Input sources * @public */ export type BlobSourceOptions = { /** The maximum number of bytes the cache is allowed to hold in memory. Defaults to 8 MiB. */ maxCacheSize?: number; /** * Defaults to `true`. When `true`, Mediabunny will acquire a `ReadableStream` reader internally to efficiently read * data from the blob. Since this can lead to errors in some (very) rare cases due to browser bugs, you can set this * field to `false` to try a slower but more stable reading method. */ useStreamReader?: boolean; }; /** * A source backed by a [`Blob`](https://developer.mozilla.org/en-US/docs/Web/API/Blob). Since a * [`File`](https://developer.mozilla.org/en-US/docs/Web/API/File) is also a `Blob`, this is the source to use when * reading files off the disk. * @group Input sources * @public */ export class BlobSource extends Source { /** @internal */ _blob: Blob; /** @internal */ _options: BlobSourceOptions; /** @internal */ _orchestrator: ReadOrchestrator; /** * Creates a new {@link BlobSource} backed by the specified * [`Blob`](https://developer.mozilla.org/en-US/docs/Web/API/Blob). */ constructor(blob: Blob, options: BlobSourceOptions = {}) { if (!(blob instanceof Blob)) { throw new TypeError('blob must be a Blob.'); } if (!options || typeof options !== 'object') { throw new TypeError('options must be an object.'); } if ( options.maxCacheSize !== undefined && (!isNumber(options.maxCacheSize) || options.maxCacheSize < 0) ) { throw new TypeError('options.maxCacheSize, when provided, must be a non-negative number.'); } if (options.useStreamReader !== undefined && typeof options.useStreamReader !== 'boolean') { throw new TypeError('options.useStreamReader, when provided, must be a boolean.'); } super(); this._blob = blob; this._options = options; this._orchestrator = new ReadOrchestrator({ maxCacheSize: options.maxCacheSize ?? (8 * 2 ** 20 /* 8 MiB */), maxWorkerCount: 4, runWorker: this._runWorker.bind(this), prefetchProfile: PREFETCH_PROFILES.fileSystem, }); this._orchestrator.fileSize = blob.size; } /** @internal */ _getFileSize(): number { return this._orchestrator.fileSize!; // Faster than blob.size } /** @internal */ _read( start: number, end: number, minReadPosition: number, maxReadPosition: number, ): MaybePromise { return this._orchestrator.read(start, end, minReadPosition, maxReadPosition); } /** @internal */ _readers = new WeakMap | null>(); /** @internal */ private async _runWorker(worker: ReadWorker) { assert(worker.strictTarget); let reader = this._readers.get(worker); if (reader === undefined) { // https://github.com/Vanilagy/mediabunny/issues/184 // WebKit has critical bugs with blob.stream(): // - WebKitBlobResource error 1 when streaming large files // - Memory buildup and reload loops on iOS (network process crashes) // - ReadableStream stalls under backpressure (especially video) // Affects Safari and all iOS browsers (Chrome, Firefox, etc.). // Use arrayBuffer() fallback for WebKit browsers. if ('stream' in this._blob && !isWebKit() && this._options.useStreamReader !== false) { // Get a reader of the blob starting at the required offset, and then keep it around const slice = this._blob.slice(worker.currentPos); reader = slice.stream().getReader(); } else { // We'll need to use more primitive ways reader = null; } this._readers.set(worker, reader); } while (worker.currentPos < worker.targetPos && !worker.aborted) { if (reader) { const { done, value } = await reader.read(); if (done) { this._orchestrator.onWorkerFinished(worker); throw new Error('Blob reader stopped unexpectedly before all requested data was read.'); } if (worker.aborted) { break; } this._dispatchRead(worker.currentPos, worker.currentPos + value.length); this._orchestrator.supplyWorkerData(worker, value); } else { const data = await this._blob.slice(worker.currentPos, worker.targetPos).arrayBuffer(); if (worker.aborted) { break; } this._dispatchRead(worker.currentPos, worker.currentPos + data.byteLength); this._orchestrator.supplyWorkerData(worker, new Uint8Array(data)); } } this._orchestrator.signalWorkerStoppedRunning(worker); if (worker.aborted) { // MDN: "Calling this method signals a loss of interest in the stream by a consumer." await reader?.cancel(); } } /** @internal */ _dispose() { this._orchestrator.dispose(); } } const URL_SOURCE_MIN_LOAD_AMOUNT = 0.5 * 2 ** 20; // 0.5 MiB const DEFAULT_RETRY_DELAY = ((previousAttempts, error, src) => { // Check if this could be a CORS error. If so, we cannot recover from it and // should not attempt to retry. // CORS errors are intentionally not opaque, so we need to rely on heuristics. const couldBeCorsError = error instanceof Error && ( error.message.includes('Failed to fetch') // Chrome || error.message.includes('Load failed') // Safari || error.message.includes('NetworkError when attempting to fetch resource') // Firefox ) && typeof window !== 'undefined'; // CORS only happens in browser environments if (couldBeCorsError) { let originOfSrc: string | null = null; // Checking if the origin is different, because only then a CORS error could originate try { if (typeof window !== 'undefined' && typeof window.location !== 'undefined') { originOfSrc = new URL(src instanceof Request ? src.url : src, window.location.href).origin; } } catch { // URL parse failed } // If user is offline, it is probably not a CORS error. const isOnline = typeof navigator !== 'undefined' && typeof navigator.onLine === 'boolean' ? navigator.onLine : true; if (isOnline && originOfSrc !== null && originOfSrc !== window.location.origin) { console.warn( `Request will not be retried because a CORS error was suspected due to different origins. You can` + ` modify this behavior by providing your own function for the 'getRetryDelay' option.`, ); return null; } } return Math.min(2 ** (previousAttempts - 2), 16); }) satisfies UrlSourceOptions['getRetryDelay']; const warnedOrigins = new Set(); /** * Options for {@link UrlSource}. * @group Input sources * @public */ export type UrlSourceOptions = { /** * The [`RequestInit`](https://developer.mozilla.org/en-US/docs/Web/API/RequestInit) used by the Fetch API. Can be * used to further control the requests, such as setting custom headers. * * The `signal` field is not available, as Mediabunny controls request cancellation internally. If you want to * cancel ongoing requests, use {@link Input.dispose}. */ requestInit?: Omit; /** * A function that returns the delay (in seconds) before retrying a failed request. The function is called * with the number of previous, unsuccessful attempts, as well as with the error with which the previous request * failed. If the function returns `null`, no more retries will be made. * * By default, it uses an exponential backoff algorithm that never gives up unless * a CORS error is suspected (`fetch()` did reject, `navigator.onLine` is true and origin is different). */ getRetryDelay?: (previousAttempts: number, error: unknown, url: string | URL | Request) => number | null; /** The maximum number of bytes the cache is allowed to hold in memory. Defaults to 64 MiB. */ maxCacheSize?: number; /** The maximum number of parallel requests to use for fetching. Defaults to 2. */ parallelism?: number; /** * A WHATWG-compatible fetch function. You can use this field to polyfill the `fetch` function, add missing * features, or use a custom implementation. */ fetchFn?: typeof fetch; }; /** * A source backed by a URL. This is useful for reading data from the network. Requests will be made using an optimized * reading and prefetching pattern to minimize request count and latency. * @group Input sources * @public */ export class UrlSource extends PathedSource { /** @internal */ _url: string | URL | Request; /** @internal */ _getRetryDelay: (previousAttempts: number, error: unknown, url: string | URL | Request) => number | null; /** @internal */ _options: UrlSourceOptions; /** @internal */ _requestInit: RequestInit; /** @internal */ _offset = 0; /** @internal */ _length: number | null = null; /** @internal */ _orchestrator: ReadOrchestrator; /** * Note that this value being true does NOT mean the file size can't change anymore; it just signals that we have at * least checked if we know the file size or not. * @internal */ _fileSizeDetermined = false; /** * Creates a new {@link UrlSource} backed by the resource at the specified URL. * * When passing a `Request` instance, note that its `signal` will be overridden by Mediabunny; if you want to cancel * ongoing requests, use {@link Input.dispose}. */ constructor( url: string | URL | Request, options: UrlSourceOptions = {}, ) { if ( typeof url !== 'string' && !(url instanceof URL) && !(typeof Request !== 'undefined' && url instanceof Request) ) { throw new TypeError('url must be a string, URL or Request.'); } if (!options || typeof options !== 'object') { throw new TypeError('options must be an object.'); } if (options.requestInit !== undefined && (!options.requestInit || typeof options.requestInit !== 'object')) { throw new TypeError('options.requestInit, when provided, must be an object.'); } if (options.getRetryDelay !== undefined && typeof options.getRetryDelay !== 'function') { throw new TypeError('options.getRetryDelay, when provided, must be a function.'); } if ( options.maxCacheSize !== undefined && (!isNumber(options.maxCacheSize) || options.maxCacheSize < 0) ) { throw new TypeError('options.maxCacheSize, when provided, must be a non-negative number.'); } if (options.parallelism !== undefined && (!Number.isInteger(options.parallelism) || options.parallelism < 1)) { throw new TypeError('options.parallelism, when provided, must be a positive number.'); } if (options.fetchFn !== undefined && typeof options.fetchFn !== 'function') { throw new TypeError('options.fetchFn, when provided, must be a function.'); // Won't bother validating this function beyond this } const urlString = url instanceof Request ? url.url : url instanceof URL ? url.href : url; super(urlString, request => new UrlSource(request.path, this._options)); this._url = url; this._options = options; this._getRetryDelay = options.getRetryDelay ?? DEFAULT_RETRY_DELAY; // A user-supplied Range header is interpreted as a byte offset (and optional length) into the resource. We // pull it out of the request and remember it for subsequent requests. this._requestInit = { ...options.requestInit }; let rangeHeaderValue: string | null = null; if (options.requestInit?.headers) { const headers = { ...normalizeHeaders(options.requestInit.headers) }; const rangeKey = Object.keys(headers).find(key => key.toLowerCase() === 'range'); if (rangeKey !== undefined) { rangeHeaderValue = headers[rangeKey]!; delete headers[rangeKey]; this._requestInit.headers = headers; } } if (url instanceof Request) { const requestRange = url.headers.get('Range'); if (requestRange !== null) { rangeHeaderValue ??= requestRange; // Clone the request so we don't mutate the user's object, then strip the Range header const strippedRequest = new Request(url); strippedRequest.headers.delete('Range'); this._url = strippedRequest; } } if (rangeHeaderValue !== null) { const parsed = parseByteRangeHeader(rangeHeaderValue); if (parsed) { this._offset = parsed.offset; this._length = parsed.length; } } // Most files in the real-world have a single sequential access pattern, but having two in parallel can // also happen const DEFAULT_PARALLELISM = 2; this._orchestrator = new ReadOrchestrator({ maxCacheSize: options.maxCacheSize ?? (64 * 2 ** 20 /* 64 MiB */), maxWorkerCount: options.parallelism ?? DEFAULT_PARALLELISM, runWorker: this._runWorker.bind(this), prefetchProfile: PREFETCH_PROFILES.network, }); } /** @internal */ _getFileSize(): number | null | undefined { if (!this._fileSizeDetermined) { return this._length !== null ? this._length : undefined; } const baseSize = this._orchestrator.fileSize; if (baseSize === null) { return this._length !== null ? this._length : null; } return clamp(baseSize - this._offset, 0, this._length ?? Infinity); } /** @internal */ _read( start: number, end: number, minReadPosition: number, maxReadPosition: number, ): MaybePromise { if (this._length !== null && end > this._length) { return null; } const offset = this._offset; const result = this._orchestrator.read( offset + start, offset + end, Math.max(offset + minReadPosition, offset), offset + Math.min(maxReadPosition, this._length ?? Infinity), ); const processResult = (result: ReadResult | null) => { if (!result) { return null; } result.offset -= this._offset; return result; }; if (result instanceof Promise) { return result.then(processResult); } else { return processResult(result); } } /** @internal */ private async _runWorker(worker: ReadWorker) { // The outer loop is for resuming a request if it dies mid-response while (true) { const abortController = new AbortController(); const response = await retriedFetch( this._options.fetchFn ?? fetch, this._url, mergeRequestInit(this._requestInit, { headers: { // Always sending a range request is a good way to probe if the server supports them Range: `bytes=${worker.currentPos}-`, }, signal: abortController.signal, }), this._getRetryDelay, () => this._disposed, ); if (!response.ok) { // eslint-disable-next-line @typescript-eslint/no-base-to-string throw new Error(`Error fetching ${String(this._url)}: ${response.status} ${response.statusText}`); } outer: if (this._orchestrator.fileSize === null) { // See if we can deduce the file size from the response const contentRange = response.headers.get('Content-Range'); if (contentRange) { const match = /\/(\d+)/.exec(contentRange); if (match) { this._orchestrator.supplyFileSize(Number(match[1])); break outer; } } const contentLength = response.headers.get('Content-Length'); if (contentLength) { // Note: For range requests, this is _technically_ not correct, as the range response could contain // less data than was requested. In practice, it seems most servers don't do this though, and the // Content-Length header actually contains the length until the end of the file. this._orchestrator.supplyFileSize(worker.currentPos + Number(contentLength)); } } this._fileSizeDetermined = true; // Yes, this is correct even if file size is still null if (response.status !== 206) { if (!this._usedForHls) { const url = new URL( this._url instanceof Request ? this._url.url : this._url, typeof window !== 'undefined' ? window.location.href : undefined, ); if ( url.origin !== 'null' // Don't show the warning for M3U8 playlist files, it's irrelevant for those && !(url.pathname.endsWith('.m3u8') || url.pathname.endsWith('.m3u')) ) { if (!warnedOrigins.has(url.origin)) { console.log(this._usedForHls, this._url, url.pathname); console.warn( `HTTP server (origin ${url.origin}) did not respond to a range request with 206 Partial` + ' Content, meaning the entire resource will now be downloaded. To enable efficient' + ' media file streaming across a network, please make sure your server supports' + ' range requests.', ); warnedOrigins.add(url.origin); } } } worker.currentPos = 0; this._orchestrator.options.maxCacheSize = Infinity; // 🤷 if (this._orchestrator.fileSize !== null) { worker.targetPos = this._orchestrator.fileSize; } else { // The server is dumb, doesn't even surface the content length, but we'll work with it. worker.targetPos = Infinity; worker.strictTarget = false; } this._orchestrator.consolidateEverythingIntoOneWorker(worker); } if (!response.body) { throw new Error( 'Missing HTTP response body stream. The used fetch function must provide the response body as a' + ' ReadableStream.', ); } const reader = response.body.getReader(); while (true) { if (worker.currentPos >= worker.targetPos || worker.aborted) { abortController.abort(); this._orchestrator.signalWorkerStoppedRunning(worker); return; } let readResult: ReadableStreamReadResult; try { readResult = await reader.read(); } catch (error) { if (this._disposed) { // No need to try to retry throw error; } const retryDelayInSeconds = this._getRetryDelay(1, error, this._url); if (retryDelayInSeconds !== null) { console.error('Error while reading response stream. Attempting to resume.', error); await wait(1000 * retryDelayInSeconds); break; } else { throw error; } } if (worker.aborted) { continue; // Cleanup happens in next iteration } const { done, value } = readResult; if (done) { if (worker.currentPos >= worker.targetPos) { // All data was delivered, we're good this._orchestrator.onWorkerFinished(worker); return; } if (worker.strictTarget) { // The response stopped early, before the target. This can happen if server decides to cap range // requests arbitrarily, even if the request had an uncapped end. In this case, let's fetch the // rest of the data using a new request. break; } else { // Assume we have simply reached the end of the resource this._orchestrator.onWorkerFinished(worker); return; } } this._dispatchRead(worker.currentPos, worker.currentPos + value.length); this._orchestrator.supplyWorkerData(worker, value); } } // The previous UrlSource had logic for circumventing https://issues.chromium.org/issues/436025873; I haven't // been able to observe this bug with the new UrlSource (maybe because we're using response streaming), so the // logic for that has vanished for now. Leaving a comment here if this becomes relevant again. } /** @internal */ _dispose() { this._orchestrator.dispose(); } } const BYTE_RANGE_REGEX = /^bytes=(\d+)-(\d*)$/; const parseByteRangeHeader = (value: string) => { const match = BYTE_RANGE_REGEX.exec(value.trim()); if (!match) { return null; } const offset = Number(match[1]); const end = match[2] === '' ? null : Number(match[2]); if (end !== null && end < offset) { return null; } return { offset, length: end !== null ? end - offset + 1 : null, }; }; /** * Options for {@link FilePathSource}. * @group Input sources * @public */ export type FilePathSourceOptions = { /** The maximum number of bytes the cache is allowed to hold in memory. Defaults to 8 MiB. */ maxCacheSize?: number; }; /** * A source backed by a path to a file. Intended for server-side usage in Node, Bun, or Deno. * * Make sure to call `.dispose()` on the corresponding {@link Input} when done to explicitly free the internal file * handle acquired by this source. * @group Input sources * @public */ export class FilePathSource extends PathedSource { /** @internal */ _customSource: CustomSource; /** @internal */ _fileHandle: FileHandle | null = null; /** Creates a new {@link FilePathSource} backed by the file at the specified file path. */ constructor(filePath: string, options: FilePathSourceOptions = {}) { if (typeof filePath !== 'string') { throw new TypeError('filePath must be a string.'); } if (!options || typeof options !== 'object') { throw new TypeError('options must be an object.'); } if ( options.maxCacheSize !== undefined && (!isNumber(options.maxCacheSize) || options.maxCacheSize < 0) ) { throw new TypeError('options.maxCacheSize, when provided, must be a non-negative number.'); } if (!node.fs) { throw new Error( 'FilePathSource is only available in server-side environments (Node.js, Bun, Deno).', ); } super(filePath, request => new FilePathSource(request.path, options)); // Let's back this source with a CustomSource, makes the implementation very simple this._customSource = new CustomSource({ getSize: async () => { const fileHandle = await node.fs.open(filePath, 'r'); this._fileHandle = fileHandle; sourceFinalizationRegistry?.register(this, () => { // If it's not closed, Node prints annoying warnings void fileHandle.close(); }, this); const stats = await fileHandle.stat(); return stats.size; }, read: async (start, end) => { assert(this._fileHandle); const buffer = new Uint8Array(end - start); await this._fileHandle.read(buffer, 0, end - start, start); return buffer; }, maxCacheSize: options.maxCacheSize, prefetchProfile: 'fileSystem', }); } /** @internal */ _read( start: number, end: number, minReadPosition: number, maxReadPosition: number, ): MaybePromise { return this._customSource._read(start, end, minReadPosition, maxReadPosition); } /** @internal */ _getFileSize(): number | null | undefined { return this._customSource._getFileSize(); } /** @internal */ _dispose() { this._customSource._dispose(); if (this._fileHandle) { void this._fileHandle.close(); this._fileHandle = null; sourceFinalizationRegistry?.unregister(this); } } } /** * Options for defining a {@link CustomSource}. * @group Input sources * @public */ export type CustomSourceOptions = { /** * Called when the size of the entire file is requested. Must return or resolve to the size in bytes. This function * is guaranteed to be called before `read`. */ getSize: () => MaybePromise; /** * Called when data is requested. Must return or resolve to the bytes from the specified byte range, or a stream * that yields these bytes. * * You are guaranteed that `0 <= start < end < fileSize`. */ read: (start: number, end: number) => MaybePromise>; /** * Called when the {@link Input} driven by this source is disposed. */ dispose?: () => unknown; /** The maximum number of bytes the cache is allowed to hold in memory. Defaults to 8 MiB. */ maxCacheSize?: number; /** * Specifies the prefetch profile that the reader should use with this source. A prefetch profile specifies the * pattern with which bytes outside of the requested range are preloaded to reduce latency for future reads. * * - `'none'` (default): No prefetching; only the data needed in the moment is requested. * - `'fileSystem'`: File system-optimized prefetching: a small amount of data is prefetched bidirectionally, * aligned with page boundaries. * - `'network'`: Network-optimized prefetching, or more generally, prefetching optimized for any high-latency * environment: tries to minimize the amount of read calls and aggressively prefetches data when sequential access * patterns are detected. */ prefetchProfile?: 'none' | 'fileSystem' | 'network'; }; /** * A general-purpose, callback-driven source that can get its data from anywhere. Use this source to implement your own * custom source if the other sources don't cover your case. * @group Input sources * @public */ export class CustomSource extends Source { /** @internal */ _options: CustomSourceOptions; /** @internal */ _orchestrator: ReadOrchestrator; /** Creates a new {@link CustomSource} whose behavior is specified by `options`. */ constructor(options: CustomSourceOptions) { if (!options || typeof options !== 'object') { throw new TypeError('options must be an object.'); } if (typeof options.getSize !== 'function') { throw new TypeError('options.getSize must be a function.'); } if (typeof options.read !== 'function') { throw new TypeError('options.read must be a function.'); } if (options.dispose !== undefined && typeof options.dispose !== 'function') { throw new TypeError('options.dispose, when provided, must be a function.'); } if ( options.maxCacheSize !== undefined && (!isNumber(options.maxCacheSize) || options.maxCacheSize < 0) ) { throw new TypeError('options.maxCacheSize, when provided, must be a non-negative number.'); } if (options.prefetchProfile && !['none', 'fileSystem', 'network'].includes(options.prefetchProfile)) { throw new TypeError( 'options.prefetchProfile, when provided, must be one of \'none\', \'fileSystem\' or \'network\'.', ); } super(); this._options = options; this._orchestrator = new ReadOrchestrator({ maxCacheSize: options.maxCacheSize ?? (8 * 2 ** 20 /* 8 MiB */), maxWorkerCount: 2, // Fixed for now, *should* be fine prefetchProfile: PREFETCH_PROFILES[options.prefetchProfile ?? 'none'], runWorker: this._runWorker.bind(this), }); } /** @internal */ _getFileSize(): number | null | undefined { return this._orchestrator.fileSize ?? undefined; } /** @internal */ _read( start: number, end: number, minReadPosition: number, maxReadPosition: number, ): MaybePromise { if (this._orchestrator.fileSize !== null) { return this._orchestrator.read(start, end, minReadPosition, maxReadPosition); } const result = this._options.getSize(); if (result instanceof Promise) { return result.then((size) => { if (!Number.isInteger(size) || size < 0) { throw new TypeError('options.getSize must return or resolve to a non-negative integer.'); } this._orchestrator.fileSize = size; return this._orchestrator.read(start, end, minReadPosition, maxReadPosition); }); } else { if (!Number.isInteger(result) || result < 0) { throw new TypeError('options.getSize must return or resolve to a non-negative integer.'); } this._orchestrator.fileSize = result; return this._orchestrator.read(start, end, minReadPosition, maxReadPosition); } } /** @internal */ private async _runWorker(worker: ReadWorker) { while (worker.currentPos < worker.targetPos && !worker.aborted) { const originalCurrentPos = worker.currentPos; const originalTargetPos = worker.targetPos; let data = this._options.read(worker.currentPos, originalTargetPos); if (data instanceof Promise) data = await data; if (worker.aborted) { break; } if (data instanceof Uint8Array) { data = toUint8Array(data); // Normalize things like Node.js Buffer to Uint8Array if (data.length !== originalTargetPos - worker.currentPos) { // Yes, we're that strict throw new Error( `options.read returned a Uint8Array with unexpected length: Requested ${ originalTargetPos - worker.currentPos } bytes, but got ${data.length}.`, ); } this._dispatchRead(worker.currentPos, worker.currentPos + data.length); this._orchestrator.supplyWorkerData(worker, data); } else if (data instanceof ReadableStream) { const reader = data.getReader(); while (worker.currentPos < originalTargetPos && !worker.aborted) { const { done, value } = await reader.read(); if (done) { if (worker.currentPos < originalTargetPos) { // Yes, we're *that* strict throw new Error( `ReadableStream returned by options.read ended before supplying enough data.` + ` Requested ${originalTargetPos - originalCurrentPos} bytes, but got ${ worker.currentPos - originalCurrentPos }`, ); } break; } if (!(value instanceof Uint8Array)) { throw new TypeError('ReadableStream returned by options.read must yield Uint8Array chunks.'); } if (worker.aborted) { break; } const data = toUint8Array(value); // Normalize things like Node.js Buffer to Uint8Array this._dispatchRead(worker.currentPos, worker.currentPos + data.length); this._orchestrator.supplyWorkerData(worker, data); } } else { throw new TypeError('options.read must return or resolve to a Uint8Array or a ReadableStream.'); } } this._orchestrator.signalWorkerStoppedRunning(worker); } /** @internal */ _dispose() { this._orchestrator.dispose(); this._options.dispose?.(); } } /** * An alias for {@link CustomSource}. * @deprecated This name is misleading and will be removed in a future release. Please use {@link CustomSource} instead. * * @group Input sources * @public */ export const StreamSource = CustomSource; /** * An alias for {@link CustomSourceOptions}. * @deprecated This name is misleading and will be removed in a future release. Please use * {@link CustomSourceOptions} instead. * * @group Input sources * @public */ export type StreamSourceOptions = CustomSourceOptions; type ReadableStreamSourcePendingSlice = { start: number; end: number; bytes: Uint8Array; resolve: (bytes: ReadResult | null) => void; reject: (error: unknown) => void; }; /** * Options for {@link ReadableStreamSource}. * @group Input sources * @public */ export type ReadableStreamSourceOptions = { /** The maximum number of bytes the cache is allowed to hold in memory. Defaults to 32 MiB. */ maxCacheSize?: number; }; /** * A source backed by a [`ReadableStream`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream) of * `Uint8Array`, representing an append-only byte stream of unknown length. This is the source to use for incrementally * streaming in input files that are still being constructed and whose size we don't yet know, like for example the * output chunks of [MediaRecorder](https://developer.mozilla.org/en-US/docs/Web/API/MediaRecorder). * * This source is *unsized*, meaning calls to `.getSize()` will throw and readers are more limited due to the * lack of random file access. You should only use this source with sequential access patterns, such as reading all * packets from start to end. This source does not work well with random access patterns unless you increase its * max cache size. * * @group Input sources * @public */ export class ReadableStreamSource extends Source { /** @internal */ _stream: ReadableStream; /** @internal */ _reader: ReadableStreamDefaultReader | null = null; /** @internal */ _cache: CacheEntry[] = []; /** @internal */ _maxCacheSize: number; /** @internal */ _pendingSlices: ReadableStreamSourcePendingSlice[] = []; /** @internal */ _currentIndex = 0; /** @internal */ _targetIndex = 0; /** @internal */ _maxRequestedIndex = 0; /** @internal */ _endIndex: number | null = null; /** @internal */ _pulling = false; /** Creates a new {@link ReadableStreamSource} backed by the specified `ReadableStream`. */ constructor(stream: ReadableStream, options: ReadableStreamSourceOptions = {}) { if (!(stream instanceof ReadableStream)) { throw new TypeError('stream must be a ReadableStream.'); } if (!options || typeof options !== 'object') { throw new TypeError('options must be an object.'); } if ( options.maxCacheSize !== undefined && (!isNumber(options.maxCacheSize) || options.maxCacheSize < 0) ) { throw new TypeError('options.maxCacheSize, when provided, must be a non-negative number.'); } super(); this._stream = stream; this._maxCacheSize = options.maxCacheSize ?? (32 * 2 ** 20 /* 32 MiB */); } /** @internal */ _getFileSize(): number | null { return this._endIndex; // Starts out as null, meaning this source is unsized } /** @internal */ _read(start: number, end: number): MaybePromise { if (this._endIndex !== null && end > this._endIndex) { return null; } this._maxRequestedIndex = Math.max(this._maxRequestedIndex, end); const cacheStartIndex = binarySearchLessOrEqual(this._cache, start, x => x.start); const cacheStartEntry = cacheStartIndex !== -1 ? this._cache[cacheStartIndex]! : null; if (cacheStartEntry && cacheStartEntry.start <= start && end <= cacheStartEntry.end) { // The request can be satisfied with a single cache entry return { bytes: cacheStartEntry.bytes, view: cacheStartEntry.view, offset: cacheStartEntry.start, }; } let lastEnd = start; const bytes = new Uint8Array(end - start); if (cacheStartIndex !== -1) { // Walk over the cache to see if we can satisfy the request using multiple cache entries for (let i = cacheStartIndex; i < this._cache.length; i++) { const cacheEntry = this._cache[i]!; if (cacheEntry.start >= end) { break; } const cappedStart = Math.max(start, cacheEntry.start); if (cappedStart > lastEnd) { // We're too far behind this._throwDueToCacheMiss(); } const cappedEnd = Math.min(end, cacheEntry.end); if (cappedStart < cappedEnd) { bytes.set( cacheEntry.bytes.subarray(cappedStart - cacheEntry.start, cappedEnd - cacheEntry.start), cappedStart - start, ); lastEnd = cappedEnd; } } } if (lastEnd === end) { return { bytes, view: toDataView(bytes), offset: start, }; } // We need to pull more data if (this._currentIndex > lastEnd) { // We're too far behind this._throwDueToCacheMiss(); } const { promise, resolve, reject } = promiseWithResolvers(); this._pendingSlices.push({ start, end, bytes, resolve, reject, }); this._targetIndex = Math.max(this._targetIndex, end); // Start pulling from the stream if we're not already doing it if (!this._pulling) { this._pulling = true; void this._pull() .catch((error) => { this._pulling = false; if (this._pendingSlices.length > 0) { this._pendingSlices.forEach(x => x.reject(error)); // Make sure to propagate any errors this._pendingSlices.length = 0; } else { throw error; // So it doesn't get swallowed } }); } return promise; } /** @internal */ _throwDueToCacheMiss() { throw new Error( 'Read is before the cached region. With ReadableStreamSource, you must access the data more' + ' sequentially or increase the size of its cache.', ); } /** @internal */ async _pull() { this._reader ??= this._stream.getReader(); // This is the loop that keeps pulling data from the stream until a target index is reached, filling requests // in the process while (this._currentIndex < this._targetIndex && !this._disposed) { const { done, value } = await this._reader.read(); if (done) { for (const pendingSlice of this._pendingSlices) { pendingSlice.resolve(null); } this._pendingSlices.length = 0; this._endIndex = this._currentIndex; // We know how long the file is now! break; } const startIndex = this._currentIndex; const endIndex = this._currentIndex + value.byteLength; this._dispatchRead(startIndex, endIndex); // Fill the pending slices with the data for (let i = 0; i < this._pendingSlices.length; i++) { const pendingSlice = this._pendingSlices[i]!; const cappedStart = Math.max(startIndex, pendingSlice.start); const cappedEnd = Math.min(endIndex, pendingSlice.end); if (cappedStart < cappedEnd) { pendingSlice.bytes.set( value.subarray(cappedStart - startIndex, cappedEnd - startIndex), cappedStart - pendingSlice.start, ); if (cappedEnd === pendingSlice.end) { // Pending slice fully filled pendingSlice.resolve({ bytes: pendingSlice.bytes, view: toDataView(pendingSlice.bytes), offset: pendingSlice.start, }); this._pendingSlices.splice(i, 1); i--; } } } this._cache.push({ start: startIndex, end: endIndex, bytes: value, view: toDataView(value), age: 0, // Unused }); // Do cache eviction, based on the distance from the last-requested index. It's important that we do it like // this and not based on where the reader is at, because if the reader is fast, we'll unnecessarily evict // data that we still might need. while (this._cache.length > 0) { const firstEntry = this._cache[0]!; const distance = this._maxRequestedIndex - firstEntry.end; if (distance <= this._maxCacheSize) { break; } this._cache.shift(); } this._currentIndex += value.byteLength; } this._pulling = false; } /** @internal */ _dispose() { this._pendingSlices.length = 0; this._cache.length = 0; void this._reader?.cancel(); } } type PrefetchProfile = (start: number, end: number, workers: ReadWorker[]) => { start: number; end: number; }; const PREFETCH_PROFILES = { none: (start, end) => ({ start, end }), fileSystem: (start, end) => { const padding = 2 ** 16; start = Math.floor((start - padding) / padding) * padding; end = Math.ceil((end + padding) / padding) * padding; return { start, end }; }, network: (start, end, workers) => { // Add a slight bit of start padding because backwards reading is painful const paddingStart = 2 ** 16; start = Math.max(0, Math.floor((start - paddingStart) / paddingStart) * paddingStart); // Remote resources have extreme latency (relatively speaking), so the benefit from intelligent // prefetching is great. The network prefetch strategy is as follows: When we notice // successive reads to a worker's read region, we prefetch more data at the end of that region, // growing exponentially (up to a cap). This performs well for real-world use cases: Either we read a // small part of the file once and then never need it again, in which case the requested about of data // is small. Or, we're repeatedly doing a sequential access pattern (common in media files), in which // case we can become more and more confident to prefetch more and more data. for (const worker of workers) { const maxExtensionAmount = 8 * 2 ** 20; // 8 MiB // When the read region cross the threshold point, we trigger a prefetch. This point is typically // in the middle of the worker's read region, or a fixed offset from the end if the region has grown // really large. const thresholdPoint = Math.max( (worker.startPos + worker.targetPos) / 2, worker.targetPos - maxExtensionAmount, ); if (closedIntervalsOverlap( start, end, thresholdPoint, worker.targetPos, )) { const size = worker.targetPos - worker.startPos; // If we extend by maxExtensionAmount const a = Math.ceil((size + 1) / maxExtensionAmount) * maxExtensionAmount; // If we extend to the next power of 2 const b = 2 ** Math.ceil(Math.log2(size + 1)); const extent = Math.min(b, a); end = Math.max(end, worker.startPos + extent); } } end = Math.max(end, start + URL_SOURCE_MIN_LOAD_AMOUNT); return { start, end, }; }, } satisfies Record; type PendingSlice = { start: number; bytes: Uint8Array; holes: Hole[]; resolve: (bytes: Uint8Array | null) => void; reject: (error: unknown) => void; }; type Hole = { start: number; end: number; }; type CacheEntry = { start: number; end: number; bytes: Uint8Array; view: DataView; age: number; }; type ReadWorker = { startPos: number; currentPos: number; targetPos: number; /** The target is considered _strict_ when it is an error for the worker to terminate before reaching the target. */ strictTarget: boolean; running: boolean; aborted: boolean; pendingSlices: PendingSlice[]; age: number; }; /** * Godclass for orchestrating complex, cached read operations. The reading model is as follows: Any reading task is * delegated to a *worker*, which is a sequential reader positioned somewhere along the file. All workers run in * parallel and can be stopped and resumed in their forward movement. When read requests come in, this orchestrator will * first try to satisfy the request with only the cached data. If this isn't possible, workers are spun up for all * missing parts (or existing workers are repurposed), and these workers will then fill the holes in the data as they * march along the file. */ class ReadOrchestrator { fileSize: number | null = null; nextAge = 0; // Used for multiple things workers: ReadWorker[] = []; cache: CacheEntry[] = []; currentCacheSize = 0; disposed = false; queuedReads: { hole: Hole; strictTarget: boolean; pendingSlices: PendingSlice[]; age: number; }[] = []; constructor(public options: { maxCacheSize: number; runWorker: (worker: ReadWorker) => Promise; prefetchProfile: PrefetchProfile; maxWorkerCount: number; }) {} read( innerStart: number, innerEnd: number, minReadPosition: number, maxReadPosition: number, ): MaybePromise { assert(!this.disposed); const prefetchRange = this.options.prefetchProfile(innerStart, innerEnd, this.workers); const outerStart = Math.max(prefetchRange.start, minReadPosition); const outerEnd = Math.min(prefetchRange.end, this.fileSize ?? Infinity, maxReadPosition); assert(outerStart <= innerStart && innerEnd <= outerEnd); let result: MaybePromise | null = null; const innerCacheStartIndex = binarySearchLessOrEqual(this.cache, innerStart, x => x.start); const innerStartEntry = innerCacheStartIndex !== -1 ? this.cache[innerCacheStartIndex] : null; // See if the read request can be satisfied by a single cache entry if (innerStartEntry && innerStartEntry.start <= innerStart && innerEnd <= innerStartEntry.end) { innerStartEntry.age = this.nextAge++; result = { bytes: innerStartEntry.bytes, view: innerStartEntry.view, offset: innerStartEntry.start, }; // Can't return yet though, still need to check if the prefetch range might lie outside the cached area } const outerCacheStartIndex = binarySearchLessOrEqual(this.cache, outerStart, x => x.start); const bytes = result ? null : new Uint8Array(innerEnd - innerStart); let contiguousBytesWriteEnd = 0; // Used to track if the cache is able to completely cover the bytes let lastEnd = outerStart; // The "holes" in the cache (the parts we need to load) const outerHoles: Hole[] = []; // Loop over the cache and build up the list of holes if (outerCacheStartIndex !== -1) { for (let i = outerCacheStartIndex; i < this.cache.length; i++) { const entry = this.cache[i]!; if (entry.start >= outerEnd) { break; } if (entry.end <= outerStart) { continue; } const cappedOuterStart = Math.max(outerStart, entry.start); const cappedOuterEnd = Math.min(outerEnd, entry.end); assert(cappedOuterStart <= cappedOuterEnd); if (lastEnd < cappedOuterStart) { outerHoles.push({ start: lastEnd, end: cappedOuterStart }); } lastEnd = cappedOuterEnd; if (bytes) { const cappedInnerStart = Math.max(innerStart, entry.start); const cappedInnerEnd = Math.min(innerEnd, entry.end); if (cappedInnerStart < cappedInnerEnd) { const relativeOffset = cappedInnerStart - innerStart; // Fill the relevant section of the bytes with the cached data bytes.set( entry.bytes.subarray(cappedInnerStart - entry.start, cappedInnerEnd - entry.start), relativeOffset, ); if (relativeOffset === contiguousBytesWriteEnd) { contiguousBytesWriteEnd = cappedInnerEnd - innerStart; } } } entry.age = this.nextAge++; } if (lastEnd < outerEnd) { outerHoles.push({ start: lastEnd, end: outerEnd }); } } else { outerHoles.push({ start: outerStart, end: outerEnd }); } if (bytes && contiguousBytesWriteEnd >= bytes.length) { // Multiple cache entries were able to completely cover the requested bytes! result = { bytes, view: toDataView(bytes), offset: innerStart, }; } if (outerHoles.length === 0) { assert(result); return result; } // We need to read more data, so now we're in async land const { promise, resolve, reject } = promiseWithResolvers(); const innerHoles: typeof outerHoles = []; for (const outerHole of outerHoles) { const cappedStart = Math.max(innerStart, outerHole.start); const cappedEnd = Math.min(innerEnd, outerHole.end); if (cappedStart === outerHole.start && cappedEnd === outerHole.end) { innerHoles.push(outerHole); // Can reuse without allocating a new object } else if (cappedStart < cappedEnd) { innerHoles.push({ start: cappedStart, end: cappedEnd }); } } const pendingSlice: PendingSlice | null = bytes && { start: innerStart, bytes, holes: innerHoles, resolve, reject, }; // Fire off workers to take care of patching the holes outer: for (const outerHole of outerHoles) { for (const worker of this.workers) { const addedToWorker = this.checkHoleAgainstWorker( worker, outerHole, pendingSlice ? [pendingSlice] : [], ); if (addedToWorker) { this.checkQueuedReadsAgainstWorker(worker); continue outer; } } // We need to spawn a new worker const strictTarget = outerHole.end < outerEnd || this.fileSize !== null; const newWorker = this.createWorker(outerHole.start, outerHole.end, strictTarget); if (newWorker) { if (pendingSlice) { newWorker.pendingSlices = [pendingSlice]; } this.runWorker(newWorker); } else { // Max worker count has been reached, let's queue a read for later let index = binarySearchLessOrEqual(this.queuedReads, outerHole.start, x => x.hole.start); let entry = index !== -1 ? this.queuedReads[index]! : null; if (entry && outerHole.start <= entry.hole.end) { entry.hole.end = Math.max(entry.hole.end, outerHole.end); entry.strictTarget &&= strictTarget; if (pendingSlice) { entry.pendingSlices.push(pendingSlice); } } else { index++; entry = { hole: { // Clone the hole because it might be mutated later start: outerHole.start, end: outerHole.end, }, strictTarget, pendingSlices: pendingSlice ? [pendingSlice] : [], age: this.nextAge++, }; this.queuedReads.splice(index, 0, entry); } // Merge with any subsequent entries that overlap while (index + 1 < this.queuedReads.length) { const nextEntry = this.queuedReads[index + 1]!; if (nextEntry.hole.start > entry.hole.end) { break; } entry.hole.end = Math.max(entry.hole.end, nextEntry.hole.end); entry.pendingSlices.push(...nextEntry.pendingSlices); entry.strictTarget &&= nextEntry.strictTarget; entry.age = Math.min(entry.age, nextEntry.age); this.queuedReads.splice(index + 1, 1); } } } if (!result) { assert(bytes); result = promise.then(bytes => bytes && ({ bytes, view: toDataView(bytes), offset: innerStart, } satisfies ReadResult)); } else { // The requested region was satisfied by the cache, but the entire prefetch region was not } return result; } checkHoleAgainstWorker(worker: ReadWorker, hole: Hole, pendingSlices: PendingSlice[]) { // A small tolerance in the case that the requested region is *just* after the target position of an // existing worker. In that case, it's probably more efficient to repurpose that worker than to spawn // another one so close to it const gapTolerance = 2 ** 17; // This check also implies worker.currentPos <= hole.start, a critical condition if (closedIntervalsOverlap( hole.start - gapTolerance, hole.start, worker.currentPos, worker.targetPos, )) { worker.targetPos = Math.max(worker.targetPos, hole.end); // Update the worker's target position for (let i = 0; i < pendingSlices.length; i++) { const pendingSlice = pendingSlices[i]!; if (!worker.pendingSlices.includes(pendingSlice)) { worker.pendingSlices.push(pendingSlice); } } if (!worker.running) { // Kick it off if it's idle this.runWorker(worker); } return true; } return false; } checkQueuedReadsAgainstWorker(worker: ReadWorker) { let wasTrueOnce = false; for (let i = 0; i < this.queuedReads.length; i++) { const queuedRead = this.queuedReads[i]!; const result = this.checkHoleAgainstWorker(worker, queuedRead.hole, queuedRead.pendingSlices); if (result) { this.queuedReads.splice(i, 1); i--; wasTrueOnce = true; } else if (wasTrueOnce) { // We can stop since the holes are sorted break; } } } createWorker(startPos: number, targetPos: number, strictTarget: boolean) { if (this.workers.length >= this.options.maxWorkerCount) { let oldestWorker: ReadWorker | null = null; let oldestIndex: number | null = null; for (let i = 0; i < this.workers.length; i++) { const worker = this.workers[i]!; if ( !worker.running && worker.pendingSlices.length === 0 && (!oldestWorker || worker.age < oldestWorker.age) ) { oldestIndex = i; oldestWorker = worker; } } if (oldestWorker) { // LRU eviction assert(oldestIndex !== null); assert(oldestWorker.pendingSlices.length === 0); this.workers.splice(oldestIndex, 1); } else { return null; // All workers are still running, we can't create a new one } } const worker: ReadWorker = { startPos, currentPos: startPos, targetPos, strictTarget, running: false, // Due to async shenanigans, it can happen that workers are started after disposal. In this case, instead of // simply not creating the worker, we allow it to run but immediately label it as aborted, so it can then // shut itself down. aborted: this.disposed, pendingSlices: [], age: this.nextAge++, }; this.workers.push(worker); return worker; } runWorker(worker: ReadWorker) { assert(!worker.running); assert(worker.currentPos < worker.targetPos); worker.running = true; worker.age = this.nextAge++; void this.options.runWorker(worker) .catch((error) => { worker.running = false; if (worker.pendingSlices.length > 0) { worker.pendingSlices.forEach(x => x.reject(error)); // Make sure to propagate any errors worker.pendingSlices.length = 0; } else { throw error; // So it doesn't get swallowed } }) .finally(() => { if (worker.running) { // Rare, but can happen with multiple concurrent reads. In this case, don't do anything. return; } if (this.queuedReads.length > 0) { let oldestIndex = 0; for (let i = 1; i < this.queuedReads.length; i++) { const queuedRead = this.queuedReads[i]!; if (queuedRead.age < this.queuedReads[oldestIndex]!.age) { oldestIndex = i; } } const queuedRead = this.queuedReads[oldestIndex]!; this.queuedReads.splice(oldestIndex, 1); const newWorker = this.createWorker( queuedRead.hole.start, queuedRead.hole.end, queuedRead.strictTarget, ); assert(newWorker); // We just freed up a worker, so this should never fail newWorker.pendingSlices = queuedRead.pendingSlices; this.runWorker(newWorker); } }); } consolidateEverythingIntoOneWorker(worker: ReadWorker) { // Here we merge everything into one "megaworker" that spans the entire file. We assume the passed-in worker // is already configured to be a megaworker. const uniqueSlices = new Set(worker.pendingSlices); for (let i = 0; i < this.workers.length; i++) { const otherWorker = this.workers[i]!; if (otherWorker === worker) { continue; } for (const slice of otherWorker.pendingSlices) { uniqueSlices.add(slice); } otherWorker.aborted = true; otherWorker.pendingSlices.length = 0; this.workers.splice(i, 1); i--; } for (let i = 0; i < this.queuedReads.length; i++) { const queuedRead = this.queuedReads[i]!; for (const slice of queuedRead.pendingSlices) { uniqueSlices.add(slice); } } worker.pendingSlices = [...uniqueSlices]; this.queuedReads.length = 0; } /** Called by a worker when it has read some data. */ supplyWorkerData(worker: ReadWorker, bytes: Uint8Array) { assert(!worker.aborted); const start = worker.currentPos; const end = start + bytes.length; this.insertIntoCache({ start, end, bytes, view: toDataView(bytes), age: this.nextAge++, }); worker.currentPos += bytes.length; if (worker.currentPos > worker.targetPos) { // In case it overshoots worker.targetPos = worker.currentPos; this.checkQueuedReadsAgainstWorker(worker); } // Now, let's see if we can use the read bytes to fill any pending slice for (let i = 0; i < worker.pendingSlices.length; i++) { const pendingSlice = worker.pendingSlices[i]!; const clampedStart = Math.max(start, pendingSlice.start); const clampedEnd = Math.min(end, pendingSlice.start + pendingSlice.bytes.length); if (clampedStart < clampedEnd) { pendingSlice.bytes.set( bytes.subarray(clampedStart - start, clampedEnd - start), clampedStart - pendingSlice.start, ); } for (let j = 0; j < pendingSlice.holes.length; j++) { // The hole is intentionally not modified here if the read section starts somewhere in the middle of // the hole. We don't need to do "hole splitting", since the workers are spawned *by* the holes, // meaning there's always a worker which will consume the hole left to right. const hole = pendingSlice.holes[j]!; if (start <= hole.start && end > hole.start) { hole.start = end; } if (hole.end <= hole.start) { pendingSlice.holes.splice(j, 1); j--; } } if (pendingSlice.holes.length === 0) { // The slice has been fulfilled, everything has been read. Let's resolve the promise pendingSlice.resolve(pendingSlice.bytes); worker.pendingSlices.splice(i, 1); i--; } } // Remove other idle workers if we "ate" into their territory for (let i = 0; i < this.workers.length; i++) { const otherWorker = this.workers[i]!; if (worker === otherWorker || otherWorker.running) { continue; } if (closedIntervalsOverlap( start, end, otherWorker.currentPos, otherWorker.targetPos, // These should typically be equal when the worker's idle )) { this.workers.splice(i, 1); i--; } } } supplyFileSize(size: number) { assert(this.fileSize === null); this.fileSize = size; // Trim the workers with this new information for (const worker of this.workers) { worker.targetPos = Math.min(worker.targetPos, size); worker.strictTarget = true; for (let i = 0; i < worker.pendingSlices.length; i++) { const pendingSlice = worker.pendingSlices[i]!; for (const hole of pendingSlice.holes) { if (hole.end > size) { // Can't satisfy this slice anymore pendingSlice.resolve(null); worker.pendingSlices.splice(i, 1); i--; break; } } } } // Trim the queued reads as well for (let i = 0; i < this.queuedReads.length; i++) { const queuedRead = this.queuedReads[i]!; if (queuedRead.hole.start >= size) { // Entirely out of bounds for (const slice of queuedRead.pendingSlices) slice.resolve(null); this.queuedReads.splice(i, 1); i--; } else if (queuedRead.hole.end > size) { // Partially out of bounds queuedRead.hole.end = size; queuedRead.strictTarget = true; for (let j = 0; j < queuedRead.pendingSlices.length; j++) { const slice = queuedRead.pendingSlices[j]!; // If the slice itself is out of bounds, resolve it if (slice.start >= size) { slice.resolve(null); queuedRead.pendingSlices.splice(j, 1); j--; } } } } } signalWorkerStoppedRunning(worker: ReadWorker) { worker.running = false; // When a worker stops running, that means it has hit its targetPos. It might still have pendingSlices assigned, // but this is because those pending slices cover data that other workers are assigned to fill. Since targetPos // has been reached, we can confidently say that this worker has completed its share of work on the pending // slices and must no longer care about them. worker.pendingSlices.length = 0; } /** Called when a worker reaches the end of the underlying data and must be cleaned up. */ onWorkerFinished(worker: ReadWorker) { const index = this.workers.indexOf(worker); assert(index !== -1); worker.running = false; this.workers.splice(index, 1); if (this.fileSize === null) { // We can now deduce the file size! this.supplyFileSize(worker.currentPos); } for (const pendingSlice of worker.pendingSlices) { pendingSlice.resolve(null); } } insertIntoCache(entry: CacheEntry) { if (this.options.maxCacheSize === 0) { return; // No caching } let insertionIndex = binarySearchLessOrEqual(this.cache, entry.start, x => x.start) + 1; if (insertionIndex > 0) { const previous = this.cache[insertionIndex - 1]!; if (previous.end >= entry.end) { // Previous entry swallows the one to be inserted; we don't need to do anything return; } if (previous.end > entry.start) { // Partial overlap with the previous entry, let's join const joined = new Uint8Array(entry.end - previous.start); joined.set(previous.bytes, 0); joined.set(entry.bytes, entry.start - previous.start); this.currentCacheSize += entry.end - previous.end; previous.bytes = joined; previous.view = toDataView(joined); previous.end = entry.end; // Do the rest of the logic with the previous entry instead insertionIndex--; entry = previous; } else { this.cache.splice(insertionIndex, 0, entry); this.currentCacheSize += entry.bytes.length; } } else { this.cache.splice(insertionIndex, 0, entry); this.currentCacheSize += entry.bytes.length; } for (let i = insertionIndex + 1; i < this.cache.length; i++) { const next = this.cache[i]!; if (entry.end <= next.start) { // Even if they touch, we don't wanna merge them, no need break; } if (entry.end >= next.end) { // The inserted entry completely swallows the next entry this.cache.splice(i, 1); this.currentCacheSize -= next.bytes.length; i--; continue; } // Partial overlap, let's join const joined = new Uint8Array(next.end - entry.start); joined.set(entry.bytes, 0); joined.set(next.bytes, next.start - entry.start); this.currentCacheSize -= entry.end - next.start; // Subtract the overlap entry.bytes = joined; entry.view = toDataView(joined); entry.end = next.end; this.cache.splice(i, 1); break; // After the join case, we're done: the next entry cannot possibly overlap with the inserted one. } // LRU eviction of cache entries while (this.currentCacheSize > this.options.maxCacheSize) { let oldestIndex = 0; let oldestEntry = this.cache[0]!; for (let i = 1; i < this.cache.length; i++) { const entry = this.cache[i]!; if (entry.age < oldestEntry.age) { oldestIndex = i; oldestEntry = entry; } } if (this.currentCacheSize - oldestEntry.bytes.length <= this.options.maxCacheSize) { // Don't evict if it would shrink the cache below the max size break; } this.cache.splice(oldestIndex, 1); this.currentCacheSize -= oldestEntry.bytes.length; } } dispose() { for (const worker of this.workers) { worker.aborted = true; } this.workers.length = 0; this.cache.length = 0; this.disposed = true; } } /** * A dummy source from which no data can be read. Can be used in conjunction with input formats that get their data * from another source. */ export class NullSource extends Source { override _getFileSize(): number | null { return null; } override _read(): MaybePromise { return null; } override _dispose(): void { // Do nothing } } /** * A source that covers a range (offset + length) of another source. Useful for reading files that are embedded within * larger files. * * @group Input sources * @public */ export class RangedSource extends Source { /** @internal */ _baseSource: Source; /** @internal */ _ref: SourceRef | null = null; /** @internal */ _offset: number; /** @internal */ _length: number | null; /** @internal */ constructor(baseSource: Source, offset: number, length?: number) { super(); if (baseSource._disposed) { throw new Error('Cannot create a slice of a disposed source.'); } this._baseSource = baseSource; this._offset = offset; this._length = length ?? null; } /** @internal */ override _getFileSize(): number | null | undefined { const baseSize = this._baseSource._getFileSize(); if (baseSize === undefined) { return this._length !== null ? this._length : undefined; } if (baseSize === null) { if (this._length !== null) { return this._length; } else { return null; } } return clamp(baseSize - this._offset, 0, this._length ?? Infinity); } /** @internal */ override _read( start: number, end: number, minReadPosition: number, maxReadPosition: number, ): MaybePromise { if (this._length !== null && end > this._length) { return null; } const result = this._baseSource._read( this._offset + start, this._offset + end, this._offset + minReadPosition, this._offset + maxReadPosition, ); const processResult = (result: ReadResult | null) => { if (!result) { return null; } result.offset -= this._offset; return result; }; if (result instanceof Promise) { return result.then(processResult); } else { return processResult(result); } } /** @internal */ override _dispose(): void { this._ref?.free(); } override ref() { this._ref ??= this._baseSource.ref(); return super.ref(); } }