/*
Phaneron - Clustered, accelerated and cloud-fit video server, pre-assembled and in kit form.
Copyright (C) 2020 Streampunk Media Ltd.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
https://www.streampunk.media/ mailto:furnace@streampunk.media
14 Ormiscaig, Aultbea, Achnasheen, IV22 2JJ U.K.
*/
import { ProducerFactory, Producer, InvalidProducerError } from './producer'
import { clContext as nodenCLContext, OpenCLBuffer } from 'nodencl'
import redio, { RedioPipe, nil, end, isValue, RedioEnd, isEnd, Generator, Valve } from 'redioactive'
import { Frame, frame, Filterer, filterer } from 'beamcoder'
import { ClJobs } from '../clJobQueue'
import { LoadParams } from '../chanLayer'
import { VideoFormat } from '../config'
import * as Macadam from 'macadam'
import { ToRGBA } from '../process/io'
import { Reader as v210Reader } from '../process/v210'
import Yadif from '../process/yadif'
import { Mixer } from './mixer'
import { SourcePipes } from '../routeSource'
export class MacadamProducer implements Producer {
private readonly sourceID: string
private readonly params: LoadParams
private readonly clContext: nodenCLContext
private readonly clJobs: ClJobs
private readonly consumerFormat: VideoFormat
private readonly mixer: Mixer
private capture: Macadam.CaptureChannel | null = null
private audFilterer: Filterer | null = null
private audSource: RedioPipe | undefined
private vidSource: RedioPipe | undefined
private srcFormat: VideoFormat | undefined
private numForks = 0
private running = true
constructor(
id: number,
params: LoadParams,
context: nodenCLContext,
clJobs: ClJobs,
consumerFormat: VideoFormat
) {
this.sourceID = `P${id} Macadam ${params.url} L${params.layer}`
this.params = params
this.clContext = context
this.clJobs = clJobs
this.consumerFormat = consumerFormat
this.mixer = new Mixer(this.clContext, this.consumerFormat, this.clJobs)
if (this.params.url !== 'DECKLINK')
throw new InvalidProducerError('Macadam producer supports decklink devices')
}
async initialise(): Promise {
let width = 0
let height = 0
let progressive = false
const displayMode = Macadam.bmdModeHD1080i50
const tff = true
let toRGBA: ToRGBA | null = null
let yadif: Yadif | null = null
const sampleRate = this.consumerFormat.audioSampleRate
const numAudChannels = this.consumerFormat.audioChannels
const audLayout = `${numAudChannels}c`
try {
progressive = !Macadam.modeInterlace(displayMode)
this.capture = await Macadam.capture({
deviceIndex: (this.params.channel as number) - 1,
channels: numAudChannels,
sampleRate: Macadam.bmdAudioSampleRate48kHz,
sampleType: Macadam.bmdAudioSampleType32bitInteger,
displayMode: displayMode,
pixelFormat: Macadam.bmdFormat10BitYUV
})
let filtStr = ''
filtStr += `[in${0}:a]asetnsamples=n=1024:p=1, channelsplit=channel_layout=${numAudChannels}c`
for (let s = 0; s < numAudChannels; ++s) filtStr += `[c${s}:a]`
for (let s = 0; s < numAudChannels; ++s)
filtStr += `;\n[c${s}:a]aformat=channel_layouts=1c[out${s}:a]`
// console.log(filtStr)
const outParams = []
for (let s = 0; s < numAudChannels; ++s)
outParams.push({
name: `out${s}:a`,
sampleRate: this.consumerFormat.audioSampleRate,
sampleFormat: 'fltp',
channelLayout: '1c'
})
this.audFilterer = await filterer({
filterType: 'audio',
inputParams: [
{
name: 'in0:a',
timeBase: [1, sampleRate],
sampleRate: sampleRate,
sampleFormat: 's32',
channelLayout: audLayout
}
],
outputParams: outParams,
filterSpec: filtStr
})
// console.log('\nMacadam producer audio:\n', this.audFilterer.graph.dump())
width = this.capture.width
height = this.capture.height
toRGBA = new ToRGBA(this.clContext, '709', '709', new v210Reader(width, height), this.clJobs)
await toRGBA.init()
const yadifMode = progressive ? 'send_frame' : 'send_field'
yadif = new Yadif(
this.clContext,
this.clJobs,
width,
height,
{ mode: yadifMode, tff: tff },
!progressive
)
await yadif.init()
} catch (err) {
throw new Error(err)
}
const frameSource: Generator = async () => {
let result: Promise = Promise.resolve(end)
if (this.capture && this.running) result = this.capture.frame()
else if (this.capture) {
this.capture.stop()
this.capture = null
}
return result
}
const audFilter: Valve = async (
captureFrame
) => {
if (isValue(captureFrame) && this.audFilterer) {
const ffFrame = frame({
nb_samples: captureFrame.audio.sampleFrameCount,
format: 's32',
pts: captureFrame.audio.packetTime,
sample_rate: sampleRate,
channels: numAudChannels,
channel_layout: audLayout,
data: [captureFrame.audio.data]
})
const ff = await this.audFilterer.filter([{ name: 'in0:a', frames: [ffFrame] }])
if (ff.reduce((acc, f) => acc && f.frames && f.frames.length > 0, true)) {
const l = ff[0].frames.length
const result: Frame[][] = Array.from(Array(l), () => new Array(ff.length))
ff.forEach((chan, c) => chan.frames.forEach((f, i) => (result[i][c] = f)))
return result
} else return nil
} else {
return captureFrame as RedioEnd
}
}
const vidLoader: Valve = async (
frame
) => {
if (isValue(frame)) {
const convert = toRGBA as ToRGBA
const clSources = await convert.createSources()
// const now = process.hrtime()
// const nowms = now[0] * 1000.0 + now[1] / 1000000.0
const timestamp =
(frame.video.frameTime / frame.video.frameDuration) * (progressive ? 1 : 2)
clSources.forEach((s) => {
// s.loadstamp = nowms
s.timestamp = timestamp
})
await convert.loadFrame(frame.video.data, clSources, this.clContext.queue.load)
await this.clContext.waitFinish(this.clContext.queue.load)
return clSources
} else {
return frame
}
}
const vidProcess: Valve = async (
clSources
) => {
if (isValue(clSources)) {
const convert = toRGBA as ToRGBA
const clDest = await convert.createDest({ width: width, height: height })
// clDest.loadstamp = clSources[0].loadstamp
clDest.timestamp = clSources[0].timestamp
convert.processFrame(this.sourceID, clSources, clDest)
return clDest
} else {
if (isEnd(clSources)) toRGBA = null
return clSources
}
}
const vidDeint: Valve = async (frame) => {
if (isValue(frame)) {
const yadifDests: OpenCLBuffer[] = []
await yadif?.processFrame(frame, yadifDests, this.sourceID)
yadifDests.forEach((d) => {
for (let f = 0; f < this.numForks; ++f) d.addRef()
})
return yadifDests.length > 0 ? yadifDests : nil
} else {
if (isEnd(frame)) {
yadif?.release()
yadif = null
}
return frame
}
}
const srcFormat = {
name: 'macadam',
fields: 1,
width: width,
height: height,
squareWidth: width,
squareHeight: height,
timescale: 50,
duration: 1,
audioSampleRate: 48000,
audioChannels: 8
}
const macadamFrames = redio(frameSource, { bufferSizeMax: 2 })
this.audSource = macadamFrames
.fork({ bufferSizeMax: 1 })
.valve(audFilter, { bufferSizeMax: 2, oneToMany: true })
this.vidSource = macadamFrames
.fork({ bufferSizeMax: 1 })
.valve(vidLoader, { bufferSizeMax: 1 })
.valve(vidProcess, { bufferSizeMax: 1 })
.valve(vidDeint, { bufferSizeMax: 1, oneToMany: true })
await this.mixer.init(this.sourceID, this.audSource.fork(), this.vidSource.fork(), srcFormat)
console.log(`Created Macadam producer for channel ${this.params.channel}`)
}
async getSourcePipes(): Promise {
if (!(this.audSource && this.vidSource && this.srcFormat))
throw new Error(`Route producer failed to find source pipes for route`)
this.numForks++
return Promise.resolve({
audio: this.audSource.fork(),
video: this.vidSource.fork(),
format: this.srcFormat
})
}
getMixer(): Mixer {
return this.mixer
}
setPaused(pause: boolean): void {
this.mixer.setPaused(pause)
}
release(): void {
this.running = false
this.mixer.release()
}
}
export class MacadamProducerFactory implements ProducerFactory {
private clContext: nodenCLContext
constructor(clContext: nodenCLContext) {
this.clContext = clContext
}
createProducer(
id: number,
params: LoadParams,
clJobs: ClJobs,
consumerFormat: VideoFormat
): MacadamProducer {
return new MacadamProducer(id, params, this.clContext, clJobs, consumerFormat)
}
}