// ets_tracing: off import type * as CK from "../../../../Collections/Immutable/Chunk/index.js" import { pipe } from "../../../../Function/index.js" import * as M from "../../../../Managed/index.js" import * as Q from "../../../../Queue/index.js" import * as CH from "../../Channel/index.js" import * as TK from "../../Take/index.js" import type * as C from "../core.js" /** * Like `Stream#into`, but provides the result as a `Managed` to allow for scope * composition. */ export function runIntoManaged_( self: C.Stream, queue: Q.XQueue, Z> ): M.Managed { const writer: CH.Channel< R, E, CK.Chunk, unknown, E, TK.Take, any > = CH.readWithCause( (in_) => CH.zipRight_(CH.write(TK.chunk(in_)), writer), (cause) => CH.write(TK.halt(cause)), (_) => CH.write(TK.end) ) return pipe( self.channel[">>>"](writer), CH.mapOutEffect((_) => Q.offer_(queue, _)), CH.drain, CH.runManaged, M.asUnit ) } /** * Like `Stream#into`, but provides the result as a `Managed` to allow for scope * composition. * * @ets_data_first runIntoManaged_ */ export function runIntoManaged( queue: Q.XQueue, Z> ) { return (self: C.Stream) => runIntoManaged_(self, queue) }