import { pipe } from "../../../Function"; import type { Option } from "../../../Option"; import * as O from "../../../Option"; import * as M from "../../Managed"; import * as T from "../../Task"; // Contract notes for transducers: // - When a None is received, the transducer must flush all of its internal state // and remain empty until subsequent Some(Chunk) values. // // Stated differently, after a first push(None), all subsequent push(None) must // result in empty []. export class Transducer { constructor(readonly push: M.Managed>) => T.Task>>) {} } /** * Contract notes for transducers: * - When a None is received, the transducer must flush all of its internal state * and remain empty until subsequent Some(Chunk) values. * * Stated differently, after a first push(None), all subsequent push(None) must * result in empty []. */ export const transducer = ( push: M.Managed>) => T.Task>> ) => new Transducer(push); /** * Compose this tansducer with another transducer, resulting in a composite transducer. */ export const then = (that: Transducer) => ( self: Transducer ): Transducer => transducer( pipe( self.push, M.mapBoth(that.push, (pushLeft, pushRight) => O.fold( () => pipe( pushLeft(O.none()), T.chain((cl) => cl.length === 0 ? pushRight(O.none()) : pipe( pushRight(O.some(cl)), T.mapBoth(pushRight(O.none()), (a, b) => [...a, ...b]) ) ) ), (inputs) => pipe( pushLeft(O.some(inputs)), T.chain((cl) => pushRight(O.some(cl))) ) ) ) ) );