// ets_tracing: off import * as CS from "../../../../Cause/index.js" import * as CK from "../../../../Collections/Immutable/Chunk/index.js" import * as Tp from "../../../../Collections/Immutable/Tuple/index.js" import * as T from "../../../../Effect/index.js" import * as E from "../../../../Either/index.js" import * as Ex from "../../../../Exit/index.js" import * as O from "../../../../Option/index.js" import type * as C from "../core.js" import * as ZipChunks from "./_internal/zipChunks.js" import * as CombineChunks from "./combineChunks.js" class Running { readonly _tag = "Running" constructor(readonly excess: E.Either, CK.Chunk>) {} } class LeftDone { readonly _tag = "LeftDone" constructor(readonly excessL: CK.Chunk) {} } class RightDone { readonly _tag = "RightDone" constructor(readonly excessR: CK.Chunk) {} } class End { readonly _tag = "End" } type State = Running | LeftDone | RightDone | End function handleSuccess( f: (a: A, a1: A1) => B, leftUpd: O.Option>, rightUpd: O.Option>, excess: E.Either, CK.Chunk> ): Ex.Exit, Tp.Tuple<[CK.Chunk, State]>> { const [leftExcess, rightExcess] = E.fold_( excess, (l) => [l, CK.empty()] as const, (r) => [CK.empty(), r] as const ) const left = O.fold_( leftUpd, () => leftExcess, (upd) => CK.concat_(leftExcess, upd) ) const right = O.fold_( rightUpd, () => rightExcess, (upd) => CK.concat_(rightExcess, upd) ) const { tuple: [emit, newExcess] } = ZipChunks.zipChunks_(left, right, f) if (leftUpd._tag === "Some" && rightUpd._tag === "Some") { return Ex.succeed(Tp.tuple(emit, new Running(newExcess))) } if (leftUpd._tag === "None" && rightUpd._tag === "None") { return Ex.fail(O.none) } const newState: State = newExcess._tag === "Left" ? CK.isEmpty(newExcess.left) ? new End() : new LeftDone(newExcess.left) : CK.isEmpty(newExcess.right) ? new End() : new RightDone(newExcess.right) return Ex.succeed(Tp.tuple(emit, newState)) } /** * Zips this stream with another point-wise and applies the function to the paired elements. * * The new stream will end when one of the sides ends. */ export function zipWith_( self: C.Stream, that: C.Stream, f: (a: A, a1: A1) => B ): C.Stream { return CombineChunks.combineChunks_( self, that, new Running(E.left(CK.empty())) as State, (st, p1, p2) => { switch (st._tag) { case "End": { return T.succeed(Ex.fail(O.none)) } case "Running": { return T.catchAllCause_( T.zipWithPar_(T.optional(p1), T.optional(p2), (l, r) => handleSuccess(f, l, r, st.excess) ), (e) => T.succeed(Ex.halt(CS.map_(e, O.some))) ) } case "LeftDone": { return T.catchAllCause_( T.map_(T.optional(p2), (l) => handleSuccess(f, O.none, l, E.left(st.excessL)) ), (e) => T.succeed(Ex.halt(CS.map_(e, O.some))) ) } case "RightDone": { return T.catchAllCause_( T.map_(T.optional(p1), (r) => handleSuccess(f, r, O.none, E.right(st.excessR)) ), (e) => T.succeed(Ex.halt(CS.map_(e, O.some))) ) } } } ) } /** * Zips this stream with another point-wise and applies the function to the paired elements. * * The new stream will end when one of the sides ends. * * @ets_data_first zipWith_ */ export function zipWith( that: C.Stream, f: (a: A, a1: A1) => B ): (self: C.Stream) => C.Stream { return (self) => zipWith_(self, that, f) }