import { debounce, throttle } from "../limit"; import { ControlledPromise } from "../promises"; /** * Performs a task whenever it becomes dirty. * * Implements debouncing, throttling, race-condition handling, and pause/resume. * * Largely useful to be able to trigger fetches whenever a value changes. * * Use case is perhaps somewhat niche, but I've had to implement similar logic in multiple places, so here it is. * * Used as a helper for `@matchlighter/fetcher`'s `TimedFetch` and in `@inst_proserv/toolkit`'s `DataFetcher` classes. */ export class ConcurrentTransactor { constructor(private readonly options: { active?: boolean, transact: () => Promise, handleResult: (result: T) => void, debounce?: number | false, max_debounce?: number | false, throttle?: number | false, /** If an older response arrives after a newer request has been issued, should it be ignored? */ filter_stale_results?: boolean, }) { this.options = { active: true, ...options } if (options.throttle) { this._performThrottled = throttle({ interval: options.throttle }, this._performThrottled); } else { this._performThrottled = this._performNow; } if (options.debounce) { this._performThrottledAndDebounced = debounce({ timeToStability: options.debounce, maxTime: options.max_debounce || Infinity }, this._performThrottled); } else { this._performThrottledAndDebounced = this._performThrottled; } if (!options.active) this._paused = true; } private _paused = false; /** The most recently completed transaction number */ private currentState = 0; /** The most recently queued transaction number */ private wantedState = 0; /** The most recently started transaction number */ private requestedState = 0; get isPaused() { return this._paused; } get isCaughtUp() { return this.currentState == this.wantedState; } get isExecuting() { return this.requestedState > this.currentState; } /** Mark the Transactor as dirty and schedule a transaction to be performed */ performSoon(skip_debounce = false) { this.wantedState++; if (this._paused) return; if (skip_debounce) { cancel_limit(this._performThrottledAndDebounced); this._performThrottled(); } else { this._performThrottledAndDebounced(); } } /** Mark the Transactor as dirty and perform a transaction immediately */ performNow() { // TODO Should this respect the paused state? this.wantedState++; cancel_limit(this._performThrottledAndDebounced); cancel_limit(this._performThrottled); this._performNow(); } /** Immediately perform a transaction if the Transactor is dirty */ performNowIfNeeded() { // TODO Should this respect the paused state? if (!this.isCaughtUp) { cancel_limit(this._performThrottledAndDebounced); cancel_limit(this._performThrottled); this._performNow(); } } /** Mark the Transactor as clean, cancelling any pending transactions */ markCaughtUp() { this.currentState = this.wantedState; this.requestedState = this.wantedState; cancel_limit(this._performThrottledAndDebounced); cancel_limit(this._performThrottled); this.resolveTokenPromises(); } /** Prevent the Transactor from starting any transactions (started transactions will continue) */ pause() { this._paused = true; cancel_limit(this._performThrottledAndDebounced); cancel_limit(this._performThrottled); } /** Un-pause the Transactor. Will queue a transaction if dirty */ resume(immediate = false) { this._paused = false; if (!this.isCaughtUp) { if (immediate) { this._performNow(); } else { this._performThrottledAndDebounced(); } } } private caughtupPromise: ControlledPromise; private tokenPromises: Record }> = {}; private resolveTokenPromises() { for (let { token, promise } of Object.values(this.tokenPromises)) { if (token <= this.currentState) { promise.resolve(); delete this.tokenPromises[token]; } } if (this.caughtupPromise && this.isCaughtUp) { this.caughtupPromise.resolve(); this.caughtupPromise = null; } } /** * Wait until the Transactor is caught up to any already-triggered triggers. * Has the same effect as `catchupToAll()` if `filter_stale_results: true`. */ async catchupToCurrent() { const currentToken = this.wantedState; // Already caught up if (this.currentState == currentToken) return; this.tokenPromises[currentToken] ||= { token: currentToken, promise: new ControlledPromise() }; await this.tokenPromises[currentToken].promise; } /** Wait until the Transactor has no pending triggers */ async catchupToAll() { // Already caught up if (this.currentState == this.wantedState) return; this.caughtupPromise ||= new ControlledPromise(); await this.caughtupPromise; } private async performTransaction() { if (this._paused) return; const token = this.wantedState; // We're already up to date if (token == this.currentState) return; // This request has already been made if (this.requestedState >= token) return; this.requestedState = token; const result = await this.options.transact(); if (this.options.filter_stale_results && token < this.wantedState) return; if (this.currentState >= token) return; this.currentState = token; await this.options.handleResult(result); this.resolveTokenPromises(); } private _performNow = () => { this.performTransaction() }; private _performThrottled: typeof this._performNow; private _performThrottledAndDebounced: typeof this._performNow; } function cancel_limit(limit) { limit.cancel_pending?.(); }