import { Readable } from 'stream'; import { IncomingMessage, MessageObjectInfo, BinaryMessage } from '../messages'; interface TransactionStreams { [path: string]: Readable; } export class TransactionManager { protected transactions = new Map(); public beginTransaction(message: IncomingMessage) { const { transactionId, meta } = message; if (transactionId && meta) { const streams = this.getStreams(meta.messageObjectInfo); if (streams) { this.transactions.set(transactionId, { streams }); } } } public endTransaction(message: IncomingMessage) { const { transactionId } = message; if (transactionId && this.transactions.has(transactionId)) { this.transactions.delete(transactionId); } } public addChunk(binaryMessage: BinaryMessage): boolean { const { transactionId, path, chunk } = binaryMessage; if (this.transactions.has(transactionId)) { const { streams } = this.transactions.get(transactionId)!; if (Object.hasOwn(streams, path)) { const stream = streams[path]; return chunk === null ? stream.push(null) : stream.push(chunk.buffer); } } throw new Error(`Not found stream ${binaryMessage.path} for transaction ${transactionId}.`); } protected getStreams(info?: MessageObjectInfo): TransactionStreams | null { if (info && info.binarySize > 0) { const { binaryData } = info; const streams: TransactionStreams = {}; binaryData.forEach((item) => { streams[item.path.join('.')] = item.data as any; }); return streams; } return null; } }