import { BehaviorSubject, Observable, Subject, Subscription } from 'rxjs'; import { PartialObserver } from 'rxjs/Observer'; import { Command, ObservableOrValue } from './Interfaces'; import { asObservable, handleError, isObservable } from './Utils'; export type ExecutionAction = (parameter: any) => ObservableOrValue; export type InterrogationAction = ( condition: T, parameter?: any, ) => boolean; export class ObservableCommand extends Subscription implements Command { static coerceCondition(condition: T) { if (typeof condition === 'boolean') { return condition; } return condition != null; } protected isExecutingSubject: BehaviorSubject; protected conditionSubject: BehaviorSubject; protected canExecuteSubject: BehaviorSubject; protected requestsSubject: Subject; protected resultsSubject: Subject; protected thrownErrorsSubject: Subject; constructor( protected readonly executeAction: ExecutionAction, protected readonly interrogationAction: InterrogationAction< TCondition | undefined > = x => ObservableCommand.coerceCondition(x), private condition?: Observable, private initialCondition?: TCondition, ) { super(); this.isExecutingSubject = this.addSubscription( new BehaviorSubject(false), ); this.conditionSubject = this.addSubscription( new BehaviorSubject(initialCondition), ); this.requestsSubject = this.addSubscription(new Subject()); this.resultsSubject = this.addSubscription(new Subject()); this.thrownErrorsSubject = this.addSubscription(new Subject()); if (condition != null) { this.add(condition.subscribe(this.conditionSubject)); } this.canExecuteSubject = this.addSubscription( new BehaviorSubject( condition == null || this.conditionValue == null || this.interrogationAction(this.conditionValue), ), ); const canExecute = condition == null ? asObservable(true) : this.conditionSubject.map(x => this.interrogationAction(x)); this.add( canExecute .combineLatest( this.isExecutingSubject, (ce, ie) => ce === true && ie === false, ) .catch(e => { handleError(e, this.thrownErrorsSubject); return asObservable(false); }) .distinctUntilChanged() .subscribe(this.canExecuteSubject), ); } get isExecutingObservable() { return this.isExecutingSubject.distinctUntilChanged(); } get isExecuting() { return this.isExecutingSubject.getValue(); } get conditionObservable() { return this.conditionSubject.distinctUntilChanged(); } get canExecuteObservable() { return this.canExecuteSubject.distinctUntilChanged(); } get conditionValue() { return this.conditionSubject.getValue(); } get canExecute() { return this.canExecuteSubject.getValue(); } isCommand() { return true; } canExecuteFor(parameter: any) { return this.interrogationAction( this.conditionSubject.getValue(), parameter, ); } observeExecution(parameter?: any): Observable { if (this.canExecute === false) { const error = new Error('canExecute currently forbids execution'); handleError(error); return Observable.throw(error); } return ( Observable.of(parameter) .do(x => { this.requestsSubject.next(x); this.isExecutingSubject.next(true); }) .flatMap(x => { return asObservable(this.executeAction(x)); }) .do( x => { this.resultsSubject.next(x); this.isExecutingSubject.next(false); }, e => { // capture the error, but don't swallow it handleError(e, this.thrownErrorsSubject); this.isExecutingSubject.next(false); }, () => { this.isExecutingSubject.next(false); }, ) // this will prevent execution if nobody is subscribing to the result .share() ); } execute(parameter?: any, observer?: PartialObserver): Subscription; execute( parameter?: any, next?: (value: T) => void, error?: (error: any) => void, complete?: () => void, ): Subscription; execute( parameter?: any, observerOrNext?: PartialObserver | ((value: T) => void), error: (error: any) => void = () => { return; }, complete?: () => void, ): Subscription { const obs = this.observeExecution(parameter); return obs.subscribe.apply(obs, [observerOrNext, error, complete]); } get requests() { return this.requestsSubject.asObservable(); } get results() { return this.resultsSubject.asObservable(); } get thrownErrors() { return this.thrownErrorsSubject.asObservable(); } } export function command(): Command; // tslint:disable-next-line:unified-signatures export function command(execute: ExecutionAction): Command; export function command( canExecute: Observable, // tslint:disable-next-line:unified-signatures execute?: ExecutionAction, ): Command; export function command( execute: ExecutionAction, // tslint:disable-next-line:unified-signatures canExecute: Observable, ): Command; export function command( execute: ExecutionAction, condition: Observable, interrogation: InterrogationAction, initialCondition?: TCondition, ): Command; export function command( arg1?: ExecutionAction | Observable, arg2?: ExecutionAction | Observable, interrogation?: InterrogationAction, initialCondition?: any, ): Command { let condition: Observable | undefined; let execute: ExecutionAction = (x: T) => x; if (isObservable(arg1)) { condition = arg1; if (arg2 instanceof Function) { execute = arg2; } } else if (isObservable(arg2)) { condition = arg2; if (arg1 instanceof Function) { execute = arg1; } } else { // no boolean observable passed in for can execute // just check if arg1 is a function if (arg1 instanceof Function) { execute = arg1; } } if (condition == null) { initialCondition = true; } return new ObservableCommand( execute, interrogation, condition, initialCondition, ); }