/* Phaneron - Clustered, accelerated and cloud-fit video server, pre-assembled and in kit form. Copyright (C) 2020 Streampunk Media Ltd. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . https://www.streampunk.media/ mailto:furnace@streampunk.media 14 Ormiscaig, Aultbea, Achnasheen, IV22 2JJ U.K. */ import { clContext as nodenCLContext } from 'nodencl' import { LoadParams } from './chanLayer' import { Producer, ProducerRegistry } from './producer/producer' import { ConsumerConfig } from './config' import { DefaultTransitionSpec, Layer } from './layer' import { ConsumerRegistry, Consumer } from './consumer/consumer' import { CombineLayer, Combiner } from './combiner' import { ClJobs } from './clJobQueue' import { TransitionSpec } from './transitioner' import { SourcePipes } from './routeSource' export class Channel { private readonly clContext: nodenCLContext private readonly chanID: string private readonly consumerConfig: ConsumerConfig private readonly consumerRegistry: ConsumerRegistry private readonly producerRegistry: ProducerRegistry private readonly clJobs: ClJobs private readonly combiner: Combiner private readonly consumers: Consumer[] private readonly layers: Map constructor( clContext: nodenCLContext, chanID: string, chanNum: number, consumerConfig: ConsumerConfig, consumerRegistry: ConsumerRegistry, producerRegistry: ProducerRegistry, clJobs: ClJobs ) { this.clContext = clContext this.chanID = chanID this.consumerConfig = consumerConfig this.consumerRegistry = consumerRegistry this.producerRegistry = producerRegistry this.clJobs = clJobs this.combiner = new Combiner( this.clContext, this.chanID, this.consumerConfig.format, this.clJobs ) this.consumers = this.consumerRegistry.createConsumers( chanNum, this.chanID, this.consumerConfig, this.clJobs ) this.layers = new Map() } async initialise(): Promise { await this.combiner.initialise() await Promise.all(this.consumers.map((c) => c.initialise())) this.consumers.forEach((c) => this.addConsumer(c)) return Promise.resolve() } getConfig(): ConsumerConfig { return this.consumerConfig } addConsumer(consumer: Consumer): void { if (!this.consumers.find((c) => c === consumer)) this.consumers.push(consumer) const combinerAudio = this.combiner.getAudioPipe() const combinerVideo = this.combiner.getVideoPipe() if (!(combinerAudio !== undefined && combinerVideo !== undefined)) { throw new Error('Failed to get combiner connection pipes') } consumer.connect(combinerAudio.fork(), combinerVideo.fork()) this.combiner.addConsumer() } removeConsumer(consumer: Consumer): void { if (!this.consumers.some((c) => c === consumer)) throw new Error('remove consumer - consumer not found') const i = this.consumers.indexOf(consumer) this.consumers.splice(i, i + 1) // should be consumer.destroy() this.combiner.removeConsumer() } updateLayers(): void { const layerNums: number[] = [] const layerIter = this.layers.keys() let next = layerIter.next() while (!next.done) { layerNums.push(next.value) next = layerIter.next() } // sort layers from low to high for combining bottom to top layerNums.sort((a, b) => a - b) const curLayers = this.combiner.getLayers() const newLayers: CombineLayer[] = [] layerNums.forEach((l) => { const layer = this.layers.get(l) if (layer) { const audPipe = layer.getAudioPipe() const vidPipe = layer.getVideoPipe() if (audPipe && vidPipe) { const aid = audPipe.fittingId const curLayer = curLayers.find((cl) => aid === cl.getAudioPipe().fittingId) if (curLayer) newLayers.push(curLayer) else newLayers.push(new CombineLayer(layer, audPipe, vidPipe)) } } }) this.combiner.updateLayers(newLayers) } async loadSource(params: LoadParams): Promise { let producer: Producer | undefined const transitionSpec: TransitionSpec = JSON.parse(DefaultTransitionSpec) let error = '' try { producer = await this.producerRegistry.createSource( params, this.consumerConfig.format, this.clJobs ) if (params.transition) { transitionSpec.type = params.transition.type if (params.transition.type === 'wipe' && params.transition.url) transitionSpec.mask = await this.producerRegistry.createSource( { url: params.transition.url, streams: params.transition.streams, layer: params.layer, length: params.transition.length }, this.consumerConfig.format, this.clJobs ) transitionSpec.len = params.transition.length } } catch (err) { error = err } if (!producer || error.length > 0) { console.log('Failed to create source for params', params, error) return false } let layer = this.layers.get(params.layer) if (!layer) { layer = new Layer( this.clContext, `${this.chanID}-l${params.layer}`, this.consumerConfig, this.clJobs ) await layer.initialise() this.layers.set(params.layer, layer) } return layer.load( producer, transitionSpec, params.preview ? true : false, params.autoPlay ? true : false, this.updateLayers.bind(this) ) } async play(layerNum: number, ticker?: () => void): Promise { const layer = this.layers.get(layerNum) // eslint-disable-next-line @typescript-eslint/no-unused-vars await layer?.play(async (t: string) => { if (t === 'end') { await layer?.release() this.layers.delete(layerNum) this.updateLayers() } else if (ticker) ticker() }) return layer !== undefined } pause(layerNum: number): boolean { const layer = this.layers.get(layerNum) layer?.pause() return layer !== undefined } resume(layerNum: number): boolean { const layer = this.layers.get(layerNum) layer?.resume() return layer !== undefined } async stop(layerNum: number): Promise { const layer = this.layers.get(layerNum) await layer?.stop() return layer !== undefined } async clear(layerNum: number): Promise { let result = true if (layerNum === 0) { const layerNums: number[] = [] const layerIter = this.layers.keys() let next = layerIter.next() while (!next.done) { layerNums.push(next.value) next = layerIter.next() } await Promise.all( layerNums.map((n) => this.stop(n).then(() => this.layers.get(n)?.release())) ) for (const n of layerNums) this.layers.delete(n) } else { result = await this.stop(layerNum) await this.layers.get(layerNum)?.release() this.layers.delete(layerNum) } this.updateLayers() return result } anchor(layerNum: number, params: string[]): boolean { const layer = this.layers.get(layerNum) layer?.anchor(params) return layer !== undefined } rotation(layerNum: number, params: string[]): boolean { const layer = this.layers.get(layerNum) layer?.rotation(params) return layer !== undefined } fill(layerNum: number, params: string[]): boolean { const layer = this.layers.get(layerNum) layer?.fill(params) return layer !== undefined } volume(layerNum: number, params: string[]): boolean { const layer = this.layers.get(layerNum) layer?.volume(params) return layer !== undefined } async getRoutePipes(layerNum: number): Promise { let sourcePipes: SourcePipes | undefined if (layerNum === 0) { sourcePipes = await this.combiner.getSourcePipes() } else { const layer = this.layers.get(layerNum) sourcePipes = await layer?.getSourcePipes() } if (!sourcePipes) throw new Error(`Failed to find source pipes for layer ${layerNum}`) return sourcePipes } }