{"version":3,"sources":["../src/audio_stream.ts"],"sourcesContent":["// SPDX-FileCopyrightText: 2024 LiveKit, Inc.\n//\n// SPDX-License-Identifier: Apache-2.0\nimport type { NewAudioStreamResponse } from '@livekit/rtc-ffi-bindings';\nimport { AudioStreamType, NewAudioStreamRequest } from '@livekit/rtc-ffi-bindings';\nimport type { UnderlyingSource } from 'node:stream/web';\nimport { AudioFrame } from './audio_frame.js';\nimport type { FfiEvent } from './ffi_client.js';\nimport { FfiClient, FfiClientEvent, FfiHandle } from './ffi_client.js';\nimport { type FrameProcessor, isFrameProcessor } from './frame_processor.js';\nimport { log } from './log.js';\nimport type { Track } from './track.js';\n\nexport interface AudioStreamOptions {\n  noiseCancellation?: NoiseCancellationOptions | FrameProcessor<AudioFrame>;\n  sampleRate?: number;\n  numChannels?: number;\n  frameSizeMs?: number;\n}\n\nexport interface NoiseCancellationOptions {\n  moduleId: string;\n  // eslint-disable-next-line @typescript-eslint/no-explicit-any\n  options: Record<string, any>;\n}\n\nclass AudioStreamSource implements UnderlyingSource<AudioFrame> {\n  private controller?: ReadableStreamDefaultController<AudioFrame>;\n  private ffiHandle: FfiHandle;\n  private disposed = false;\n  private sampleRate: number;\n  private numChannels: number;\n  private legacyNcOptions?: NoiseCancellationOptions;\n  private frameProcessor?: FrameProcessor<AudioFrame>;\n  private frameSizeMs?: number;\n\n  constructor(\n    track: Track,\n    sampleRateOrOptions?: number | AudioStreamOptions,\n    numChannels?: number,\n  ) {\n    if (sampleRateOrOptions !== undefined && typeof sampleRateOrOptions !== 'number') {\n      this.sampleRate = sampleRateOrOptions.sampleRate ?? 48000;\n      this.numChannels = sampleRateOrOptions.numChannels ?? 1;\n      if (isFrameProcessor(sampleRateOrOptions.noiseCancellation)) {\n        this.frameProcessor = sampleRateOrOptions.noiseCancellation;\n      } else {\n        this.legacyNcOptions = sampleRateOrOptions.noiseCancellation;\n      }\n      this.frameSizeMs = sampleRateOrOptions.frameSizeMs;\n    } else {\n      this.sampleRate = (sampleRateOrOptions as number) ?? 48000;\n      this.numChannels = numChannels ?? 1;\n    }\n\n    const req = new NewAudioStreamRequest({\n      type: AudioStreamType.AUDIO_STREAM_NATIVE,\n      trackHandle: track.ffi_handle.handle,\n      sampleRate: this.sampleRate,\n      numChannels: this.numChannels,\n      frameSizeMs: this.frameSizeMs,\n      ...(this.legacyNcOptions\n        ? {\n            audioFilterModuleId: this.legacyNcOptions.moduleId,\n            audioFilterOptions: JSON.stringify(this.legacyNcOptions.options),\n          }\n        : {}),\n    });\n\n    const res = FfiClient.instance.request<NewAudioStreamResponse>({\n      message: {\n        case: 'newAudioStream',\n        value: req,\n      },\n    });\n\n    this.ffiHandle = new FfiHandle(res.stream!.handle!.id!);\n\n    FfiClient.instance.on(FfiClientEvent.FfiEvent, this.onEvent);\n  }\n\n  private onEvent = (ev: FfiEvent) => {\n    if (!this.controller) {\n      throw new Error('Stream controller not initialized');\n    }\n\n    if (\n      ev.message.case != 'audioStreamEvent' ||\n      ev.message.value.streamHandle != this.ffiHandle.handle\n    ) {\n      return;\n    }\n\n    const streamEvent = ev.message.value.message;\n    switch (streamEvent.case) {\n      case 'frameReceived':\n        let frame = AudioFrame.fromOwnedInfo(streamEvent.value.frame!);\n        if (this.frameProcessor && this.frameProcessor.isEnabled()) {\n          try {\n            frame = this.frameProcessor.process(frame);\n          } catch (err: unknown) {\n            log.warn(`Frame processing failed, passing through original frame: ${err}`);\n          }\n        }\n        this.controller.enqueue(frame);\n        break;\n      case 'eos':\n        FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent);\n        this.controller.close();\n        // Dispose the native handle so the FD is released on stream end,\n        // not just when cancel() is called explicitly by the consumer.\n        // Guard against double-dispose if cancel() is called after EOS\n        // while buffered frames are still in the ReadableStream queue.\n        if (!this.disposed) {\n          this.disposed = true;\n          this.ffiHandle.dispose();\n          this.frameProcessor?.close();\n        }\n        break;\n    }\n  };\n\n  start(controller: ReadableStreamDefaultController<AudioFrame>) {\n    this.controller = controller;\n  }\n\n  cancel() {\n    FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent);\n    if (!this.disposed) {\n      this.disposed = true;\n      this.ffiHandle.dispose();\n      // Also close the frame processor on cancel for symmetry with the EOS path,\n      // so resources are released regardless of how the stream ends.\n      this.frameProcessor?.close();\n    }\n  }\n}\n\nexport class AudioStream extends ReadableStream<AudioFrame> {\n  constructor(track: Track);\n  constructor(track: Track, sampleRate: number);\n  constructor(track: Track, sampleRate: number, numChannels: number);\n  constructor(track: Track, options: AudioStreamOptions);\n  constructor(\n    track: Track,\n    sampleRateOrOptions?: number | AudioStreamOptions,\n    numChannels?: number,\n  ) {\n    super(new AudioStreamSource(track, sampleRateOrOptions, numChannels));\n  }\n}\n"],"mappings":";;;;;;;;;;;;;;;;;;AAAA;AAAA;AAAA;AAAA;AAAA;AAIA,8BAAuD;AAEvD,yBAA2B;AAE3B,wBAAqD;AACrD,6BAAsD;AACtD,iBAAoB;AAgBpB,MAAM,kBAA0D;AAAA,EAU9D,YACE,OACA,qBACA,aACA;AAXF,SAAQ,WAAW;AAoDnB,SAAQ,UAAU,CAAC,OAAiB;AAjFtC;AAkFI,UAAI,CAAC,KAAK,YAAY;AACpB,cAAM,IAAI,MAAM,mCAAmC;AAAA,MACrD;AAEA,UACE,GAAG,QAAQ,QAAQ,sBACnB,GAAG,QAAQ,MAAM,gBAAgB,KAAK,UAAU,QAChD;AACA;AAAA,MACF;AAEA,YAAM,cAAc,GAAG,QAAQ,MAAM;AACrC,cAAQ,YAAY,MAAM;AAAA,QACxB,KAAK;AACH,cAAI,QAAQ,8BAAW,cAAc,YAAY,MAAM,KAAM;AAC7D,cAAI,KAAK,kBAAkB,KAAK,eAAe,UAAU,GAAG;AAC1D,gBAAI;AACF,sBAAQ,KAAK,eAAe,QAAQ,KAAK;AAAA,YAC3C,SAAS,KAAc;AACrB,6BAAI,KAAK,4DAA4D,GAAG,EAAE;AAAA,YAC5E;AAAA,UACF;AACA,eAAK,WAAW,QAAQ,KAAK;AAC7B;AAAA,QACF,KAAK;AACH,sCAAU,SAAS,IAAI,iCAAe,UAAU,KAAK,OAAO;AAC5D,eAAK,WAAW,MAAM;AAKtB,cAAI,CAAC,KAAK,UAAU;AAClB,iBAAK,WAAW;AAChB,iBAAK,UAAU,QAAQ;AACvB,uBAAK,mBAAL,mBAAqB;AAAA,UACvB;AACA;AAAA,MACJ;AAAA,IACF;AA/EE,QAAI,wBAAwB,UAAa,OAAO,wBAAwB,UAAU;AAChF,WAAK,aAAa,oBAAoB,cAAc;AACpD,WAAK,cAAc,oBAAoB,eAAe;AACtD,cAAI,yCAAiB,oBAAoB,iBAAiB,GAAG;AAC3D,aAAK,iBAAiB,oBAAoB;AAAA,MAC5C,OAAO;AACL,aAAK,kBAAkB,oBAAoB;AAAA,MAC7C;AACA,WAAK,cAAc,oBAAoB;AAAA,IACzC,OAAO;AACL,WAAK,aAAc,uBAAkC;AACrD,WAAK,cAAc,eAAe;AAAA,IACpC;AAEA,UAAM,MAAM,IAAI,8CAAsB;AAAA,MACpC,MAAM,wCAAgB;AAAA,MACtB,aAAa,MAAM,WAAW;AAAA,MAC9B,YAAY,KAAK;AAAA,MACjB,aAAa,KAAK;AAAA,MAClB,aAAa,KAAK;AAAA,MAClB,GAAI,KAAK,kBACL;AAAA,QACE,qBAAqB,KAAK,gBAAgB;AAAA,QAC1C,oBAAoB,KAAK,UAAU,KAAK,gBAAgB,OAAO;AAAA,MACjE,IACA,CAAC;AAAA,IACP,CAAC;AAED,UAAM,MAAM,4BAAU,SAAS,QAAgC;AAAA,MAC7D,SAAS;AAAA,QACP,MAAM;AAAA,QACN,OAAO;AAAA,MACT;AAAA,IACF,CAAC;AAED,SAAK,YAAY,IAAI,4BAAU,IAAI,OAAQ,OAAQ,EAAG;AAEtD,gCAAU,SAAS,GAAG,iCAAe,UAAU,KAAK,OAAO;AAAA,EAC7D;AAAA,EA2CA,MAAM,YAAyD;AAC7D,SAAK,aAAa;AAAA,EACpB;AAAA,EAEA,SAAS;AA9HX;AA+HI,gCAAU,SAAS,IAAI,iCAAe,UAAU,KAAK,OAAO;AAC5D,QAAI,CAAC,KAAK,UAAU;AAClB,WAAK,WAAW;AAChB,WAAK,UAAU,QAAQ;AAGvB,iBAAK,mBAAL,mBAAqB;AAAA,IACvB;AAAA,EACF;AACF;AAEO,MAAM,oBAAoB,eAA2B;AAAA,EAK1D,YACE,OACA,qBACA,aACA;AACA,UAAM,IAAI,kBAAkB,OAAO,qBAAqB,WAAW,CAAC;AAAA,EACtE;AACF;","names":[]}