import { ActorRef } from "./actor.js"; import { ActorSystem, SpawnOptions } from "./actor_system.js"; import { ChildCounts, ChildInfo } from "./dynamic_supervisor.js"; /** * Options for a Producer stage. */ export interface ProducerOptions { /** Max buffered events when no demand. Default: 10000 */ bufferSize?: number; /** Keep "first" or "last" events when buffer overflows. Default: "last" */ bufferKeep?: "first" | "last"; /** Dispatcher strategy. Default: { type: "demand" } */ dispatcher?: DispatcherConfig; /** * Demand mode. Default: "forward" * - "forward": demand from consumers is processed immediately (default) * - "accumulate": demand is accumulated but not forwarded to handleDemand. * Call `producer.demand("forward")` to resume after all consumers have subscribed. * Inspired by Elixir's `{:producer, state, demand: :accumulate}`. */ demand?: "accumulate" | "forward"; } /** Hash function for PartitionDispatcher. Return partition index or null to discard. */ export type PartitionHashFunction = (event: any) => number | null; /** Dispatcher configuration for producers. */ export type DispatcherConfig = { type: "demand"; } | { type: "broadcast"; } | { type: "partition"; partitions: number; hash?: PartitionHashFunction; }; /** * Options for subscribing a consumer to a producer. */ export interface SubscriptionOptions { /** Max events in flight per subscription. Default: 1000 */ maxDemand?: number; /** Threshold to request more events. Default: floor(maxDemand * 0.75) */ minDemand?: number; /** Cancel behavior. Default: "temporary" * - "permanent": consumer crashes if producer cancels * - "transient": consumer ignores normal cancels, crashes on others * - "temporary": consumer always ignores cancels (default) */ cancel?: "permanent" | "transient" | "temporary"; /** Partition to subscribe to (required for PartitionDispatcher). */ partition?: number | string; } export interface ChildSpec { actorClass: any; args?: any[]; spawnOptions?: Omit; } export interface ConsumerSupervisorOptions { maxRestarts?: number; periodMs?: number; } /** Subscription reference returned from subscribe(). */ export interface SubscriptionRef { tag: string; producerRef: ActorRef; } /** * Callbacks for a Producer stage. * @template TState The producer's internal state type */ export interface ProducerCallbacks { /** Initialize state. Receives spawn args. */ init: (...args: any[]) => TState; /** Called when demand arrives. Return [events, newState]. */ handleDemand: (demand: number, state: TState) => [events: any[], newState: TState]; } /** * Callbacks for a Consumer stage. * @template TState The consumer's internal state type */ export interface ConsumerCallbacks { /** Initialize state (optional). */ init?: (...args: any[]) => TState; /** Called when events arrive from a producer. Return new state. */ handleEvents: (events: any[], from: { tag: string; producerRef: ActorRef; }, state: TState) => TState; } /** * Callbacks for a ProducerConsumer stage. * @template TState The stage's internal state type */ export interface ProducerConsumerCallbacks { /** Initialize state (optional). */ init?: (...args: any[]) => TState; /** Called when events arrive. Return [outputEvents, newState]. */ handleEvents: (events: any[], from: { tag: string; producerRef: ActorRef; }, state: TState) => [events: any[], newState: TState]; } export interface ConsumerSupervisorCallbacks { init?: (...args: any[]) => TState; } /** * A Producer stage that emits events in response to consumer demand. * * @example * ```typescript * const producer = Producer.start(system, { * init: () => 0, * handleDemand: (demand, counter) => { * const events = Array.from({length: demand}, (_, i) => counter + i); * return [events, counter + demand]; * }, * }); * ``` */ export declare class Producer { private readonly actorRef; private readonly system; private constructor(); /** * Start a new Producer stage. * * @param system The actor system * @param callbacks Producer callbacks (init, handleDemand) * @param options Producer options (bufferSize, bufferKeep) * @param spawnOptions Spawn options for the underlying actor (name, etc.) * @param args Arguments passed to callbacks.init() */ static start(system: ActorSystem, callbacks: ProducerCallbacks, options?: ProducerOptions, spawnOptions?: SpawnOptions, ...args: any[]): Producer; /** Stop the producer and cancel all subscriptions. */ stop(): Promise; /** * Switch the producer's demand mode. * Call `producer.demand("forward")` after subscribing all consumers * to resume event production when started with `demand: "accumulate"`. */ demand(mode: "forward"): void; /** Get the producer's ActorRef (for subscribing consumers). */ getRef(): ActorRef; } /** * A Consumer stage that receives events from upstream producers. * * @example * ```typescript * const consumer = Consumer.start(system, { * handleEvents: (events, _from, state) => { * console.log("Received:", events); * return state; * }, * }); * * await consumer.subscribe(producer.getRef(), { maxDemand: 10 }); * ``` */ export declare class Consumer { private readonly actorRef; private readonly system; private readonly actor; private constructor(); /** * Start a new Consumer stage. * * @param system The actor system * @param callbacks Consumer callbacks (handleEvents, optional init) * @param spawnOptions Spawn options for the underlying actor * @param args Arguments passed to callbacks.init() */ static start(system: ActorSystem, callbacks: ConsumerCallbacks, spawnOptions?: SpawnOptions, ...args: any[]): Consumer; /** * Subscribe to a producer. Returns a subscription reference. * * @param producerRef The producer's ActorRef * @param options Subscription options (maxDemand, minDemand, cancel) */ subscribe(producerRef: ActorRef, options?: SubscriptionOptions): Promise; /** * Cancel a subscription. * * @param ref The subscription to cancel */ cancel(ref: SubscriptionRef): boolean; /** Stop the consumer and cancel all subscriptions. */ stop(): Promise; /** Get the consumer's ActorRef. */ getRef(): ActorRef; } export declare class ConsumerSupervisor { private readonly actorRef; private readonly system; private readonly actor; private constructor(); static start(system: ActorSystem, childSpec: ChildSpec, options?: ConsumerSupervisorOptions, spawnOptions?: SpawnOptions): ConsumerSupervisor; subscribe(producerRef: ActorRef, options?: SubscriptionOptions): Promise; cancel(ref: SubscriptionRef): boolean; whichChildren(): Promise; countChildren(): Promise; stop(): Promise; getRef(): ActorRef; } /** * A ProducerConsumer stage that transforms events between upstream and downstream. * * @example * ```typescript * const multiplier = ProducerConsumer.start(system, { * init: (factor: number) => factor, * handleEvents: (events, _from, factor) => { * return [events.map(e => e * factor), factor]; * }, * }); * * await multiplier.subscribe(producer.getRef(), { maxDemand: 10 }); * // Then: consumer.subscribe(multiplier.getRef(), { maxDemand: 10 }); * ``` */ export declare class ProducerConsumer { private readonly actorRef; private readonly system; private readonly actor; private constructor(); /** * Start a new ProducerConsumer stage. * * @param system The actor system * @param callbacks ProducerConsumer callbacks (handleEvents, optional init) * @param producerOptions Options for the producer side (bufferSize, bufferKeep) * @param spawnOptions Spawn options for the underlying actor * @param args Arguments passed to callbacks.init() */ static start(system: ActorSystem, callbacks: ProducerConsumerCallbacks, producerOptions?: ProducerOptions, spawnOptions?: SpawnOptions, ...args: any[]): ProducerConsumer; /** * Subscribe to an upstream producer. */ subscribe(producerRef: ActorRef, options?: SubscriptionOptions): Promise; /** * Cancel an upstream subscription. */ cancelUpstream(ref: SubscriptionRef): boolean; /** Stop the stage, cancelling all upstream and downstream subscriptions. */ stop(): Promise; /** Get the stage's ActorRef (for subscribing downstream consumers or upstream producers). */ getRef(): ActorRef; } //# sourceMappingURL=gen_stage.d.ts.map