import { equal } from '../util' export type Awaitable = T | PromiseLike export type Mapper = (t: T) => Awaitable export type Filter = (t: T) => Awaitable export type Reducer = (acc: U, t: T) => Awaitable // eslint-disable-next-line @typescript-eslint/no-explicit-any export type NamedEventuals = { [k: string]: Eventual } & { [K in keyof T]: T[K] } export type Join = Eventual ? U : any }> type MutableJoin = WritableEventual ? U : any }> export interface TryMapOptions { // eslint-disable-next-line @typescript-eslint/no-explicit-any onError: (err: any) => void } export type Subscriber = (value: T) => void export interface Eventual { readonly valueReady: boolean value(): Promise subscribe(subscriber: Subscriber): void map(f: Mapper): Eventual tryMap(f: Mapper, options: TryMapOptions): Eventual filter(f: Filter): Eventual pipe(f: (t: T) => Awaitable): void throttle(interval: number): Eventual reduce(f: Reducer, initial: U): Eventual // An asynchronous generator over values pushed into the Eventual // over time. Note: There is no guarantee that all values will be // emitted; some may be skipped e.g. if multiple different values // are pushed into the eventual at almost the same time; only the // last of these may be emitted in this case. values(): AsyncGenerator } export interface WritableEventual extends Eventual { push(value: T): void } export class EventualValue implements WritableEventual { inner: T | undefined promise: Promise | undefined resolvePromise?: (value: T) => void subscribers: Subscriber[] = [] constructor(initial?: T) { this.inner = initial this.promise = new Promise(resolve => { if (this.inner !== undefined) { resolve(this.inner) } else { this.resolvePromise = resolve } }) } get valueReady(): boolean { return this.inner !== undefined } value(): Promise { if (this.promise) { return this.promise } else { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion return Promise.resolve(this.inner!) } } subscribe(subscriber: Subscriber): void { this.subscribers.push(subscriber) if (this.inner !== undefined) { subscriber(this.inner) } } push(value: T): void { if (!equal(this.inner, value)) { this.inner = value this.resolvePromise?.call(this, value) this.promise = undefined this.resolvePromise = undefined this.subscribers.forEach(subscriber => subscriber(value)) } } map(f: Mapper): Eventual { return map(this, f) } tryMap(f: Mapper, options: TryMapOptions): Eventual { return tryMap(this, f, options) } filter(f: Filter): Eventual { return filter(this, f) } pipe(f: (t: T) => Awaitable): void { return pipe(this, f) } throttle(interval: number): Eventual { return throttle(this, interval) } reduce(f: Reducer, initial: U): Eventual { return reduce(this, f, initial) } async *values(): AsyncGenerator { // Creates a promise and exposes its `resolve` method so it can be triggered // externally function defer() { let resolve: ((t: T) => void) | null = null const promise = new Promise(_resolve => { resolve = _resolve }) const deferred: { promise: Promise resolve: (t: T) => void } = { promise, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion resolve: resolve!, } return deferred } // Create the initial promise let next = defer() // Delay this ever so slightly to allow `await next.promise` to be executed // before we resolve the first value. Otherwise we'd skip the value at the // time `values()` is called, because `await next.promise` would await the // second, not the initial promise. setTimeout( () => // Whenever there is a new value, resolve the current promise // and replace it with a new one this.pipe(t => { next.resolve(t) next = defer() }), 0, ) while (true) { yield await next.promise } } } export function mutable(initial?: T): WritableEventual { return new EventualValue(initial) } export function map( source: Eventual, mapper: (t: T) => Awaitable, ): Eventual { const output: WritableEventual = mutable() let previousT: T | undefined let latestT: T | undefined let mapPromise: Promise | undefined source.subscribe(t => { latestT = t if (mapPromise === undefined) { mapPromise = (async () => { while (!equal(latestT, previousT)) { previousT = latestT output.push(await mapper(latestT)) } mapPromise = undefined })() } }) return output } export function tryMap( source: Eventual, mapper: (t: T) => Awaitable, { onError }: TryMapOptions, ): Eventual { const output: WritableEventual = mutable() let previousT: T | undefined let latestT: T | undefined let promiseActive = false source.subscribe(t => { latestT = t if (!promiseActive) { promiseActive = true ;(async () => { while (!equal(latestT, previousT)) { try { previousT = latestT output.push(await mapper(latestT)) } catch (err) { onError(err) } } promiseActive = false })() } }) return output } export function filter(source: Eventual, f: Filter): Eventual { const output: WritableEventual = mutable() let previousT: T | undefined let latestT: T | undefined let mapPromise: Promise | undefined source.subscribe(t => { latestT = t if (mapPromise === undefined) { mapPromise = (async () => { while (!equal(latestT, previousT)) { previousT = latestT if (await f(latestT)) { output.push(latestT) } } mapPromise = undefined })() } }) return output } export function pipe(source: Eventual, fn: (t: T) => Awaitable): void { map(source, fn) } export function throttle(source: Eventual, interval: number): Eventual { const output: WritableEventual = mutable() let latestT: T | undefined let timeout: NodeJS.Timeout | undefined let lastPushed = Date.now() source.subscribe(t => { if (!output.valueReady) { latestT = t output.push(t) lastPushed = Date.now() } else if (!equal(t, latestT)) { latestT = t if (!timeout) { timeout = setTimeout( () => { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion output.push(latestT!) lastPushed = Date.now() timeout = undefined }, Math.max(0, Math.min(interval, Date.now() - lastPushed)), ) } } }) return output } export function timer(milliseconds: number): Eventual { const time = mutable(Date.now()) setInterval(() => time.push(Date.now()), milliseconds) return time } export function reduce( source: Eventual, reducer: (acc: U, t: T) => Awaitable, initial: U, ): Eventual { const output = mutable(initial) let acc: U = initial let previousT: T | undefined let latestT: T | undefined let promiseActive = false source.subscribe(t => { latestT = t if (!promiseActive) { promiseActive = true ;(async () => { while (!equal(latestT, previousT)) { previousT = latestT acc = await reducer(acc, latestT) output.push(acc) } promiseActive = false })() } }) return output } export function join(sources: NamedEventuals): Join { const output: MutableJoin = mutable() const keys = Object.keys(sources) as Array const sourceValues: { // eslint-disable-next-line @typescript-eslint/no-explicit-any [key in keyof T]: T[key] extends Eventual ? U : any } = keys.reduce((out, key) => { out[key] = undefined return out // eslint-disable-next-line @typescript-eslint/no-explicit-any }, {} as any) for (const key of keys) { sources[key].subscribe(value => { sourceValues[key] = value if (!keys.some(key => sourceValues[key] === undefined)) { // NOTE: creating a new JS object is important, otherwise // `output.inner` and `sourceValues` will be the same object // and therefore always be considered identical output.push({ ...sourceValues }) } }) } return output }