import * as E from "../../Either"; import { pipe } from "../../Function"; import * as O from "../../Option"; import * as C from "../Exit/Cause"; import * as M from "../Managed"; import * as T from "../Task"; import * as Sink from "./internal/Sink"; import type { Stream } from "./model"; /** * Runs the sink on the stream to produce either the sink's result or an error. */ export const runManaged_ = ( stream: Stream, sink: Sink.Sink ): M.Managed => pipe( M.both_(stream.proc, sink.push), M.mapM(([pull, push]) => { const go: T.Task = T.foldCauseM_( pull, (c): T.Task => pipe( C.sequenceCauseOption(c), O.fold( () => T.foldCauseM_( push(O.none()), (c) => pipe( c, C.map(([_]) => _), C.sequenceCauseEither, E.fold(T.halt, T.pure) ), () => T.die("empty stream / empty sinks") ), T.halt ) ), (os) => T.foldCauseM_( push(O.some(os)), (c): T.Task => pipe( c, C.map(([_]) => _), C.sequenceCauseEither, E.fold(T.halt, T.pure) ), () => go ) ); return go; }) ); /** * Runs the sink on the stream to produce either the sink's result or an error. */ export const runManaged = (sink: Sink.Sink) => (stream: Stream) => runManaged_(stream, sink); /** * Runs the sink on the stream to produce either the sink's result or an error. */ export const run_ = (stream: Stream, sink: Sink.Sink) => M.useNow(runManaged_(stream, sink)); /** * Runs the sink on the stream to produce either the sink's result or an error. */ export const run = (sink: Sink.Sink) => (stream: Stream) => run_(stream, sink); export const runCollect = (stream: Stream) => run_(stream, Sink.collectAll()); /** * Runs the stream and collects all of its elements to an array. */ export const runDrain = (stream: Stream): T.Task => pipe( stream, foreach((_) => T.unit()) ); /** * Consumes all elements of the stream, passing them to the specified callback. */ export const foreach_ = (stream: Stream, f: (a: A) => T.Task) => run_(stream, Sink.foreach(f)); /** * Consumes all elements of the stream, passing them to the specified callback. */ export const foreach = (f: (a: A) => T.Task) => (stream: Stream) => foreach_(stream, f); /** * Like `foreach`, but returns a `Managed` so the finalization order * can be controlled. */ export const foreachManaged_ = (stream: Stream, f: (a: A) => T.Task) => runManaged_(stream, Sink.foreach(f)); /** * Like `foreach`, but returns a `Managed` so the finalization order * can be controlled. */ export const foreachManaged = (f: (a: A) => T.Task) => (stream: Stream) => foreachManaged_(stream, f);