import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Halts the evaluation of this stream when the provided IO completes. The * given IO will be forked as part of the returned stream, and its success * will be discarded. * * An element in the process of being pulled will not be interrupted when the * IO completes. See `interruptWhen` for this behavior. * * If the IO completes with a failure, the stream will emit that failure. * * @tsplus static effect/core/stream/Stream.Aspects haltWhen * @tsplus pipeable effect/core/stream/Stream haltWhen */ export function haltWhen(io: Effect) { return (self: Stream): Stream => { concreteStream(self) return new StreamInternal( Channel.unwrapScoped( io.forkScoped.map((fiber) => self.channel >> writer(fiber)) ) ) } } function writer( fiber: Fiber ): Channel, unknown, E | E2, Chunk, void> { return Channel.unwrap( fiber.poll.map((option) => option.fold( Channel.readWith( (input: Chunk) => Channel.write(input).flatMap(() => writer(fiber)), (err) => Channel.fail(err), () => Channel.unit ), (exit) => exit.fold( (cause) => Channel.failCause(cause), (): Channel, unknown, E | E2, Chunk, void> => Channel.unit ) ) ) ) }