// Copyright (C) 2017-2019 Brainbean Apps OU (https://brainbeanapps.com). // License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). import { isFunction, isArray, isNil } from 'lodash' import { Readable, Transform } from 'stream' import Promise from 'bluebird' import * as logger from 'winston' import { SequenceOptions } from './sequenceOptions' /** * Wraps multiple readable streams into a sequence */ export default class Sequence extends Readable { /** * @param objectMode Object mode setting * @param abortOnError True if error in sequence should emit error, false otherwise */ static DEFAULT_OPTIONS: { objectMode: boolean abortOnError: boolean } private _options: any private _context: null private _stream: Transform | null private readonly _nextStreamGetter?: Function | null private readonly _onStreamEndHandler: () => void private readonly _onStreamCloseHandler: () => void private readonly _onStreamDataHandler: (chunk: any) => void private readonly _onStreamErrorHandler: (error: Error) => void /** * @constructor * @param sequence Functor-factory or array of Readable streams * @param options Sequence options */ constructor( sequence: Function | Array, options?: SequenceOptions, ) { options = Object.assign({}, Sequence.DEFAULT_OPTIONS, options) super({ objectMode: options.objectMode, }) this._options = options this._context = null this._stream = null if (isFunction(sequence)) { this._nextStreamGetter = sequence } else if (isArray(sequence)) { // @ts-ignore this._nextStreamGetter = (oldStream, oldContext) => { const index = (isNil(oldContext) ? -1 : oldContext) + 1 if (index >= sequence.length) { return null } return [sequence[index], index] } } this._onStreamEndHandler = () => this._onStreamEnd() this._onStreamCloseHandler = () => this._onStreamClose() this._onStreamErrorHandler = error => this._onStreamError(error) this._onStreamDataHandler = (chunk: any) => this._onStreamData(chunk) this.once('read', () => { this._obtainNextStream() }) } /** * https://nodejs.org/api/stream.html#stream_readable_read_size_1 */ _read(size: any) { this.emit('read') } /** * https://nodejs.org/api/stream.html#stream_readable_destroy_err_callback */ _destroy(error: any, callback: any) { if (this._stream) { this._detachFromStream() this._stream.destroy(error) this._stream = null } super._destroy(error, callback) } /** * Obtains next stream */ _obtainNextStream() { if (this._nextStreamGetter) { // @ts-ignore Promise.try(() => this._nextStreamGetter(this._stream, this._context)) .catch(error => { logger.log( 'error', 'Error in Sequence while getting next stream:', error, ) if (this._options.abortOnError) { throw error } }) .then(result => { if (!result) { this.push(null) return } if (isArray(result)) { ;[this._stream, this._context] = result } else { this._stream = result this._context = null } if (this._stream) { this._stream .on('end', this._onStreamEndHandler) .on('close', this._onStreamCloseHandler) .on('error', this._onStreamErrorHandler) .on('data', this._onStreamDataHandler) } }) .catch(error => { logger.log('error', 'Error in Sequence:', error) this.emit('error', error) this.push(null) }) } } /** * Detaches from current stream */ _detachFromStream() { if (this._stream) { this._stream.removeListener('end', this._onStreamEndHandler) this._stream.removeListener('close', this._onStreamCloseHandler) this._stream.removeListener('error', this._onStreamErrorHandler) this._stream.removeListener('data', this._onStreamDataHandler) } } /** * Handles stream end event */ _onStreamEnd() { this._detachFromStream() this._obtainNextStream() } /** * Handles stream close event */ _onStreamClose() { this._detachFromStream() this._obtainNextStream() } /** * Handles stream error event */ _onStreamError(error: Error) { logger.log('error', 'Error in Sequence chain:', error) this._detachFromStream() if (this._options.abortOnError) { this.emit('error', error) this.push(null) } else { this._obtainNextStream() } } /** * Handles stream readable event */ _onStreamData(chunk: any) { this.push(chunk) } }