import { ReadableStream, WritableStream, TransformStream, ReadableStreamDefaultController, WritableStreamDefaultController, } from "./std"; import { readAllChunks } from "./utils"; export type DuplexStreamResponse = { close(reason?): any }; export type DuplexStreamHandler = (input: ReadableStream, output: WritableStream) => any | DuplexStreamResponse; export class ChunkQueue { private _reads: Array<(value:T) => any> = []; private _writes: Array<[T, () => any, (reason: any) => any]> = []; push(value: T): Promise { if (this._reads.length) { this._reads.shift()(value); return Promise.resolve(); } return new Promise((resolve, reject) => { this._writes.push([value, resolve, reject]); }); } get size() { return this._writes.length; } shift() { if (this._writes.length) { const [value, resolve] = this._writes.shift(); resolve(); return Promise.resolve(value); } return new Promise((resolve) => { this._reads.push(resolve); }); } cancel(reason) { const writes = this._writes.concat(); this._writes = []; for (const [value, resolve, reject] of writes) { reject(reason); } } } class ReadableWritableStream { readonly writable: WritableStream; readonly readable: ReadableStream constructor(stream: DuplexStream) { let readerController: ReadableStreamDefaultController; let writerController: WritableStreamDefaultController; let cancelReason: any; let abortReason: any; let _writePromise: Promise = Promise.resolve(); const queue = new ChunkQueue(); const close = (reason) => { queue.cancel(reason); if (stream.$response.close) { stream.$response.close(reason); } } const output = this.readable = new ReadableStream({ start(controller) { readerController = controller; }, pull(controller) { return queue.shift().then((value) => { readerController.enqueue(value as T); }); }, cancel(reason) { cancelReason = reason; close(reason); } }); let inputAborted: boolean; const input = this.writable = new WritableStream({ start(controller) { writerController = controller; }, write: (chunk) => { // need to eat the chunk here. Streams will re-throw // any exception that are occur in in sink.write() if (cancelReason) return; return _writePromise = queue.push(chunk); }, close() { if (cancelReason) return; const close = () => { readerController.close(); } return _writePromise.then(close, close); }, abort(reason) { if (cancelReason) return; abortReason = reason; readerController.error(reason); close(reason); } }); } } export function wrapDuplexStream(value): TransformStream { if (value && value.readable && value.writable) { if (value.writable) { return value; } } if (value instanceof ReadableStream) { const readable: ReadableStream = value; return new TransformStream({ start(controller) { readable.pipeTo(new WritableStream({ write(chunk) { controller.enqueue(chunk); }, abort(error) { controller.error(error); }, close() { controller.close(); } })) } }) } return new TransformStream({ async start(controller) { const v = await value; if (v != null) { if (Array.isArray(v)) { v.forEach((i) => controller.enqueue(i)); } else { controller.enqueue(v); } } controller.close(); } }); } export class DuplexStream implements TransformStream { private _input: ReadableWritableStream; private _output: ReadableWritableStream; public $response: DuplexStreamResponse; constructor(handler: DuplexStreamHandler) { const input = this._input = new ReadableWritableStream(this); const output = this._output = new ReadableWritableStream(this); this.$response = handler(input.readable, output.writable) || {}; } static empty() { return new DuplexStream((input, output) => { output.getWriter().close(); }); } static fromArray(items: any[]) { return new DuplexStream((input, output) => { const writer = output.getWriter(); items.forEach(item => writer.write(item)); writer.close(); }); } then(resolve?, reject?) { return readAllChunks(this).then(resolve, reject); } get writable(): WritableStream { return this._input.writable; } get readable(): ReadableStream { return this._output.readable; } } export const createDuplexStream = (handler: DuplexStreamHandler) => new DuplexStream(handler);