/** * A stream that process atvise server requests in parallel. * @abstract */ export default class QueueStream extends Stream { /** * Creates a new QueueStream with the given options. * @param {Object} [options] The options to use. * @param {number} [options.maxParallel] The maximum of parallel tasks to execute. */ constructor(options?: { maxParallel: number; }); /** * The number of running operations. * @type {Number} */ _processing: number; /** * The number of chunks processed so far. * @type {Number} */ _processed: number; /** * The queued chunks. * @type {*[]} */ _queued: any[]; /** * The maximum of parallel tasks to execute * @type {number} */ _maxParallel: number; /** * The timestamp of the date when the stream was created. * @type {Number} */ _start: number; /** * `true` if there are queued operations or an operation is running right now. * @type {boolean} */ get hasPending(): boolean; /** * `true` if there are no queued operations. * @type {boolean} */ get queueEmpty(): boolean; /** * The number of chunks already processed. * @type {number} */ get processed(): number; /** * The number of processed chunks per second. * @type {number} */ get opsPerSecond(): number; /** * The error message to use when processing a chunk fails. **Must be overridden by all * subclasses!**. * @param {*} chunk The chunk being processed. * @return {string} The error message to use. * @abstract */ processErrorMessage(chunk: any): string; /** * The function to call when a chunk is ready to be processed. **Must be overridden by all * subclasses.**. * @param {*} chunk The chunk to process. * @param {function(err: Error, statusCode: node-opcua~StatusCodes, onSuccess: function)} * handleErrors Call this function to handle errors and bad status codes. When no error occured * and the status code received is fine, `onSuccess` is called. Further processing of valid * chunks, for example Recursions should happen in `onSuccess`. **Note that `onSuccess` is an * asynchronous function with a callback as an argument.**. * @example Basic implementation * class MyQueueStream extends QueueStream { * ... * processChunk(chunk, handle) { * client.session.doSomething((err, result, statusCode) => handle(err, statusCode, done => { * // This is called if err is falsy and status code is node-opcua~StatusCodes.Good * doSomethingWith(result); * done(); * })); * } * ... * } * @example Implement a recursion * class RecursiveQueueStream extends QueueStream { * ... * processChunk(chunk, handle) { * client.session.doSomething((err, result, statusCode) => handle(err, statusCode, done => { * // Write the result back to the stream. * // This means, that `result` will be queued and, as soon as possible, #processChunk will * // be called with `result` as the `chunk` argument. * this.write(result, null, done); * })); * } * ... * } * @example Allowing some invalid status codes * import { StatusCodes } from 'node-opcua'; * * class FriendlyQueueStream extends QueueStream { * ... * processChunk(chunk, handle) { * client.session.doSomething((err, result, statusCode) => { * if (statusCode === StatusCodes.BadUserAccessDenied) { * Logger.warn(`Ignored invalid status: ${statusCode.description}`); * handle(err, StatusCodes.Good, done => done()); * } else { * handle(err, statusCode, done => done()); * } * }); * } * ... * } * @abstract */ processChunk(chunk: any, handleErrors: any): void; /** * Calls {@link QueueStream#processChunk} and handles errors and invalid status codes. * @param {*} chunk The chunk to process. * @emits {*} Emits a `processed-chunk` event once a chunk was processed. */ _processChunk(chunk: any): void; /** * Enqueues the given chunk for processing. * @param {*} chunk The chunk to enqueue. */ _enqueueChunk(chunk: any): void; /** * Calls {@link QueueStream#_enqueueChunk} as soon as the stream's session is opened. * @param {*} chunk The chunk to transform. * @param {string} enc The encoding used. * @param {Function} callback Called once the chunk has been enqueued. */ _transform(chunk: any, enc: string, callback: Function): void; } import Stream from "./Stream";