/** * Shared libav demux session. Opens a libav demuxer over a NormalizedSource * and provides a linear, cancellable packet pump. * * Phase 1 API: deliberately minimal. The first consumer is the AVI/ASF/FLV * transcode path (src/convert/transcode-libav.ts), which is strictly linear. * No seek, no track swapping — those were added to hybrid/fallback's * private pumps for playback reasons. When those paths migrate here, the * API will grow to cover their needs. * * The shared timestamp sanitizers (sanitizePacketTimestamp, * sanitizeFrameTimestamp) also live here. They were previously duplicated * in convert/remux.ts and strategies/hybrid/decoder.ts. The duplicates * stay put in Phase 1 with TODO pointers; migration is a follow-up. */ import { loadLibav, type LibavVariant } from "../strategies/fallback/libav-loader.js"; import { pickLibavVariant } from "../strategies/fallback/variant-routing.js"; import { prepareLibavInput } from "./libav-http-reader.js"; import type { MediaContext, TransportConfig } from "../types.js"; import type { NormalizedSource } from "./source.js"; // ───────────────────────────────────────────────────────────────────────── // Structural types (mirror libav.js' shape without dragging in its types) // ───────────────────────────────────────────────────────────────────────── export interface LibavStream { index: number; codec_type: number; codec_id: number; codecpar: number; time_base_num?: number; time_base_den?: number; } export interface LibavPacket { data: Uint8Array; pts: number; ptshi?: number; duration?: number; durationhi?: number; flags: number; stream_index: number; time_base_num?: number; time_base_den?: number; } export interface LibavFrame { data: unknown; format: number; channels?: number; ch_layout_nb_channels?: number; sample_rate?: number; nb_samples?: number; pts?: number; ptshi?: number; width?: number; height?: number; } interface LibavRuntime { AVMEDIA_TYPE_VIDEO: number; AVMEDIA_TYPE_AUDIO: number; AVERROR_EOF: number; EAGAIN: number; mkreadaheadfile(name: string, blob: Blob): Promise; unlinkreadaheadfile(name: string): Promise; ff_init_demuxer_file(name: string): Promise<[number, LibavStream[]]>; ff_read_frame_multi( fmt_ctx: number, pkt: number, opts?: { limit?: number }, ): Promise<[number, Record]>; av_packet_alloc(): Promise; av_packet_free?(pkt: number): Promise; avformat_close_input_js(ctx: number): Promise; f64toi64?(val: number): [number, number]; } // ───────────────────────────────────────────────────────────────────────── // Session // ───────────────────────────────────────────────────────────────────────── export interface LibavDemuxSession { readonly libav: LibavRuntime; readonly fmtCtx: number; readonly streams: LibavStream[]; readonly videoStream: LibavStream | null; readonly audioStream: LibavStream | null; /** True when the input is being streamed via HTTP Range requests. */ readonly transport: "http-range" | "blob"; /** * Linear read-to-EOF pump. Invokes the callbacks for each * ff_read_frame_multi batch (audio is handed over before video per * batch, matching the audio-first ordering that the hybrid/fallback * playback pumps use — see POSTMORTEMS.md entry 1). * * Honors the AbortSignal between batches. Invokes `onEof` once when * the demuxer returns EOF. Does NOT handle seek. */ pump(cb: { onVideoPackets?: (pkts: LibavPacket[]) => Promise; onAudioPackets?: (pkts: LibavPacket[]) => Promise; onEof?: () => Promise; signal?: AbortSignal; }): Promise; destroy(): Promise; } export interface OpenLibavDemuxOptions { source: NormalizedSource; filename: string; context: MediaContext; transport?: TransportConfig; /** Override automatic variant picking. Defaults to pickLibavVariant(context). */ variant?: LibavVariant; } export async function openLibavDemux(opts: OpenLibavDemuxOptions): Promise { const variant: LibavVariant = opts.variant ?? pickLibavVariant(opts.context); const libav = (await loadLibav(variant)) as unknown as LibavRuntime; const inputHandle = await prepareLibavInput( libav as unknown as Parameters[0], opts.filename, opts.source, opts.transport, ); const readPkt = await libav.av_packet_alloc(); const [fmtCtx, streams] = await libav.ff_init_demuxer_file(opts.filename); const videoStream = streams.find((s) => s.codec_type === libav.AVMEDIA_TYPE_VIDEO) ?? null; const audioStream = streams.find((s) => s.codec_type === libav.AVMEDIA_TYPE_AUDIO) ?? null; let destroyed = false; async function pump(cb: Parameters[0]): Promise { while (!destroyed) { if (cb.signal?.aborted) return; let readErr: number; let packets: Record; try { [readErr, packets] = await libav.ff_read_frame_multi(fmtCtx, readPkt, { // 16 KB batch — chosen so each read produces a handful of // packets, keeping downstream queues bounded. Same rationale // as the hybrid/fallback pumps (see CLAUDE.md note). limit: 16 * 1024, }); } catch (err) { throw new Error(`libav-demux: ff_read_frame_multi failed: ${(err as Error).message}`); } if (destroyed || cb.signal?.aborted) return; const videoPackets = videoStream ? packets[videoStream.index] : undefined; const audioPackets = audioStream ? packets[audioStream.index] : undefined; // Audio-first ordering. Audio decode is cheap; video decode can // be expensive. Feeding audio first ensures the audio consumer // has samples to work with before any long video-decode block. if (cb.onAudioPackets && audioPackets && audioPackets.length > 0) { await cb.onAudioPackets(audioPackets); } if (destroyed || cb.signal?.aborted) return; if (cb.onVideoPackets && videoPackets && videoPackets.length > 0) { await cb.onVideoPackets(videoPackets); } if (readErr === libav.AVERROR_EOF) { if (cb.onEof) await cb.onEof(); return; } if (readErr && readErr !== 0 && readErr !== -libav.EAGAIN) { throw new Error(`libav-demux: ff_read_frame_multi returned ${readErr}`); } } } async function destroy(): Promise { destroyed = true; try { await libav.av_packet_free?.(readPkt); } catch { /* ignore */ } try { await libav.avformat_close_input_js(fmtCtx); } catch { /* ignore */ } try { await inputHandle.detach(); } catch { /* ignore */ } } return { libav, fmtCtx, streams, videoStream, audioStream, transport: inputHandle.transport, pump, destroy, }; } // ───────────────────────────────────────────────────────────────────────── // Timestamp sanitizers (extracted from convert/remux.ts + hybrid/decoder.ts) // // libav can hand us packets/frames with pts = AV_NOPTS_VALUE (encoded as // ptshi = -2147483648, pts = 0) for inputs whose demuxer can't determine // presentation times. AVI is the canonical example. Downstream consumers // that treat pts as int64 overflow and throw. // // The sanitizer replaces invalid pts with a synthetic microsecond counter, // and normalizes valid pts to a 1/1e6 time_base so consumers don't need // to track the source time_base per packet. // ───────────────────────────────────────────────────────────────────────── /** * Sanitize a libav packet's timestamp. Mutates `pkt` in place. * If the packet has AV_NOPTS_VALUE, replaces pts with `nextUs()`. * Otherwise normalizes to µs with time_base = 1/1_000_000. */ export function sanitizePacketTimestamp( pkt: LibavPacket, nextUs: () => number, fallbackTimeBase?: [number, number], ): void { const lo = pkt.pts ?? 0; const hi = pkt.ptshi ?? 0; const isInvalid = (hi === -2147483648 && lo === 0) || !Number.isFinite(lo); if (isInvalid) { const us = nextUs(); pkt.pts = us; pkt.ptshi = 0; pkt.time_base_num = 1; pkt.time_base_den = 1_000_000; return; } const tb = fallbackTimeBase ?? [1, 1_000_000]; const pts64 = hi * 0x100000000 + lo; const us = Math.round((pts64 * 1_000_000 * tb[0]) / tb[1]); if (Number.isFinite(us) && Math.abs(us) <= Number.MAX_SAFE_INTEGER) { pkt.pts = us; pkt.ptshi = us < 0 ? -1 : 0; pkt.time_base_num = 1; pkt.time_base_den = 1_000_000; return; } const fallback = nextUs(); pkt.pts = fallback; pkt.ptshi = 0; pkt.time_base_num = 1; pkt.time_base_den = 1_000_000; } /** * Convert a raw libav packet's pts to seconds using the given stream * time_base, or return `null` if the packet lacks a valid pts. Used by * the hybrid + fallback strategies to track the demuxer's read-ahead * progress (the signal behind `