import * as A from "../../Array";
import type { Either } from "../../Either";
import * as E from "../../Either";
import { flow, pipe } from "../../Function";
import type { Option } from "../../Option";
import * as O from "../../Option";
import { sequential } from "../ExecutionStrategy";
import type { Exit } from "../Exit";
import * as C from "../Exit/Cause";
import * as M from "../Managed";
import * as T from "../Task";
import type { XQueue } from "../XQueue";
import * as XQ from "../XQueue";
import * as XR from "../XRef";
import * as Pull from "./internal/Pull";
import * as Take from "./internal/Take";
import type { Transducer } from "./internal/Transducer";
import type { EIO, IO, RIO } from "./model";
import { Stream } from "./model";
/**
* Creates a stream from an array of values
*/
export const fromArray = (c: ReadonlyArray): IO =>
new Stream(
pipe(
T.do,
T.bindS("doneRef", () => XR.makeRef(false)),
T.letS("pull", ({ doneRef }) =>
pipe(
doneRef,
XR.modify, ReadonlyArray>, boolean>((done) =>
done || c.length === 0 ? [Pull.end, true] : [T.pure(c), true]
),
T.flatten
)
),
T.map(({ pull }) => pull),
T.toManaged()
)
);
/**
* Creates a stream from a task producing a value of type `A` or an empty Stream
*/
export const fromTaskOption = (fa: T.Task, A>): Stream =>
new Stream(
pipe(
M.do,
M.bindS("doneRef", () => pipe(XR.makeRef(false), T.toManaged())),
M.letS("pull", ({ doneRef }) =>
pipe(
doneRef,
XR.modify((b) =>
b
? [Pull.end, true]
: [
pipe(
fa,
T.map((a) => [a])
),
true
]
),
T.flatten
)
),
M.map(({ pull }) => pull)
)
);
/**
* Creates a stream from a task producing a value of type `A`
*/
export const fromTask = (ef: T.Task): Stream => pipe(ef, T.first(O.some), fromTaskOption);
/**
* Creates a stream from an asynchronous callback that can be called multiple times.
* The registration of the callback can possibly return the stream synchronously.
* The optionality of the error type `E` can be used to signal the end of the stream,
* by setting it to `None`.
*/
export const asyncOption = (
register: (
resolve: (
next: T.Task, ReadonlyArray>,
offerCb?: (e: Exit) => void
) => T.IO>
) => Option>,
outputBuffer = 16
): Stream =>
new Stream(
pipe(
M.do,
M.bindS("output", () => pipe(XQ.makeBounded>(outputBuffer), T.toManaged())),
M.bindS("runtime", () => pipe(T.runtime(), T.toManaged())),
M.bindS("maybeStream", ({ output, runtime }) =>
M.total(() =>
register((k, cb) => pipe(Take.fromPull(k), T.chain(output.offer), (x) => runtime.runCancel(x, cb)))
)
),
M.bindS("pull", ({ maybeStream, output }) =>
O.fold_(
maybeStream,
() =>
pipe(
M.do,
M.bindS("done", () => XR.makeManagedRef(false)),
M.map(({ done }) =>
pipe(
done.get,
T.chain((b) =>
b
? Pull.end
: pipe(
output.take,
T.chain(Take.done),
T.onError(() =>
pipe(
done.set(true),
T.chain(() => output.shutdown)
)
)
)
)
)
)
),
(s) =>
pipe(
output.shutdown,
T.toManaged(),
M.chain(() => s.proc)
)
)
),
M.map(({ pull }) => pull)
)
);
/**
* Creates a stream from an asynchronous callback that can be called multiple times.
* The optionality of the error type `E` can be used to signal the end of the stream,
* by setting it to `None`.
*/
export const async = (
register: (
resolve: (
next: T.Task, ReadonlyArray>,
offerCb?: (e: Exit) => void
) => T.IO>
) => void,
outputBuffer = 16
): Stream =>
asyncOption((cb) => {
register(cb);
return O.none();
}, outputBuffer);
/**
* Creates a stream from an asynchronous callback that can be called multiple times.
* The registration of the callback returns either a canceler or synchronously returns a stream.
* The optionality of the error type `E` can be used to signal the end of the stream, by
* setting it to `None`.
*/
export const asyncInterruptEither = (
register: (
resolve: (
next: T.Task, ReadonlyArray>,
offerCb?: (e: Exit) => void
) => T.IO>
) => Either, Stream>,
outputBuffer = 16
): Stream =>
new Stream(
pipe(
M.do,
M.bindS("output", () => pipe(XQ.makeBounded>(outputBuffer), T.toManaged())),
M.bindS("runtime", () => pipe(T.runtime(), T.toManaged())),
M.bindS("eitherStream", ({ output, runtime }) =>
M.total(() =>
register((k, cb) => pipe(Take.fromPull(k), T.chain(output.offer), (x) => runtime.runCancel(x, cb)))
)
),
M.bindS("pull", ({ eitherStream, output }) =>
E.fold_(
eitherStream,
(canceler) =>
pipe(
M.do,
M.bindS("done", () => XR.makeManagedRef(false)),
M.map(({ done }) =>
pipe(
done.get,
T.chain((b) =>
b
? Pull.end
: pipe(
output.take,
T.chain(Take.done),
T.onError(() =>
pipe(
done.set(true),
T.chain(() => output.shutdown)
)
)
)
)
)
),
M.ensuring(canceler)
),
(s) =>
pipe(
output.shutdown,
T.toManaged(),
M.chain(() => s.proc)
)
)
),
M.map(({ pull }) => pull)
)
);
/**
* Creates a stream from an asynchronous callback that can be called multiple times.
* The registration of the callback returns either a canceler or synchronously returns a stream.
* The optionality of the error type `E` can be used to signal the end of the stream, by
* setting it to `None`.
*/
export const asyncInterrupt = (
register: (
cb: (
next: T.Task, ReadonlyArray>,
offerCb?: (e: Exit) => void
) => T.IO>
) => T.Canceler,
outputBuffer = 16
): Stream => asyncInterruptEither((cb) => E.left(register(cb)), outputBuffer);
export const fail = (e: E): EIO => fromTask(T.fail(e));
/**
* Creates a stream from a task producing chunks of `A` values until it fails with None.
*/
export const repeatTaskChunkOption = (ef: T.Task, ReadonlyArray>): Stream =>
new Stream(
pipe(
M.do,
M.bindS("done", () => XR.makeManagedRef(false)),
M.letS("pull", ({ done }) =>
pipe(
done.get,
T.chain((b) =>
b
? Pull.end
: pipe(
ef,
T.tapError(
O.fold(
() => done.set(true),
() => T.unit()
)
)
)
)
)
),
M.map(({ pull }) => pull)
)
);
const ensuringFirst_ = (stream: Stream, finalizer: T.Task) =>
new Stream(M.ensuringFirst_(stream.proc, finalizer));
/**
* Creates a stream from a task producing values of type `A` until it fails with None.
*/
export const repeatTaskOption: (fa: T.Task, A>) => Stream = flow(
T.map(A.pure),
repeatTaskChunkOption
);
/**
* Creates a stream from an `XQueue` of values
*/
export const fromArrayXQueue = (queue: XQueue>): Stream =>
repeatTaskChunkOption(
T.catchAllCause_(queue.take, (c) =>
T.chain_(queue.isShutdown, (down) => (down && C.isInterrupt(c) ? Pull.end : Pull.halt(c)))
)
);
/**
* Creates a stream from an `XQueue` of values. The queue will be shutdown once the stream is closed.
*/
export const fromArrayXQueueWithShutdown = (
queue: XQueue>
): Stream => ensuringFirst_(fromArrayXQueue(queue), queue.shutdown);
/**
* Creates a stream from an `XQueue` of values
*/
export const fromXQueue = (queue: XQueue): Stream =>
pipe(
queue,
XQ.takeBetween(1, Number.MAX_SAFE_INTEGER),
T.catchAllCause((c) =>
T.chain_(queue.isShutdown, (down) => (down && C.isInterrupt(c) ? Pull.end : Pull.halt(c)))
),
repeatTaskChunkOption
);
/**
* Creates a stream from an `XQueue` of values. The queue will be shutdown once the stream is closed.
*/
export const fromXQueueWithShutdown = (queue: XQueue): Stream =>
ensuringFirst_(fromXQueue(queue), queue.shutdown);
/**
* The `Stream` that dies with the error.
*/
export const die = (e: unknown): IO => fromTask(T.die(e));
/**
* The stream that dies with an exception described by `message`.
*/
export const dieMessage = (message: string) => fromTask(T.dieMessage(message));
/**
* The empty stream
*/
export const empty: IO = new Stream(M.succeed(Pull.end));
/**
* The infinite stream of iterative function application: a, f(a), f(f(a)), f(f(f(a))), ...
*/
export const iterate = (a: A, f: (a: A) => A): IO =>
new Stream(pipe(XR.makeRef(a), T.toManaged(), M.map(flow(XR.getAndUpdate(f), T.map(A.pure)))));
export const suspend = (thunk: () => Stream): Stream =>
new Stream(M.suspend(() => thunk().proc));
/**
* Creates a single-valued stream from a managed resource
*/
export const managed = (ma: M.Managed): Stream =>
new Stream(
pipe(
M.do,
M.bindS("doneRef", () => XR.makeManagedRef(false)),
M.bindS("finalizer", () => M.makeManagedReleaseMap(sequential())),
M.letS("pull", ({ doneRef, finalizer }) =>
T.uninterruptibleMask(({ restore }) =>
pipe(
doneRef.get,
T.chain((done) =>
done
? Pull.end
: pipe(
T.do,
T.bindS("a", () =>
pipe(
ma.task,
T.map(([_, __]) => __),
T.local((r: R) => [r, finalizer] as [R, M.ReleaseMap]),
restore,
T.onError(() => doneRef.set(true))
)
),
T.tap(() => doneRef.set(true)),
T.map(({ a }) => [a]),
T.first(O.some)
)
)
)
)
),
M.map(({ pull }) => pull)
)
);
/**
* Creates a one-element stream that never fails and executes the finalizer when it ends.
*/
export const finalizer = (finalizer: T.RIO): RIO => bracket((_) => finalizer)(T.unit());
/**
* Applies an aggregator to the stream, which converts one or more elements
* of type `A` into elements of type `B`.
*/
export const aggregate_ = (stream: Stream, transducer: Transducer) =>
new Stream(
pipe(
M.do,
M.bindS("pull", () => stream.proc),
M.bindS("push", () => transducer.push),
M.bindS("done", () => XR.makeManagedRef(false)),
M.letS("run", ({ done, pull, push }) =>
pipe(
done.get,
T.chain((b) =>
b
? Pull.end
: pipe(
pull,
T.foldM(
O.fold(
() =>
pipe(
done.set(true),
T.chain(() => pipe(push(O.none()), T.asSomeError))
),
(e) => Pull.fail(e)
),
(os) => pipe(push(O.some(os)), T.asSomeError)
)
)
)
)
),
M.map(({ run }) => run)
)
);
/**
* Applies an aggregator to the stream, which converts one or more elements
* of type `A` into elements of type `B`.
*/
export const aggregate = (transducer: Transducer) => (stream: Stream) =>
aggregate_(stream, transducer);
/**
* Creates a new `Stream` from a managed effect that yields chunks.
* The effect will be evaluated repeatedly until it fails with a `None`
* (to signify stream end) or a `Some` (to signify stream failure).
*
* The stream evaluation guarantees proper acquisition and release of the
* `Managed`.
*/
export const apply = (proc: M.Managed, ReadonlyArray>>) => new Stream(proc);
/**
* Creates a stream from a single value that will get cleaned up after the
* stream is consumed
*/
export const bracket_ = (acquire: T.Task, release: (a: A) => T.Task) =>
managed(M.make_(acquire, release));
/**
* Creates a stream from a single value that will get cleaned up after the
* stream is consumed
*/
export const bracket = (release: (a: A) => T.Task) => (acquire: T.Task) =>
bracket_(acquire, release);
/**
* Creates a stream from a single value that will get cleaned up after the
* stream is consumed
*/
export const bracketExit_ = (
acquire: T.Task,
release: (a: A, exit: Exit) => T.Task
) => managed(M.makeExit_(acquire, release));
/**
* Creates a stream from a single value that will get cleaned up after the
* stream is consumed
*/
export const bracketExit = (release: (a: A, exit: Exit) => T.Task) => <
R,
E
>(
acquire: T.Task
) => bracketExit_(acquire, release);