import { GroupedObservable, ObservableInput, OperatorFunction, pipe, Subject } from 'rxjs'; import { groupBy, map, mergeScan, scan } from 'rxjs/operators'; interface StateAndOptionalOutput { state: T; output?: O; } export type StateAndOutput = Required>; export function mergeMapWithState( fn: (state: T, input: I) => ObservableInput>, initialState: T, concurrency = 1 ): OperatorFunction { return pipe( mergeScan( ({ state }: StateAndOptionalOutput, input: I): ObservableInput> => fn(state, input), { state: initialState }, concurrency ), map(({ output }) => output!) ); } export function mapWithState( fn: (state: T, input: I) => StateAndOutput, initialState: T ): OperatorFunction { return pipe( scan(({ state }: StateAndOptionalOutput, input: I): StateAndOptionalOutput => fn(state, input), { state: initialState, }), map(({ output }) => output!) ); } export interface CloseableGroupedObservable extends GroupedObservable { close(): void; } /** * An RX OperatorFunction similar to `groupBy`. * The returned GroupedObservable has a `close()` method. */ export function closeableGroupBy( keyFunc: (t: T) => K ): OperatorFunction> { const keyToSubject = new Map>(); return pipe( groupBy(keyFunc, { duration: (group$) => { // Duration selector function, the group will close when this subject emits a value const subject = new Subject(); keyToSubject.set(group$.key, subject); return subject; }, }), map((group$: GroupedObservable): CloseableGroupedObservable => { (group$ as CloseableGroupedObservable).close = () => { const subject = keyToSubject.get(group$.key); if (subject !== undefined) { subject.next(); keyToSubject.delete(group$.key); } }; return group$ as CloseableGroupedObservable; }) ); }