/*
Phaneron - Clustered, accelerated and cloud-fit video server, pre-assembled and in kit form.
Copyright (C) 2020 Streampunk Media Ltd.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
https://www.streampunk.media/ mailto:furnace@streampunk.media
14 Ormiscaig, Aultbea, Achnasheen, IV22 2JJ U.K.
*/
import { EventEmitter } from 'events'
import { clContext as nodenCLContext, KernelParams, OpenCLProgram, RunTimings } from 'nodencl'
export type JobCB = () => void
interface ClJob {
name: string
program: OpenCLProgram
params: KernelParams
cb: JobCB
}
export type JobID = {
source: string
timestamp: number
}
type JobsRequest = { id: string; jobs: ClJob[]; start: [number, number]; done: () => void }
export class ClJobs {
private readonly processJobs: ClProcessJobs
private jobs: Map
constructor(processJobs: ClProcessJobs) {
this.processJobs = processJobs
this.jobs = new Map()
}
makeKey(id: JobID): string {
return `${id.source} ts ${id.timestamp}`
}
add(id: JobID, name: string, program: OpenCLProgram, params: KernelParams, cb: JobCB): void {
const key = this.makeKey(id)
let tsJobs = this.jobs.get(key)
if (!tsJobs) {
this.jobs.set(key, [])
tsJobs = this.jobs.get(key) as ClJob[]
}
tsJobs.push({ name: name, program: program, params: params, cb: cb })
}
get(id: JobID): ClJob[] | undefined {
return this.jobs.get(this.makeKey(id))
}
delete(id: JobID): void {
this.jobs.delete(this.makeKey(id))
}
clear(): void {
this.jobs.clear()
}
async runQueue(id: JobID): Promise {
const key = this.makeKey(id)
const tsJobs = this.jobs.get(key)
if (!tsJobs) throw new Error(`Failed to run queue for id ${key}`)
return new Promise((resolve) => {
const jobsRequest = { id: key, jobs: tsJobs, start: process.hrtime(), done: resolve }
this.processJobs.requestRun(key, jobsRequest)
this.delete(id)
})
}
clearQueue(src: string): void {
this.jobs.forEach((jobs, key) => {
if (key.startsWith(src)) {
// run the callbacks so sources are released
jobs.forEach((j) => j.cb())
}
})
}
}
export class ClProcessJobs {
private readonly clContext: nodenCLContext
private readonly requests: Map
private readonly runEvents: EventEmitter
private readonly clJobs: ClJobs
private readonly showTimings = 0
constructor(clContext: nodenCLContext) {
this.clContext = clContext
this.requests = new Map()
this.runEvents = new EventEmitter()
this.clJobs = new ClJobs(this)
this.runEvents.once('run', async () => this.processQueue())
// setInterval(() => this.logRequests(), 2000)
}
async processQueue(): Promise {
const reqIt = this.requests.entries()
let curReq = reqIt.next()
while (!curReq.done) {
const chan = curReq.value[0]
const req = curReq.value[1]
const timings = new Map()
const jobQueued = process.hrtime(req.start)
for (let i = 0; i < req.jobs.length; ++i) {
const job = req.jobs[i]
timings.set(
job.name,
await this.clContext.runProgram(job.program, job.params, this.clContext.queue.process)
)
}
const submit = process.hrtime(req.start)
await this.clContext.waitFinish(this.clContext.queue.process)
req.jobs.forEach((j) => j.cb())
const end = process.hrtime(req.start)
this.logTimings(req.id, jobQueued, submit, end, timings)
req.done()
this.requests.delete(chan)
curReq = reqIt.next()
}
this.runEvents.once('run', async () => this.processQueue())
}
getJobs(): ClJobs {
return this.clJobs
}
requestRun(id: string, request: JobsRequest): void {
this.requests.set(id, request)
this.runEvents.emit('run')
}
logRequests(): void {
let i = 0
this.requests.forEach((r) => {
console.log(`${i++}: ${r.id} ${r.jobs.map((j) => j.name)}`)
})
}
logTimings(
id: string,
jobQueued: number[],
submit: number[],
end: number[],
timings: Map
): void {
const submitms = submit.reduce((sec, nano) => sec * 1e3 + nano / 1e6)
const endms = end.reduce((sec, nano) => sec * 1e3 + nano / 1e6)
const executems = endms - submitms
const idLim = id.slice(-20).concat(':').padEnd(21, ' ')
const idSp = new Array(25 - idLim.length).fill(' ').join('')
if (this.showTimings > 1) {
const tsIt = timings.entries()
let curTs = tsIt.next()
console.log(`\n${idLim}${idSp} | toGPU | process | total (microseconds)`)
console.log(new Array(56).fill('—').join(''))
let d2kTotal = 0
let keTotal = 0
let ttTotal = 0
while (!curTs.done) {
const process = curTs.value[0]
const t = curTs.value[1]
const pSp = new Array(26 - process.length).fill(' ').join('')
const d2kSp = new Array(7 - t.dataToKernel.toString().length).fill(' ').join('')
const keSp = new Array(7 - t.kernelExec.toString().length).fill(' ').join('')
const ttSp = new Array(7 - t.totalTime.toString().length).fill(' ').join('')
// eslint-disable-next-line prettier/prettier
console.log(`${process}${pSp}| ${d2kSp}${t.dataToKernel} | ${keSp}${t.kernelExec} | ${ttSp}${t.totalTime}`)
d2kTotal += t.dataToKernel
keTotal += t.kernelExec
ttTotal += t.totalTime
curTs = tsIt.next()
}
const executeus = (executems * 1000) >>> 0
ttTotal += executeus
const pSp = new Array(26 + 20 - 'execute'.length).fill(' ').join('')
const exSp = new Array(7 - executeus.toString().length).fill(' ').join('')
console.log(`execute${pSp}| ${exSp}${executeus}`)
const tSp = new Array(26 - 'TOTALS'.length).fill(' ').join('')
const d2kSp = new Array(7 - d2kTotal.toString().length).fill(' ').join('')
const keSp = new Array(7 - keTotal.toString().length).fill(' ').join('')
const ttSp = new Array(7 - ttTotal.toString().length).fill(' ').join('')
console.log(new Array(56).fill('—').join(''))
console.log(`TOTALS${tSp}| ${d2kSp}${d2kTotal} | ${keSp}${keTotal} | ${ttSp}${ttTotal}`)
}
if (this.showTimings > 0) {
const jobqueuedms = jobQueued.reduce((sec, nano) => sec * 1e3 + nano / 1e6)
console.log(
// eslint-disable-next-line prettier/prettier
`${idLim}${endms < 10.0 ? idSp : idSp.slice(1)} ${endms.toFixed(2)}ms elapsed (${jobqueuedms.toFixed(2)}ms job queued, ${(submitms - jobqueuedms).toFixed(2)}ms submit, ${executems.toFixed(2)}ms execute)`
)
}
}
}