import { ISubscribable, ISubscription } from "./types"; export interface IIteratorConfig { subscribable: ISubscribable; } export class Iterator { protected isClosed = false; protected subscription: ISubscription; protected values: T[] = []; protected error: any = undefined; protected resolve?: (value: T) => any; protected reject?: (err: any) => any; protected isCompleted = false; constructor(protected config: IIteratorConfig) { this.subscription = this.config.subscribable.subscribe({ next: this.push, error: this.setError, complete: () => { this.isCompleted = true; }, }); } isComplete() { return this.isCompleted; } close() { this.values = []; this.subscription.unsubscribe(); this.isClosed = true; } async next() { if (this.isClosed) { throw new Error("Channel was closed"); } if (this.error) { throw new Error(this.error); } if (this.values.length > 0) { return this.values.shift(); } return new Promise((resolve, reject) => { this.resolve = resolve; this.reject = reject; }); } protected push = (value: T) => { if (this.resolve) { setTimeout(() => { if (this.resolve) { this.resolve(value); this.resolve = undefined; } }); } else { this.values.push(value); } }; protected setError = (err: any) => { if (this.reject) { this.reject(err); this.reject = undefined; } else { this.error = err; } }; }