/* Phaneron - Clustered, accelerated and cloud-fit video server, pre-assembled and in kit form. Copyright (C) 2020 Streampunk Media Ltd. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . https://www.streampunk.media/ mailto:furnace@streampunk.media 14 Ormiscaig, Aultbea, Achnasheen, IV22 2JJ U.K. */ import { EventEmitter } from 'events' import { clContext as nodenCLContext } from 'nodencl' import { Layer } from './layer' import { RedioPipe, RedioEnd, isValue, isEnd, Valve, nil, end } from 'redioactive' import { OpenCLBuffer } from 'nodencl' import { AudioInputParam, filterer, Filterer, Frame } from 'beamcoder' import { VideoFormat } from './config' import { ClJobs } from './clJobQueue' import ImageProcess from './process/imageProcess' import Combine from './process/combine' import { Silence, Black } from './blackSilence' import { SourcePipes, RouteSource } from './routeSource' export class CombineLayer { private readonly audioPipe: RedioPipe private readonly videoPipe: RedioPipe private readonly endEvent: EventEmitter private audioState: 'start' | 'run' | 'end' = 'start' private videoState: 'start' | 'run' | 'end' = 'start' constructor( layer: Layer, audPipe: RedioPipe, vidPipe: RedioPipe ) { this.audioPipe = audPipe this.videoPipe = vidPipe this.endEvent = layer.getEndEvent() } getAudioPipe(): RedioPipe { return this.audioPipe } getVideoPipe(): RedioPipe { return this.videoPipe } checkAudio(frame: Frame | RedioEnd): boolean { let result = true if (isValue(frame)) { if (this.audioState === 'start') this.audioState = 'run' } else { if (this.audioState === 'run') { this.audioState = 'end' if (this.audioState === 'end' && this.videoState === 'end') this.endEvent.emit('end') } result = false } return result } checkVideo(frame: OpenCLBuffer | RedioEnd): boolean { let result = true if (isValue(frame)) { if (this.videoState === 'start') this.videoState = 'run' } else { if (this.videoState === 'run') { this.videoState = 'end' if (this.audioState === 'end' && this.videoState === 'end') this.endEvent.emit('end') } result = false } return result } } export class Combiner implements RouteSource { private readonly clContext: nodenCLContext private readonly chanID: string private readonly consumerFormat: VideoFormat private readonly clJobs: ClJobs private lastNumAudLayers = 0 private lastNumVidLayers = 0 private numConsumers = 0 private audCombiner: Filterer | undefined private vidCombiner: ImageProcess | undefined private audioPipe: RedioPipe | undefined private videoPipe: RedioPipe | undefined private combineLayers: CombineLayer[] = [] private audLayerPipes: RedioPipe[] = [] private vidLayerPipes: RedioPipe[] = [] private audRoutePipe: RedioPipe | undefined private vidTimestamp = 0 private numForks = 0 constructor( clContext: nodenCLContext, chanID: string, consumerFormat: VideoFormat, clJobs: ClJobs ) { this.clContext = clContext this.chanID = `${chanID} combine` this.consumerFormat = consumerFormat this.clJobs = clJobs } async initialise(): Promise { const silence = new Silence(this.consumerFormat) const silencePipe = await silence.initialise() const black = new Black(this.clContext, this.consumerFormat, this.chanID) const blackPipe = await black.initialise() const audEndValve: Valve< [Frame | RedioEnd, ...(Frame | RedioEnd)[]], [Frame | RedioEnd, ...(Frame | RedioEnd)[]] > = async (frames) => { if (isValue(frames)) { return frames.filter((f, i) => i > 0 ? this.combineLayers.length > i - 1 ? this.combineLayers[i - 1].checkAudio(f) : false : true ) as [Frame | RedioEnd, ...(Frame | RedioEnd)[]] } else { return frames } } const combineAudValve: Valve<[Frame | RedioEnd, ...(Frame | RedioEnd)[]], Frame | RedioEnd> = async (frames) => { if (isValue(frames)) { const numLayers = frames.length - 1 const layerFrames = frames.slice(1) as Frame[] const doFilter = layerFrames.reduce((acc, f) => acc && isValue(f), true) const numCombineLayers = numLayers < 2 ? 0 : numLayers if (numCombineLayers && this.lastNumAudLayers !== numCombineLayers) { await this.makeAudCombiner(numCombineLayers) this.lastNumAudLayers = numCombineLayers } const srcFrames = frames as Frame[] if (!isValue(frames[0])) return end const refFrame = srcFrames[0] if (numLayers === 0) { return srcFrames[0] } else if (numLayers === 1) { if (isValue(srcFrames[1])) srcFrames[1].pts = refFrame.pts return srcFrames[1] } else if (doFilter && this.audCombiner) { const filterFrames = layerFrames.map((f, i) => { f.pts = refFrame.pts return { name: `in${i}:a`, frames: [f] } }) const ff = await this.audCombiner.filter(filterFrames) return ff[0].frames.length > 0 ? ff[0].frames : nil } else { return end } } else { this.audCombiner = undefined silence.release() return frames } } const vidEndValve: Valve< [OpenCLBuffer | RedioEnd, ...(OpenCLBuffer | RedioEnd)[]], [OpenCLBuffer | RedioEnd, ...(OpenCLBuffer | RedioEnd)[]] > = async (frames) => { if (isValue(frames)) { return frames.filter((f, i) => i > 0 ? this.combineLayers.length > i - 1 ? this.combineLayers[i - 1].checkVideo(f) : false : true ) as [OpenCLBuffer | RedioEnd, ...(OpenCLBuffer | RedioEnd)[]] } else { return frames } } const combineVidValve: Valve< [OpenCLBuffer | RedioEnd, ...(OpenCLBuffer | RedioEnd)[]], OpenCLBuffer | RedioEnd > = async (frames) => { if (isValue(frames)) { const numLayers = frames.length - 1 const layerFrames = frames.slice(1) as OpenCLBuffer[] if (!isValue(frames[0])) return end const timestamp = this.vidTimestamp++ const numCombineLayers = numLayers < 2 ? 0 : numLayers if (numCombineLayers && this.lastNumVidLayers !== numCombineLayers) { await this.makeVidCombiner(numCombineLayers) this.lastNumVidLayers = numCombineLayers } if (numLayers === 0) { for (let d = 0; d < this.numConsumers + this.numForks; ++d) frames[0].addRef() frames[0].timestamp = timestamp return frames[0] } else if (numLayers === 1) { if (!isEnd(frames[1])) { frames[1].timestamp = timestamp for (let d = 1; d < this.numConsumers + this.numForks; ++d) frames[1].addRef() } return frames[1] } if (frames.reduce((acc, f) => acc && isValue(f), true)) { const combineDest = await this.clContext.createBuffer( this.consumerFormat.width * this.consumerFormat.height * 4 * 4, 'readwrite', 'coarse', { width: this.consumerFormat.width, height: this.consumerFormat.height }, 'combine' ) // combineDest.loadstamp = Math.min(...layerFrames.map((f) => f.loadstamp)) combineDest.timestamp = timestamp for (let d = 1; d < this.numConsumers + this.numForks; ++d) combineDest.addRef() await this.vidCombiner?.run( { inputs: layerFrames, output: combineDest }, { source: this.chanID, timestamp: timestamp }, () => layerFrames.forEach((f) => f.release()) ) await this.clJobs.runQueue({ source: this.chanID, timestamp: timestamp }) return combineDest } else { return end } } else { if (this.vidCombiner) { this.clJobs.clearQueue(this.chanID) black.release() this.vidCombiner = undefined } return frames } } this.audioPipe = silencePipe .zipEach(this.audLayerPipes) .valve(audEndValve) .valve(combineAudValve, { oneToMany: true }) // eslint-disable-next-line prettier/prettier this.videoPipe = blackPipe .zipEach(this.vidLayerPipes) .valve(vidEndValve) .valve(combineVidValve) } async makeAudCombiner(numLayers: number): Promise { const sampleRate = this.consumerFormat.audioSampleRate const numAudChannels = this.consumerFormat.audioChannels const audLayout = `${numAudChannels}c` const inParams: Array = [] let inStr = '' const filtLayers = numLayers > 0 ? numLayers : 1 for (let i = 0; i < filtLayers; i++) { inStr += `[in${i}:a]` inParams.push({ name: `in${i}:a`, timeBase: [1, sampleRate], sampleRate: sampleRate, sampleFormat: 'fltp', channelLayout: `${numAudChannels}c` }) } this.audCombiner = await filterer({ filterType: 'audio', inputParams: inParams, outputParams: [ { name: 'out0:a', sampleRate: sampleRate, sampleFormat: 'fltp', channelLayout: audLayout } ], filterSpec: `${inStr}amix=inputs=${filtLayers}:duration=shortest[out0:a]` }) // console.log('\nCombine audio:\n', this.audCombiner.graph.dump()) } async makeVidCombiner(numLayers: number): Promise { this.vidCombiner = new ImageProcess( this.clContext, new Combine(numLayers, this.consumerFormat.width, this.consumerFormat.height), this.clJobs ) await this.vidCombiner.init() } getLayers(): CombineLayer[] { return this.combineLayers } updateLayers(layers: CombineLayer[]): void { this.combineLayers = layers.slice(0) this.audLayerPipes.splice(0) this.vidLayerPipes.splice(0) layers.forEach((l) => { this.audLayerPipes.push(l.getAudioPipe()) this.vidLayerPipes.push(l.getVideoPipe()) }) } addConsumer(): void { this.numConsumers++ } removeConsumer(): void { this.numConsumers-- } async getSourcePipes(): Promise { if (!(this.audioPipe && this.videoPipe && this.consumerFormat)) throw new Error(`Combiner failed to find source pipes for route`) if (this.numForks === 0) { let audFilterer: Filterer | null = null let filtStr = '' const numAudChannels = this.consumerFormat.audioChannels const sampleRate = this.consumerFormat.audioSampleRate filtStr += `[in${0}:a]channelsplit=channel_layout=${numAudChannels}c` for (let s = 0; s < numAudChannels; ++s) filtStr += `[out${s}:a]` // console.log(filtStr) const outParams = [] for (let s = 0; s < numAudChannels; ++s) { outParams.push({ name: `out${s}:a`, sampleRate: this.consumerFormat.audioSampleRate, sampleFormat: 'fltp', channelLayout: '1c' }) } audFilterer = await filterer({ filterType: 'audio', inputParams: [ { name: 'in0:a', timeBase: [1, sampleRate], sampleRate: sampleRate, sampleFormat: 'fltp', channelLayout: `${numAudChannels}c` } ], outputParams: outParams, filterSpec: filtStr }) // console.log('\nCombiner route source audio:\n', audFilterer.graph.dump()) const audFilter: Valve = async (frame) => { if (isValue(frame)) { if (!audFilterer) return nil const ff = await audFilterer.filter([{ name: 'in0:a', frames: [frame] }]) if (ff.reduce((acc, f) => acc && f.frames && f.frames.length > 0, true)) { return ff.map((f) => f.frames[0]) } else return nil } else { return frame } } this.audRoutePipe = this.audioPipe.fork().valve(audFilter) } this.numForks++ if (!this.audRoutePipe) throw new Error(`Combiner failed to create audio filter for route`) return { audio: this.audRoutePipe.fork(), video: this.videoPipe.fork(), format: this.consumerFormat } } getAudioPipe(): RedioPipe | undefined { return this.audioPipe } getVideoPipe(): RedioPipe | undefined { return this.videoPipe } }