import { SinkInternal } from "@effect/core/stream/Sink/operations/_internal/SinkInternal" import { Handoff } from "@effect/core/stream/Stream/operations/_internal/Handoff" import { StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" type Signal = Emit | Halt | End export class Emit { readonly _tag = "Emit" constructor(readonly elements: Chunk) {} } export class Halt { readonly _tag = "Halt" constructor(readonly error: Cause) {} } export class End { readonly _tag = "End" } /** * Peels off enough material from the stream to construct a `Z` using the * provided `Sink` and then returns both the `Z` and the rest of the * `Stream` in a scope. Like all scoped values, the provided stream is valid * only within the scope. * * @tsplus static effect/core/stream/Stream.Aspects peel * @tsplus pipeable effect/core/stream/Stream peel */ export function peel(sink: Sink) { return ( self: Stream ): Effect]> => Do(($) => { const deferred = $(Deferred.make()) const handoff = $(Handoff.make>()) const consumer = sink.exposeLeftover .foldSink( (e) => Sink.fromEffect(deferred.fail(e)) > Sink.fail(e), ([z1, leftovers]) => { const loop: Channel< never, E, Chunk, unknown, E | E2, Chunk, void > = Channel.readWithCause( (chunk: Chunk) => Channel.fromEffect(handoff.offer(new Emit(chunk))).flatMap(() => loop), (cause) => Channel.fromEffect(handoff.offer(new Halt(cause))).flatMap(() => Channel.failCause(cause) ), () => Channel.fromEffect(handoff.offer(new End())).flatMap(() => Channel.unit) ) return new SinkInternal( Channel.fromEffect(deferred.succeed(z1)).flatMap(() => Channel.fromEffect(handoff.offer(new Emit(leftovers))) ).flatMap(() => loop) ) } ) const producer: Channel< never, unknown, unknown, unknown, E, Chunk, void > = Channel.unwrap( handoff.take.map((signal) => { switch (signal._tag) { case "Emit": { return Channel.write(signal.elements).flatMap(() => producer) } case "Halt": { return Channel.failCause(signal.error) } case "End": { return Channel.unit } } }) ) return self .tapErrorCause((cause) => deferred.failCause(cause)) .runScoped(consumer) .forkScoped .flatMap(() => deferred.await) .map((z) => [z, new StreamInternal(producer)] as const) }).flatten }