import type { DebouncedFunc } from 'lodash' import throttle from 'lodash/throttle' import { isPromise } from '../lib/is-promise' import { Emitter } from './event-emitter' type MaybePromise = T | Promise export interface QueueOptions { flushInterval?: number onSend?: (items: T[]) => MaybePromise } export class Queue extends Emitter { readonly flushed: T[] = [] readonly queue = new Set() readonly scheduleSend: DebouncedFunc<() => void> private flushing = false constructor(options: QueueOptions = {}) { super() if (options.onSend) { this.onSend = options.onSend } this.scheduleSend = throttle(this.send, options.flushInterval || 10_000, { leading: false, trailing: true }) } add(item: T) { this.queue.add(item) // Trigger the throttled function, which will eventually flush the queue this.scheduleSend() } flush() { this.scheduleSend.flush() } get all() { return this.flushed.concat(Array.from(this.queue)) } reset() { this.queue.clear() this.flushed.length = 0 } // Placeholder method to be overridden by subclasses or via configuration options protected onSend(_items: T[]): MaybePromise { return true } /** * Simplistic promise queue * Attempt to deliver all items in the queue, and then clear it. * If `this.onSend` returns false, the queue will not be cleared. * If `this.onSend` throws, the queue will not be cleared. * If we are already flushing, we'll delay the attempt by scheduling another flush. */ send(force = false) { // schedule another `send` if we're waiting for a previous delivery to finish if (this.flushing && !force) { setTimeout(() => this.scheduleSend(), 0) return } try { this.flushing = true if (this.queue.size) { const items = Array.from(this.queue) const shouldSend = this.onSend(items) const finalize = (send?: unknown) => { if (send !== false) { this.flushed.push(...items) this.queue.clear() this.emit('processed', items) } } // handle promise case (browsers that don't support sendBeacon) if (isPromise(shouldSend)) { shouldSend.then(finalize).finally(() => { this.flushing = false }) } else { finalize(shouldSend) } } } finally { this.flushing = false } } }