import { HttpRequestExecutor } from "./http-loader.js"; import { CoreEventMap, EngineCallbacks, SegmentWithStream, StreamConfig, StreamWithSegments, } from "./types.js"; import { Playback, BandwidthCalculators, StreamDetails, } from "./internal-types.js"; import { P2PLoadersContainer } from "./p2p/loaders-container.js"; import { RequestsContainer } from "./requests/request-container.js"; import { EngineRequest } from "./requests/engine-request.js"; import * as QueueUtils from "./utils/queue.js"; import * as LoggerUtils from "./utils/logger.js"; import * as StreamUtils from "./utils/stream.js"; import * as Utils from "./utils/utils.js"; import debug from "debug"; import { QueueItem } from "./utils/queue.js"; import { EventTarget } from "./utils/event-target.js"; import { SegmentStorage } from "./segment-storage/index.js"; const FAILED_ATTEMPTS_CLEAR_INTERVAL = 60000; const PEER_UPDATE_LATENCY = 1000; export class HybridLoader { private readonly requests: RequestsContainer; private engineRequest?: EngineRequest; private readonly p2pLoaders: P2PLoadersContainer; private readonly playback: Playback; private readonly segmentAvgDuration: number; private readonly logger: debug.Debugger; private storageCleanUpIntervalId?: number; private levelChangedTimestamp?: number; private lastQueueProcessingTimeStamp?: number; private randomHttpDownloadInterval?: number; private isProcessQueueMicrotaskCreated = false; constructor( private streamManifestUrl: string, private lastRequestedSegment: Readonly, private readonly streamDetails: Required>, private readonly config: StreamConfig, private readonly bandwidthCalculators: BandwidthCalculators, private readonly segmentStorage: SegmentStorage, private readonly eventTarget: EventTarget, ) { const activeStream = this.lastRequestedSegment.stream; this.playback = { position: this.lastRequestedSegment.startTime, rate: 1 }; this.segmentAvgDuration = StreamUtils.getSegmentAvgDuration(activeStream); this.requests = new RequestsContainer( this.requestProcessQueueMicrotask, this.bandwidthCalculators, this.playback, this.config, this.eventTarget, ); this.p2pLoaders = new P2PLoadersContainer( this.streamManifestUrl, this.lastRequestedSegment.stream, this.requests, this.segmentStorage, this.config, this.eventTarget, this.requestProcessQueueMicrotask, ); this.logger = debug(`p2pml-core:hybrid-loader-${activeStream.type}`); this.logger.color = "coral"; this.setIntervalLoading(); } private setIntervalLoading() { const peersCount = this.p2pLoaders.currentLoader.connectedPeerCount; const randomTimeout = Math.random() * PEER_UPDATE_LATENCY * peersCount + PEER_UPDATE_LATENCY; this.randomHttpDownloadInterval = window.setTimeout(() => { this.loadRandomThroughHttp(); this.setIntervalLoading(); }, randomTimeout); } // api method for engines async loadSegment( segment: Readonly, callbacks: EngineCallbacks, ) { this.logger(`requests: ${LoggerUtils.getSegmentString(segment)}`); const { stream } = segment; if (stream !== this.lastRequestedSegment.stream) { this.logger(`stream changed to ${LoggerUtils.getStreamString(stream)}`); this.p2pLoaders.changeCurrentLoader(stream); } this.lastRequestedSegment = segment; const swarmId = this.config.swarmId ?? this.streamManifestUrl; const streamSwarmId = StreamUtils.getStreamSwarmId(swarmId, stream); this.segmentStorage.onSegmentRequested( swarmId, streamSwarmId, segment.externalId, segment.startTime, segment.endTime, stream.type, this.streamDetails.isLive, ); const engineRequest = new EngineRequest(segment, callbacks); try { const hasSegment = this.segmentStorage.hasSegment( swarmId, streamSwarmId, segment.externalId, ); if (hasSegment) { const data = await this.segmentStorage.getSegmentData( swarmId, streamSwarmId, segment.externalId, ); if (data) { const { queueDownloadRatio } = this.generateQueue(); engineRequest.resolve(data, this.getBandwidth(queueDownloadRatio)); return; } } this.engineRequest?.abort(); this.engineRequest = engineRequest; } catch { engineRequest.reject(); } finally { this.requestProcessQueueMicrotask(); } } private requestProcessQueueMicrotask = (force = true) => { const now = performance.now(); if ( (!force && this.lastQueueProcessingTimeStamp !== undefined && now - this.lastQueueProcessingTimeStamp <= 1000) || this.isProcessQueueMicrotaskCreated ) { return; } this.isProcessQueueMicrotaskCreated = true; queueMicrotask(() => { try { this.processQueue(); this.lastQueueProcessingTimeStamp = now; } finally { this.isProcessQueueMicrotaskCreated = false; } }); }; private processRequests( queueSegmentIds: Set, queueDownloadRatio: number, ) { const { stream } = this.lastRequestedSegment; const { httpErrorRetries } = this.config; const now = performance.now(); for (const request of this.requests.items()) { const { downloadSource: type, status, segment, isHandledByProcessQueue, } = request; const engineRequest = this.engineRequest?.segment === segment ? this.engineRequest : undefined; switch (status) { case "loading": if (!queueSegmentIds.has(segment.runtimeId) && !engineRequest) { request.abortFromProcessQueue(); this.requests.remove(request); } break; case "succeed": { if (!type) break; if (type === "http") { this.p2pLoaders.currentLoader.broadcastAnnouncement(); } if (engineRequest) { engineRequest.resolve( request.data, this.getBandwidth(queueDownloadRatio), ); this.engineRequest = undefined; } this.requests.remove(request); const swarmId = this.config.swarmId ?? this.streamManifestUrl; const streamSwarmId = StreamUtils.getStreamSwarmId(swarmId, stream); void this.segmentStorage.storeSegment( swarmId, streamSwarmId, segment.externalId, request.data, segment.startTime, segment.endTime, segment.stream.type, this.streamDetails.isLive, ); break; } case "failed": if (type === "http" && !isHandledByProcessQueue) { this.p2pLoaders.currentLoader.broadcastAnnouncement(); } if ( !engineRequest && !stream.segments.has(request.segment.runtimeId) ) { this.requests.remove(request); } if ( request.failedAttempts.httpAttemptsCount >= httpErrorRetries && engineRequest ) { this.engineRequest = undefined; engineRequest.reject(); } break; case "not-started": this.requests.remove(request); break; case "aborted": this.requests.remove(request); break; } request.markHandledByProcessQueue(); const { lastAttempt } = request.failedAttempts; if ( lastAttempt && now - lastAttempt.error.timestamp > FAILED_ATTEMPTS_CLEAR_INTERVAL ) { request.failedAttempts.clear(); } } } private processQueue() { const { queue, queueSegmentIds, queueDownloadRatio } = this.generateQueue(); this.processRequests(queueSegmentIds, queueDownloadRatio); const { simultaneousHttpDownloads, simultaneousP2PDownloads, httpErrorRetries, } = this.config; if ( this.engineRequest?.shouldBeStartedImmediately && this.engineRequest.status === "pending" && this.requests.executingHttpCount < simultaneousHttpDownloads ) { const { segment } = this.engineRequest; const request = this.requests.get(segment); if ( !request || request.status === "not-started" || (request.status === "failed" && request.failedAttempts.httpAttemptsCount < this.config.httpErrorRetries) ) { this.loadThroughHttp(segment); } } for (const item of queue) { const { statuses, segment } = item; const request = this.requests.get(segment); if (statuses.isHighDemand) { if ( request?.downloadSource === "http" && request.status === "loading" ) { continue; } if ( request?.downloadSource === "http" && request.status === "failed" && request.failedAttempts.httpAttemptsCount >= httpErrorRetries ) { continue; } const isP2PLoadingRequest = request?.status === "loading" && request.downloadSource === "p2p"; if (this.requests.executingHttpCount < simultaneousHttpDownloads) { if (isP2PLoadingRequest) request.abortFromProcessQueue(); this.loadThroughHttp(segment); continue; } if ( this.abortLastHttpLoadingInQueueAfterItem(queue, segment) && this.requests.executingHttpCount < simultaneousHttpDownloads ) { if (isP2PLoadingRequest) request.abortFromProcessQueue(); this.loadThroughHttp(segment); continue; } if (isP2PLoadingRequest) continue; if (this.requests.executingP2PCount < simultaneousP2PDownloads) { this.loadThroughP2P(segment); continue; } if ( this.abortLastP2PLoadingInQueueAfterItem(queue, segment) && this.requests.executingP2PCount < simultaneousP2PDownloads ) { this.loadThroughP2P(segment); continue; } } else if (statuses.isP2PDownloadable) { if (request?.status === "loading") continue; if (this.requests.executingP2PCount < simultaneousP2PDownloads) { this.loadThroughP2P(segment); } else if ( this.p2pLoaders.currentLoader.isSegmentLoadedBySomeone(segment) ) { if ( this.abortLastP2PLoadingInQueueAfterItem(queue, segment) && this.requests.executingP2PCount < simultaneousP2PDownloads ) { this.loadThroughP2P(segment); } } } } } // api method for engines abortSegmentRequest(segmentRuntimeId: string) { if (this.engineRequest?.segment.runtimeId !== segmentRuntimeId) return; this.engineRequest.abort(); this.logger( "abort: ", LoggerUtils.getSegmentString(this.engineRequest.segment), ); this.engineRequest = undefined; this.requestProcessQueueMicrotask(); } private loadThroughHttp(segment: SegmentWithStream) { const request = this.requests.getOrCreateRequest(segment); new HttpRequestExecutor(request, this.config, this.eventTarget); this.p2pLoaders.currentLoader.broadcastAnnouncement(); } private loadThroughP2P(segment: SegmentWithStream) { this.p2pLoaders.currentLoader.downloadSegment(segment); } private loadRandomThroughHttp() { const availableStorageCapacityPercent = this.getAvailableStorageCapacityPercent(); if (availableStorageCapacityPercent <= 10) return; const { simultaneousHttpDownloads, httpErrorRetries } = this.config; const p2pLoader = this.p2pLoaders.currentLoader; if ( this.requests.executingHttpCount >= simultaneousHttpDownloads || !p2pLoader.connectedPeerCount ) { return; } const segmentsToLoad: SegmentWithStream[] = []; for (const { segment, statuses } of QueueUtils.generateQueue( this.lastRequestedSegment, this.playback, this.config, this.p2pLoaders.currentLoader, availableStorageCapacityPercent, )) { const swarmId = this.config.swarmId ?? this.streamManifestUrl; const streamSwarmId = StreamUtils.getStreamSwarmId( swarmId, segment.stream, ); if ( !statuses.isHttpDownloadable || statuses.isP2PDownloadable || this.segmentStorage.hasSegment( swarmId, streamSwarmId, segment.externalId, ) ) { continue; } const request = this.requests.get(segment); if ( request && (request.status === "loading" || request.status === "succeed" || request.failedAttempts.httpAttemptsCount >= httpErrorRetries) ) { continue; } segmentsToLoad.push(segment); } if (!segmentsToLoad.length) return; const availableHttpDownloads = simultaneousHttpDownloads - this.requests.executingHttpCount; if (availableHttpDownloads === 0) return; const peersCount = p2pLoader.connectedPeerCount + 1; const safeRandomSegmentsCount = Math.min( segmentsToLoad.length, simultaneousHttpDownloads * peersCount, ); const randomIndices = Utils.shuffleArray( Array.from({ length: safeRandomSegmentsCount }, (_, i) => i), ); let probability = safeRandomSegmentsCount / peersCount; for (const randomIndex of randomIndices) { if (this.requests.executingHttpCount >= simultaneousHttpDownloads) { break; } if (probability >= 1 || Math.random() <= probability) { const segment = segmentsToLoad[randomIndex]; this.loadThroughHttp(segment); } probability--; if (probability <= 0) break; } } private abortLastHttpLoadingInQueueAfterItem( queue: QueueUtils.QueueItem[], segment: SegmentWithStream, ): boolean { for (const { segment: itemSegment } of Utils.arrayBackwards(queue)) { if (itemSegment === segment) break; const request = this.requests.get(itemSegment); if (request?.downloadSource === "http" && request.status === "loading") { request.abortFromProcessQueue(); return true; } } return false; } private abortLastP2PLoadingInQueueAfterItem( queue: QueueUtils.QueueItem[], segment: SegmentWithStream, ): boolean { for (const { segment: itemSegment } of Utils.arrayBackwards(queue)) { if (itemSegment === segment) break; const request = this.requests.get(itemSegment); if (request?.downloadSource === "p2p" && request.status === "loading") { request.abortFromProcessQueue(); return true; } } return false; } private getAvailableStorageCapacityPercent(): number { const { totalCapacity, usedCapacity } = this.segmentStorage.getUsage(); return 100 - (usedCapacity / totalCapacity) * 100; } private generateQueue() { const queue: QueueItem[] = []; const queueSegmentIds = new Set(); let maxPossibleLength = 0; let alreadyLoadedCount = 0; const availableStorageCapacityPercent = this.getAvailableStorageCapacityPercent(); for (const item of QueueUtils.generateQueue( this.lastRequestedSegment, this.playback, this.config, this.p2pLoaders.currentLoader, availableStorageCapacityPercent, )) { maxPossibleLength++; const { segment } = item; const swarmId = this.config.swarmId ?? this.streamManifestUrl; const streamSwarmId = StreamUtils.getStreamSwarmId( swarmId, segment.stream, ); if ( this.segmentStorage.hasSegment( swarmId, streamSwarmId, segment.externalId, ) || this.requests.get(segment)?.status === "succeed" ) { alreadyLoadedCount++; continue; } queue.push(item); queueSegmentIds.add(segment.runtimeId); } return { queue, queueSegmentIds, maxPossibleLength, alreadyLoadedCount, queueDownloadRatio: maxPossibleLength !== 0 ? alreadyLoadedCount / maxPossibleLength : 0, }; } private getBandwidth(queueDownloadRatio: number) { const { http, all } = this.bandwidthCalculators; const { activeLevelBitrate } = this.streamDetails; if (this.streamDetails.activeLevelBitrate === 0) { return all.getBandwidthLoadingOnly(3); } const bandwidth = Math.max( all.getBandwidth(30, this.levelChangedTimestamp), all.getBandwidth(60, this.levelChangedTimestamp), all.getBandwidth(90, this.levelChangedTimestamp), ); if (queueDownloadRatio >= 0.8 || bandwidth >= activeLevelBitrate * 0.9) { return Math.max( all.getBandwidthLoadingOnly(1), all.getBandwidthLoadingOnly(3), all.getBandwidthLoadingOnly(5), ); } const httpRealBandwidth = Math.max( http.getBandwidthLoadingOnly(1), http.getBandwidthLoadingOnly(3), http.getBandwidthLoadingOnly(5), ); return Math.max(bandwidth, httpRealBandwidth); } notifyLevelChanged() { this.levelChangedTimestamp = performance.now(); } sendBroadcastAnnouncement(sendEmptySegmentsAnnouncement = false) { this.p2pLoaders.currentLoader.broadcastAnnouncement( sendEmptySegmentsAnnouncement, ); } updatePlayback(position: number, rate: number) { const isRateChanged = this.playback.rate !== rate; const isPositionChanged = this.playback.position !== position; if (!isRateChanged && !isPositionChanged) return; const isPositionSignificantlyChanged = Math.abs(position - this.playback.position) / this.segmentAvgDuration > 0.5; if (isPositionChanged) this.playback.position = position; if (isRateChanged && rate !== 0) this.playback.rate = rate; if (isPositionSignificantlyChanged) { this.logger("position significantly changed"); this.engineRequest?.markAsShouldBeStartedImmediately(); } this.segmentStorage.onPlaybackUpdated(position, rate); this.requestProcessQueueMicrotask(isPositionSignificantlyChanged); } updateStream(stream: StreamWithSegments) { if (stream !== this.lastRequestedSegment.stream) return; this.logger(`update stream: ${LoggerUtils.getStreamString(stream)}`); this.requestProcessQueueMicrotask(); } destroy() { clearInterval(this.storageCleanUpIntervalId); clearInterval(this.randomHttpDownloadInterval); this.storageCleanUpIntervalId = undefined; this.engineRequest?.abort(); this.requests.destroy(); this.p2pLoaders.destroy(); } }