import * as A from "../../Array"; import type { Either } from "../../Either"; import * as E from "../../Either"; import { constVoid, flow, identity, pipe, tuple } from "../../Function"; import type { NonEmptyArray } from "../../NonEmptyArray"; import type { Option } from "../../Option"; import * as O from "../../Option"; import type { HasClock } from "../Clock"; import { sequential } from "../ExecutionStrategy"; import type { Exit } from "../Exit"; import * as Ex from "../Exit"; import type { Cause } from "../Exit/Cause"; import * as C from "../Exit/Cause"; import * as F from "../Fiber"; import * as M from "../Managed"; import * as Sc from "../Schedule"; import type { Task } from "../Task"; import * as T from "../Task"; import type { Ref } from "../XRef"; import * as XR from "../XRef"; import * as XRM from "../XRefM"; import { fromTask, repeatTaskOption } from "./constructors"; import * as H from "./internal/Handoff"; import * as Pull from "./internal/Pull"; import * as Take from "./internal/Take"; import { flatten } from "./methods"; import { Stream } from "./model"; export const unwrap = (fa: T.Task>): Stream => flatten(fromTask(fa)); /** * Creates a stream from a `Schedule` that does not require any further * input. The stream will emit an element for each value output from the * schedule, continuing for as long as the schedule continues. */ export const fromSchedule: (schedule: Sc.Schedule) => Stream = flow( Sc.driver, T.map((driver) => repeatTaskOption(driver.next(constVoid()))), unwrap ); /** * Creates a stream by effectfully peeling off the "layers" of a value of type `S` */ export const unfoldChunkM = (z: Z) => ( f: (z: Z) => T.Task, Z]>> ): Stream => new Stream( pipe( M.do, M.bindS("done", () => XR.makeManagedRef(false)), M.bindS("ref", () => XR.makeManagedRef(z)), M.letS("pull", ({ done, ref }) => pipe( done.get, T.chain((isDone) => isDone ? Pull.end : pipe( ref.get, T.chain(f), T.foldM( Pull.fail, O.fold( () => pipe( done.set(true), T.chain(() => Pull.end) ), ([a, z]) => pipe( ref.set(z), T.map(() => a) ) ) ) ) ) ) ), M.map(({ pull }) => pull) ) ); /** * Combines the chunks from this stream and the specified stream by repeatedly applying the * function `f` to extract a chunk using both sides and conceptually "offer" * it to the destination stream. `f` can maintain some internal state to control * the combining process, with the initial state being specified by `s`. */ export const combineChunks = (that: Stream) => (z: Z) => ( f: ( z: Z, s: T.Task, ReadonlyArray>, t: T.Task, ReadonlyArray> ) => T.Task, readonly [ReadonlyArray, Z]>> ) => (self: Stream): Stream => new Stream( pipe( M.do, M.bindS("left", () => self.proc), M.bindS("right", () => that.proc), M.bindS( "pull", ({ left, right }) => unfoldChunkM(z)((z) => pipe( f(z, left, right), T.chain((ex) => T.optional(T.done(ex))) ) ).proc ), M.map(({ pull }) => pull) ) ); function __zipChunks( fa: ReadonlyArray, fb: ReadonlyArray, f: (a: A, b: B) => C ): [ReadonlyArray, E.Either, ReadonlyArray>] { const fc: C[] = []; const len = Math.min(fa.length, fb.length); for (let i = 0; i < len; i++) { fc[i] = f(fa[i], fb[i]); } if (fa.length > fb.length) { return [fc, E.left(A.dropLeft_(fa, fb.length))]; } return [fc, E.right(A.dropLeft_(fb, fa.length))]; } /** * Zips this stream with another point-wise and applies the function to the paired elements. * * The new stream will end when one of the sides ends. * * By default pull is executed in parallel to preserve async semantics, see `zipWithSeq` for * a sequential alternative */ export function mapBothPar_( stream: Stream, that: Stream, f: (a: O, a1: O2) => O3, ps: "seq" ): Stream; export function mapBothPar_( stream: Stream, that: Stream, f: (a: O, a1: O2) => O3, ps?: "par" | "seq" ): Stream; export function mapBothPar_( stream: Stream, that: Stream, f: (a: O, a1: O2) => O3, ps: "par" | "seq" = "par" ): Stream { type End = { _tag: "End" }; type RightDone = { _tag: "RightDone"; excessR: NonEmptyArray }; type LeftDone = { _tag: "LeftDone"; excessL: NonEmptyArray }; type Running = { _tag: "Running"; excess: Either, ReadonlyArray>; }; type State = End | Running | LeftDone | RightDone; const handleSuccess = ( leftUpd: Option>, rightUpd: Option>, excess: Either, ReadonlyArray> ): Exit, readonly [ReadonlyArray, State]> => { const [leftExcess, rightExcess] = pipe( excess, E.fold( (l) => tuple<[ReadonlyArray, ReadonlyArray]>(l, []), (r) => tuple<[ReadonlyArray, ReadonlyArray]>([], r) ) ); const [left, right] = [ pipe( leftUpd, O.fold( () => leftExcess, (upd) => [...leftExcess, ...upd] as ReadonlyArray ) ), pipe( rightUpd, O.fold( () => rightExcess, (upd) => [...rightExcess, ...upd] as ReadonlyArray ) ) ]; const [emit, newExcess] = __zipChunks(left, right, f); if (O.isSome(leftUpd) && O.isSome(rightUpd)) { return Ex.succeed( tuple<[ReadonlyArray, State]>(emit, { _tag: "Running", excess: newExcess }) ); } else if (O.isNone(leftUpd) && O.isNone(rightUpd)) { return Ex.fail(O.none()); } else { return Ex.succeed( tuple( emit, pipe( newExcess, E.fold( (l): State => A.isNonEmpty(l) ? { _tag: "LeftDone", excessL: l } : { _tag: "End" }, (r): State => A.isNonEmpty(r) ? { _tag: "RightDone", excessR: r } : { _tag: "End" } ) ) ) ); } }; return pipe( stream, combineChunks(that)>({ _tag: "Running", excess: E.left([]) })((st, p1, p2) => { switch (st._tag) { case "End": { return T.pure(Ex.fail(O.none())); } case "Running": { return pipe( p1, T.optional, ps === "par" ? T.mapBothPar(T.optional(p2), (l, r) => handleSuccess(l, r, st.excess)) : T.mapBoth(T.optional(p2), (l, r) => handleSuccess(l, r, st.excess)), T.catchAllCause((e) => T.pure(Ex.failure(pipe(e, C.map(O.some))))) ); } case "LeftDone": { return pipe( p2, T.optional, T.map((r) => handleSuccess(O.none(), r, E.left(st.excessL))), T.catchAllCause((e) => T.pure(Ex.failure(pipe(e, C.map(O.some))))) ); } case "RightDone": { return pipe( p1, T.optional, T.map((l) => handleSuccess(l, O.none(), E.right(st.excessR))), T.catchAllCause((e) => T.pure(Ex.failure(pipe(e, C.map(O.some))))) ); } } }) ); } export function mapBothPar( that: Stream, f: (a: O, a1: O2) => O3, ps: "seq" ): (stream: Stream) => Stream; export function mapBothPar( that: Stream, f: (a: O, a1: O2) => O3, ps?: "par" | "seq" ): (stream: Stream) => Stream; export function mapBothPar( that: Stream, f: (a: O, a1: O2) => O3, ps: "par" | "seq" = "par" ): (stream: Stream) => Stream { return (stream) => mapBothPar_(stream, that, f, ps); } export const mapBoth_ = ( stream: Stream, that: Stream, f: (a: A, a1: A1) => B ) => mapBothPar_(stream, that, f, "seq"); export const bothMap = (that: Stream, f: (a: A, a1: A1) => B) => ( stream: Stream ) => mapBoth_(stream, that, f); /** * Switches over to the stream produced by the provided function in case this one * fails. Allows recovery from all causes of failure, including interruption if the * stream is uninterruptible. */ export const catchAllCause_ = ( stream: Stream, f: (e: C.Cause) => Stream ): Stream => { type NotStarted = { _tag: "NotStarted" }; type Self = { _tag: "Self"; pull: Pull.Pull }; type Other = { _tag: "Other"; pull: Pull.Pull }; type State = NotStarted | Self | Other; return new Stream( pipe( M.do, M.bindS("finalizerRef", () => M.finalizerRef(M.noopFinalizer) as M.Managed>), M.bindS("ref", () => pipe( XR.makeRef>({ _tag: "NotStarted" }), T.toManaged() ) ), M.letS("pull", ({ finalizerRef, ref }) => { const closeCurrent = (cause: C.Cause) => pipe( finalizerRef, XR.getAndSet(M.noopFinalizer), T.chain((f) => f(Ex.failure(cause))), T.makeUninterruptible ); const open = (stream: Stream) => (asState: (_: Pull.Pull) => State) => T.uninterruptibleMask(({ restore }) => pipe( M.makeReleaseMap, T.chain((releaseMap) => pipe( finalizerRef.set((exit) => M.releaseAll(exit, sequential())(releaseMap)), T.chain(() => pipe( restore(stream.proc.task), T.local((_: R) => [_, releaseMap] as [R, M.ReleaseMap]), T.map(([_, __]) => __), T.tap((pull) => ref.set(asState(pull))) ) ) ) ) ) ); const failover = (cause: C.Cause>) => pipe( cause, C.sequenceCauseOption, O.fold( () => T.fail(O.none()), (cause) => pipe( closeCurrent(cause), T.chain(() => open(f(cause))((pull) => ({ _tag: "Other", pull })) ), T.flatten ) ) ); return pipe( ref.get, T.chain((s) => { switch (s._tag) { case "NotStarted": { return pipe( open(stream)((pull) => ({ _tag: "Self", pull })), T.flatten, T.catchAllCause(failover) ); } case "Self": { return pipe(s.pull, T.catchAllCause(failover)); } case "Other": { return s.pull; } } }) ); }), M.map(({ pull }) => pull) ) ); }; export const catchAllCause = (f: (e: C.Cause) => Stream) => ( stream: Stream ): Stream => catchAllCause_(stream, f); function go( streams: ReadonlyArray>, chunkSize: number, currIndex: Ref, currStream: Ref, ReadonlyArray>>, switchStream: ( x: M.Managed, ReadonlyArray>> ) => T.Task, ReadonlyArray>> ): T.Task, ReadonlyArray> { return pipe( currStream.get, T.flatten, T.catchAllCause((x) => O.fold_( C.sequenceCauseOption(x), () => pipe( currIndex, XR.getAndUpdate((x) => x + 1), T.chain((i) => i >= chunkSize ? Pull.end : pipe( switchStream(streams[i].proc), T.chain(currStream.set), T.apSecond(go(streams, chunkSize, currIndex, currStream, switchStream)) ) ) ), Pull.halt ) ) ); } /** * Concatenates all of the streams in the chunk to one stream. */ export const concatAll = (streams: Array>): Stream => { const chunkSize = streams.length; return new Stream( pipe( M.do, M.bindS("currIndex", () => XR.makeManagedRef(0)), M.bindS("currStream", () => XR.makeManagedRef, ReadonlyArray>>(Pull.end)), M.bindS("switchStream", () => M.switchable, ReadonlyArray>>()), M.map(({ currIndex, currStream, switchStream }) => go(streams, chunkSize, currIndex, currStream, switchStream)) ) ); }; /** * Executes the provided finalizer before this stream's finalizers run. */ export const ensuringFirst_ = (self: Stream, fin: Task) => new Stream(M.ensuringFirst_(self.proc, fin)); /** * Executes the provided finalizer before this stream's finalizers run. */ export const ensuringFirst = (fin: Task) => (self: Stream) => ensuringFirst_(self, fin); export type TerminationStrategy = "Left" | "Right" | "Both" | "Either"; /** * Merges this stream and the specified stream together to a common element * type with the specified mapping functions. * * New produced stream will terminate when both specified stream terminate if * no termination strategy is specified. */ export const mergeWith_ = ( sa: Stream, sb: Stream, l: (a: A) => C, r: (b: B) => C1, strategy: TerminationStrategy = "Both" ): Stream => new Stream( pipe( M.do, M.bindS("handoff", () => M.fromTask(H.make>())), M.bindS("done", () => M.fromTask(XRM.makeRefM>(O.none()))), M.bindS("chunksL", () => sa.proc), M.bindS("chunksR", () => sb.proc), M.letS("handler", ({ done, handoff }) => (pull: Pull.Pull, terminate: boolean) => pipe( done.get, T.chain((o) => { if (o._tag === "Some" && o.value) { return T.succeed(false); } else { return pipe( pull, T.result, T.chain((exit) => pipe( done, XRM.modify((o) => { const causeOrChunk = pipe( exit, Ex.fold( (c): Either>, ReadonlyArray> => E.left(C.sequenceCauseOption(c)), E.right ) ); if (o._tag === "Some" && o.value) { return T.succeed([false, o]); } else if (causeOrChunk._tag === "Right") { return pipe( handoff, H.offer(>Take.chunk(causeOrChunk.right)), T.as([true, o]) ); } else if (causeOrChunk._tag === "Left" && causeOrChunk.left._tag === "Some") { return pipe( handoff, H.offer(>Take.halt(causeOrChunk.left.value)), T.as([false, O.some(true)]) ); } else if ( causeOrChunk._tag === "Left" && causeOrChunk.left._tag === "None" && (terminate || o._tag === "Some") ) { return pipe( handoff, H.offer(>Take.end), T.as([false, O.some(true)]) ); } else { return T.succeed([false, O.some(false)]); } }) ) ) ); } }), T.repeatWhile(identity), T.fork, T.makeInterruptible, T.toManaged(F.interrupt) ) ), M.tap(({ chunksL, handler }) => handler(pipe(chunksL, T.map(A.map(l))), strategy === "Left" || strategy === "Either") ), M.tap(({ chunksR, handler }) => handler(pipe(chunksR, T.map(A.map(r))), strategy === "Right" || strategy === "Either") ), M.map(({ done, handoff }) => pipe( T.do, T.bindS("done", () => done.get), T.bindS("take", (s) => s.done._tag === "Some" && s.done.value ? pipe(handoff, H.poll, T.some) : pipe(handoff, H.take) ), T.bindS("result", ({ take }) => Take.done(take)), T.map(({ result }) => result) ) ) ) ); /** * Merges this stream and the specified stream together to a common element * type with the specified mapping functions. * * New produced stream will terminate when both specified stream terminate if * no termination strategy is specified. */ export const mergeWith = ( that: Stream, l: (a: A) => C, r: (b: B) => C1, strategy: TerminationStrategy = "Both" ) => (ma: Stream) => mergeWith_(ma, that, l, r, strategy); /** * Merges this stream and the specified stream together. * * New produced stream will terminate when both specified stream terminate if no termination * strategy is specified. */ export const merge_ = ( self: Stream, that: Stream, strategy: TerminationStrategy = "Both" ): Stream => mergeWith_( self, that, (a): A | B => a, (b) => b, strategy ); /** * Merges this stream and the specified stream together. * * New produced stream will terminate when both specified stream terminate if no termination * strategy is specified. */ export const merge = (sb: Stream, strategy: TerminationStrategy = "Both") => ( sa: Stream ) => merge_(sa, sb, strategy); /** * Merges this stream and the specified stream together. New produced stream will * terminate when either stream terminates. */ export const mergeTerminateEither_ = ( sa: Stream, sb: Stream ): Stream => merge_(sa, sb, "Either"); /** * Merges this stream and the specified stream together. New produced stream will * terminate when either stream terminates. */ export const mergeTerminateEither = (sb: Stream) => (sa: Stream) => merge_(sa, sb, "Either"); /** * Merges this stream and the specified stream together. New produced stream will * terminate when this stream terminates. */ export const mergeTerminateLeft_ = ( sa: Stream, sb: Stream ): Stream => merge_(sa, sb, "Left"); /** * Merges this stream and the specified stream together. New produced stream will * terminate when this stream terminates. */ export const mergeTerminateLeft = (sb: Stream) => (sa: Stream) => merge_(sa, sb, "Left"); /** * Merges this stream and the specified stream together. New produced stream will * terminate when the specified stream terminates. */ export const mergeTerminateRight_ = ( sa: Stream, sb: Stream ): Stream => merge_(sa, sb, "Right"); /** * Merges this stream and the specified stream together. New produced stream will * terminate when the specified stream terminates. */ export const mergeTerminateRight = (sb: Stream) => (sa: Stream) => merge_(sa, sb, "Right"); /** * Merges this stream and the specified stream together to produce a stream of * eithers. */ export const mergeEither_ = ( sa: Stream, sb: Stream, strategy: TerminationStrategy = "Both" ): Stream> => mergeWith_(sa, sb, E.left, E.right, strategy); /** * Merges this stream and the specified stream together to produce a stream of * eithers. */ export const mergeEither = (sb: Stream, strategy: TerminationStrategy = "Both") => ( sa: Stream ): Stream> => mergeEither_(sa, sb, strategy);