import { Handoff } from "@effect/core/stream/Stream/operations/_internal/Handoff" import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Combines the elements from this stream and the specified stream by * repeatedly applying the function `f` to extract an element using both sides * and conceptually "offer" it to the destination stream. `f` can maintain * some internal state to control the combining process, with the initial * state being specified by `s`. * * Where possible, prefer `Stream.combineChunks` for a more efficient * implementation. * * @tsplus static effect/core/stream/Stream.Aspects combine * @tsplus pipeable effect/core/stream/Stream combine */ export function combine( that: Stream, s: S, f: ( s: S, pullLeft: Effect, A>, pullRight: Effect, A2> ) => Effect, readonly [A3, S]>> ) { return (self: Stream): Stream => new StreamInternal( Channel.unwrapScoped( Do(($) => { const left = $(Handoff.make, A>>()) const right = $(Handoff.make, A2>>()) const latchL = $(Handoff.make()) const latchR = $(Handoff.make()) concreteStream(self) $( (self.channel.concatMap((chunk) => Channel.writeChunk(chunk)) >> producer(left, latchL)) .runScoped .forkScoped ) concreteStream(that) $( (that.channel.concatMap((chunk) => Channel.writeChunk(chunk)) >> producer(right, latchR)) .runScoped .forkScoped ) const pullLeft = latchL.offer(undefined) .zipRight(left.take.flatMap((exit) => Effect.done(exit))) const pullRight = latchR.offer(undefined) .zipRight(right.take.flatMap((exit) => Effect.done(exit))) const stream = Stream.unfoldEffect(s, (s) => f(s, pullLeft, pullRight).flatMap((exit) => Effect.done(exit).unsome)) concreteStream(stream) return stream.channel }) ) ) } function producer( handoff: Handoff, Elem>>, latch: Handoff ): Channel { return ( Channel.fromEffect(latch.take).flatMap(() => Channel.readWithCause( (value) => Channel.fromEffect(handoff.offer(Exit.succeed(value))) .flatMap(() => producer(handoff, latch)), (cause) => Channel.fromEffect(handoff.offer(Exit.failCause(cause.map(Maybe.some)))), () => Channel.fromEffect(handoff.offer(Exit.fail(Maybe.none))).flatMap(() => producer(handoff, latch) ) ) ) ) }