// ets_tracing: off import * as A from "../../../Collections/Immutable/Chunk/index.js" import * as Tp from "../../../Collections/Immutable/Tuple/index.js" import * as T from "../../../Effect/index.js" import { pipe } from "../../../Function/index.js" import * as O from "../../../Option/index.js" import * as Ref from "../../../Ref/index.js" import * as Pull from "../Pull/index.js" export class BufferedPull { constructor( readonly upstream: T.Effect, A.Chunk>, readonly done: Ref.Ref, readonly cursor: Ref.Ref, number]>> ) {} } export function make(upstream: T.Effect, A.Chunk>) { return pipe( T.do, T.bind("done", () => Ref.makeRef(false)), T.bind("cursor", () => Ref.makeRef(Tp.tuple(A.empty(), 0))), T.map(({ cursor, done }) => new BufferedPull(upstream, done, cursor)) ) } export function ifNotDone_( self: BufferedPull, fa: T.Effect, A1> ): T.Effect, A1> { return T.chain_(Ref.get(self.done), (_) => { if (_) { return Pull.end } else { return fa } }) } /** * @ets_data_first ifNotDone_ */ export function ifNotDone(fa: T.Effect, A1>) { return (self: BufferedPull) => ifNotDone_(self, fa) } export function update( self: BufferedPull ): T.Effect, void> { return ifNotDone_( self, T.foldM_( self.upstream, O.fold( () => T.zipRight_(Ref.set_(self.done, true), Pull.end), (e) => Pull.fail(e) ), (chunk) => Ref.set_(self.cursor, Tp.tuple(chunk, 0)) ) ) } export function pullElement( self: BufferedPull ): T.Effect, A> { return ifNotDone_( self, pipe( self.cursor, Ref.modify(({ tuple: [chunk, idx] }) => { if (idx >= A.size(chunk)) { return Tp.tuple( T.zipRight_(update(self), pullElement(self)), Tp.tuple(A.empty(), 0) ) } else { return Tp.tuple( T.succeed(A.unsafeGet_(chunk, idx)), Tp.tuple(A.empty(), idx + 1) ) } }), T.flatten ) ) } export function pullChunk( self: BufferedPull ): T.Effect, A.Chunk> { return ifNotDone_( self, pipe( self.cursor, Ref.modify(({ tuple: [chunk, idx] }) => { if (idx >= A.size(chunk)) { return Tp.tuple( T.zipRight_(update(self), pullChunk(self)), Tp.tuple(A.empty(), 0) ) } else { return Tp.tuple(T.succeed(A.drop_(chunk, idx)), Tp.tuple(A.empty(), 0)) } }), T.flatten ) ) }