import * as A from "../../Array"; import * as E from "../../Either"; import { flow, identity, pipe } from "../../Function"; import type { Option } from "../../Option"; import * as O from "../../Option"; import type { Exit } from "../Exit"; import type { Cause } from "../Exit/Cause"; import * as C from "../Exit/Cause"; import * as M from "../Managed"; import * as Semaphore from "../Semaphore"; import * as T from "../Task"; import * as XP from "../XPromise"; import * as XQ from "../XQueue"; import * as XR from "../XRef"; import { fromArray, fromTask, managed, repeatTaskChunkOption } from "./constructors"; import { foreachManaged } from "./destructors"; import * as BPull from "./internal/BufferedPull"; import * as Pull from "./internal/Pull"; import * as Take from "./internal/Take"; import type { IO, RIO } from "./model"; import { Chain, Stream } from "./model"; /** * Creates a single-valued pure stream */ export const pure = (a: A): IO => fromArray([a]); /** * Taskfully transforms the chunks emitted by this stream. */ export const mapChunksM_ = ( fa: Stream, f: (chunks: ReadonlyArray) => T.Task> ): Stream => new Stream( pipe( fa.proc, M.map((e) => pipe( e, T.chain((x) => pipe(f(x), T.first>(O.some))) ) ) ) ); /** * Taskfully transforms the chunks emitted by this stream. */ export const mapChunksM = (f: (chunks: ReadonlyArray) => T.Task>) => ( fa: Stream ): Stream => mapChunksM_(fa, f); /** * Transforms the chunks emitted by this stream. */ export const mapChunks_ = (fa: Stream, f: (chunks: ReadonlyArray) => ReadonlyArray) => mapChunksM_(fa, flow(f, T.pure)); /** * Transforms the chunks emitted by this stream. */ export const mapChunks = (f: (chunks: ReadonlyArray) => ReadonlyArray) => (fa: Stream) => mapChunks_(fa, f); /** * Transforms the chunks emitted by this stream. */ export const map_ = (fa: Stream, f: (a: A) => B): Stream => mapChunks_(fa, A.map(f)); /** * Transforms the chunks emitted by this stream. */ export const map = (f: (a: A) => B) => (fa: Stream): Stream => map_(fa, f); /** * Maps over elements of the stream with the specified effectful function. */ export const mapM_ = ( fa: Stream, f: (a: A) => T.Task ): Stream => new Stream( pipe( fa.proc, M.mapM(BPull.make), M.map((pull) => pipe( pull, BPull.pullElement, T.chain((o) => pipe( f(o), T.bimap(O.some, (o1) => [o1] as [B]) ) ) ) ) ) ); /** * Maps over elements of the stream with the specified effectful function. */ export const mapM = (f: (o: A) => T.Task) => (fa: Stream) => mapM_(fa, f); export const first_ = (pab: Stream, f: (e: E) => D) => new Stream(pipe(pab.proc, M.map(T.first(O.map(f))))); export const first = (f: (e: E) => D) => (pab: Stream) => first_(pab, f); export const mapError = first; export const mapErrorCause_ = (stream: Stream, f: (e: Cause) => Cause) => new Stream( pipe( stream.proc, M.map( T.mapErrorCause((cause) => pipe( C.sequenceCauseOption(cause), O.fold( () => C.fail(O.none()), (c) => C.map_(f(c), O.some) ) ) ) ) ) ); export const mapErrorCause = (f: (e: Cause) => Cause) => (stream: Stream) => mapErrorCause_(stream, f); /** * Returns a stream made of the concatenation in strict order of all the streams * produced by passing each element of this stream to `f0` */ export const chain_ = (fa: Stream, f: (a: A) => Stream) => { type R_ = R & Q; type E_ = E | D; return new Stream( pipe( M.do, M.bindS("outerStream", () => fa.proc), M.bindS("currOuterChunk", () => T.toManaged()( XR.makeRef<[ReadonlyArray, number]>([[], 0]) ) ), M.bindS("currInnerStream", () => T.toManaged()(XR.makeRef, ReadonlyArray>>(Pull.end)) ), M.bindS("innerFinalizer", () => M.finalizerRef(M.noopFinalizer) as M.Managed>), M.map(({ currInnerStream, currOuterChunk, innerFinalizer, outerStream }) => new Chain(f, outerStream, currOuterChunk, currInnerStream, innerFinalizer).apply() ) ) ); }; /** * Returns a stream made of the concatenation in strict order of all the streams * produced by passing each element of this stream to `f0` */ export const chain = (f: (a: A) => Stream) => ( fa: Stream ): Stream => chain_(fa, f); /** * Flattens this stream-of-streams into a stream made of the concatenation in * strict order of all the streams. */ export const flatten = (ffa: Stream>): Stream => chain_(ffa, identity); export const absolve: (stream: Stream>) => Stream = chain( E.fold(fail, pure) ); export const ask = (): RIO => fromTask(T.ask()); export const asks = (f: (_: R) => A): Stream => map_(ask(), f); export const asksTask = (f: (_: R0) => T.Task): Stream => mapM_(ask(), f); export const asksM = (f: (_: R0) => Stream) => chain_(ask(), f); /** * Statefully and effectfully maps over the elements of this stream to produce * new elements. */ export const mapAccumM_ = ( stream: Stream, z: Z, f: (z: Z, a: A) => T.Task ) => new Stream( pipe( M.do, M.bindS("state", () => XR.makeManagedRef(z)), M.bindS("pull", () => pipe(stream.proc, M.mapM(BPull.make))), M.map(({ pull, state }) => pipe( pull, BPull.pullElement, T.chain((o) => pipe( T.do, T.bindS("s", () => state.get), T.bindS("t", ({ s }) => f(s, o)), T.tap(({ t }) => state.set(t[0])), T.map(({ t }) => [t[1]]), T.first(O.some) ) ) ) ) ) ); export const mapAccumM = (z: Z) => (f: (z: Z, a: A) => T.Task) => ( stream: Stream ) => mapAccumM_(stream, z, f); export const mapAccum_ = (stream: Stream, z: Z, f: (z: Z, a: A) => [Z, B]) => mapAccumM_(stream, z, (z, a) => T.pure(f(z, a))); export const mapAccum = (z: Z) => (f: (z: Z, a: A) => [Z, B]) => (stream: Stream) => mapAccum_(stream, z, f); export const mapConcat_ = (stream: Stream, f: (a: A) => Iterable) => mapChunks_(stream, (chunks) => A.chain_(chunks, (a) => Array.from(f(a)))); export const mapConcat = (f: (a: A) => Iterable) => (stream: Stream) => mapConcat_(stream, f); export const mapConcatChunk_ = (stream: Stream, f: (a: A) => ReadonlyArray) => mapChunks_(stream, (chunks) => A.chain_(chunks, f)); export const mapConcatChunk = (f: (a: A) => ReadonlyArray) => (stream: Stream) => mapConcatChunk_(stream, f); export const mapConcatChunkM_ = ( stream: Stream, f: (a: A) => T.Task> ) => pipe(stream, mapM(f), mapConcatChunk(identity)); export const mapConcatChunkM = (f: (a: A) => T.Task>) => ( stream: Stream ) => mapConcatChunkM_(stream, f); export const mapConcatM_ = (stream: Stream, f: (a: A) => T.Task>) => pipe( stream, mapConcatChunkM((a) => T.map_(f(a), (_) => Array.from(_))) ); /** * Maps over elements of the stream with the specified effectful function, * executing up to `n` invocations of `f` concurrently. Transformed elements * will be emitted in the original order. */ export const mapMPar_ = (n: number) => ( stream: Stream, f: (a: A) => T.Task ): Stream => new Stream( pipe( M.do, M.bindS("out", () => T.toManaged()(XQ.makeBounded, B>>(n))), M.bindS("errorSignal", () => T.toManaged()(XP.make())), M.bindS("permits", () => T.toManaged()(Semaphore.makeSemaphore(n))), M.tap(({ errorSignal, out, permits }) => pipe( stream, foreachManaged((a) => pipe( T.do, T.bindS("p", () => XP.make()), T.bindS("latch", () => XP.make()), T.tap(({ p }) => out.offer(pipe(p, XP.await, T.first(O.some)))), T.tap(({ latch, p }) => pipe( latch, // Make sure we start evaluation before moving on to the next element XP.succeed(undefined), T.chain(() => pipe( errorSignal, XP.await, // Interrupt evaluation if another task fails T.raceFirst(f(a)), // Notify other tasks of a failure T.tapCause((e) => pipe(errorSignal, XP.halt(e))), // Transfer the result to the consuming stream T.to(p) ) ), Semaphore.withPermit(permits), T.fork ) ), T.tap(({ latch }) => XP.await(latch)), T.asUnit ) ), M.foldCauseM( (c) => T.toManaged()(out.offer(Pull.halt(c))), () => pipe( Semaphore.withPermits(n)(permits)(T.unit()), T.chain(() => out.offer(Pull.end)), T.toManaged() ) ), M.fork ) ), M.map(({ out }) => pipe( out.take, T.flatten, T.map((o) => [o]) ) ) ) ); /** * Maps over elements of the stream with the specified effectful function, * executing up to `n` invocations of `f` concurrently. Transformed elements * will be emitted in the original order. */ export const mapMPar = (n: number) => (f: (a: A) => T.Task) => ( stream: Stream ) => mapMPar_(n)(stream, f); /** * Creates a stream from an asynchronous callback that can be called multiple times * The registration of the callback itself returns a task. The optionality of the * error type `E` can be used to signal the end of the stream, by setting it to `None`. */ export const asyncTask = ( register: ( cb: ( next: T.Task, ReadonlyArray>, offerCb?: (e: Exit) => void ) => T.IO> ) => T.Task, outputBuffer = 16 ): Stream => pipe( M.do, M.bindS("output", () => pipe(XQ.makeBounded>(outputBuffer), T.toManaged())), M.bindS("runtime", () => pipe(T.runtime(), T.toManaged())), M.tap(({ output, runtime }) => T.toManaged()( register((k, cb) => pipe(Take.fromPull(k), T.chain(output.offer), (x) => runtime.runCancel(x, cb))) ) ), M.bindS("done", () => XR.makeManagedRef(false)), M.letS("pull", ({ done, output }) => pipe( done.get, T.chain((b) => b ? Pull.end : pipe( output.take, T.chain(Take.done), T.onError(() => pipe( done.set(true), T.chain(() => output.shutdown) ) ) ) ) ) ), M.map(({ pull }) => pull), managed, chain(repeatTaskChunkOption) );