import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Observable } from '../Observable';
import { PartialObserver } from '../Observer';
import { TeardownLogic } from '../Subscription';
/**
* Perform a side effect for every emission on the source Observable, but return
* an Observable that is identical to the source.
*
* Intercepts each emission on the source and runs a
* function, but returns an output which is identical to the source.
*
*
*
* Returns a mirrored Observable of the source Observable, but modified so that
* the provided Observer is called to perform a side effect for every value,
* error, and completion emitted by the source. Any errors that are thrown in
* the aforementioned Observer or handlers are safely sent down the error path
* of the output Observable.
*
* This operator is useful for debugging your Observables for the correct values
* or performing other side effects.
*
* Note: this is different to a `subscribe` on the Observable. If the Observable
* returned by `do` is not subscribed, the side effects specified by the
* Observer will never happen. `do` therefore simply spies on existing
* execution, it does not trigger an execution to happen like `subscribe` does.
*
* @example
Map every every click to the clientX position of that click, while also logging the click event
* var clicks = Rx.Observable.fromEvent(document, 'click');
* var positions = clicks
* .do(ev => console.log(ev))
* .map(ev => ev.clientX);
* positions.subscribe(x => console.log(x));
*
* @see {@link map}
* @see {@link subscribe}
*
* @param {Observer|function} [nextOrObserver] A normal Observer object or a
* callback for `next`.
* @param {function} [error] Callback for errors in the source.
* @param {function} [complete] Callback for the completion of the source.
* @return {Observable} An Observable identical to the source, but runs the
* specified Observer or callback(s) for each item.
* @method do
* @name do
* @owner Observable
*/
export function _do(nextOrObserver?: PartialObserver | ((x: T) => void),
error?: (e: any) => void,
complete?: () => void): Observable {
return this.lift(new DoOperator(nextOrObserver, error, complete));
}
export interface DoSignature {
(next: (x: T) => void, error?: (e: any) => void, complete?: () => void): Observable;
(observer: PartialObserver): Observable;
}
class DoOperator implements Operator {
constructor(private nextOrObserver?: PartialObserver | ((x: T) => void),
private error?: (e: any) => void,
private complete?: () => void) {
}
call(subscriber: Subscriber, source: any): TeardownLogic {
return source._subscribe(new DoSubscriber(subscriber, this.nextOrObserver, this.error, this.complete));
}
}
/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class DoSubscriber extends Subscriber {
private safeSubscriber: Subscriber;
constructor(destination: Subscriber,
nextOrObserver?: PartialObserver | ((x: T) => void),
error?: (e: any) => void,
complete?: () => void) {
super(destination);
const safeSubscriber = new Subscriber(nextOrObserver, error, complete);
safeSubscriber.syncErrorThrowable = true;
this.add(safeSubscriber);
this.safeSubscriber = safeSubscriber;
}
protected _next(value: T): void {
const { safeSubscriber } = this;
safeSubscriber.next(value);
if (safeSubscriber.syncErrorThrown) {
this.destination.error(safeSubscriber.syncErrorValue);
} else {
this.destination.next(value);
}
}
protected _error(err: any): void {
const { safeSubscriber } = this;
safeSubscriber.error(err);
if (safeSubscriber.syncErrorThrown) {
this.destination.error(safeSubscriber.syncErrorValue);
} else {
this.destination.error(err);
}
}
protected _complete(): void {
const { safeSubscriber } = this;
safeSubscriber.complete();
if (safeSubscriber.syncErrorThrown) {
this.destination.error(safeSubscriber.syncErrorValue);
} else {
this.destination.complete();
}
}
}