/** * Split a stream by a predicate. The faster stream may advance by up to * buffer elements further than the slower one. * * @tsplus static effect/core/stream/Stream.Aspects partitionEither * @tsplus pipeable effect/core/stream/Stream partitionEither */ export function partitionEither( p: (a: A) => Effect>, buffer = 16 ) { return (self: Stream): Effect< R | R2 | Scope, E | E2, readonly [Stream, Stream] > => self .mapEffect(p) .distributedWith(2, buffer, (either) => either.fold( () => Effect.succeed((_) => _ === 0), () => Effect.succeed((_) => _ === 1) )) .flatMap((dequeues) => { if (dequeues.length === 2) { return Effect.succeed([ Stream.fromQueueWithShutdown(dequeues.unsafeHead!) .flattenExitMaybe .collectLeft, Stream.fromQueueWithShutdown(dequeues.unsafeLast!) .flattenExitMaybe .collectRight ]) } return Effect.dieMessage( `Stream.partitionEither: expected two streams but received: ${dequeues.length}` ) }) }