import * as A from "../../Array";
import * as E from "../../Either";
import { flow, identity, pipe } from "../../Function";
import type { Option } from "../../Option";
import * as O from "../../Option";
import type { Exit } from "../Exit";
import type { Cause } from "../Exit/Cause";
import * as C from "../Exit/Cause";
import * as M from "../Managed";
import * as Semaphore from "../Semaphore";
import * as T from "../Task";
import * as XP from "../XPromise";
import * as XQ from "../XQueue";
import * as XR from "../XRef";
import { fromArray, fromTask, managed, repeatTaskChunkOption } from "./constructors";
import { foreachManaged } from "./destructors";
import * as BPull from "./internal/BufferedPull";
import * as Pull from "./internal/Pull";
import * as Take from "./internal/Take";
import type { IO, RIO } from "./model";
import { Chain, Stream } from "./model";
/**
* Creates a single-valued pure stream
*/
export const pure = (a: A): IO => fromArray([a]);
/**
* Taskfully transforms the chunks emitted by this stream.
*/
export const mapChunksM_ = (
fa: Stream,
f: (chunks: ReadonlyArray) => T.Task>
): Stream =>
new Stream(
pipe(
fa.proc,
M.map((e) =>
pipe(
e,
T.chain((x) => pipe(f(x), T.first>(O.some)))
)
)
)
);
/**
* Taskfully transforms the chunks emitted by this stream.
*/
export const mapChunksM = (f: (chunks: ReadonlyArray) => T.Task>) => (
fa: Stream
): Stream => mapChunksM_(fa, f);
/**
* Transforms the chunks emitted by this stream.
*/
export const mapChunks_ = (fa: Stream, f: (chunks: ReadonlyArray) => ReadonlyArray) =>
mapChunksM_(fa, flow(f, T.pure));
/**
* Transforms the chunks emitted by this stream.
*/
export const mapChunks = (f: (chunks: ReadonlyArray) => ReadonlyArray) => (fa: Stream) =>
mapChunks_(fa, f);
/**
* Transforms the chunks emitted by this stream.
*/
export const map_ = (fa: Stream, f: (a: A) => B): Stream => mapChunks_(fa, A.map(f));
/**
* Transforms the chunks emitted by this stream.
*/
export const map = (f: (a: A) => B) => (fa: Stream): Stream => map_(fa, f);
/**
* Maps over elements of the stream with the specified effectful function.
*/
export const mapM_ = (
fa: Stream,
f: (a: A) => T.Task
): Stream =>
new Stream(
pipe(
fa.proc,
M.mapM(BPull.make),
M.map((pull) =>
pipe(
pull,
BPull.pullElement,
T.chain((o) =>
pipe(
f(o),
T.bimap(O.some, (o1) => [o1] as [B])
)
)
)
)
)
);
/**
* Maps over elements of the stream with the specified effectful function.
*/
export const mapM = (f: (o: A) => T.Task) => (fa: Stream) => mapM_(fa, f);
export const first_ = (pab: Stream, f: (e: E) => D) =>
new Stream(pipe(pab.proc, M.map(T.first(O.map(f)))));
export const first = (f: (e: E) => D) => (pab: Stream) => first_(pab, f);
export const mapError = first;
export const mapErrorCause_ = (stream: Stream, f: (e: Cause) => Cause) =>
new Stream(
pipe(
stream.proc,
M.map(
T.mapErrorCause((cause) =>
pipe(
C.sequenceCauseOption(cause),
O.fold(
() => C.fail(O.none()),
(c) => C.map_(f(c), O.some)
)
)
)
)
)
);
export const mapErrorCause = (f: (e: Cause) => Cause) => (stream: Stream) =>
mapErrorCause_(stream, f);
/**
* Returns a stream made of the concatenation in strict order of all the streams
* produced by passing each element of this stream to `f0`
*/
export const chain_ = (fa: Stream, f: (a: A) => Stream) => {
type R_ = R & Q;
type E_ = E | D;
return new Stream(
pipe(
M.do,
M.bindS("outerStream", () => fa.proc),
M.bindS("currOuterChunk", () =>
T.toManaged()(
XR.makeRef<[ReadonlyArray, number]>([[], 0])
)
),
M.bindS("currInnerStream", () =>
T.toManaged()(XR.makeRef, ReadonlyArray>>(Pull.end))
),
M.bindS("innerFinalizer", () => M.finalizerRef(M.noopFinalizer) as M.Managed>),
M.map(({ currInnerStream, currOuterChunk, innerFinalizer, outerStream }) =>
new Chain(f, outerStream, currOuterChunk, currInnerStream, innerFinalizer).apply()
)
)
);
};
/**
* Returns a stream made of the concatenation in strict order of all the streams
* produced by passing each element of this stream to `f0`
*/
export const chain = (f: (a: A) => Stream) => (
fa: Stream
): Stream => chain_(fa, f);
/**
* Flattens this stream-of-streams into a stream made of the concatenation in
* strict order of all the streams.
*/
export const flatten = (ffa: Stream>): Stream =>
chain_(ffa, identity);
export const absolve: (stream: Stream>) => Stream = chain(
E.fold(fail, pure)
);
export const ask = (): RIO => fromTask(T.ask());
export const asks = (f: (_: R) => A): Stream => map_(ask(), f);
export const asksTask = (f: (_: R0) => T.Task): Stream => mapM_(ask(), f);
export const asksM = (f: (_: R0) => Stream) => chain_(ask(), f);
/**
* Statefully and effectfully maps over the elements of this stream to produce
* new elements.
*/
export const mapAccumM_ = (
stream: Stream,
z: Z,
f: (z: Z, a: A) => T.Task
) =>
new Stream(
pipe(
M.do,
M.bindS("state", () => XR.makeManagedRef(z)),
M.bindS("pull", () => pipe(stream.proc, M.mapM(BPull.make))),
M.map(({ pull, state }) =>
pipe(
pull,
BPull.pullElement,
T.chain((o) =>
pipe(
T.do,
T.bindS("s", () => state.get),
T.bindS("t", ({ s }) => f(s, o)),
T.tap(({ t }) => state.set(t[0])),
T.map(({ t }) => [t[1]]),
T.first(O.some)
)
)
)
)
)
);
export const mapAccumM = (z: Z) => (f: (z: Z, a: A) => T.Task) => (
stream: Stream
) => mapAccumM_(stream, z, f);
export const mapAccum_ = (stream: Stream, z: Z, f: (z: Z, a: A) => [Z, B]) =>
mapAccumM_(stream, z, (z, a) => T.pure(f(z, a)));
export const mapAccum = (z: Z) => (f: (z: Z, a: A) => [Z, B]) => (stream: Stream) =>
mapAccum_(stream, z, f);
export const mapConcat_ = (stream: Stream, f: (a: A) => Iterable) =>
mapChunks_(stream, (chunks) => A.chain_(chunks, (a) => Array.from(f(a))));
export const mapConcat = (f: (a: A) => Iterable) => (stream: Stream) => mapConcat_(stream, f);
export const mapConcatChunk_ = (stream: Stream, f: (a: A) => ReadonlyArray) =>
mapChunks_(stream, (chunks) => A.chain_(chunks, f));
export const mapConcatChunk = (f: (a: A) => ReadonlyArray) => (stream: Stream) =>
mapConcatChunk_(stream, f);
export const mapConcatChunkM_ = (
stream: Stream,
f: (a: A) => T.Task>
) => pipe(stream, mapM(f), mapConcatChunk(identity));
export const mapConcatChunkM = (f: (a: A) => T.Task>) => (
stream: Stream
) => mapConcatChunkM_(stream, f);
export const mapConcatM_ = (stream: Stream, f: (a: A) => T.Task>) =>
pipe(
stream,
mapConcatChunkM((a) => T.map_(f(a), (_) => Array.from(_)))
);
/**
* Maps over elements of the stream with the specified effectful function,
* executing up to `n` invocations of `f` concurrently. Transformed elements
* will be emitted in the original order.
*/
export const mapMPar_ = (n: number) => (
stream: Stream,
f: (a: A) => T.Task
): Stream =>
new Stream(
pipe(
M.do,
M.bindS("out", () => T.toManaged()(XQ.makeBounded, B>>(n))),
M.bindS("errorSignal", () => T.toManaged()(XP.make())),
M.bindS("permits", () => T.toManaged()(Semaphore.makeSemaphore(n))),
M.tap(({ errorSignal, out, permits }) =>
pipe(
stream,
foreachManaged((a) =>
pipe(
T.do,
T.bindS("p", () => XP.make()),
T.bindS("latch", () => XP.make()),
T.tap(({ p }) => out.offer(pipe(p, XP.await, T.first(O.some)))),
T.tap(({ latch, p }) =>
pipe(
latch,
// Make sure we start evaluation before moving on to the next element
XP.succeed(undefined),
T.chain(() =>
pipe(
errorSignal,
XP.await,
// Interrupt evaluation if another task fails
T.raceFirst(f(a)),
// Notify other tasks of a failure
T.tapCause((e) => pipe(errorSignal, XP.halt(e))),
// Transfer the result to the consuming stream
T.to(p)
)
),
Semaphore.withPermit(permits),
T.fork
)
),
T.tap(({ latch }) => XP.await(latch)),
T.asUnit
)
),
M.foldCauseM(
(c) => T.toManaged()(out.offer(Pull.halt(c))),
() =>
pipe(
Semaphore.withPermits(n)(permits)(T.unit()),
T.chain(() => out.offer(Pull.end)),
T.toManaged()
)
),
M.fork
)
),
M.map(({ out }) =>
pipe(
out.take,
T.flatten,
T.map((o) => [o])
)
)
)
);
/**
* Maps over elements of the stream with the specified effectful function,
* executing up to `n` invocations of `f` concurrently. Transformed elements
* will be emitted in the original order.
*/
export const mapMPar = (n: number) => (f: (a: A) => T.Task) => (
stream: Stream
) => mapMPar_(n)(stream, f);
/**
* Creates a stream from an asynchronous callback that can be called multiple times
* The registration of the callback itself returns a task. The optionality of the
* error type `E` can be used to signal the end of the stream, by setting it to `None`.
*/
export const asyncTask = (
register: (
cb: (
next: T.Task, ReadonlyArray>,
offerCb?: (e: Exit) => void
) => T.IO>
) => T.Task,
outputBuffer = 16
): Stream =>
pipe(
M.do,
M.bindS("output", () => pipe(XQ.makeBounded>(outputBuffer), T.toManaged())),
M.bindS("runtime", () => pipe(T.runtime(), T.toManaged())),
M.tap(({ output, runtime }) =>
T.toManaged()(
register((k, cb) => pipe(Take.fromPull(k), T.chain(output.offer), (x) => runtime.runCancel(x, cb)))
)
),
M.bindS("done", () => XR.makeManagedRef(false)),
M.letS("pull", ({ done, output }) =>
pipe(
done.get,
T.chain((b) =>
b
? Pull.end
: pipe(
output.take,
T.chain(Take.done),
T.onError(() =>
pipe(
done.set(true),
T.chain(() => output.shutdown)
)
)
)
)
)
),
M.map(({ pull }) => pull),
managed,
chain(repeatTaskChunkOption)
);