// ets_tracing: off import "../../Operator/index.js" import type { Cause } from "../../Cause/index.js" import * as A from "../../Collections/Immutable/Chunk/index.js" import * as Tp from "../../Collections/Immutable/Tuple/index.js" import * as E from "../../Either/index.js" import { pipe } from "../../Function/index.js" import type * as O from "../../Option/index.js" import * as T from "../_internal/effect.js" import * as M from "../_internal/managed.js" import * as R from "../_internal/ref.js" export interface Push { (_: O.Option>): T.Effect, A.Chunk]>, void> } export function emit( z: Z, leftover: A.Chunk ): T.IO, A.Chunk]>, never> { return T.fail(Tp.tuple(E.right(z), leftover)) } export function fail( e: E, leftover: A.Chunk ): T.IO, A.Chunk]>, never> { return T.fail(Tp.tuple(E.left(e), leftover)) } export function halt( c: Cause ): T.IO, A.Chunk]>, never> { return T.mapError_(T.halt(c), (e) => Tp.tuple(E.left(e), A.empty())) } export const more = T.unit /** * Decorates a Push with a Effect value that re-initializes it with a fresh state. */ export function restartable( sink: M.Managed> ): M.Managed, T.Effect]>> { return pipe( M.do, M.bind("switchSink", () => M.switchable>()), M.bind("initialSink", ({ switchSink }) => T.toManaged(switchSink(sink))), M.bind("currSink", ({ initialSink }) => T.toManaged(R.makeRef(initialSink))), M.map(({ currSink, switchSink }) => { const restart = T.chain_(switchSink(sink), currSink.set) const newPush = (input: O.Option>) => T.chain_(currSink.get, (f) => f(input)) return Tp.tuple(newPush, restart) }) ) }