import { MergeDecision } from "@effect/core/stream/Channel/MergeDecision" import { concreteSink, SinkInternal } from "@effect/core/stream/Sink/operations/_internal/SinkInternal" /** * Runs both sinks in parallel on the input, using the specified merge * function as soon as one result or the other has been computed. * * @tsplus static effect/core/stream/Sink.Aspects raceWith * @tsplus pipeable effect/core/stream/Sink raceWith */ export function raceWith( that: Sink, leftDone: (exit: Exit) => MergeDecision, rightDone: (exit: Exit) => MergeDecision, capacity = 16 ) { return (self: Sink): Sink => Sink.unwrapScoped( Do(($) => { const hub = $(Hub.bounded, Chunk>>(capacity)) const c1 = $(Channel.fromHubScoped(hub)) const c2 = $(Channel.fromHubScoped(hub)) const reader = Channel.toHub(hub) concreteSink(self) concreteSink(that) const writer = (c1 >> self.channel).mergeWith(c2 >> that.channel, leftDone, rightDone) const channel = reader.mergeWith( writer, () => MergeDecision.await((exit) => Effect.done(exit)), (done) => MergeDecision.done(Effect.done(done)) ) return new SinkInternal(channel) }) ) }