import { Handoff } from "@effect/core/stream/Stream/operations/_internal/Handoff" import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Combines this stream and the specified stream deterministically using the * stream of boolean values `b` to control which stream to pull from next. * `true` indicates to pull from this stream and `false` indicates to pull * from the specified stream. Only consumes as many elements as requested by * `b`. If either this stream or the specified stream are exhausted further * requests for values from that stream will be ignored. * * @tsplus static effect/core/stream/Stream.Aspects interleaveWith * @tsplus pipeable effect/core/stream/Stream interleaveWith */ export function interleaveWith( that: Stream, b: Stream ) { return (self: Stream): Stream => { concreteStream(self) return new StreamInternal( Channel.unwrapScoped( Do(($) => { const left = $(Handoff.make>()) const right = $(Handoff.make>()) $( (self.channel.concatMap(Channel.writeChunk) >> producer(left)) .runScoped .forkScoped ) concreteStream(that) $( (that.channel.concatMap(Channel.writeChunk) >> producer(right)) .runScoped .forkScoped ) concreteStream(b) return b.channel.concatMap(Channel.writeChunk) >> process(left, right, false, false) }) ) ) } } function producer( handoff: Handoff> ): Channel { return Channel.readWithCause( (value: A | A2) => Channel.fromEffect(handoff.offer(Take.single(value))) .flatMap(() => producer(handoff)), (cause) => Channel.fromEffect(handoff.offer(Take.failCause(cause))), () => Channel.fromEffect(handoff.offer(Take.end)) ) } function process( left: Handoff>, right: Handoff>, leftDone: boolean, rightDone: boolean ): Channel, void> { return Channel.readWithCause( (bool: boolean) => { if (bool && !leftDone) { return Channel.fromEffect(left.take).flatMap((take) => take.fold( rightDone ? Channel.unit : process(left, right, true, rightDone), (cause) => Channel.failCause(cause), (chunk) => Channel.write(chunk).flatMap(() => process(left, right, leftDone, rightDone)) ) ) } if (!bool && !rightDone) { return Channel.fromEffect(right.take).flatMap((take) => take.fold( leftDone ? Channel.unit : process(left, right, leftDone, true), (cause) => Channel.failCause(cause), (chunk) => Channel.write(chunk).flatMap(() => process(left, right, leftDone, rightDone)) ) ) } return process(left, right, leftDone, rightDone) }, (cause) => Channel.failCause(cause), () => Channel.unit ) }