// ets_tracing: off import "../../Operator/index.js" import * as A from "../../Collections/Immutable/Chunk/index.js" import * as Tp from "../../Collections/Immutable/Tuple/index.js" import { pipe } from "../../Function/index.js" import * as O from "../../Option/index.js" import * as T from "../_internal/effect.js" import * as R from "../_internal/ref.js" import * as Pull from "../Pull/index.js" export class BufferedPull { constructor( readonly upstream: T.Effect, A.Chunk>, readonly done: R.Ref, readonly cursor: R.Ref, number]>> ) {} } export function ifNotDone_( self: BufferedPull, fa: T.Effect, A1> ): T.Effect, A1> { return T.chain_(self.done.get, (b) => (b ? T.fail(O.none) : fa)) } export function ifNotDone(fa: T.Effect, A1>) { return (self: BufferedPull) => ifNotDone_(self, fa) } export function update(self: BufferedPull) { return ifNotDone_( self, T.foldM_( self.upstream, O.fold( () => T.chain_(self.done.set(true), () => Pull.end), (e) => Pull.fail(e) ), (a) => self.cursor.set(Tp.tuple(a, 0)) ) ) } export function pullElement( self: BufferedPull ): T.Effect, A> { return ifNotDone_( self, pipe( self.cursor, R.modify( ({ tuple: [c, i] }): Tp.Tuple<[T.Effect, A>, Tp.Tuple<[A.Chunk, number]>]> => { if (i >= A.size(c)) { return Tp.tuple( T.chain_(update(self), () => pullElement(self)), Tp.tuple(A.empty(), 0) ) } else { return Tp.tuple(T.succeed(A.unsafeGet_(c, i)), Tp.tuple(c, i + 1)) } } ), T.flatten ) ) } export function pullChunk( self: BufferedPull ): T.Effect, A.Chunk> { return ifNotDone_( self, pipe( self.cursor, R.modify( ({ tuple: [chunk, idx] }): Tp.Tuple< [T.Effect, A.Chunk>, Tp.Tuple<[A.Chunk, number]>] > => { if (idx >= A.size(chunk)) { return Tp.tuple( T.chain_(update(self), () => pullChunk(self)), Tp.tuple(A.empty(), 0) ) } else { return Tp.tuple(T.succeed(A.drop_(chunk, idx)), Tp.tuple(A.empty(), 0)) } } ), T.flatten ) ) } export function make(pull: T.Effect, A.Chunk>) { return pipe( T.do, T.bind("done", () => R.makeRef(false)), T.bind("cursor", () => R.makeRef(Tp.tuple<[A.Chunk, number]>(A.empty(), 0))), T.map(({ cursor, done }) => new BufferedPull(pull, done, cursor)) ) }