import { IBus, IStreamableBus } from "./base"; import {  pump, DuplexStream, ReadableStream, WritableStream, wrapDuplexStream, ReadableStreamDefaultReader } from "../streams"; import { IteratorType, sequenceIterator, parallelIterator, createRandomIterator, createRoundRobinIterator, } from "../utils"; export type FanoutBusTargetsParamType = IBus[] | ((message: T) => IBus[]); // TODO - weighted bus export class FanoutBus implements IStreamableBus { private getTargets: (message: T) => IBus[]; constructor(private _targets: FanoutBusTargetsParamType, private _iterator: IteratorType>) { this.getTargets = typeof _targets === "function" ? _targets : () => _targets; } dispatch(message: T) { return new DuplexStream((input, output) => { const writer = output.getWriter(); let spare: ReadableStream = input, child: ReadableStream; let pending = 0; this._iterator(this.getTargets(message), (target: IBus) => { let response = target.dispatch(message); if (response == null) { return Promise.resolve(); } [spare, child] = spare.tee(); response = wrapDuplexStream(response); pending++; return child .pipeThrough(response) .pipeTo(new WritableStream({ write(chunk) { return writer.write(chunk); }, close: () => pending--, abort: () => pending-- })) }) .then(writer.close.bind(writer)) .catch(writer.abort.bind(writer)) .catch((e) => { }); }); } } export const createFanoutBus = (targets: FanoutBusTargetsParamType, iterator: IteratorType>) => new FanoutBus(targets, iterator); /** * Executes a message against all target busses in one after the other. */ export class SequenceBus extends FanoutBus { constructor(targets: FanoutBusTargetsParamType) { super(targets, sequenceIterator); } } export const createSequenceBus = (targets: FanoutBusTargetsParamType) => new SequenceBus(targets); /** * Executes a message against all target busses at the same time. */ export class ParallelBus extends FanoutBus { constructor(targets: FanoutBusTargetsParamType) { super(targets, parallelIterator); } } export const createParallelBus = (targets: FanoutBusTargetsParamType) => new ParallelBus(targets); /** * Executes a message against one target bus that is rotated with each message. */ export class RoundRobinBus extends FanoutBus { constructor(targets: FanoutBusTargetsParamType) { super(targets, createRoundRobinIterator()); } } export const createRoundRobinBus = (targets: FanoutBusTargetsParamType) => new RoundRobinBus(targets); /** * Executes a message against one target bus that is selected at random. */ export class RandomBus extends FanoutBus { constructor(targets: FanoutBusTargetsParamType, weights?: number[]) { super(targets, createRandomIterator(weights)); } } export const createRandomBus = (targets: FanoutBusTargetsParamType, weights?: number[]) => new RandomBus(targets, weights);