{"version":3,"sources":["../../src/data_streams/stream_reader.ts"],"sourcesContent":["// SPDX-FileCopyrightText: 2024 LiveKit, Inc.\n//\n// SPDX-License-Identifier: Apache-2.0\nimport type { DataStream_Chunk } from '@livekit/rtc-ffi-bindings';\nimport { log } from '../log.js';\nimport { bigIntToNumber } from '../utils.js';\nimport type { BaseStreamInfo, ByteStreamInfo, TextStreamInfo } from './types.js';\n\nabstract class BaseStreamReader<T extends BaseStreamInfo> {\n  protected reader: ReadableStream<DataStream_Chunk>;\n\n  protected totalByteSize?: number;\n\n  protected _info: T;\n\n  protected bytesReceived: number;\n\n  get info() {\n    return this._info;\n  }\n\n  constructor(info: T, stream: ReadableStream<DataStream_Chunk>, totalByteSize?: number) {\n    this.reader = stream;\n    this.totalByteSize = totalByteSize;\n    this._info = info;\n    this.bytesReceived = 0;\n  }\n\n  protected abstract handleChunkReceived(chunk: DataStream_Chunk): void;\n\n  onProgress?: (progress: number | undefined) => void;\n\n  abstract readAll(): Promise<string | Array<Uint8Array>>;\n}\n\n/**\n * A class to read chunks from a ReadableStream and provide them in a structured format.\n */\nexport class ByteStreamReader extends BaseStreamReader<ByteStreamInfo> {\n  protected handleChunkReceived(chunk: DataStream_Chunk) {\n    this.bytesReceived += chunk.content!.byteLength;\n    const currentProgress = this.totalByteSize\n      ? this.bytesReceived / this.totalByteSize\n      : undefined;\n    this.onProgress?.(currentProgress);\n  }\n\n  [Symbol.asyncIterator]() {\n    const reader = this.reader.getReader();\n\n    return {\n      next: async (): Promise<IteratorResult<Uint8Array>> => {\n        try {\n          const { done, value } = await reader.read();\n          if (done) {\n            // Release the lock when the stream is exhausted so the\n            // underlying ReadableStream can be garbage-collected.\n            reader.releaseLock();\n            return { done: true, value: undefined as unknown };\n          } else {\n            this.handleChunkReceived(value);\n            return { done: false, value: value.content! };\n          }\n        } catch (error: unknown) {\n          // Release the lock on error so it doesn't stay held when the\n          // consumer never calls return() (e.g. breaking out of for-await).\n          reader.releaseLock();\n          log.error('error processing stream update: %s', error);\n          return { done: true, value: undefined as unknown };\n        }\n      },\n\n      return(): IteratorResult<Uint8Array> {\n        reader.releaseLock();\n        return { done: true, value: undefined };\n      },\n    };\n  }\n\n  async readAll(): Promise<Array<Uint8Array>> {\n    const chunks: Set<Uint8Array> = new Set();\n    for await (const chunk of this) {\n      chunks.add(chunk);\n    }\n    return Array.from(chunks);\n  }\n}\n\n/**\n * A class to read chunks from a ReadableStream and provide them in a structured format.\n */\nexport class TextStreamReader extends BaseStreamReader<TextStreamInfo> {\n  private receivedChunks: Map<number, DataStream_Chunk>;\n\n  /**\n   * A TextStreamReader instance can be used as an AsyncIterator that returns the entire string\n   * that has been received up to the current point in time.\n   */\n  constructor(\n    info: TextStreamInfo,\n    stream: ReadableStream<DataStream_Chunk>,\n    totalChunkCount?: number,\n  ) {\n    super(info, stream, totalChunkCount);\n    this.receivedChunks = new Map();\n  }\n\n  protected handleChunkReceived(chunk: DataStream_Chunk) {\n    const index = bigIntToNumber(chunk.chunkIndex!);\n    const previousChunkAtIndex = this.receivedChunks.get(index!);\n    if (previousChunkAtIndex && previousChunkAtIndex.version! > chunk.version!) {\n      // we have a newer version already, dropping the old one\n      return;\n    }\n    this.receivedChunks.set(index, chunk);\n    const currentProgress = this.totalByteSize\n      ? this.receivedChunks.size / this.totalByteSize\n      : undefined;\n    this.onProgress?.(currentProgress);\n  }\n\n  /**\n   * Async iterator implementation to allow usage of `for await...of` syntax.\n   * Yields structured chunks from the stream.\n   *\n   */\n  [Symbol.asyncIterator]() {\n    const reader = this.reader.getReader();\n    const decoder = new TextDecoder();\n    const receivedChunks = this.receivedChunks;\n\n    return {\n      next: async (): Promise<IteratorResult<string>> => {\n        try {\n          const { done, value } = await reader.read();\n          if (done) {\n            // Release the lock when the stream is exhausted so the\n            // underlying ReadableStream can be garbage-collected.\n            reader.releaseLock();\n            // Clear received chunks so the buffered data can be GC'd.\n            receivedChunks.clear();\n            return { done: true, value: undefined };\n          } else {\n            this.handleChunkReceived(value);\n            return {\n              done: false,\n              value: decoder.decode(value.content!),\n            };\n          }\n        } catch (error: unknown) {\n          // Release the lock on error so it doesn't stay held when the\n          // consumer never calls return() (e.g. breaking out of for-await).\n          reader.releaseLock();\n          receivedChunks.clear();\n          log.error('error processing stream update: %s', error);\n          return { done: true, value: undefined };\n        }\n      },\n\n      return(): IteratorResult<string> {\n        reader.releaseLock();\n        // Clear received chunks so the buffered data can be GC'd.\n        receivedChunks.clear();\n        return { done: true, value: undefined };\n      },\n    };\n  }\n\n  async readAll(): Promise<string> {\n    let finalString: string = '';\n    for await (const chunk of this) {\n      finalString += chunk;\n    }\n    return finalString;\n  }\n}\n"],"mappings":";;;;;;;;;;;;;;;;;;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAIA,iBAAoB;AACpB,mBAA+B;AAG/B,MAAe,iBAA2C;AAAA,EASxD,IAAI,OAAO;AACT,WAAO,KAAK;AAAA,EACd;AAAA,EAEA,YAAY,MAAS,QAA0C,eAAwB;AACrF,SAAK,SAAS;AACd,SAAK,gBAAgB;AACrB,SAAK,QAAQ;AACb,SAAK,gBAAgB;AAAA,EACvB;AAOF;AAKO,MAAM,yBAAyB,iBAAiC;AAAA,EAC3D,oBAAoB,OAAyB;AAvCzD;AAwCI,SAAK,iBAAiB,MAAM,QAAS;AACrC,UAAM,kBAAkB,KAAK,gBACzB,KAAK,gBAAgB,KAAK,gBAC1B;AACJ,eAAK,eAAL,8BAAkB;AAAA,EACpB;AAAA,EAEA,CAAC,OAAO,aAAa,IAAI;AACvB,UAAM,SAAS,KAAK,OAAO,UAAU;AAErC,WAAO;AAAA,MACL,MAAM,YAAiD;AACrD,YAAI;AACF,gBAAM,EAAE,MAAM,MAAM,IAAI,MAAM,OAAO,KAAK;AAC1C,cAAI,MAAM;AAGR,mBAAO,YAAY;AACnB,mBAAO,EAAE,MAAM,MAAM,OAAO,OAAqB;AAAA,UACnD,OAAO;AACL,iBAAK,oBAAoB,KAAK;AAC9B,mBAAO,EAAE,MAAM,OAAO,OAAO,MAAM,QAAS;AAAA,UAC9C;AAAA,QACF,SAAS,OAAgB;AAGvB,iBAAO,YAAY;AACnB,yBAAI,MAAM,sCAAsC,KAAK;AACrD,iBAAO,EAAE,MAAM,MAAM,OAAO,OAAqB;AAAA,QACnD;AAAA,MACF;AAAA,MAEA,SAAqC;AACnC,eAAO,YAAY;AACnB,eAAO,EAAE,MAAM,MAAM,OAAO,OAAU;AAAA,MACxC;AAAA,IACF;AAAA,EACF;AAAA,EAEA,MAAM,UAAsC;AAC1C,UAAM,SAA0B,oBAAI,IAAI;AACxC,qBAAiB,SAAS,MAAM;AAC9B,aAAO,IAAI,KAAK;AAAA,IAClB;AACA,WAAO,MAAM,KAAK,MAAM;AAAA,EAC1B;AACF;AAKO,MAAM,yBAAyB,iBAAiC;AAAA;AAAA;AAAA;AAAA;AAAA,EAOrE,YACE,MACA,QACA,iBACA;AACA,UAAM,MAAM,QAAQ,eAAe;AACnC,SAAK,iBAAiB,oBAAI,IAAI;AAAA,EAChC;AAAA,EAEU,oBAAoB,OAAyB;AA3GzD;AA4GI,UAAM,YAAQ,6BAAe,MAAM,UAAW;AAC9C,UAAM,uBAAuB,KAAK,eAAe,IAAI,KAAM;AAC3D,QAAI,wBAAwB,qBAAqB,UAAW,MAAM,SAAU;AAE1E;AAAA,IACF;AACA,SAAK,eAAe,IAAI,OAAO,KAAK;AACpC,UAAM,kBAAkB,KAAK,gBACzB,KAAK,eAAe,OAAO,KAAK,gBAChC;AACJ,eAAK,eAAL,8BAAkB;AAAA,EACpB;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAOA,CAAC,OAAO,aAAa,IAAI;AACvB,UAAM,SAAS,KAAK,OAAO,UAAU;AACrC,UAAM,UAAU,IAAI,YAAY;AAChC,UAAM,iBAAiB,KAAK;AAE5B,WAAO;AAAA,MACL,MAAM,YAA6C;AACjD,YAAI;AACF,gBAAM,EAAE,MAAM,MAAM,IAAI,MAAM,OAAO,KAAK;AAC1C,cAAI,MAAM;AAGR,mBAAO,YAAY;AAEnB,2BAAe,MAAM;AACrB,mBAAO,EAAE,MAAM,MAAM,OAAO,OAAU;AAAA,UACxC,OAAO;AACL,iBAAK,oBAAoB,KAAK;AAC9B,mBAAO;AAAA,cACL,MAAM;AAAA,cACN,OAAO,QAAQ,OAAO,MAAM,OAAQ;AAAA,YACtC;AAAA,UACF;AAAA,QACF,SAAS,OAAgB;AAGvB,iBAAO,YAAY;AACnB,yBAAe,MAAM;AACrB,yBAAI,MAAM,sCAAsC,KAAK;AACrD,iBAAO,EAAE,MAAM,MAAM,OAAO,OAAU;AAAA,QACxC;AAAA,MACF;AAAA,MAEA,SAAiC;AAC/B,eAAO,YAAY;AAEnB,uBAAe,MAAM;AACrB,eAAO,EAAE,MAAM,MAAM,OAAO,OAAU;AAAA,MACxC;AAAA,IACF;AAAA,EACF;AAAA,EAEA,MAAM,UAA2B;AAC/B,QAAI,cAAsB;AAC1B,qBAAiB,SAAS,MAAM;AAC9B,qBAAe;AAAA,IACjB;AACA,WAAO;AAAA,EACT;AACF;","names":[]}