/** * Create a TripleStreamBatcher with the given options * * @param {BatcherOptions} [options={}] - Batcher configuration * @returns {TripleStreamBatcher} New batcher instance */ export function createTripleStreamBatcher(options?: BatcherOptions): TripleStreamBatcher; /** * TripleStreamBatcher - Efficient batching for bulk RDF triple operations * * Accumulates triples into batches for efficient transfer between BEAM and Oxigraph. * Supports configurable batch sizes, timeouts, backpressure, and async streaming. * * @example * const batcher = new TripleStreamBatcher({ batchSize: 100, timeout: 50 }); * * batcher.onBatch(async (batch) => { * await oxigraphBridge.insertBatch(batch); * return { success: true }; * }); * * for (const triple of triples) { * batcher.addTriple(triple); * } * * await batcher.flush(); */ export class TripleStreamBatcher { /** * Create a new TripleStreamBatcher * * @param {BatcherOptions} [options={}] - Batcher configuration */ constructor(options?: BatcherOptions); /** @type {number} */ batchSize: number; /** @type {number} */ timeout: number; /** @type {number} */ maxQueueSize: number; /** @type {Triple[]} */ queue: Triple[]; /** @type {BatcherState} */ state: BatcherState; /** @type {NodeJS.Timeout|null} */ timeoutHandle: NodeJS.Timeout | null; /** @type {((batch: Triple[]) => Promise)|null} */ batchCallback: ((batch: Triple[]) => Promise) | null; /** @type {BatchMetrics} */ metrics: BatchMetrics; /** @type {number} */ startTime: number; /** @type {number[]} */ latencies: number[]; /** @type {boolean} */ backpressureActive: boolean; /** * Check if batcher is ready to accept triples * * @returns {boolean} True if batcher can accept triples */ isReady(): boolean; /** * Add a single triple to the batch queue * * @param {Triple} triple - Triple to add * @throws {Error} If batcher is destroyed or paused * @throws {TypeError} If triple is invalid */ addTriple(triple: Triple): void; /** * Add multiple triples to the batch queue * * @param {Triple[]} triples - Array of triples to add * @throws {Error} If batcher is destroyed * @throws {TypeError} If triples is not an array or contains invalid triples */ addTriples(triples: Triple[]): void; /** * Force flush any pending triples in the queue * * @returns {Promise} Resolves when flush is complete */ flush(): Promise; /** * Register a callback for when a batch is ready * * The callback receives the batch of triples and should return a BatchResult. * If the callback returns { slow: true }, backpressure will be applied. * * @param {(batch: Triple[]) => Promise} callback - Batch handler */ onBatch(callback: (batch: Triple[]) => Promise): void; /** * Stream triples from an async iterable source * * Consumes an async iterable of triples and batches them efficiently. * Handles backpressure automatically. * * @param {AsyncIterable} asyncIterable - Async source of triples * @returns {Promise} Final metrics after streaming completes */ streamTriples(asyncIterable: AsyncIterable): Promise; /** * Get current batcher metrics * * @returns {BatchMetrics} Current metrics */ getMetrics(): BatchMetrics; /** * Get current queue size * * @returns {number} Number of triples in queue */ getQueueSize(): number; /** * Get current batcher state * * @returns {BatcherState} Current state */ getState(): BatcherState; /** * Resume batcher after backpressure pause */ resume(): void; /** * Destroy the batcher and release resources */ destroy(): void; /** * Start the timeout timer for partial batch flushing * * @private */ private _startTimeout; /** * Clear the timeout timer * * @private */ private _clearTimeout; /** * Send the current batch to the callback * * @private * @returns {Promise} */ private _sendBatch; } export default TripleStreamBatcher; /** * Batcher state machine states */ export type BatcherState = "Idle" | "Accumulating" | "Flushing" | "Paused" | "Destroyed"; /** * Triple object structure for RDF triples */ export type Triple = { /** * - RDF subject term */ subject: any; /** * - RDF predicate term */ predicate: any; /** * - RDF object term */ object: any; /** * - Optional RDF graph term */ graph?: any; }; /** * Batch callback result indicating consumer status */ export type BatchResult = { /** * - Whether batch was processed successfully */ success: boolean; /** * - Whether consumer is slow (triggers backpressure) */ slow?: boolean; /** * - Error message if failed */ error?: string; }; /** * Batcher options configuration */ export type BatcherOptions = { /** * - Number of triples per batch */ batchSize?: number; /** * - Timeout in ms before flushing partial batch */ timeout?: number; /** * - Maximum queue size before backpressure */ maxQueueSize?: number; }; /** * Metrics for batch operations */ export type BatchMetrics = { /** * - Total triples processed */ totalTriples: number; /** * - Total batches sent */ totalBatches: number; /** * - Average batch size */ avgBatchSize: number; /** * - Average batch latency in milliseconds */ avgLatencyMs: number; /** * - Triples per second */ throughput: number; /** * - Number of backpressure events */ backpressureEvents: number; };