import type { PaginationList, PaginationListWithCount } from './apiTypes.ts' import { Readable } from 'node:stream' type FetchPage = ( pageno: number, ) => | PaginationList | PromiseLike> | PaginationListWithCount | PromiseLike> export default class PaginationStream extends Readable { private _fetchPage: FetchPage private _nitems?: number private _pageno = 0 private _items: T[] = [] private _itemsRead = 0 constructor(fetchPage: FetchPage) { super({ objectMode: true }) this._fetchPage = fetchPage } override async _read() { if (this._items.length > 0) { this._itemsRead++ process.nextTick(() => this.push(this._items.pop())) return } if (this._nitems != null && this._itemsRead >= this._nitems) { process.nextTick(() => this.push(null)) return } try { const { items, ...rest } = await this._fetchPage(++this._pageno) if ('count' in rest) { this._nitems = rest.count } // Some endpoints can return a non-zero `count` even when the current query/page yields no // items (e.g. filtering by date range). An empty page is a reliable signal that we are done. if (items.length === 0) { process.nextTick(() => this.push(null)) return } this._items = Array.from(items) this._items.reverse() this._read() } catch (err) { this.emit('error', err) } } }