import { Handoff } from "@effect/core/stream/Stream/operations/_internal/Handoff" import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Combines the chunks from this stream and the specified stream by repeatedly * applying the function `f` to extract a chunk using both sides and * conceptually "offer" it to the destination stream. `f` can maintain some * internal state to control the combining process, with the initial state * being specified by `s`. * * @tsplus static effect/core/stream/Stream.Aspects combineChunks * @tsplus pipeable effect/core/stream/Stream combineChunks */ export function combineChunks( that: Stream, s: S, f: ( s: S, pullLeft: Effect, Chunk>, pullRight: Effect, Chunk> ) => Effect, readonly [Chunk, S]>> ) { return (self: Stream): Stream => new StreamInternal( Channel.unwrapScoped( Do(($) => { const left = $(Handoff.make>()) const right = $(Handoff.make>()) const latchL = $(Handoff.make()) const latchR = $(Handoff.make()) concreteStream(self) $((self.channel >> producer(left, latchL)).runScoped.forkScoped) concreteStream(that) $((that.channel >> producer(right, latchR)).runScoped.forkScoped) const pullLeft = latchL.offer(undefined) .zipRight(left.take.flatMap((take) => take.done)) const pullRight = latchR.offer(undefined) .zipRight(right.take.flatMap((take) => take.done)) const stream = Stream.unfoldChunkEffect( s, (s) => f(s, pullLeft, pullRight).flatMap((exit) => Effect.done(exit).unsome) ) concreteStream(stream) return stream.channel }) ) ) } function producer( handoff: Handoff>, latch: Handoff ): Channel, unknown, never, never, unknown> { return ( Channel.fromEffect(latch.take).flatMap(() => Channel.readWithCause( (chunk: Chunk) => Channel.fromEffect(handoff.offer(Take.chunk(chunk))) .flatMap(() => producer(handoff, latch)), (cause) => Channel.fromEffect(handoff.offer(Take.failCause(cause))), () => Channel.fromEffect(handoff.offer(Take.end)).flatMap(() => producer(handoff, latch)) ) ) ) }