// tslint:disable max-classes-per-file no-use-before-declare import $$observable from 'symbol-observable' import { ArrayValues, Disposer, PartialObserver, SignalType, Subscribable, Subscriber, Subscription, SubscriptionObserver, Unary } from './types.h' /* istanbul ignore next */ const $$toStringTag: symbol = (typeof Symbol === 'function' && Symbol.toStringTag) || ('@@toStringTag' as any) const fromArray = (arrayLike: ArrayLike): Subscriber => { return (observer) => { for (let index = 0; index < arrayLike.length; index += 1) { if (observer.closed) { return } observer.next(arrayLike[index]) } observer.complete() } } const fromIterable = (iterable: Iterable): Subscriber => { const iterator = iterable[Symbol.iterator]() return (observer) => { let iteratorNormalCompletion = true let iteratorError: any let step = iterator.next() try { while (!step.done) { iteratorNormalCompletion = step.done if (observer.closed) { return } observer.next(step.value) iteratorNormalCompletion = true step = iterator.next() } observer.complete() } catch (e) { iteratorError = { e } } finally { try { /* istanbul ignore next */ if (!iteratorNormalCompletion && iterator.return) { iterator.return() } } finally { if (iteratorError) { observer.error(iteratorError.e) } } } } } function cleanupSubscription(subscription: ObservableSubscription) { const disposer = subscription._disposer if (!disposer) { return } subscription._disposer = undefined if (typeof disposer === 'function') { disposer() } else { if (disposer.unsubscribe) { disposer.unsubscribe() } } } function closeSubscription(subscription: ObservableSubscription) { subscription._observer = undefined subscription._closed = true } function notifySubscription( subscription: ObservableSubscription, type: SignalType, value: any ) { if (subscription.closed) { return } const observer = subscription._observer /* istanbul ignore next */ if (!observer) { return } switch (type) { case SignalType.next: if (observer.next) { observer.next(value) } break case SignalType.error: closeSubscription(subscription) if (observer.error) { observer.error(value) } else { cleanupSubscription(subscription) throw value } break case SignalType.complete: closeSubscription(subscription) if (observer.complete) { observer.complete() } break } if (subscription.closed) { cleanupSubscription(subscription) } } class ObservableSubscription implements Subscription { // @ts-ignore [$$toStringTag]: 'Subscription' _disposer: Disposer | undefined _observer: PartialObserver | undefined _closed: boolean = false constructor(observer: PartialObserver, source: Subscriber) { this._observer = observer if (observer.start) { observer.start(this) } const subscriptionObserver = new ConcreteObserver(this) try { this._disposer = source(subscriptionObserver) } catch (error) { if (!observer.error) { throw error } subscriptionObserver.error(error) } } get closed() { return this._closed } unsubscribe() { if (this.closed) { return } closeSubscription(this) cleanupSubscription(this) } } class ConcreteObserver implements SubscriptionObserver { _subscription: ObservableSubscription; // @ts-ignore [$$toStringTag]: 'Subscription Observer' constructor(subscription: ObservableSubscription) { this._subscription = subscription } get closed() { return this._subscription.closed } next(value: T) { notifySubscription(this._subscription, SignalType.next, value) } error(reason: any) { notifySubscription(this._subscription, SignalType.error, reason) } complete() { notifySubscription(this._subscription, SignalType.complete, undefined) } } export class Observable implements Subscribable { static of(...args: TS): Observable> static of() { const C = typeof this === 'function' ? this : Observable return new C(fromArray(arguments)) } static from(ish: Subscribable | Observable | Iterable) { const C = typeof this === 'function' ? this : Observable const error = `${ish} is not an object` if (ish == null) { throw new TypeError(error) } if ((ish as any)[$$observable]) { const observable = (ish as any)[$$observable]() if (Object(observable) !== observable) { throw new TypeError(error) } if (observable instanceof Observable && observable.constructor === C) { return observable as Observable } return new C((observer) => observable.subscribe(observer)) } if (typeof Symbol === 'function' && Symbol.iterator && (ish as any)[Symbol.iterator]) { return new C(fromIterable((ish as any)[Symbol.iterator]())) } // For old browsers that doesn't support @@iterator /* istanbul ignore next */ if (Array.isArray(ish)) { return new C(fromArray(ish)) } throw new TypeError(error) } // @ts-ignore [$$toStringTag]: 'Observable' private _subscribe: Subscriber constructor(_subscribe: Subscriber) { // This check should stay in case if the ES6->ES5 transpiling is enabled. /* istanbul ignore next */ if (!(this instanceof Observable)) { throw new TypeError('Observable cannot be called as a function') } if (typeof _subscribe !== 'function') { throw new TypeError('Observable initializer must be a function') } this._subscribe = _subscribe } subscribe( next?: PartialObserver | ((value: T) => void), error?: (reason: any) => void, complete?: () => void ) { let observer: any if (typeof next !== 'object' || next === null) { observer = { next, error, complete } } else { observer = next } return new ObservableSubscription(observer, this._subscribe) } pipe(): Observable pipe(op1: Unary, A>): A pipe(op1: Unary, A>, op2: Unary): B pipe(op1: Unary, A>, op2: Unary, op3: Unary): C pipe( op1: Unary, A>, op2: Unary, op3: Unary, op4: Unary ): D pipe( op1: Unary, A>, op2: Unary, op3: Unary, op4: Unary, op5: Unary ): E pipe( op1: Unary, A>, op2: Unary, op3: Unary, op4: Unary, op5: Unary, op6: Unary ): F pipe( op1: Unary, A>, op2: Unary, op3: Unary, op4: Unary, op5: Unary, op6: Unary, op7: Unary ): G pipe( op1: Unary, A>, op2: Unary, op3: Unary, op4: Unary, op5: Unary, op6: Unary, op7: Unary, op8: Unary ): H pipe( op1: Unary, A>, op2: Unary, op3: Unary, op4: Unary, op5: Unary, op6: Unary, op7: Unary, op8: Unary, op9: Unary ): I pipe(): any { if (!arguments.length) { return this } // tslint:disable-next-line no-this-assignment let result: any = this for (let i = 0; i < arguments.length; i += 1) { result = arguments[i](result) } return result } [$$observable]() { return this } }