// SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 import type { NewVideoStreamResponse, VideoRotation } from '@livekit/rtc-ffi-bindings'; import { NewVideoStreamRequest, VideoStreamType } from '@livekit/rtc-ffi-bindings'; import type { UnderlyingSource } from 'node:stream/web'; import type { FfiEvent } from './ffi_client.js'; import { FfiClient, FfiClientEvent, FfiHandle } from './ffi_client.js'; import type { Track } from './track.js'; import { VideoFrame } from './video_frame.js'; export type VideoFrameEvent = { frame: VideoFrame; timestampUs: bigint; rotation: VideoRotation; }; class VideoStreamSource implements UnderlyingSource { private controller?: ReadableStreamDefaultController; private ffiHandle: FfiHandle; private disposed = false; constructor(track: Track) { const req = new NewVideoStreamRequest({ type: VideoStreamType.VIDEO_STREAM_NATIVE, trackHandle: track.ffi_handle.handle, }); const res = FfiClient.instance.request({ message: { case: 'newVideoStream', value: req, }, }); this.ffiHandle = new FfiHandle(res.stream!.handle!.id!); FfiClient.instance.on(FfiClientEvent.FfiEvent, this.onEvent); } private onEvent = (ev: FfiEvent) => { if (!this.controller) { throw new Error('Stream controller not initialized'); } if ( ev.message.case != 'videoStreamEvent' || ev.message.value.streamHandle != this.ffiHandle.handle ) { return; } const streamEvent = ev.message.value.message; switch (streamEvent.case) { case 'frameReceived': const rotation = streamEvent.value.rotation; const timestampUs = streamEvent.value.timestampUs; const frame = VideoFrame.fromOwnedInfo(streamEvent.value.buffer!); const value = { rotation, timestampUs, frame }; const videoFrameEvent = { frame: value.frame, timestampUs: value.timestampUs!, rotation: value.rotation!, }; this.controller.enqueue(videoFrameEvent); break; case 'eos': FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent); this.controller.close(); // Dispose the native handle so the FD is released on stream end, // not just when cancel() is called explicitly by the consumer. // Guard against double-dispose if cancel() is called after EOS // while buffered frames are still in the ReadableStream queue. if (!this.disposed) { this.disposed = true; this.ffiHandle.dispose(); } break; } }; start(controller: ReadableStreamDefaultController) { this.controller = controller; } cancel() { FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent); if (!this.disposed) { this.disposed = true; this.ffiHandle.dispose(); } } } export class VideoStream extends ReadableStream { constructor(track: Track) { super(new VideoStreamSource(track)); } }