import { type JoinResponse, type ParticipantUpdate } from '@livekit/protocol'; import { EventEmitter } from 'events'; import type { Throws } from '@livekit/throws-transformer/throws'; import type TypedEmitter from 'typed-emitter'; import type { BaseE2EEManager } from '../../../e2ee/E2eeManager'; import { LoggerNames, getLogger } from '../../../logger'; import { abortSignalAny, abortSignalTimeout } from '../../../utils/abort-signal-polyfill'; import type Participant from '../../participant/Participant'; import type RemoteParticipant from '../../participant/RemoteParticipant'; import { Future } from '../../utils'; import RemoteDataTrack from '../RemoteDataTrack'; import { DataTrackDepacketizerDropError } from '../depacketizer'; import { type DataTrackFrame, DataTrackFrameInternal } from '../frame'; import { DataTrackHandle } from '../handle'; import { DataTrackPacket } from '../packet'; import { type DataTrackInfo, type DataTrackSid } from '../types'; import { DataTrackSubscribeError } from './errors'; import IncomingDataTrackPipeline from './pipeline'; import { type EventSfuUpdateSubscription, type EventTrackAvailable, type EventTrackUnavailable, } from './types'; const log = getLogger(LoggerNames.DataTracks); export type DataTrackIncomingManagerCallbacks = { /** Request sent to the SFU to update the subscription for a data track. */ sfuUpdateSubscription: (event: EventSfuUpdateSubscription) => void; /** A track has been published by a remote participant and is available to be * subscribed to. */ trackPublished: (event: EventTrackAvailable) => void; /** A track has been unpublished by a remote participant and can no longer be subscribed to. */ trackUnpublished: (event: EventTrackUnavailable) => void; }; /** Track is not subscribed to. */ type SubscriptionStateNone = { type: 'none' }; /** Track is being subscribed to, waiting for subscriber handle. */ type SubscriptionStatePending = { type: 'pending'; completionFuture: Future; /** The number of in flight requests waiting for this subscription state to go to "active". */ pendingRequestCount: number; /** A function that when called, cancels the pending subscription and moves back to "none". */ cancel: () => void; }; /** Track has an active subscription. */ type SubscriptionStateActive = { type: 'active'; subcriptionHandle: DataTrackHandle; pipeline: IncomingDataTrackPipeline; streamControllers: Set>; }; type SubscriptionState = SubscriptionStateNone | SubscriptionStatePending | SubscriptionStateActive; /** Information and state for a remote data track. */ type Descriptor = { info: DataTrackInfo; publisherIdentity: Participant['identity']; subscription: S; }; type IncomingDataTrackManagerOptions = { /** Provider to use for decrypting incoming frame payloads. * If none, remote tracks using end-to-end encryption will not be available * for subscription. */ e2eeManager?: BaseE2EEManager; }; /** How long to wait when attempting to subscribe before timing out. */ const SUBSCRIBE_TIMEOUT_MILLISECONDS = 10_000; /** Maximum number of {@link DataTrackFrame}s that are cached for each ReadableStream subscription. * If data comes in too fast and saturates this threshold, backpressure will be applied. */ const READABLE_STREAM_DEFAULT_BUFFER_SIZE = 16; export default class IncomingDataTrackManager extends (EventEmitter as new () => TypedEmitter) { private e2eeManager: BaseE2EEManager | null; /** Mapping between track SID and descriptor. */ private descriptors = new Map>(); /** Mapping between subscriber handle and track SID. * * This is an index that allows track descriptors to be looked up * by subscriber handle in O(1) time, to make routing incoming packets * a (hot code path) faster. */ private subscriptionHandles = new Map(); constructor(options?: IncomingDataTrackManagerOptions) { super(); this.e2eeManager = options?.e2eeManager ?? null; } /** @internal */ updateE2eeManager(e2eeManager: BaseE2EEManager | null) { this.e2eeManager = e2eeManager; // Propegate downwards to all pre-existing pipelines for (const descriptor of this.descriptors.values()) { if (descriptor.subscription.type === 'active') { descriptor.subscription.pipeline.updateE2eeManager(e2eeManager); } } } /** Allocates a ReadableStream which emits when a new {@link DataTrackFrame} is received from the * SFU. The SFU subscription is initiated lazily when the stream is created. * * @returns A tuple of the ReadableStream and a Promise that resolves once the SFU subscription * is fully established / the stream is ready to receive frames. * * @internal **/ openSubscriptionStream( sid: DataTrackSid, signal?: AbortSignal, bufferSize = READABLE_STREAM_DEFAULT_BUFFER_SIZE, ): [ReadableStream, Promise>] { let streamController: ReadableStreamDefaultController | null = null; const sfuSubscriptionComplete = new Future(); const stream = new ReadableStream( { start: (controller) => { streamController = controller; const onAbort = () => { controller.error(DataTrackSubscribeError.cancelled()); sfuSubscriptionComplete.reject?.(DataTrackSubscribeError.cancelled()); }; this.subscribeRequest(sid, signal) .then(async () => { signal?.addEventListener('abort', onAbort); const descriptor = this.descriptors.get(sid); if (!descriptor) { log.error(`Unknown track ${sid}`); return; } if (descriptor.subscription.type !== 'active') { log.error(`Subscription for track ${sid} is not active`); return; } descriptor.subscription.streamControllers.add(controller); sfuSubscriptionComplete.resolve?.(); }) .catch((err) => { controller.error(err); sfuSubscriptionComplete.reject?.(err); }) .finally(() => { signal?.removeEventListener('abort', onAbort); }); }, cancel: () => { if (!streamController) { log.warn(`ReadableStream subscribed to ${sid} was not started.`); return; } const descriptor = this.descriptors.get(sid); if (!descriptor) { log.warn(`Unknown track ${sid}, skipping cancel...`); return; } if (descriptor.subscription.type !== 'active') { log.warn(`Subscription for track ${sid} is not active, skipping cancel...`); return; } descriptor.subscription.streamControllers.delete(streamController); // If no active stream controllers are left, also unsubscribe on the SFU end. if (descriptor.subscription.streamControllers.size === 0) { this.unSubscribeRequest(descriptor.info.sid); } }, }, new CountQueuingStrategy({ highWaterMark: bufferSize }), ); return [stream, sfuSubscriptionComplete.promise]; } /** Client requested to subscribe to a data track. * * This is sent when the user calls {@link RemoteDataTrack.subscribe}. * * Only the first request to subscribe to a given track incurs meaningful overhead; subsequent * requests simply attach an additional receiver to the broadcast channel, allowing them to consume * frames from the existing subscription pipeline. */ async subscribeRequest( sid: DataTrackSid, signal?: AbortSignal, ): Promise> { const descriptor = this.descriptors.get(sid); if (!descriptor) { // @throws-transformer ignore - this should be treated as a "panic" and not be caught throw new Error('Cannot subscribe to unknown track'); } const waitForCompletionFuture = async ( currentDescriptor: Descriptor, userProvidedSignal?: AbortSignal, timeoutSignal?: AbortSignal, ) => { if (currentDescriptor.subscription.type === 'active') { // Subscription has already become active! So bail out early, there is nothing to wait for. return; } if (currentDescriptor.subscription.type !== 'pending') { // @throws-transformer ignore - this should be treated as a "panic" and not be caught throw new Error( `Descriptor for track ${sid} is not pending, found ${currentDescriptor.subscription.type}`, ); } const combinedSignal = abortSignalAny( [userProvidedSignal, timeoutSignal].filter( (s): s is AbortSignal => typeof s !== 'undefined', ), ); const proxiedCompletionFuture = new Future(); currentDescriptor.subscription.completionFuture.promise .then(() => proxiedCompletionFuture.resolve?.()) .catch((err) => proxiedCompletionFuture.reject?.(err)); const onAbort = () => { if (currentDescriptor.subscription.type !== 'pending') { return; } currentDescriptor.subscription.pendingRequestCount -= 1; if (timeoutSignal?.aborted) { // A timeout should apply to the underlying SFU subscription and cancel all user // subscriptions. currentDescriptor.subscription.cancel(); return; } if (currentDescriptor.subscription.pendingRequestCount <= 0) { // No user subscriptions are still pending, so cancel the underlying pending `sfuUpdateSubscription` currentDescriptor.subscription.cancel(); return; } // Other subscriptions are still pending for this data track, so just cancel this one // active user subscription, and leave the rest of the user subscriptions alone. proxiedCompletionFuture.reject?.(DataTrackSubscribeError.cancelled()); }; if (combinedSignal.aborted) { onAbort(); } combinedSignal.addEventListener('abort', onAbort); await proxiedCompletionFuture.promise; combinedSignal.removeEventListener('abort', onAbort); }; switch (descriptor.subscription.type) { case 'none': { descriptor.subscription = { type: 'pending', completionFuture: new Future(), pendingRequestCount: 1, cancel: () => { const previousDescriptorSubscription = descriptor.subscription; descriptor.subscription = { type: 'none' }; // Let the SFU know that the subscribe has been cancelled this.emit('sfuUpdateSubscription', { sid, subscribe: false }); if (previousDescriptorSubscription.type === 'pending') { previousDescriptorSubscription.completionFuture.reject?.( timeoutSignal.aborted ? DataTrackSubscribeError.timeout() : // NOTE: the below cancelled case was introduced by web / there isn't a corresponding case in the rust version. DataTrackSubscribeError.cancelled(), ); } }, }; this.emit('sfuUpdateSubscription', { sid, subscribe: true }); const timeoutSignal = abortSignalTimeout(SUBSCRIBE_TIMEOUT_MILLISECONDS); // Wait for the subscription to complete, or time out if it takes too long await waitForCompletionFuture(descriptor, signal, timeoutSignal); return; } case 'pending': { descriptor.subscription.pendingRequestCount += 1; // Wait for the subscription to complete await waitForCompletionFuture(descriptor, signal); return; } case 'active': { return; } } } /** * Get information about all currently subscribed tracks. * @internal */ async querySubscribed() { const descriptorInfos = Array.from(this.descriptors.values()) .filter( (descriptor): descriptor is Descriptor => descriptor.subscription.type === 'active', ) .map( (descriptor) => [descriptor.info, descriptor.publisherIdentity] as [ info: DataTrackInfo, identity: Participant['identity'], ], ); return descriptorInfos; } /** Client requested to unsubscribe from a data track. */ unSubscribeRequest(sid: DataTrackSid) { const descriptor = this.descriptors.get(sid); if (!descriptor) { // FIXME: rust implementation returns here, not throws // @throws-transformer ignore - this should be treated as a "panic" and not be caught throw new Error('Cannot subscribe to unknown track'); } if (descriptor.subscription.type !== 'active') { log.warn( `Unexpected descriptor state in unSubscribeRequest, expected active, found ${descriptor.subscription?.type}`, ); return; } for (const controller of descriptor.subscription.streamControllers) { controller.close(); } // FIXME: this might be wrong? Shouldn't this only occur if it is the last subscription to // terminate? const previousDescriptorSubscription = descriptor.subscription; descriptor.subscription = { type: 'none' }; this.subscriptionHandles.delete(previousDescriptorSubscription.subcriptionHandle); this.emit('sfuUpdateSubscription', { sid, subscribe: false }); } /** SFU notification that track publications have changed. * * This event is produced from both {@link JoinResponse} and {@link ParticipantUpdate} * to provide a complete view of remote participants' track publications: * * - From a `JoinResponse`, it captures the initial set of tracks published when a participant joins. * - From a `ParticipantUpdate`, it captures subsequent changes (i.e., new tracks being * published and existing tracks unpublished). */ async receiveSfuPublicationUpdates(updates: Map>) { if (updates.size === 0) { return; } // Detect published track const publisherParticipantToSidsInUpdate = new Map< Participant['identity'], Set >(); for (const [publisherIdentity, infos] of updates.entries()) { const sidsInUpdate = new Set(); for (const info of infos) { sidsInUpdate.add(info.sid); if (this.descriptors.has(info.sid)) { continue; } await this.handleTrackPublished(publisherIdentity, info); } publisherParticipantToSidsInUpdate.set(publisherIdentity, sidsInUpdate); } // Detect unpublished tracks for (const [publisherIdentity, sidsInUpdate] of publisherParticipantToSidsInUpdate.entries()) { const descriptorsForPublisher = Array.from(this.descriptors.entries()) .filter(([_sid, descriptor]) => descriptor.publisherIdentity === publisherIdentity) .map(([sid]) => sid); let unpublishedSids = descriptorsForPublisher.filter((sid) => !sidsInUpdate.has(sid)); for (const sid of unpublishedSids) { this.handleTrackUnpublished(sid); } } } /** * Get information about all currently remotely published tracks which could be subscribed to. * @internal */ async queryPublications() { return Array.from(this.descriptors.values()).map((descriptor) => descriptor.info); } async handleTrackPublished(publisherIdentity: Participant['identity'], info: DataTrackInfo) { if (this.descriptors.has(info.sid)) { log.error(`Existing descriptor for track ${info.sid}`); return; } let descriptor: Descriptor = { info, publisherIdentity, subscription: { type: 'none' }, }; this.descriptors.set(descriptor.info.sid, descriptor); const track = new RemoteDataTrack(descriptor.info, this, { publisherIdentity }); this.emit('trackPublished', { track }); } handleTrackUnpublished(sid: DataTrackSid) { const descriptor = this.descriptors.get(sid); if (!descriptor) { log.error(`Unknown track ${sid}`); return; } this.descriptors.delete(sid); if (descriptor.subscription.type === 'active') { descriptor.subscription.streamControllers.forEach((controller) => { controller.close(); }); this.subscriptionHandles.delete(descriptor.subscription.subcriptionHandle); } this.emit('trackUnpublished', { sid, publisherIdentity: descriptor.publisherIdentity }); } /** SFU notification that handles have been assigned for requested subscriptions. */ receivedSfuSubscriberHandles( /** Mapping between track handles attached to incoming packets to the * track SIDs they belong to. */ mapping: Map, ) { for (const [handle, sid] of mapping.entries()) { this.registerSubscriberHandle(handle, sid); } } private registerSubscriberHandle(assignedHandle: DataTrackHandle, sid: DataTrackSid) { const descriptor = this.descriptors.get(sid); if (!descriptor) { log.error(`Unknown track ${sid}`); return; } switch (descriptor.subscription.type) { case 'none': { // Handle assigned when there is no pending or active subscription is unexpected. log.warn(`No subscription for ${sid}`); return; } case 'active': { // Update handle for an active subscription. This can occur following a full reconnect. descriptor.subscription.subcriptionHandle = assignedHandle; this.subscriptionHandles.set(assignedHandle, sid); return; } case 'pending': { const pipeline = new IncomingDataTrackPipeline({ info: descriptor.info, publisherIdentity: descriptor.publisherIdentity, e2eeManager: this.e2eeManager, }); const previousDescriptorSubscription = descriptor.subscription; descriptor.subscription = { type: 'active', subcriptionHandle: assignedHandle, pipeline, streamControllers: new Set(), }; this.subscriptionHandles.set(assignedHandle, sid); previousDescriptorSubscription.completionFuture.resolve?.(); } } } /** Packet has been received over the transport. */ async packetReceived(bytes: Uint8Array): Promise> { let packet: DataTrackPacket; try { [packet] = DataTrackPacket.fromBinary(bytes); } catch (err) { log.error(`Failed to deserialize packet: ${err}`); return; } const sid = this.subscriptionHandles.get(packet.header.trackHandle); if (!sid) { log.warn(`Unknown subscriber handle ${packet.header.trackHandle}`); return; } const descriptor = this.descriptors.get(sid); if (!descriptor) { log.error(`Missing descriptor for track ${sid}`); return; } if (descriptor.subscription.type !== 'active') { log.warn(`Received packet for track ${sid} without active subscription`); return; } const internalFrame = await descriptor.subscription.pipeline.processPacket(packet); if (!internalFrame) { // Not all packets have been received yet to form a complete frame return; } // Broadcast to all downstream subscribers for (const controller of descriptor.subscription.streamControllers) { if (controller.desiredSize !== null && controller.desiredSize <= 0) { log.warn( `Cannot send frame to subscribers: readable stream is full (desiredSize is ${controller.desiredSize}). To increase this threshold, set a higher 'options.highWaterMark' when calling .subscribe().`, ); continue; } const frame = DataTrackFrameInternal.lossyIntoFrame(internalFrame); controller.enqueue(frame); } } /** Resend all subscription updates. * * This must be sent after a full reconnect to ensure the SFU knows which * tracks are subscribed to locally. */ resendSubscriptionUpdates() { for (const [sid, descriptor] of this.descriptors) { if (descriptor.subscription.type === 'none') { continue; } this.emit('sfuUpdateSubscription', { sid, subscribe: true }); } } /** Called when a remote participant is disconnected so that any pending data tracks can be * cancelled. */ handleRemoteParticipantDisconnected(remoteParticipantIdentity: RemoteParticipant['identity']) { for (const descriptor of this.descriptors.values()) { if (descriptor.publisherIdentity !== remoteParticipantIdentity) { continue; } switch (descriptor.subscription.type) { case 'none': break; case 'pending': descriptor.subscription.completionFuture.reject?.(DataTrackSubscribeError.disconnected()); break; case 'active': this.unSubscribeRequest(descriptor.info.sid); break; } } } /** Shutdown the manager, ending any subscriptions. */ shutdown() { for (const descriptor of this.descriptors.values()) { this.emit('trackUnpublished', { sid: descriptor.info.sid, publisherIdentity: descriptor.publisherIdentity, }); if (descriptor.subscription.type === 'pending') { descriptor.subscription.completionFuture.reject?.(DataTrackSubscribeError.disconnected()); } if (descriptor.subscription.type === 'active') { descriptor.subscription.streamControllers.forEach((controller) => controller.close()); } } this.descriptors.clear(); } }