import Future from './Future'; import Lock from './Lock'; export interface Transform { (input: I): O | PromiseLike; } export interface PipelineFn extends Transform { (input: I | PromiseLike): Future; } export interface Pipeline extends PipelineFn { pipe(transform: Transform, lock?: Lock): Pipeline; } export function pipe( this: Transform, transform: Transform, lock = new Lock() ): Pipeline { const previous = this; const pipeline: Pipeline = Object.assign( function Pipeline(input: A | PromiseLike) { const guard_promise = lock.wait_and_lock(); const future = new Future((resolve, reject) => { Future.resolve(input) .then(previous) .await(guard_promise) .then(transform) .then(resolve, reject) .then(() => guard_promise) .then((guard) => guard.release_async()); }); return future; }, { pipe } ); return pipeline; } export function Pipeline( transform: Transform, lock = new Lock() ): Pipeline { // @ts-ignore return pipe(transform, lock); } export default Pipeline; Object.defineProperties(Pipeline, { pipe: { get: () => pipe }, default: { get: () => Pipeline }, Pipeline: { get: () => Pipeline }, });