// npm import { v4 as uuid } from 'uuid'; import * as fs from 'fs'; import * as path from 'path'; import * as _ from 'lodash'; // @ownzones import { s3 as ownS3 } from '@ownzones/lib'; import { S3ByteProvider, MxfFileReader } from '@ownzones/imf'; import { generateChannelMappingFilters, createFFmpegCommand, IInput, unknownChannelLayoutValue, } from '@ownzones/meh'; // app import { Honeycomb } from '../tracing'; import { log } from '../../config'; import { SegmentBuilderHelper } from './segment-builder-helper'; import { IAudioSegment, ICustomAudioSegment, isCustomAudioSegment } from '../playlist-builder'; import { TrackedFFmpegCommand } from '../../utils/types'; import { ISegmentTranscoder, ISegmentTranscodeResult } from './segment-transcoder'; export interface IAudioSegmentResult extends ISegmentTranscodeResult { extension: string; } export class AudioSegment implements ISegmentTranscoder { public static mxfGCClipWrappedBroadcastWaveAudioData = '060e2b34040101010d01030102060200'; public static async isMxfGCClipWrappedBroadcastWaveAudioData(mxfFileReader: MxfFileReader): Promise { const partitionPack = await mxfFileReader.getPartitionPack(0); return partitionPack.essenceContainers.includes(AudioSegment.mxfGCClipWrappedBroadcastWaveAudioData); } private readonly segment: IAudioSegment | ICustomAudioSegment; public constructor(segment: IAudioSegment | ICustomAudioSegment) { this.segment = segment; } public async transcode(): Promise { const { totalStartTime } = this.segment; const segmentPath = isCustomAudioSegment(this.segment) ? await this.customMxfTrackToTs() : await this.samplesToTs(); const parts = segmentPath.split('.'); const extension = parts.length > 0 ? `.${parts[parts.length - 1]}` : ''; const segmentWithOffsetPath = await Honeycomb.startAsyncSpan(path.basename(__filename), 'audioSetOffset', async (span) => { const ffmpegOffsetPath = await SegmentBuilderHelper.ffmpegSetOffset( segmentPath, Number.parseFloat(totalStartTime as any) + 1, false, extension, ); Honeycomb.endSpan(span); return ffmpegOffsetPath; }); const readOffsetPathFromDisk = Honeycomb.startSpan(path.basename(__filename), 'readOffsetPathFromDisk'); const content = fs.readFileSync(segmentWithOffsetPath); Honeycomb.endSpan(readOffsetPathFromDisk); fs.unlink(segmentPath, () => null); fs.unlink(segmentWithOffsetPath, () => null); return { segmentData: content, extension }; } private async customMxfTrackToTs(): Promise { const segmentPath = `${__dirname}/../../media/${uuid()}.ts`; const intermediate = `${__dirname}/../../media/${uuid()}.ts`; const inputs = (this.segment as ICustomAudioSegment).inputs; const { ffmpegCommand, ffmpegPromise } = createFFmpegCommand({ bypassFFmpegErrors: true }) as TrackedFFmpegCommand; // ToDo There is actually only one stream per input. Resolve in playlist-builder. const firstStream = inputs[0].streams[Object.keys(inputs[0].streams).map((st) => Number.parseInt(st, 10))[0]]; // The following values are equal between streams in different inputs: startTime, sampleRate, duration. When // ICustomAudioSegment will properly extend IAudioSegment, this values should be referred directly from the segment. let underscanFrames = 0; const secondsPerFrame = 1024 / firstStream.sampleRate; let usedDuration = firstStream.duration; if (Math.abs(firstStream.duration / secondsPerFrame - Math.round(firstStream.duration / secondsPerFrame)) > 1e-4) { usedDuration = Math.floor(firstStream.duration / secondsPerFrame) * secondsPerFrame; usedDuration = Math.max(usedDuration, secondsPerFrame); } let adjustedStartTime = firstStream.startTime; let adjustedDuration = usedDuration; if (firstStream.startTime > 0) { // We don't add underscan to the first segment - there's nothing // to underscan since we're starting at time index 0. underscanFrames = 2; adjustedStartTime -= secondsPerFrame * underscanFrames; adjustedDuration += secondsPerFrame * underscanFrames; } // Inputs mapped by `input.url` const addedInputs: Record = {}; // First extract all unknown channel layouts so we can pass the options all at once let ffmpegInputIndex = 0; for (const input of inputs) { if (!addedInputs[input.url]) { addedInputs[input.url] = { inputIndex: ffmpegInputIndex, channelLayouts: [], used: false }; ffmpegInputIndex += 1; } const channelLayouts = addedInputs[input.url].channelLayouts; for (const [streamIdx, stream] of Object.entries(input.streams)) { if (stream.channelsCount > 8) { channelLayouts.push({ streamIndex: Number.parseInt(streamIdx, 10), value: unknownChannelLayoutValue(stream.channelsCount), }); } } } const transcodeInputs: IInput[] = []; /* eslint-disable no-await-in-loop */ for (const input of inputs) { const transcodeInput: IInput = { streams: {}, inputIndex: addedInputs[input.url].inputIndex }; for (const [streamIdx, stream] of Object.entries(input.streams)) { transcodeInput.streams[streamIdx] = { channelsCount: input.streams[streamIdx].channelsCount, channels: _.map(_.zip(stream.channelLabels, stream.channelIndices), (x) => ({ label: x[0] as string, index: x[1] as number, })), }; } if (!addedInputs[input.url].used) { addedInputs[input.url].used = true; const signedUrl = (await ownS3.signUrl(input.url)); ffmpegCommand.input(`cache:${signedUrl}`); const mxfFileReader = input.url.includes('.mxf') ? new MxfFileReader(S3ByteProvider.fromUrl(input.url)) : null; const stream = input.streams[Object.keys(input.streams).map((st) => Number.parseInt(st, 10))[0]]; // We pass the channel layout as a mask in hexadecimal notation which selects // the first channel labels from FFmpeg. // For example for 12 channels the channel layout will be '0xFFF' which selects the channels // 'FL', 'FR', 'FC' ..., the first 12 from the list printed by command 'ffmpeg -layouts'. // This option needs to be passed for each stream with an unknown layout for an input for (const channelLayout of addedInputs[input.url].channelLayouts) { ffmpegCommand.inputOptions(`-channel_layout:${channelLayout.streamIndex} ${channelLayout.value}`); } if (mxfFileReader && await mxfFileReader.isSupportedFile()) { const { startOffset } = await mxfFileReader.getWavSamplesOffsetAndLength(); ffmpegCommand .inputOptions('-f s24le') .inputOptions(`-skip_initial_bytes ${startOffset}`) .inputOptions(`-ar ${stream.sampleRate}`) .inputOptions(`-ac ${stream.channelsCount}`); } else if (input.mxfIndexUrl) { const signedMxfIndexUrl = await ownS3.signUrl(input.mxfIndexUrl); ffmpegCommand .inputOptions(`-use_mxf_cache ${signedMxfIndexUrl}`); } ffmpegCommand .inputOptions(`-ss ${adjustedStartTime}`) .inputOptions(`-t ${adjustedDuration}`); } transcodeInputs.push(transcodeInput); } /* eslint-enable no-await-in-loop */ const filter = generateChannelMappingFilters({ inputs: transcodeInputs }); ffmpegCommand.outputOptions([ `-filter_complex ${filter}`, '-map [out]', '-muxdelay 0', '-copyts', '-c:a aac', '-ar 48000', '-f mpegts', ]); ffmpegCommand .on('start', (cmdline: string) => { Honeycomb.addCustomContext({ ffmpegCommand: cmdline, }); }) .on('progress', (info) => { log.info(info); }) .on('stderr', (a) => { log.debug(a); }) .on('stdout', (a) => { log.info(a); }) .save(intermediate); await ignorePcmErrors(ffmpegPromise); const { ffmpegCommand: alignFFmpegCommand, ffmpegPromise: alignFFmpegPromise, } = createFFmpegCommand({ bypassFFmpegErrors: true }) as TrackedFFmpegCommand; const remuxInputOptions = [ `-ss ${secondsPerFrame * underscanFrames}`, `-t ${usedDuration}`, ]; const remuxOutputOptions = [ '-map 0:a', '-muxdelay 0', '-copyts', '-c:a copy', '-f mpegts', ]; alignFFmpegCommand .input(intermediate) .inputOptions(remuxInputOptions) .outputOptions(remuxOutputOptions) .on('start', (cmdline: string) => { Honeycomb.addCustomContext({ ffmpegCommand: cmdline, }); }) .save(segmentPath); await ignorePcmErrors(alignFFmpegPromise); fs.unlinkSync(intermediate); return segmentPath; } private async samplesToTs(): Promise { const { url, sampleRate, channels, startTime, duration, mxfIndexUrl, streamIndex, } = this.segment as IAudioSegment; const extension = [3, 5, 7].includes(channels) ? '.mp4' : '.ts'; const format = [3, 5, 7].includes(channels) ? 'mp4' : 'mpegts'; const signedUrl = await ownS3.signUrl(url); const id = uuid(); const segmentPath = `${__dirname}/../../media/${id}${extension}`; const intermediate = `${__dirname}/../../media/${uuid()}${extension}`; // This code calculates the amount of "underscan" frames required // for this segment. The idea is to compress 2 extra audio frames // (1024 samples) at the beginning of each segment (except segment // 0), and then mux them off before delivery to the end user. This // preserves the waveform generated in the prior segment by using // the last 2 frames of the prior segment to "prime" the current // segment. Research indicates the reason for the popping we used // to get was the AAC compressor's MDCT sliding window function which // straddles each AAC frame. Unless there is underscan, we get a pop // on segment boundaries. let usedDuration = duration; let underscanFrames = 0; const secondsPerFrame = 1024 / sampleRate; // If the duration is not a multiple of the frame size, we truncate it if (Math.abs(duration / secondsPerFrame - Math.round(duration / secondsPerFrame)) > 1e-4) { usedDuration = Math.floor(duration / secondsPerFrame) * secondsPerFrame; usedDuration = Math.max(usedDuration, secondsPerFrame); } let adjustedStartTime = startTime; let adjustedDuration = usedDuration; if (startTime > 0) { // We don't add underscan to the first segment - there's nothing // to underscan since we're starting at time index 0. underscanFrames = 2; adjustedStartTime -= secondsPerFrame * underscanFrames; adjustedDuration += secondsPerFrame * underscanFrames; } // END Added by KDF - add underscan frames to the input options let inputOptions = [ `-ss ${adjustedStartTime}`, `-t ${adjustedDuration}`, ]; const mxfFileReader = url.includes('.mxf') ? new MxfFileReader(S3ByteProvider.fromUrl(url)) : null; if (mxfFileReader && await mxfFileReader.isSupportedFile()) { const { startOffset } = await mxfFileReader.getWavSamplesOffsetAndLength(); inputOptions = inputOptions.concat([ '-f s24le', `-ar ${sampleRate}`, `-ac ${channels}`, `-skip_initial_bytes ${startOffset}`, ]); } else if (mxfIndexUrl) { const signedMxfIndexUrl = await ownS3.signUrl(mxfIndexUrl); inputOptions.push('-use_mxf_cache'); inputOptions.push(signedMxfIndexUrl); } // aframes = Math.floor(Math.ceil(adjustedDuration * 48000) / 1024); const bitrate = channels * 64; const outputOptions = [ `-map 0:${streamIndex}`, '-muxdelay 0', '-copyts', '-c:a aac', `-b:a ${bitrate}k`, '-ar 48000', // '-aframes', aframes, `-f ${format}`, ]; if (channels <= 2) { outputOptions.push('-channel_layout stereo'); } else if (channels <= 8) { outputOptions.push(`-channel_layout ${channels - 1}.1`); } const remuxInputOptions = [ `-ss ${secondsPerFrame * underscanFrames}`, `-t ${usedDuration}`, ]; const remuxOutputOptions = [ '-map 0:a', '-muxdelay 0', '-copyts', '-c:a copy', `-f ${format}`, ]; const { ffmpegCommand, ffmpegPromise } = createFFmpegCommand({ bypassFFmpegErrors: true }) as TrackedFFmpegCommand; ffmpegCommand .input(`cache:${signedUrl}`) .inputOptions(inputOptions) .outputOptions(outputOptions) .on('start', (cmdline: string) => { Honeycomb.addCustomContext({ ffmpegCommand: cmdline, }); }) .on('progress', (info) => { // ToDo This can be called with an object as 1st argument. Parameter type is wrong. log.debug(info); }) .save(intermediate); await ignorePcmErrors(ffmpegPromise); const { ffmpegCommand: alignFFmpegCommand, ffmpegPromise: alignFFmpegPromise, } = createFFmpegCommand({ bypassFFmpegErrors: true }) as TrackedFFmpegCommand; alignFFmpegCommand .input(intermediate) .inputOptions(remuxInputOptions) .outputOptions(remuxOutputOptions) .on('start', (cmdline: string) => { Honeycomb.addCustomContext({ ffmpegCommand: cmdline, }); }) .on('progress', (info) => { log.debug(info); }) .save(segmentPath); await ignorePcmErrors(alignFFmpegPromise); fs.unlinkSync(intermediate); return segmentPath; } } /** * A wrapper that takes a ffmpeg Promise and ignores some errors * which happen on the last segment of a PCM audio stream. Should be treated as a tech debt * and the problem should be investigated more carefully. */ async function ignorePcmErrors(ffmpegPromise: Promise): Promise { try { await ffmpegPromise; } catch (err) { if ((err as Error).message.includes('Invalid PCM packet, data has size')) { return; } throw err; } }