import { FSharpChoice$2_$union, Choice_tryValueIfChoice1Of2, Choice_tryValueIfChoice2Of2 } from "./Choice.ts"; import { Option, value } from "./Option.ts"; import { IDisposable } from "./Util.ts"; export interface IObserver { OnNext: (x: T) => void; OnError: (e: any) => void; OnCompleted: () => void; } export class Observer implements IObserver { public OnNext: (x: T) => void; public OnError: (e: any) => void; public OnCompleted: () => void; constructor(onNext: (x: T) => void, onError?: (e: any) => void, onCompleted?: () => void) { this.OnNext = onNext; this.OnError = onError || ((_e: any) => { return; }); this.OnCompleted = onCompleted || (() => { return; }); } } export interface IObservable { Subscribe: (o: IObserver) => IDisposable; } class Observable implements IObservable { public Subscribe: (o: IObserver) => IDisposable; constructor(subscribe: (o: IObserver) => IDisposable) { this.Subscribe = subscribe; } } function protect(f: () => T, succeed: (x: T) => void, fail: (e: any) => void) { try { return succeed(f()); } catch (e) { fail(e); } } export function add(callback: (x: T) => void, source: IObservable): void { source.Subscribe(new Observer(callback)); } export function choose(chooser: (x: T) => Option, source: IObservable): Observable { return new Observable((observer) => source.Subscribe(new Observer((t) => protect( () => chooser(t), (u) => { if (u != null) { observer.OnNext(value(u)); } }, observer.OnError), observer.OnError, observer.OnCompleted))); } export function filter(predicate: (x: T) => boolean, source: IObservable): IObservable { return choose((x) => predicate(x) ? x : void 0, source); } export function map(mapping: (x: T) => U, source: IObservable): IObservable { return new Observable((observer) => source.Subscribe(new Observer((t) => { protect( () => mapping(t), observer.OnNext, observer.OnError); }, observer.OnError, observer.OnCompleted))); } export function merge(source1: IObservable, source2: IObservable): IObservable { return new Observable((observer) => { let stopped = false; let completed1 = false; let completed2 = false; const h1 = source1.Subscribe(new Observer( (v) => { if (!stopped) { observer.OnNext(v); } }, (e) => { if (!stopped) { stopped = true; observer.OnError(e); } }, () => { if (!stopped) { completed1 = true; if (completed2) { stopped = true; observer.OnCompleted(); } } })); const h2 = source2.Subscribe(new Observer( (v) => { if (!stopped) { observer.OnNext(v); } }, (e) => { if (!stopped) { stopped = true; observer.OnError(e); } }, () => { if (!stopped) { completed2 = true; if (completed1) { stopped = true; observer.OnCompleted(); } } })); return { Dispose() { h1.Dispose(); h2.Dispose(); }, }; }); } export function pairwise(source: IObservable): IObservable<[T, T]> { return new Observable<[T, T]>((observer) => { let last: T; return source.Subscribe(new Observer((next) => { if (last != null) { observer.OnNext([last, next]); } last = next; }, observer.OnError, observer.OnCompleted)); }); } export function partition(predicate: (x: T) => boolean, source: IObservable): [Observable, Observable] { return [filter(predicate, source), filter((x) => !predicate(x), source)]; } export function scan(collector: (u: U, t: T) => U, state: U, source: IObservable): IObservable { return new Observable((observer) => { return source.Subscribe(new Observer((t) => { protect( () => collector(state, t), (u) => { state = u; observer.OnNext(u); }, observer.OnError); }, observer.OnError, observer.OnCompleted)); }); } export function split(splitter: (x: T) => FSharpChoice$2_$union, source: IObservable): [Observable, Observable] { return [ choose((v) => Choice_tryValueIfChoice1Of2(splitter(v)), source), choose((v) => Choice_tryValueIfChoice2Of2(splitter(v)), source) ]; } export function subscribe(callback: (x: T) => void, source: IObservable): IDisposable { return source.Subscribe(new Observer(callback)); }