import { EMPTY, noop, Observable, OperatorFunction, Subject, Subscription } from 'rxjs' import { multicast } from 'rxjs/operators' /** * Subscribes to any emitted observable, but "cancels" it immediately when another one arrives. * _Canceling_ means: * * the source observable is immediately unsubscribed from * * the re-emitted hot observable is completed or failed (if `cancelError` is specified and different from `undefined`) * * The observables are re-emitted immediately as *hot*. * * **Warning:** The source observable is always subscribed to, therefore this concurrency strategy cannot prevent eventual side-effects when cancelling. * * @param cancelError when provided the re-emitted observable will throw the provided error instead of completing when it is cancelled * @typeParam T type of both the accepted and emitted observable * @category Operator */ export function concurrentLatest (cancelError?: any): OperatorFunction, Observable> { return source => new Observable(subscriber => { let pendingConnection: Subscription | null = null let pending$: Observable = EMPTY let pendingSubject: Subject | null = null return source.subscribe({ next: observable => { if (pendingSubject) { if (cancelError !== undefined) { pendingSubject.error(cancelError) } else { pendingSubject.complete() } pendingSubject = null } if (pendingConnection) { pendingConnection.unsubscribe() pendingConnection = null } const subject = new Subject() pendingSubject = subject const current$ = multicast(() => subject)(observable) pending$ = current$ const connection = current$.connect() pendingConnection = connection current$.toPromise().finally(() => { connection.unsubscribe() if (pendingConnection === connection) { pendingConnection = null pending$ = EMPTY pendingSubject = null } }).catch(noop) subscriber.next(current$) }, error: err => subscriber.error(err), complete: () => { pending$.toPromise().finally(() => subscriber.complete()).catch(noop) } }) }) }