import { zipChunks } from "@effect/core/stream/Stream/operations/_internal/zipChunks" type State = DrainLeft | DrainRight | PullBoth | PullLeft | PullRight class DrainLeft { readonly _tag = "DrainLeft" } class DrainRight { readonly _tag = "DrainRight" } class PullBoth { readonly _tag = "PullBoth" } class PullLeft { readonly _tag = "PullLeft" constructor(readonly rightChunk: Chunk) {} } class PullRight { readonly _tag = "PullRight" constructor(readonly leftChunk: Chunk) {} } /** * Zips this stream with another point-wise. The provided functions will be * used to create elements for the composed stream. * * The functions `left` and `right` will be used if the streams have different * lengths and one of the streams has ended before the other. * * @tsplus static effect/core/stream/Stream.Aspects zipAllWith * @tsplus pipeable effect/core/stream/Stream zipAllWith */ export function zipAllWith( that: Stream, left: (a: A) => A3, right: (a2: A2) => A3, both: (a: A, a2: A2) => A3 ) { return (self: Stream): Stream => self.combineChunks( that, new PullBoth() as State, pull(left, right, both) ) } function zipWithChunks( leftChunk: Chunk, rightChunk: Chunk, f: (a: A, a2: A2) => A3 ): readonly [Chunk, State] { const [out, either] = zipChunks(leftChunk, rightChunk, f) return either.fold( (leftChunk) => leftChunk.isEmpty ? [out, new PullBoth()] : [out, new PullRight(leftChunk)], (rightChunk) => rightChunk.isEmpty ? [out, new PullBoth()] : [out, new PullLeft(rightChunk)] ) } function pull( left: (a: A) => A3, right: (a2: A2) => A3, both: (a: A, a2: A2) => A3 ) { return ( state: State, pullLeft: Effect, Chunk>, pullRight: Effect, Chunk> ): Effect, readonly [Chunk, State]>> => { switch (state._tag) { case "DrainLeft": { return pullLeft.foldEffect( (err) => Effect.succeed(Exit.fail(err)), (leftChunk) => Effect.succeed(Exit.succeed([leftChunk.map(left), new DrainLeft()])) ) } case "DrainRight": { return pullRight.foldEffect( (err) => Effect.succeed(Exit.fail(err)), (rightChunk) => Effect.succeed( Exit.succeed([rightChunk.map(right), new DrainRight()]) ) ) } case "PullBoth": { return pullLeft .unsome .zipPar(pullRight.unsome) .foldEffect( (err) => Effect.succeed(Exit.fail(Maybe.some(err))), ([l, r]) => { if (l._tag === "Some" && r._tag === "Some") { const leftChunk = l.value const rightChunk = r.value if (leftChunk.isEmpty && rightChunk.isEmpty) { return pull(left, right, both)(new PullBoth(), pullLeft, pullRight) } else if (leftChunk.isEmpty) { return pull(left, right, both)( new PullLeft(rightChunk), pullLeft, pullRight ) } else if (rightChunk.isEmpty) { return pull(left, right, both)( new PullRight(leftChunk), pullLeft, pullRight ) } else { return Effect.succeed( Exit.succeed(zipWithChunks(leftChunk, rightChunk, both)) ) } } else if (l._tag === "Some" && r._tag === "None") { return Effect.succeed( Exit.succeed([l.value.map(left), new DrainLeft()]) ) } else if (l._tag === "None" && r._tag === "Some") { return Effect.succeed( Exit.succeed([r.value.map(right), new DrainRight()]) ) } else { return Effect.succeed(Exit.fail(Maybe.none)) } } ) } case "PullLeft": { return pullLeft.foldEffect( (option) => option.fold( Effect.succeed, readonly [Chunk, State]>>( Exit.succeed([state.rightChunk.map(right), new DrainRight()]) ), (err) => Effect.succeed, readonly [Chunk, State]>>( Exit.fail(Maybe.some(err)) ) ), (leftChunk) => leftChunk.isEmpty ? pull(left, right, both)( new PullLeft(state.rightChunk), pullLeft, pullRight ) : state.rightChunk.isEmpty ? pull(left, right, both)(new PullRight(leftChunk), pullLeft, pullRight) : Effect.succeed( Exit.succeed(zipWithChunks(leftChunk, state.rightChunk, both)) ) ) } case "PullRight": { return pullRight.foldEffect( (option) => option.fold( Effect.succeed, readonly [Chunk, State]>>( Exit.succeed([state.leftChunk.map(left), new DrainLeft()]) ), (err) => Effect.succeed, readonly [Chunk, State]>>( Exit.fail(Maybe.some(err)) ) ), (rightChunk) => rightChunk.isEmpty ? pull(left, right, both)( new PullRight(state.leftChunk), pullLeft, pullRight ) : state.leftChunk.isEmpty ? pull(left, right, both)(new PullLeft(rightChunk), pullLeft, pullRight) : Effect.succeed( Exit.succeed(zipWithChunks(state.leftChunk, rightChunk, both)) ) ) } } } }