import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Halts the evaluation of this stream when the provided deferred resolves. * * If the deferred completes with a failure, the stream will emit that failure. * * @tsplus static effect/core/stream/Stream.Aspects haltWhenDeferred * @tsplus pipeable effect/core/stream/Stream haltWhenDeferred */ export function haltWhenDeferred(deferred: Deferred) { return (self: Stream): Stream => { const writer: Channel< R, E, Chunk, unknown, E | E2, Chunk, void > = Channel.unwrap( deferred.poll .map((option) => option.fold( Channel.readWith( (input: Chunk) => Channel.write(input).flatMap(() => writer), (err) => Channel.fail(err), () => Channel.unit ), (io) => Channel.unwrap( io.fold( (e) => Channel.fail(e), () => Channel.unit ) ) ) ) ) concreteStream(self) return new StreamInternal(self.channel >> writer) } }