import * as A from "../../../Array"; import { pipe } from "../../../Function"; import * as O from "../../../Option"; import * as T from "../../Task"; import * as R from "../../XRef"; import * as Pull from "./Pull"; export class BufferedPull { constructor( readonly upstream: T.Task, ReadonlyArray>, readonly done: R.Ref, readonly cursor: R.Ref<[ReadonlyArray, number]> ) {} } export const ifNotDone = (fa: T.Task, A1>) => ( self: BufferedPull ): T.Task, A1> => pipe( self.done.get, T.chain((b) => (b ? Pull.end : fa)) ); export const update = (self: BufferedPull) => pipe( self, ifNotDone( pipe( self.upstream, T.foldM( O.fold( () => pipe( self.done.set(true), T.chain(() => Pull.end) ), (e) => Pull.fail(e) ), (a) => self.cursor.set([a, 0]) ) ) ) ); export const pullElement = (self: BufferedPull): T.Task, A> => pipe( self, ifNotDone( pipe( self.cursor, R.modify(([c, i]): [T.Task, A>, [ReadonlyArray, number]] => { if (i >= c.length) { return [ pipe( update(self), T.chain(() => pullElement(self)) ), [[], 0] ]; } else { return [T.pure(c[i]), [c, i + 1]]; } }), T.flatten ) ) ); export const pullArray = (self: BufferedPull): T.Task, ReadonlyArray> => pipe( self, ifNotDone( pipe( self.cursor, R.modify(([chunk, idx]): [T.Task, ReadonlyArray>, [ReadonlyArray, number]] => { if (idx >= chunk.length) { return [T.chain_(update(self), () => pullArray(self)), [[], 0]]; } else { return [T.pure(A.dropLeft_(chunk, idx)), [[], 0]]; } }), T.flatten ) ) ); export const make = (pull: T.Task, ReadonlyArray>) => pipe( T.do, T.bindS("done", () => R.makeRef(false)), T.bindS("cursor", () => R.makeRef<[ReadonlyArray, number]>([[], 0])), T.map(({ cursor, done }) => new BufferedPull(pull, done, cursor)) );