import { Defer, makeDefer, EventTrigger } from './utility'; declare global { interface SymbolConstructor { readonly observable: unique symbol; } } if (!Symbol.observable) Reflect.set(Symbol, 'observable', Symbol('observable')); export interface Observer { next(value: T): void; error(reason: string | Error): void; complete(): void; } export interface Subscription { unsubscribe(): void; readonly closed: boolean; } export interface Subscribable { [Symbol.observable](): Subscribable; subscribe( onNext: Observer['next'], onError?: Observer['error'], onComplete?: Observer['complete'] ): Subscription; } export type SubscriberFunction = ( observer: Observer ) => (() => void) | void; export class Observable implements Subscribable { constructor(private subscriber: SubscriberFunction) {} [Symbol.observable]() { return this; } async *[Symbol.asyncIterator]() { var queue: Defer[] = [new Defer()], canceler: (() => void) | void, done = false; const observer: Observer = { next(value) { if (done) return; queue[queue.length - 1].resolve(value); queue.push(new Defer()); }, error(reason) { if (!done) queue[queue.length - 1].reject(reason), (done = true); if (canceler) canceler(); }, complete() { if (!done) queue[queue.length - 1].resolve(), (done = true); if (canceler) canceler(); } }; canceler = this.subscriber(observer); do { yield queue[0].promise; queue.shift(); } while (queue[0]); } static fromStream(list: Iterable | AsyncIterable) { return new this(({ next, complete, error }) => { var stopped = false; (async () => { try { for await (const item of list) if (!stopped) next(item); else break; if (!stopped) complete(); } catch (bug) { if (!stopped) error(bug); } })(); return () => (stopped = true); }); } static of(...items: T[]) { return this.fromStream(items); } async toPromise() { const stack = []; for await (const item of this) { stack.push(item); if (stack.length > 2) stack.shift(); } return stack[0]; } subscribe( onNext: Observer['next'], onError?: Observer['error'], onComplete?: Observer['complete'] ) { var stop = false; (async () => { try { for await (const item of this) if (!stop) onNext(item); else break; if (onComplete instanceof Function) onComplete(); } catch (error) { if (onError instanceof Function) onError(error); } })(); return { unsubscribe: () => (stop = true), get closed() { return stop; } }; } static from( observable: Iterable | AsyncIterable | Subscribable ) { if (Symbol.iterator in observable) return this.of(...observable); if (Symbol.asyncIterator in observable) return this.fromStream(observable); return new this( ({ next, error, complete }) => observable.subscribe(next, error, complete).unsubscribe ); } static fromEvent(target: EventTrigger, name: string) { return new this(({ next, error }) => { if (typeof target.on === 'function') target.on(name, next).on('error', error); else { target.addEventListener(name, next); target.addEventListener('error', error); } return () => { if (typeof target.off === 'function') target.off(name, next).off('error', error); else { target.removeEventListener(name, next); target.removeEventListener('error', error); } }; }); } }