import { Desc, Event, EventStream, EventStreamSeed, isValue, MethodDesc, Observer, valueEvent, } from "./abstractions" import GlobalScheduler from "./scheduler" import { StreamTransformer, transform } from "./transform" import { nop } from "./util" export type VoidFunction = () => void /** * Delay function used by `bufferWithTime` and `bufferWithTimeOrCount`. Your implementation should * call the given void function to cause a buffer flush. */ export type DelayFunction = (f: VoidFunction) => any // TODO: scoped versions export function bufferWithTime( delay: number | DelayFunction ): (src: EventStream | EventStreamSeed) => EventStreamSeed { return (src) => bufferWithTimeOrCount( ["bufferWithTime", [delay]], src, delay, Number.MAX_VALUE ) } export function bufferWithCount( count: number ): (src: EventStream | EventStreamSeed) => EventStreamSeed { return (src) => bufferWithTimeOrCount(["bufferWithCount", [count]], src, undefined, count) } function bufferWithTimeOrCount( desc: MethodDesc, src: EventStream | EventStreamSeed, delay?: number | DelayFunction, count?: number ): EventStreamSeed { const delayFunc = toDelayFunction(delay) function flushOrSchedule(buffer: Buffer) { if (buffer.values.length === count) { return buffer.flush() } else if (delayFunc !== undefined) { return buffer.schedule(delayFunc) } } return buffer(desc, src, flushOrSchedule, flushOrSchedule) } class Buffer { constructor(onFlush: BufferHandler, onInput: BufferHandler) { this.onFlush = onFlush this.onInput = onInput } delay?: DelayFunction onInput: BufferHandler onFlush: BufferHandler onValue: Observer = (e) => undefined onEnd: Observer = (e) => undefined scheduled: number | null = null ended: boolean = false values: V[] = [] flush() { if (this.scheduled) { GlobalScheduler.scheduler.clearTimeout(this.scheduled) this.scheduled = null } if (this.values.length > 0) { var valuesToPush = this.values this.values = [] this.onValue(valuesToPush) if (this.ended) { this.onEnd() } else { this.onFlush(this) } } else { if (this.ended) { this.onEnd() } } } schedule(delay: DelayFunction) { if (!this.scheduled) { return (this.scheduled = delay(() => { return this.flush() })) } } } function toDelayFunction( delay: number | DelayFunction | undefined ): DelayFunction | undefined { if (delay === undefined) { return undefined } if (typeof delay === "number") { var delayMs = delay return function (f) { return GlobalScheduler.scheduler.setTimeout(f, delayMs) } } return delay } type BufferHandler = (buffer: Buffer) => any function buffer( desc: MethodDesc, src: EventStream | EventStreamSeed, onInput: BufferHandler = nop, onFlush: BufferHandler = nop ): EventStreamSeed { const transformer: StreamTransformer = (subscribe) => (onValue, onEnd = nop) => { var buffer = new Buffer(onFlush, onInput) buffer.onValue = onValue buffer.onEnd = onEnd return subscribe( (value) => { buffer.values.push(value) onInput(buffer) }, () => { buffer.ended = true if (!buffer.scheduled) { buffer.flush() } } ) } return transform(desc, transformer)(src) }