import { type Noop, noop } from '../../utils/noop' export type TeardownLogic = Unsubscribable | Noop | void export type UnaryFunction = (source: TSource) => TReturn export type MonoTypeOperatorFunction = OperatorFunction< TValue, TError, TValue, TError > export type OperatorFunction< TValueBefore = any, TErrorBefore = any, TValueAfter = any, TErrorAfter = any, > = UnaryFunction, Subscribable> export type Observer = { next: (value: TValue) => void error: (err: TError) => void complete: () => void } export type Unsubscribable = { unsubscribe(): void } export type InferObservableValue = TObservable extends Observable ? TValue : never export function isObservable(x: unknown): x is Observable { return typeof x === 'object' && x !== null && 'subscribe' in x } export function pipeReducer(previousValue: any, next: UnaryFunction) { return next(previousValue) } export function promisifyObservable(observable: Observable) { let abort = noop const promise = new Promise((resolve, reject) => { let isDone = false const onDone = () => { if (isDone) return isDone = true reject(new ObservableAbortError('This operation was aborted.')) obs$.unsubscribe() } const obs$ = observable.subscribe({ next: (data) => { isDone = true resolve(data) onDone() }, error: (data) => { isDone = true reject(data) onDone() }, complete: () => { isDone = true onDone() }, }) abort = onDone }) return { promise, abort } } export class ObservableAbortError extends Error { constructor(message?: string) { super(message) this.name = 'ObservableAbortError' Object.setPrototypeOf(this, ObservableAbortError.prototype) } } export class Subscribable { constructor(public onSubscribe: (observer: Observer) => TeardownLogic) {} subscribe(observer?: Partial>): Unsubscribable { let teardownRef: TeardownLogic | null = null let isDone = false let unsubscribed = false let teardownImmediately = false const unsubscribe = () => { if (unsubscribed) return if (teardownRef === null) { teardownImmediately = true return } unsubscribed = true if (typeof teardownRef === 'function') { teardownRef() } else if (teardownRef) { teardownRef.unsubscribe() } } teardownRef = this.onSubscribe({ next: (value) => { if (isDone) return observer?.next?.(value) }, error: (err) => { if (isDone) return isDone = true observer?.error?.(err) unsubscribe() }, complete: () => { if (isDone) return isDone = true observer?.complete?.() unsubscribe() }, }) if (teardownImmediately) { unsubscribe() } return { unsubscribe, } } } export class Observable extends Subscribable { constructor(onSubscribe: (observer: Observer) => TeardownLogic) { super(onSubscribe) } pipe(): Observable pipe( op1: OperatorFunction, ): Observable pipe( op1: OperatorFunction, op2: OperatorFunction, ): Observable pipe( op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, ): Observable pipe( op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, ): Observable pipe( op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, op5: OperatorFunction, ): Observable pipe(...operations: OperatorFunction[]): Observable { return operations.reduce(pipeReducer, this) } }