/*
Phaneron - Clustered, accelerated and cloud-fit video server, pre-assembled and in kit form.
Copyright (C) 2020 Streampunk Media Ltd.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
https://www.streampunk.media/ mailto:furnace@streampunk.media
14 Ormiscaig, Aultbea, Achnasheen, IV22 2JJ U.K.
*/
import { clContext as nodenCLContext, OpenCLBuffer } from 'nodencl'
import { RedioPipe, RedioEnd, nil, isValue, Valve, Spout } from 'redioactive'
import { Frame, Filterer, filterer } from 'beamcoder'
import { AudioIO, IoStreamWrite, SampleFormatFloat32 } from 'naudiodon'
import Koa from 'koa'
import cors from '@koa/cors'
import { ConsumerFactory, Consumer } from './consumer'
import { FromRGBA } from '../process/io'
import { Writer } from '../process/rgba8'
import { ConfigParams, VideoFormat, DeviceConfig } from '../config'
import { ClJobs } from '../clJobQueue'
interface AudioBuffer {
buffer: Buffer
timestamp: number
}
export class ScreenConsumer implements Consumer {
private readonly clContext: nodenCLContext
private readonly chanID: string
private readonly params: ConfigParams
private readonly format: VideoFormat
private readonly clJobs: ClJobs
private fromRGBA: FromRGBA | undefined
private readonly audioOutChannels: number
private readonly audioTimebase: number[]
private readonly videoTimebase: number[]
private audioOut: IoStreamWrite
private audFilterer: Filterer | undefined
private readonly kapp: Koa
private readonly lastWeb: Buffer
constructor(
context: nodenCLContext,
chanID: string,
params: ConfigParams,
format: VideoFormat,
clJobs: ClJobs
) {
this.clContext = context
this.chanID = `${chanID} screen`
this.params = params
this.format = format
this.clJobs = clJobs
this.audioOutChannels = 2
this.audioTimebase = [1, this.format.audioSampleRate]
this.videoTimebase = [this.format.duration, this.format.timescale]
this.audioOut = AudioIO({
outOptions: {
channelCount: this.audioOutChannels,
sampleFormat: SampleFormatFloat32,
sampleRate: this.format.audioSampleRate,
closeOnError: false
}
})
if (Object.keys(this.params).length > 1)
console.log('Screen consumer - unused params', this.params)
this.lastWeb = Buffer.alloc(this.format.width * this.format.height * 4)
this.kapp = new Koa()
this.kapp.use(cors())
this.kapp.use(async (ctx) => (ctx.body = this.lastWeb))
const server = this.kapp.listen(3001)
process.on('SIGHUP', server.close)
}
async initialise(): Promise {
const sampleRate = this.audioTimebase[1]
const audInLayout = `${this.format.audioChannels}c`
const audOutLayout = `${this.audioOutChannels}c`
// !!! Needs more work to handle 59.94 frame rates !!!
const samplesPerFrame =
(this.format.audioSampleRate * this.format.duration) / this.format.timescale
const outSampleFormat = 'flt'
this.audFilterer = await filterer({
filterType: 'audio',
inputParams: [
{
name: 'in0:a',
timeBase: this.audioTimebase,
sampleRate: sampleRate,
sampleFormat: 'fltp',
channelLayout: audInLayout
}
],
outputParams: [
{
name: 'out0:a',
sampleRate: this.format.audioSampleRate,
sampleFormat: outSampleFormat,
channelLayout: audOutLayout
}
],
filterSpec: `[in0:a] aformat=sample_fmts=${outSampleFormat}:sample_rates=${this.format.audioSampleRate}:channel_layouts=${audOutLayout}, asetnsamples=n=${samplesPerFrame}:p=1 [out0:a]`
})
// console.log('\nScreen consumer audio:\n', this.audFilterer.graph.dump())
const width = this.format.width
const height = this.format.height
this.fromRGBA = new FromRGBA(
this.clContext,
'sRGB',
new Writer(width, height, false),
this.clJobs
)
await this.fromRGBA.init()
console.log('Created Screen consumer')
return Promise.resolve()
}
connect(
combineAudio: RedioPipe,
combineVideo: RedioPipe
): void {
const audFilter: Valve = async (frame) => {
if (isValue(frame)) {
const audFilt = this.audFilterer as Filterer
const ff = await audFilt.filter([{ name: 'in0:a', frames: [frame] }])
const result: AudioBuffer[] = ff[0].frames.map((f) => ({
buffer: f.data[0],
timestamp: f.pts
}))
return result.length > 0 ? result : nil
} else {
return frame
}
}
const vidProcess: Valve = async (frame) => {
if (isValue(frame)) {
const fromRGBA = this.fromRGBA as FromRGBA
const clDests = await fromRGBA.createDests()
clDests.forEach((d) => (d.timestamp = frame.timestamp))
fromRGBA.processFrame(this.chanID, frame, clDests, 0)
await this.clJobs.runQueue({ source: this.chanID, timestamp: frame.timestamp })
return clDests[0]
} else {
this.clJobs.clearQueue(this.chanID)
return frame
}
}
const vidSaver: Valve = async (frame) => {
if (isValue(frame)) {
const fromRGBA = this.fromRGBA as FromRGBA
await fromRGBA.saveFrame(frame, this.clContext.queue.unload)
await this.clContext.waitFinish(this.clContext.queue.unload)
return frame
} else {
return frame
}
}
const screenSpout: Spout<
[(OpenCLBuffer | RedioEnd | undefined)?, (AudioBuffer | RedioEnd | undefined)?] | RedioEnd
> = async (frame) => {
if (isValue(frame)) {
const vidBuf = frame[0]
const audBuf = frame[1]
if (!(audBuf && isValue(audBuf) && vidBuf && isValue(vidBuf))) {
console.log('One-legged zipper:', audBuf, vidBuf)
if (vidBuf && isValue(vidBuf)) vidBuf.release()
return Promise.resolve()
}
const atb = this.audioTimebase
const ats = (audBuf.timestamp * atb[0]) / atb[1]
const vtb = this.videoTimebase
const vts = (vidBuf.timestamp * vtb[0]) / vtb[1]
if (Math.abs(ats - vts) > 0.1)
console.log('Screen audio and video timestamp mismatch - aud:', ats, ' vid:', vts)
const write = (data: Buffer, cb: () => void) => {
if (
!this.audioOut.write(data, (err: Error | null | undefined) => {
if (err) console.log('Write Error:', err)
})
) {
this.audioOut.once('drain', cb)
} else {
process.nextTick(cb)
}
}
return new Promise((resolve) => {
vidBuf.copy(this.lastWeb)
write(audBuf.buffer, () => {
vidBuf.release()
resolve()
})
})
} else {
// this.clContext.logBuffers()
return Promise.resolve()
}
}
this.audioOut.start()
combineVideo
.valve(vidProcess)
.valve(vidSaver)
.zip(combineAudio.valve(audFilter, { oneToMany: true }))
.spout(screenSpout)
}
}
export class ScreenConsumerFactory implements ConsumerFactory {
private readonly clContext: nodenCLContext
constructor(clContext: nodenCLContext) {
this.clContext = clContext
}
createConsumer(
chanID: string,
params: ConfigParams,
format: VideoFormat,
_device: DeviceConfig,
clJobs: ClJobs
): ScreenConsumer {
const consumer = new ScreenConsumer(this.clContext, chanID, params, format, clJobs)
return consumer
}
}