// npm import { spawnSync } from 'child_process'; import * as fs from 'fs'; import * as _ from 'lodash'; import * as path from 'path'; import { v4 as uuid } from 'uuid'; import bluebird from 'bluebird'; // @ownzones import { createFFmpegCommand, channelLayoutToNameMapping, generateChannelMappingFilters, IInput, unknownChannelLayoutValue, } from '@ownzones/meh'; import { s3Lib as ownS3 } from '@ownzones/lib'; import { FsFileLocator, S3FileLocator } from '@ownzones/locators'; import { FfmpegCommand } from '@ownzones/fluent-ffmpeg'; // app import { Body } from 'aws-sdk/clients/s3'; import { config, log } from '../../config'; import { IAudioSegment, ISegment, SegmentType } from '../playlist-builder'; import { IComposition, IResource, IUser } from '../connect-api'; import { Nullable, TrackedFFmpegCommand } from '../../utils/types'; export interface IAudioChannelPath { fileName: string; bucket: string; key: string; } export interface IChannelMuxAacDefinition { channel: number; label: string; fileId: string; trackIndex: number; duration: number; totalStartTime: number; localPath?: string; } export interface IAudioDemuxTask { fileId: string; user?: IUser; compositionDefinition: IComposition; cacheLocation: string; type: SegmentType; orgId: string; orgSlug: string; } interface IExtractedChannel { fileId: string, channelNumber: number, trackIndex: number, trackChannelCount: number, tc: string, // the ffmpeg complex filter output label "[t{trackIndex}c{channelNumber}]" duration: number, // the duration of the specific track sampleRate: number, // the sample rate of the audio track } export class AudioDemux { public static async needsChannelExtract(definition: IComposition, cacheLocation: string) { const info = AudioDemux.getExtractedChannelsInfo(definition); if (!info) { return true; } const { channelMappings } = info; const cacheExists = await bluebird.map(channelMappings, (extractedChannel) => AudioDemux .channelIsCached(cacheLocation, extractedChannel)); return cacheExists.some((cached) => !cached); } public static async muxCachedAAcToAac( cachedSegments: { cache: IAudioSegment }[], definition: IChannelMuxAacDefinition[], ): Promise { const outputPath = `${uuid()}.ts`; const { ffmpegCommand, ffmpegPromise } = createFFmpegCommand({ ffmpegStdoutLines: 15, }) as TrackedFFmpegCommand; const inputs: IInput[] = []; await bluebird.all(cachedSegments.map(async (cachedSegment, index) => { const signedUrl = (await ownS3.signUrl(cachedSegment.cache.s3Path as string)); ffmpegCommand.input(`cache:${signedUrl}`); inputs.push({ inputIndex: index, streams: { 0: { channels: definition .filter((def) => def.trackIndex === cachedSegment.cache.streamIndex) .map((channel) => ({ label: channel.label, index: channel.channel, })), layout: cachedSegment.cache.layout as string, channelsCount: cachedSegment.cache.channels, }, }, }); })); let bitrate; if (definition.length > 1) { bitrate = definition.length * 64; const filter = generateChannelMappingFilters({ inputs }); ffmpegCommand.outputOptions(['-filter_complex', filter, '-map [out]']); } else { bitrate = 6 * 64; ffmpegCommand.outputOptions(['-af', `pan=5.1|c2=c${definition[0].channel}`]); } ffmpegCommand.outputOptions([ '-muxdelay 0', '-copyts', '-c:a aac', `-b:a ${bitrate}k`, '-ar 48000', '-f mpegts', ]); ffmpegCommand.output(path.join(config.mediaPath, outputPath)); ffmpegCommand.run(); await ffmpegPromise; const outputLocation = path.join(config.mediaPath, outputPath); const output = fs.readFileSync(outputLocation); AudioDemux.removeLocalFiles([outputLocation]); return output; } public static async muxChannelsToAac( cacheLocation: string, definition: IChannelMuxAacDefinition[], segment: number, lastSegment?: ISegment, ): Promise { const localPaths = []; const outputPath = `${uuid()}.ts`; localPaths.push(outputPath); const channelsPart: Nullable[] = await bluebird.all(definition.map(async (channel) => { const body = await AudioDemux.getChannelSegment( cacheLocation, channel.fileId, channel.trackIndex, channel.channel, segment, channel.duration, lastSegment, ); if (!body) { return null; } const localPath = path.join(config.mediaPath, `${channel.fileId}_${channel.trackIndex}_${channel.channel}_${segment}.raw`); localPaths.push(localPath); fs.writeFileSync(localPath, body); return { localPath, fileId: channel.fileId, trackIndex: channel.trackIndex, channel: channel.channel, label: channel.label, duration: channel.duration, totalStartTime: channel.totalStartTime, }; })); const channels = _.compact(channelsPart); if (channels && channels.length && channels.length === definition.length) { const { ffmpegCommand, ffmpegPromise } = createFFmpegCommand({ ffmpegStdoutLines: 15, }) as TrackedFFmpegCommand; const sortedInputs = AudioDemux.sortInputsByLabel(channels); sortedInputs.forEach((channel) => { ffmpegCommand.input(channel.localPath as string); ffmpegCommand.inputOptions(['-f', 's16le']); ffmpegCommand.inputOptions(['-ar', '48000']); ffmpegCommand.inputOptions(['-ac', '1']); }); const chMap = channels.map((_ch, idx) => `[${idx}:a]`).join(''); const filter = `${chMap}amerge=inputs=${channels.length}[a]`; let bitrate; if (channels.length > 1) { bitrate = channels.length * 64; ffmpegCommand.outputOptions(['-filter_complex', filter, '-map [a]']); } else { bitrate = 6 * 64; ffmpegCommand.outputOptions(['-af', 'pan=5.1|c2=c0']); } ffmpegCommand.outputOptions([ '-muxdelay 0', `-output_ts_offset ${channels[0].totalStartTime + 1}`, '-copyts', '-c:a aac', `-b:a ${bitrate}k`, '-ar 48000', ]); ffmpegCommand.output(path.join(config.mediaPath, outputPath)); ffmpegCommand.run(); await ffmpegPromise; const output = fs.readFileSync(path.join(config.mediaPath, outputPath)); AudioDemux.removeLocalFiles(localPaths); return output; } return null; } private static sortInputsByLabel(inputs: IChannelMuxAacDefinition[]): IChannelMuxAacDefinition[] { const chanelLayout = AudioDemux.getChannelLayout(inputs.length); const channelMapping = channelLayoutToNameMapping[chanelLayout]; const sortedArray: IChannelMuxAacDefinition[] = []; // tslint:disable prefer-for-of for (const input of inputs) { const chLabel = input.label; const sortedPosition = channelMapping.indexOf(chLabel); sortedArray[sortedPosition] = input; } return sortedArray; } private static getChannelLayout(channels: number): 'mono' | 'stereo' | '5.1' | '7.1' { switch (channels) { case 1: return 'mono'; case 2: return 'stereo'; case 6: return '5.1'; case 8: return '7.1'; default: throw new Error(`Unrecognized number of channels: ${channels}`); } } private static removeLocalFiles(files: string[]): void { files.forEach((file) => { if (fs.existsSync(file)) { fs.unlinkSync(file); } }); } private static getAudioChannelPath( cacheLocation: string, fileId: string, trackIndex: string | number, channelNumber: string | number, ): IAudioChannelPath { const fileName = `${fileId}_${trackIndex}_${channelNumber}.raw`; const filesLocation = `${cacheLocation}${fileName}`; const { bucket, key } = ownS3.parseUrl(filesLocation); return { fileName, bucket, key, }; } /** * Checks if the file associated on the channel exists on S3 and if it has the right size. * @param cacheLocation {string} the base location of the extract channels files * @param fileId {string} the file id * @param trackIndex {number} the index of the track * @param channelNumber {number} the index of the channel * @returns {Promise} whether the channel was cached or not * @private */ private static async channelIsCached(cacheLocation: string, { fileId, trackIndex, channelNumber, }: IExtractedChannel): Promise { const { bucket, key } = AudioDemux.getAudioChannelPath(cacheLocation, fileId, trackIndex, channelNumber); try { await ownS3.s3.headObject({ Bucket: bucket, Key: key, }).promise(); return true; // We could compare the extracted channel duration with the file duration to make sure the channel // is complete and for the right file (in case it is replaced with another file checking for the duration // would prevent inconsistencies in some cases). } catch { return false; } } private static async getChannelSegment( cacheLocation: string, fileId: string, trackIndex: number, channelNumber: number, segment: number, duration: number, lastSegment?: ISegment, ): Promise { const location = AudioDemux.getAudioChannelPath(cacheLocation, fileId, trackIndex, channelNumber); const bitRate = 48000; // samples const pcmSampleSize = 2; // bytes let byteEnd; let byteStart; const segmentSize = Math.round(bitRate * pcmSampleSize * duration); if (lastSegment) { const lastSegmentSize = Math.round(bitRate * pcmSampleSize * lastSegment.duration); byteStart = Math.round(lastSegmentSize * segment); byteEnd = Math.round(byteStart + segmentSize - 1); log.debug(`Last audio segment: start: ${byteStart} | end: ${byteEnd} | segment: ${segmentSize}`); } else { byteEnd = Math.round(segmentSize * (segment + 1) - 1); byteStart = Math.round(segmentSize * segment); } try { const pcmSegment = await ownS3.s3.getObject({ Bucket: location.bucket, Key: location.key, Range: `bytes=${byteStart}-${byteEnd}` }).promise(); if (pcmSegment && pcmSegment.Body) { return pcmSegment.Body; } } catch (err) { log.error(`Error getting audio channel: ${channelNumber}, track: ${trackIndex}, segment: ${segment} for file: ${fileId}. Error: ${(err as Error).message}`); } return null; } private static getExtractedChannelsInfo(definition: IComposition): { channelMappings: IExtractedChannel[], filters: string[], sourceUrl: string, sourceFileId: string, } | null { try { if (!definition || !definition.segments || !definition.segments.length) { log.error(`Extract channels failed: no definition/segments - ${JSON.stringify(definition)}`); return null; } const segment = definition.segments[0]; if (!segment || !segment.sequences || !segment.sequences.length) { log.error(`Extract channels failed: no segment/sequences - ${JSON.stringify(segment)}`); return null; } const audioResources: IResource[] = []; const audioSequences = _.filter(segment.sequences, (s) => s.type === 'audio'); if (!audioSequences || !audioSequences.length) { log.error(`Extract channels failed: no audio sequences - ${JSON.stringify(audioSequences)}`); return null; } for (const audioSequence of audioSequences) { if (audioSequence.resources && audioSequence.resources.length) { audioResources.push(audioSequence.resources[0]); } } const sourceUrl = audioResources[0].track.file?.fileLocator?.url as string; const sourceFileId = audioResources[0].track.file?.id as string; log.debug(`--- Audio source URL: ${sourceUrl}`); const channelMappings: IExtractedChannel[] = []; const filters = []; for (const audioResource of audioResources) { // ToDo These guards look unnecessary. Investigate. const track = audioResource.track; if (track) { const trackProperties = track.properties.probeResult; if (trackProperties) { const trackIndex = trackProperties.index as number; const channels = trackProperties.channels; const fileId = track.fileId; let channelLayout: string; try { channelLayout = AudioDemux.getChannelLayout(channels); } catch { channelLayout = unknownChannelLayoutValue(channels); } const trackChannels = _.range(channels).map((c) => `[t${trackIndex}c${c}]`); if (!trackProperties.sampleRate) { log.warn(`The sample rate for file id ${fileId} track number ${trackIndex} does not exist`); } channelMappings.push(...trackChannels.map((tc, channelNumber) => ({ fileId, tc, trackIndex, trackChannelCount: channels, channelNumber, duration: Number.parseFloat(trackProperties.duration), sampleRate: Number.parseInt(trackProperties.sampleRate ?? '48000', 10), }))); filters.push(`[0:${trackIndex}]channelsplit=channel_layout=${channelLayout}${trackChannels.join('')}`); } } } return { channelMappings, filters, sourceUrl, sourceFileId, }; } catch (err) { log.error(`Error processing extract channels definition: ${JSON.stringify(err)}`); throw err; } } public constructor(private readonly task: IAudioDemuxTask) {} public async run(): Promise { return this.generatePcm(); } private async generatePcm(): Promise { let ffmpegPromise; let ffmpegCommand: FfmpegCommand | null = null; const { compositionDefinition: definition, cacheLocation } = this.task; const channelS3FileLocators:S3FileLocator[] = []; try { const extractedChannels = AudioDemux.getExtractedChannelsInfo(definition); if (!extractedChannels) { return; } const { channelMappings, filters, sourceUrl, sourceFileId, } = extractedChannels; const trackedFFmpegCommand = createFFmpegCommand({ ffmpegTimeout: config.worker.extractChannels.timeout, ffmpegStdoutLines: 15, }) as TrackedFFmpegCommand; ffmpegPromise = trackedFFmpegCommand.ffmpegPromise; ffmpegCommand = trackedFFmpegCommand.ffmpegCommand; const localPaths = []; const signedUrl = await ownS3.signUrl(sourceUrl, { Expires: 7200 }); ffmpegCommand.input(signedUrl); ffmpegCommand.inputOptions('-reconnect 1'); ffmpegCommand.inputOptions('-reconnect_delay_max 10'); ffmpegCommand.inputOptions(['-vn', '-sn', '-dn']); // Explicit hex value for unknown channel layouts for (const { trackIndex, trackChannelCount } of channelMappings) { try { AudioDemux.getChannelLayout(trackChannelCount); } catch { const channelLayout = unknownChannelLayoutValue(trackChannelCount); ffmpegCommand.inputOptions([ `-channel_layout:${trackIndex} ${channelLayout}`, ]); } } // Map from tc to whether the channel is cached or not const cachedChannels = new Map(); await bluebird.map(channelMappings, async (channel) => { cachedChannels.set(channel.tc, await AudioDemux.channelIsCached(cacheLocation, channel)); }, { concurrency: 4 }); // We don't regenerate already cached channels so we need to send the output from the complex filter // to a null sink. channelMappings.filter(({ tc }) => cachedChannels.get(tc)) .forEach(({ tc }) => filters.push(`${tc}anullsink`)); ffmpegCommand.outputOptions(['-filter_complex', filters.join(';')]); // We generate output only for channels not cached yet. // eslint-disable-next-line object-curly-newline for (const { fileId, channelNumber, trackIndex, tc } of channelMappings) { if (!cachedChannels.get(tc)) { const location = AudioDemux.getAudioChannelPath(cacheLocation, fileId, trackIndex, channelNumber); const localPath = path.join(config.mediaPath, location.fileName); if (fs.existsSync(localPath)) { fs.unlinkSync(localPath); } spawnSync('mkfifo', [localPath], { stdio: 'ignore' }); ffmpegCommand.output(localPath); ffmpegCommand.outputOptions(['-map', tc]); ffmpegCommand.outputOptions(['-ar', '48000']); ffmpegCommand.outputOptions(['-c:a', 'pcm_s16le']); ffmpegCommand.format('s16le'); localPaths.push({ location, localPath }); } } log.debug(`Extract locations: ${JSON.stringify(localPaths)}`); log.debug('Cached values:', cachedChannels.entries()); // enable extended output ffmpegCommand.on('stderr', (stdLine:string) => { if (stdLine.match(/^\[.+] \[(error|warning)].+/gm)) { log.warn(stdLine); } }); // if there is at least one channel not cached we run the command const notCachedCount = [...cachedChannels.values()].filter((cached) => !cached).length; if (notCachedCount > 0) { log.info(`Running ffmpeg command for a number of ${notCachedCount} channels on file id: ${sourceFileId}.`); ffmpegCommand.run(); } else { log.info(`No channel needs to be cached for definition with file id: ${sourceFileId}`); return; } const s3Promises: Promise[] = []; localPaths.forEach((lPath) => { const channelFsFileLocator = new FsFileLocator({ path: lPath.localPath }); const channelS3FileLocator = new S3FileLocator(lPath.location); s3Promises.push(channelS3FileLocator.uploadFrom(channelFsFileLocator)); channelS3FileLocators.push(channelS3FileLocator); }); await Promise.all([ ffmpegPromise, ...s3Promises as Promise[], ]); log.debug(`Extract pcm: removing local paths: ${JSON.stringify(localPaths)}`); AudioDemux.removeLocalFiles(localPaths.map((localPath) => localPath.localPath)); log.debug(`Extract channels finished for: ${sourceUrl}`); } catch (error) { if (error instanceof Error) { log.error(`Extract channels failed: ${JSON.stringify(error)}, message: ${error.message}`); } else { log.error(`Extracted channels failed with non-error object: ${JSON.stringify(error)}`); } // try to clear the files created in s3. log.info('Attempting to stop ffmpeg process.'); try { if (ffmpegCommand) { ffmpegCommand.kill('SIGKILL'); } } catch (stopError) { log.warn(`Failed to stop ffmpeg command: ${(stopError as Error).message}`); } log.info('Attempting to remove partially extracted channels.'); try { await Promise.all(channelS3FileLocators.map((s3Locator) => s3Locator.deleteFileLocator())); } catch (deleteError) { log.error(`Failed to remove partially extracted channels at ${cacheLocation}: ${(deleteError as Error).message}`); } } } }