/*
Phaneron - Clustered, accelerated and cloud-fit video server, pre-assembled and in kit form.
Copyright (C) 2020 Streampunk Media Ltd.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
https://www.streampunk.media/ mailto:furnace@streampunk.media
14 Ormiscaig, Aultbea, Achnasheen, IV22 2JJ U.K.
*/
import { ProducerFactory, Producer, InvalidProducerError } from './producer'
import { clContext as nodenCLContext, OpenCLBuffer } from 'nodencl'
import {
Demuxer,
demuxer,
Decoder,
decoder,
Filterer,
filterer,
Stream,
Packet,
Frame,
frame
} from 'beamcoder'
import redio, { RedioPipe, nil, end, isValue, RedioEnd, Generator, Valve } from 'redioactive'
import { ClJobs } from '../clJobQueue'
import { LoadParams } from '../chanLayer'
import { VideoFormat } from '../config'
import { ToRGBA } from '../process/io'
import { Reader as yuv422p10Reader } from '../process/yuv422p10'
import { Reader as yuv422p8Reader } from '../process/yuv422p8'
import { Reader as yuv420pReader } from '../process/yuv420p'
import { Reader as v210Reader } from '../process/v210'
import { Reader as rgba8Reader } from '../process/rgba8'
import { Reader as bgra8Reader } from '../process/bgra8'
import Yadif from '../process/yadif'
import { PackImpl } from '../process/packer'
import { Mixer } from './mixer'
import { SourcePipes } from '../routeSource'
interface AudioChannel {
name: string
frames: Frame[]
}
export class FFmpegProducer implements Producer {
private readonly sourceID: string
private readonly loadParams: LoadParams
private readonly clContext: nodenCLContext
private readonly clJobs: ClJobs
private readonly consumerFormat: VideoFormat
private readonly mixer: Mixer
private demuxer: Demuxer | null = null
private audSource: RedioPipe | undefined
private vidSource: RedioPipe | undefined
private srcFormat: VideoFormat | undefined
private numForks = 0
private running = true
constructor(
id: number,
loadParams: LoadParams,
context: nodenCLContext,
clJobs: ClJobs,
consumerFormat: VideoFormat
) {
this.sourceID = `P${id} FFmpeg ${loadParams.url} L${loadParams.layer}`
this.loadParams = loadParams
this.clContext = context
this.clJobs = clJobs
this.consumerFormat = consumerFormat
this.mixer = new Mixer(this.clContext, this.consumerFormat, this.clJobs)
}
async initialise(): Promise {
try {
this.demuxer = await demuxer(this.loadParams.url)
} catch (err) {
console.log(err)
throw new InvalidProducerError(err)
}
const audioStreams: Stream[] = []
const videoStreams: Stream[] = []
const audioIndexes: number[] = []
const videoIndexes: number[] = []
const numVidChannels = 1
const audioPackets: Packet[] = []
const videoPackets: Packet[] = []
let audioDecoder: Decoder | undefined
let videoDecoder: Decoder | undefined
const demuxAudioStreams = this.demuxer.streams.filter((s) => s.codecpar.codec_type === 'audio')
let astreams = this.loadParams.streams === undefined ? demuxAudioStreams : []
if (this.loadParams.streams && this.loadParams.streams.audio)
astreams = demuxAudioStreams.filter(
(_s, i) =>
this.loadParams.streams?.audio?.find((loadIndex) => loadIndex === i) !== undefined
)
const demuxVideoStreams = this.demuxer.streams.filter((s) => s.codecpar.codec_type === 'video')
let vstreams = this.loadParams.streams === undefined ? demuxVideoStreams : []
if (this.loadParams.streams && this.loadParams.streams.video)
vstreams = demuxVideoStreams.filter(
(_s, i) =>
this.loadParams.streams?.video?.find((loadIndex) => loadIndex === i) !== undefined
)
let monoStreams = true
astreams.forEach((s) => {
if (monoStreams) {
// allow mxf-style mono channel per stream or a single stream of multiple channels
s.discard = 'default'
audioStreams.push(s)
audioIndexes.push(s.index)
if (!audioDecoder)
audioDecoder = decoder({ demuxer: this.demuxer as Demuxer, stream_index: s.index })
monoStreams &&= s.codecpar.channel_layout === 'mono'
} else {
s.discard = 'all'
}
})
vstreams.forEach((s) => {
if (videoStreams.length < numVidChannels) {
s.discard = 'default'
videoStreams.push(s)
videoIndexes.push(s.index)
if (!videoDecoder)
videoDecoder = decoder({ demuxer: this.demuxer as Demuxer, stream_index: s.index })
} else {
s.discard = 'all'
}
})
const primaryIndex = videoIndexes.length ? videoIndexes[0] : audioIndexes[0]
if (this.loadParams.seek)
await this.demuxer.seek({ stream_index: primaryIndex, frame: this.loadParams.seek })
const maxFrame = this.loadParams.length ? this.loadParams.length : 0
let silentFrame: Frame | null = null
let audFilterer: Filterer | null = null
const audStream = audioStreams[0]
let numAudChannels = 0
if (audStream) {
const inParams = audioStreams.map((_s, i) => {
return {
name: `in${i}:a`,
timeBase: audStream.time_base,
sampleRate: audStream.codecpar.sample_rate,
sampleFormat: audStream.codecpar.format,
channelLayout: monoStreams ? '1c' : audStream.codecpar.channel_layout
}
})
let filtStr = ''
if (monoStreams) {
numAudChannels = audioStreams.length
for (let c = 0; c < numAudChannels; ++c)
filtStr += (c === 0 ? '' : ';\n') + `[in${c}:a]asetnsamples=n=1024:p=1[out${c}:a]`
} else {
numAudChannels = audStream.codecpar.channels
filtStr += `[in${0}:a]asetnsamples=n=1024:p=1, channelsplit=channel_layout=${numAudChannels}c`
for (let s = 0; s < numAudChannels; ++s) filtStr += `[c${s}:a]`
for (let s = 0; s < numAudChannels; ++s)
filtStr += `;\n[c${s}:a]aformat=channel_layouts=1c[out${s}:a]`
}
// console.log(filtStr)
const outParams = []
for (let s = 0; s < numAudChannels; ++s)
outParams.push({
name: `out${s}:a`,
sampleRate: this.consumerFormat.audioSampleRate,
sampleFormat: 'fltp',
channelLayout: '1c'
})
audFilterer = await filterer({
filterType: 'audio',
inputParams: inParams,
outputParams: outParams,
filterSpec: filtStr
})
} else {
numAudChannels = 1
silentFrame = frame({
nb_samples: 1024,
format: 's32',
pts: 0,
sample_rate: this.consumerFormat.audioSampleRate,
channels: 1,
channel_layout: '1c',
data: [Buffer.alloc(1024 * 4)]
})
audFilterer = await filterer({
filterType: 'audio',
inputParams: [
{
name: 'in0:a',
timeBase: [1, this.consumerFormat.audioSampleRate],
sampleRate: this.consumerFormat.audioSampleRate,
sampleFormat: 's32',
channelLayout: '1c'
}
],
outputParams: [
{
name: 'out0:a',
sampleRate: this.consumerFormat.audioSampleRate,
sampleFormat: 'fltp',
channelLayout: '1c'
}
],
filterSpec: '[in0:a] asetpts=N/SR/TB [out0:a]'
})
}
// console.log('\nFFmpeg producer audio:\n', audFilterer.graph.dump())
let width = this.consumerFormat.width
let height = this.consumerFormat.height
let squareWidth = width
let squareHeight = height
let vidTimescale = this.consumerFormat.timescale
let vidDuration = this.consumerFormat.duration
let toRGBA: ToRGBA | null = null
let vidFilterer: Filterer | null = null
let progressive = true
let yadif: Yadif | null = null
let black: OpenCLBuffer | null = null
const vidStream = videoStreams[0]
if (vidStream) {
width = vidStream.codecpar.width
height = vidStream.codecpar.height
if (vidStream.codecpar.sample_aspect_ratio[0] && vidStream.codecpar.sample_aspect_ratio[1]) {
squareWidth =
(width * vidStream.codecpar.sample_aspect_ratio[0]) /
vidStream.codecpar.sample_aspect_ratio[1]
}
squareHeight = height
const fieldOrder = vidStream.codecpar.field_order
if (vidStream.avg_frame_rate[0] / vidStream.avg_frame_rate[1] > 30) {
console.log(
'Stream field_order flag not set for framerate greater than 30fps - setting to progressive'
)
progressive = true
} else progressive = fieldOrder === 'progressive'
vidTimescale = vidStream.avg_frame_rate[0] * (progressive ? 1 : 2)
vidDuration = vidStream.avg_frame_rate[1]
let filterOutputFormat = vidStream.codecpar.format
let readImpl: PackImpl
switch (vidStream.codecpar.format) {
case 'yuv420p':
console.log('Using native yuv420p loader')
readImpl = new yuv420pReader(width, height)
break
case 'yuv422p':
console.log('Using native yuv422p8 loader')
readImpl = new yuv422p8Reader(width, height)
break
case 'yuv422p10le':
console.log('Using native yuv422p10 loader')
readImpl = new yuv422p10Reader(width, height)
break
case 'v210':
console.log('Using native v210 loader')
readImpl = new v210Reader(width, height)
break
case 'rgba':
console.log('Using native rgba8 loader')
readImpl = new rgba8Reader(width, height)
break
case 'bgra':
console.log('Using native bgra8 loader')
readImpl = new bgra8Reader(width, height)
break
default:
if (vidStream.codecpar.format.includes('yuv')) {
console.log(`Non-native loader for ${vidStream.codecpar.format} - using yuv422p10`)
filterOutputFormat = 'yuv422p10le'
readImpl = new yuv422p10Reader(width, height)
} else if (vidStream.codecpar.format.includes('rgb')) {
console.log(`Non-native loader for ${vidStream.codecpar.format} - using rgba8`)
filterOutputFormat = 'rgba'
readImpl = new rgba8Reader(width, height)
} else
throw new Error(
`Unsupported video format '${vidStream.codecpar.format}' from FFmpeg decoder`
)
}
toRGBA = new ToRGBA(this.clContext, '709', '709', readImpl, this.clJobs)
await toRGBA.init()
const chanTb = [this.consumerFormat.duration, this.consumerFormat.timescale]
vidFilterer = await filterer({
filterType: 'video',
inputParams: [
{
timeBase: vidStream.time_base,
width: width,
height: height,
pixelFormat: vidStream.codecpar.format,
pixelAspect: vidStream.codecpar.sample_aspect_ratio
}
],
outputParams: [
{
pixelFormat: filterOutputFormat
}
],
filterSpec: `fps=fps=${chanTb[1] / (progressive ? 1 : 2)}/${chanTb[0]}`
})
// console.log('\nFFmpeg producer video:\n', vidFilterer.graph.dump())
const tff = fieldOrder === 'unknown' || fieldOrder.split(', ', 2)[1] === 'top displayed first'
const yadifMode = progressive ? 'send_frame' : 'send_field'
yadif = new Yadif(
this.clContext,
this.clJobs,
width,
height,
{ mode: yadifMode, tff: tff },
!progressive
)
await yadif.init()
} else {
const numBytesRGBA = width * height * 4 * 4
black = await this.clContext.createBuffer(
numBytesRGBA,
'readwrite',
'coarse',
{
width: width,
height: height
},
'combinerBlack'
)
let off = 0
const blackFloat = new Float32Array(numBytesRGBA / 4)
for (let y = 0; y < height; ++y) {
for (let x = 0; x < width * 4; x += 4) {
blackFloat[off + x + 0] = 0.0
blackFloat[off + x + 1] = 0.0
blackFloat[off + x + 2] = 0.0
blackFloat[off + x + 3] = 0.0
}
off += width * 4
}
await black.hostAccess('writeonly')
Buffer.from(blackFloat.buffer).copy(black)
}
const demux: Generator = async () => {
let packet: Packet | RedioEnd = end
if (this.demuxer && this.running) {
packet = await this.demuxer.read()
} else {
this.demuxer = null
}
return packet ? packet : end
}
const audPacketFilter: Valve = async (packet) => {
if (isValue(packet)) {
if (!this.running) return nil
if (audioIndexes.includes(packet.stream_index)) {
audioPackets.push(packet)
if (audioPackets.length === audioStreams.length) {
const result = audioPackets.slice(0)
audioPackets.splice(0)
return result
} else return nil
} else return nil
} else {
return packet
}
}
const audDecode: Valve = async (packets) => {
if (isValue(packets)) {
if (!(this.running && audioDecoder)) return nil
const decodedFrames = await audioDecoder.decode(packets)
return decodedFrames.frames.map((f, i) => ({ name: `in${i}:a`, frames: [f] }))
} else {
return packets
}
}
const audFilter: Valve = async (frames) => {
if (isValue(frames)) {
if (!(this.running && audFilterer)) return nil
const ff = await audFilterer.filter(frames)
if (ff.reduce((acc, f) => acc && f.frames && f.frames.length > 0, true)) {
const l = ff[0].frames.length
const result: Frame[][] = Array.from(Array(l), () => new Array(ff.length))
ff.forEach((chan, c) => chan.frames.forEach((f, i) => (result[i][c] = f)))
return result
} else return nil
} else {
return frames
}
}
const silence: Generator = async () =>
this.running ? [{ name: 'in0:a', frames: [silentFrame] }] : end
const vidPacketFilter: Valve = async (packet) => {
if (isValue(packet)) {
if (!this.running) return nil
if (videoIndexes.includes(packet.stream_index)) {
videoPackets.push(packet)
if (videoPackets.length === videoStreams.length) {
const result = videoPackets.slice(0)
videoPackets.splice(0)
return result
} else return nil
} else return nil
} else {
return packet
}
}
const vidDecode: Valve = async (packets) => {
if (isValue(packets)) {
if (!(this.running && videoDecoder)) return nil
const frm = await videoDecoder.decode(packets)
return frm.frames.length > 0 ? frm.frames : nil
} else {
return packets
}
}
const vidFilter: Valve = async (decFrame) => {
if (isValue(decFrame)) {
if (!(this.running && vidFilterer)) return nil
const ff = await vidFilterer.filter([decFrame])
if (!ff[0]) return nil
return ff[0].frames.length > 0 ? ff[0].frames : nil
} else {
return decFrame
}
}
const vidLoader: Valve = async (frame) => {
if (isValue(frame)) {
if (!this.running) return nil
const convert = toRGBA as ToRGBA
const clSources = await convert.createSources()
// const now = process.hrtime()
// const nowms = now[0] * 1000.0 + now[1] / 1000000.0
clSources.forEach((s) => {
// s.loadstamp = nowms
s.timestamp = progressive ? frame.pts : frame.pts * 2
})
await convert.loadFrame(frame.data, clSources, this.clContext.queue.load)
await this.clContext.waitFinish(this.clContext.queue.load)
return clSources
} else {
return frame
}
}
const vidProcess: Valve = async (
clSources
) => {
if (isValue(clSources)) {
if (!this.running) {
clSources.forEach((s) => s.release())
return nil
}
const convert = toRGBA as ToRGBA
const clDest = await convert.createDest({ width: width, height: height })
// clDest.loadstamp = clSources[0].loadstamp
clDest.timestamp = clSources[0].timestamp
convert.processFrame(this.sourceID, clSources, clDest)
return clDest
} else {
toRGBA?.finish()
toRGBA = null
return clSources
}
}
let curFrame = 0
const vidDeint: Valve = async (frame) => {
if (isValue(frame)) {
if (!this.running) {
frame.release()
return nil
}
const yadifDests: OpenCLBuffer[] = []
await yadif?.processFrame(frame, yadifDests, this.sourceID)
if (maxFrame && maxFrame === curFrame) this.release()
curFrame += yadifDests.length
yadifDests.forEach((d) => {
for (let f = 0; f < this.numForks; ++f) d.addRef()
})
return yadifDests.length > 0 ? yadifDests : nil
} else {
yadif?.release()
yadif = null
this.running = false
return frame
}
}
let blackCurFrame = 0
const blackPipe: RedioPipe = redio(
async () => {
if (this.running) {
if (maxFrame && maxFrame === blackCurFrame) this.release()
if (black) black.timestamp = blackCurFrame
blackCurFrame++
black?.addRef()
return black
} else return end
},
{ bufferSizeMax: 1 }
)
this.srcFormat = {
name: 'ffmpeg',
fields: 1,
width: width,
height: height,
squareWidth: squareWidth,
squareHeight: squareHeight,
timescale: vidTimescale,
duration: vidDuration,
audioSampleRate: 48000,
audioChannels: numAudChannels
}
const ffPackets = redio(demux, { bufferSizeMax: this.demuxer.streams.length * 2 })
if (audioStreams.length) {
this.audSource = ffPackets
.fork({ bufferSizeMax: 10 })
.valve(audPacketFilter)
.valve(audDecode, { bufferSizeMax: 2 })
.valve(audFilter, { oneToMany: true })
} else {
// eslint-disable-next-line prettier/prettier
this.audSource = redio(silence, { bufferSizeMax: 2 })
.valve(audFilter, { oneToMany: true })
}
if (videoStreams.length) {
this.vidSource = ffPackets
.fork({ bufferSizeMax: 10 })
.valve(vidPacketFilter)
.valve(vidDecode, { oneToMany: true, bufferSizeMax: 2 })
.valve(vidFilter, { oneToMany: true })
.valve(vidLoader, { bufferSizeMax: 1 })
.valve(vidProcess)
.valve(vidDeint, { oneToMany: true })
} else {
this.vidSource = blackPipe
}
await this.mixer.init(
this.sourceID,
this.audSource.fork(),
this.vidSource.fork(),
this.srcFormat
)
console.log(`Created FFmpeg producer for path ${this.loadParams.url}`)
}
async getSourcePipes(): Promise {
if (!(this.audSource && this.vidSource && this.srcFormat))
throw new Error(`Route producer failed to find source pipes for route`)
this.numForks++
return Promise.resolve({
audio: this.audSource.fork(),
video: this.vidSource.fork(),
format: this.srcFormat
})
}
getMixer(): Mixer {
return this.mixer
}
setPaused(pause: boolean): void {
this.mixer.setPaused(pause)
}
release(): void {
this.running = false
this.mixer.release()
}
}
export class FFmpegProducerFactory implements ProducerFactory {
private clContext: nodenCLContext
constructor(clContext: nodenCLContext) {
this.clContext = clContext
}
createProducer(
id: number,
loadParams: LoadParams,
clJobs: ClJobs,
consumerFormat: VideoFormat
): FFmpegProducer {
return new FFmpegProducer(id, loadParams, this.clContext, clJobs, consumerFormat)
}
}