// SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 import type { DataStream_Chunk } from '@livekit/rtc-ffi-bindings'; import { log } from '../log.js'; import { bigIntToNumber } from '../utils.js'; import type { BaseStreamInfo, ByteStreamInfo, TextStreamInfo } from './types.js'; abstract class BaseStreamReader { protected reader: ReadableStream; protected totalByteSize?: number; protected _info: T; protected bytesReceived: number; get info() { return this._info; } constructor(info: T, stream: ReadableStream, totalByteSize?: number) { this.reader = stream; this.totalByteSize = totalByteSize; this._info = info; this.bytesReceived = 0; } protected abstract handleChunkReceived(chunk: DataStream_Chunk): void; onProgress?: (progress: number | undefined) => void; abstract readAll(): Promise>; } /** * A class to read chunks from a ReadableStream and provide them in a structured format. */ export class ByteStreamReader extends BaseStreamReader { protected handleChunkReceived(chunk: DataStream_Chunk) { this.bytesReceived += chunk.content!.byteLength; const currentProgress = this.totalByteSize ? this.bytesReceived / this.totalByteSize : undefined; this.onProgress?.(currentProgress); } [Symbol.asyncIterator]() { const reader = this.reader.getReader(); return { next: async (): Promise> => { try { const { done, value } = await reader.read(); if (done) { // Release the lock when the stream is exhausted so the // underlying ReadableStream can be garbage-collected. reader.releaseLock(); return { done: true, value: undefined as unknown }; } else { this.handleChunkReceived(value); return { done: false, value: value.content! }; } } catch (error: unknown) { // Release the lock on error so it doesn't stay held when the // consumer never calls return() (e.g. breaking out of for-await). reader.releaseLock(); log.error('error processing stream update: %s', error); return { done: true, value: undefined as unknown }; } }, return(): IteratorResult { reader.releaseLock(); return { done: true, value: undefined }; }, }; } async readAll(): Promise> { const chunks: Set = new Set(); for await (const chunk of this) { chunks.add(chunk); } return Array.from(chunks); } } /** * A class to read chunks from a ReadableStream and provide them in a structured format. */ export class TextStreamReader extends BaseStreamReader { private receivedChunks: Map; /** * A TextStreamReader instance can be used as an AsyncIterator that returns the entire string * that has been received up to the current point in time. */ constructor( info: TextStreamInfo, stream: ReadableStream, totalChunkCount?: number, ) { super(info, stream, totalChunkCount); this.receivedChunks = new Map(); } protected handleChunkReceived(chunk: DataStream_Chunk) { const index = bigIntToNumber(chunk.chunkIndex!); const previousChunkAtIndex = this.receivedChunks.get(index!); if (previousChunkAtIndex && previousChunkAtIndex.version! > chunk.version!) { // we have a newer version already, dropping the old one return; } this.receivedChunks.set(index, chunk); const currentProgress = this.totalByteSize ? this.receivedChunks.size / this.totalByteSize : undefined; this.onProgress?.(currentProgress); } /** * Async iterator implementation to allow usage of `for await...of` syntax. * Yields structured chunks from the stream. * */ [Symbol.asyncIterator]() { const reader = this.reader.getReader(); const decoder = new TextDecoder(); const receivedChunks = this.receivedChunks; return { next: async (): Promise> => { try { const { done, value } = await reader.read(); if (done) { // Release the lock when the stream is exhausted so the // underlying ReadableStream can be garbage-collected. reader.releaseLock(); // Clear received chunks so the buffered data can be GC'd. receivedChunks.clear(); return { done: true, value: undefined }; } else { this.handleChunkReceived(value); return { done: false, value: decoder.decode(value.content!), }; } } catch (error: unknown) { // Release the lock on error so it doesn't stay held when the // consumer never calls return() (e.g. breaking out of for-await). reader.releaseLock(); receivedChunks.clear(); log.error('error processing stream update: %s', error); return { done: true, value: undefined }; } }, return(): IteratorResult { reader.releaseLock(); // Clear received chunks so the buffered data can be GC'd. receivedChunks.clear(); return { done: true, value: undefined }; }, }; } async readAll(): Promise { let finalString: string = ''; for await (const chunk of this) { finalString += chunk; } return finalString; } }