{"version":3,"sources":["src/common/Stream.ts"],"names":[],"mappings":"AAMA,OAAO,EAAE,OAAO,EAAE,MAAM,WAAW,CAAC;AACpC,OAAO,EAAE,KAAK,EAAE,MAAM,SAAS,CAAC;AAEhC,MAAM,WAAW,YAAY,CAAC,OAAO;IACjC,KAAK,EAAE,OAAO,CAAC;IACf,MAAM,EAAE,OAAO,CAAC;IAChB,YAAY,EAAE,MAAM,CAAC;CACxB;AAED,qBAAa,MAAM,CAAC,OAAO;IACvB,OAAO,CAAC,MAAM,CAAS;IACvB,OAAO,CAAC,mBAAmB,CAAa;IACxC,OAAO,CAAC,gBAAgB,CAA+B;IACvD,OAAO,CAAC,WAAW,CAAkB;IACrC,OAAO,CAAC,gBAAgB,CAAkD;gBAEvD,QAAQ,CAAC,EAAE,MAAM;aAMzB,QAAQ,EAAI,OAAO;aAInB,EAAE,EAAI,MAAM;IAIhB,SAAS,8BAef;IAEM,KAAK,IAAI,IAAI;IAWb,gBAAgB,CAAC,WAAW,EAAE,YAAY,CAAC,OAAO,CAAC,GAAG,IAAI;IAcjE,OAAO,CAAC,aAAa,CAIpB;CACJ;AAGD,qBAAa,YAAY,CAAC,OAAO;IAC7B,OAAO,CAAC,eAAe,CAA+B;IACtD,OAAO,CAAC,WAAW,CAAa;IAChC,OAAO,CAAC,YAAY,CAAkB;IACtC,OAAO,CAAC,YAAY,CAAS;gBAEV,QAAQ,EAAE,MAAM,EAAE,WAAW,EAAE,KAAK,CAAC,YAAY,CAAC,OAAO,CAAC,CAAC,EAAE,OAAO,EAAE,MAAM,IAAI;aAMxF,QAAQ,EAAI,OAAO;aAInB,QAAQ,EAAI,MAAM;IAItB,IAAI,uCAcV;IAEM,KAAK,aAMX;CACJ","file":"Stream.d.ts","sourcesContent":["// Copyright (c) Microsoft Corporation. All rights reserved.\n// Licensed under the MIT license.\n\nimport { InvalidOperationError } from \"./Error\";\nimport { createNoDashGuid } from \"./Guid\";\nimport { IStringDictionary } from \"./IDictionary\";\nimport { Promise } from \"./Promise\";\nimport { Queue } from \"./Queue\";\n\nexport interface IStreamChunk<TBuffer> {\n    isEnd: boolean;\n    buffer: TBuffer;\n    timeReceived: number;\n}\n\nexport class Stream<TBuffer> {\n    private privId: string;\n    private privReaderIdCounter: number = 1;\n    private privStreambuffer: Array<IStreamChunk<TBuffer>>;\n    private privIsEnded: boolean = false;\n    private privReaderQueues: IStringDictionary<Queue<IStreamChunk<TBuffer>>>;\n\n    public constructor(streamId?: string) {\n        this.privId = streamId ? streamId : createNoDashGuid();\n        this.privStreambuffer = [];\n        this.privReaderQueues = {};\n    }\n\n    public get isClosed(): boolean {\n        return this.privIsEnded;\n    }\n\n    public get id(): string {\n        return this.privId;\n    }\n\n    public getReader = (): StreamReader<TBuffer> => {\n        const readerId = this.privReaderIdCounter;\n        this.privReaderIdCounter++;\n        const readerQueue = new Queue<IStreamChunk<TBuffer>>();\n        const currentLength = this.privStreambuffer.length;\n        this.privReaderQueues[readerId] = readerQueue;\n        for (let i = 0; i < currentLength; i++) {\n            readerQueue.enqueue(this.privStreambuffer[i]);\n        }\n        return new StreamReader(\n            this.privId,\n            readerQueue,\n            () => {\n                delete this.privReaderQueues[readerId];\n            });\n    }\n\n    public close(): void {\n        if (!this.privIsEnded) {\n            this.writeStreamChunk({\n                buffer: null,\n                isEnd: true,\n                timeReceived: Date.now(),\n            });\n            this.privIsEnded = true;\n        }\n    }\n\n    public writeStreamChunk(streamChunk: IStreamChunk<TBuffer>): void {\n        this.throwIfClosed();\n        this.privStreambuffer.push(streamChunk);\n        for (const readerId in this.privReaderQueues) {\n            if (!this.privReaderQueues[readerId].isDisposed()) {\n                try {\n                    this.privReaderQueues[readerId].enqueue(streamChunk);\n                } catch (e) {\n                    // Do nothing\n                }\n            }\n        }\n    }\n\n    private throwIfClosed = (): void => {\n        if (this.privIsEnded) {\n            throw new InvalidOperationError(\"Stream closed\");\n        }\n    }\n}\n\n// tslint:disable-next-line:max-classes-per-file\nexport class StreamReader<TBuffer> {\n    private privReaderQueue: Queue<IStreamChunk<TBuffer>>;\n    private privOnClose: () => void;\n    private privIsClosed: boolean = false;\n    private privStreamId: string;\n\n    public constructor(streamId: string, readerQueue: Queue<IStreamChunk<TBuffer>>, onClose: () => void) {\n        this.privReaderQueue = readerQueue;\n        this.privOnClose = onClose;\n        this.privStreamId = streamId;\n    }\n\n    public get isClosed(): boolean {\n        return this.privIsClosed;\n    }\n\n    public get streamId(): string {\n        return this.privStreamId;\n    }\n\n    public read = (): Promise<IStreamChunk<TBuffer>> => {\n        if (this.isClosed) {\n            throw new InvalidOperationError(\"StreamReader closed\");\n        }\n\n        return this.privReaderQueue\n            .dequeue()\n            .onSuccessContinueWith((streamChunk: IStreamChunk<TBuffer>) => {\n                if (streamChunk === undefined || streamChunk.isEnd) {\n                    this.privReaderQueue.dispose(\"End of stream reached\");\n                }\n\n                return streamChunk;\n            });\n    }\n\n    public close = (): void => {\n        if (!this.privIsClosed) {\n            this.privIsClosed = true;\n            this.privReaderQueue.dispose(\"StreamReader closed\");\n            this.privOnClose();\n        }\n    }\n}\n"]}