import * as Cause from "../../Cause.js" import * as Chunk from "../../Chunk.js" import * as Effect from "../../Effect.js" import * as Exit from "../../Exit.js" import { pipe } from "../../Function.js" import * as Option from "../../Option.js" import type * as Queue from "../../Queue.js" import type * as Scheduler from "../../Scheduler.js" import type * as Emit from "../../StreamEmit.js" /** @internal */ export const make = ( emit: (f: Effect.Effect, Option.Option, R>) => Promise ): Emit.Emit => { const ops: Emit.EmitOps = { chunk(this: Emit.Emit, as: Chunk.Chunk) { return this(Effect.succeed(as)) }, die(this: Emit.Emit, defect: Err) { return this(Effect.die(defect)) }, dieMessage(this: Emit.Emit, message: string) { return this(Effect.dieMessage(message)) }, done(this: Emit.Emit, exit: Exit.Exit) { return this(Effect.suspend(() => Exit.mapBoth(exit, { onFailure: Option.some, onSuccess: Chunk.of }))) }, end(this: Emit.Emit) { return this(Effect.fail(Option.none())) }, fail(this: Emit.Emit, e: E) { return this(Effect.fail(Option.some(e))) }, fromEffect(this: Emit.Emit, effect: Effect.Effect) { return this(Effect.mapBoth(effect, { onFailure: Option.some, onSuccess: Chunk.of })) }, fromEffectChunk(this: Emit.Emit, effect: Effect.Effect, E, R>) { return this(pipe(effect, Effect.mapError(Option.some))) }, halt(this: Emit.Emit, cause: Cause.Cause) { return this(Effect.failCause(pipe(cause, Cause.map(Option.some)))) }, single(this: Emit.Emit, value: A) { return this(Effect.succeed(Chunk.of(value))) } } return Object.assign(emit, ops) } /** @internal */ export const makePush = ( queue: Queue.Queue | Exit.Exit>, scheduler: Scheduler.Scheduler ): Emit.EmitOpsPush => { let finished = false let buffer: Array = [] let running = false function array(items: ReadonlyArray) { if (finished) return false if (items.length <= 50_000) { buffer.push.apply(buffer, items as Array) } else { for (let i = 0; i < items.length; i++) { buffer.push(items[0]) } } if (!running) { running = true scheduler.scheduleTask(flush, 0) } return true } function flush() { running = false if (buffer.length > 0) { queue.unsafeOffer(buffer) buffer = [] } } function done(exit: Exit.Exit) { if (finished) return finished = true if (exit._tag === "Success") { buffer.push(exit.value) } flush() queue.unsafeOffer(exit._tag === "Success" ? Exit.void : exit) } return { single(value: A) { if (finished) return false buffer.push(value) if (!running) { running = true scheduler.scheduleTask(flush, 0) } return true }, array, chunk(chunk) { return array(Chunk.toReadonlyArray(chunk)) }, done, end() { if (finished) return finished = true flush() queue.unsafeOffer(Exit.void) }, halt(cause: Cause.Cause) { return done(Exit.failCause(cause)) }, fail(error: E) { return done(Exit.fail(error)) }, die(defect: Err): void { return done(Exit.die(defect)) }, dieMessage(message: string): void { return done(Exit.die(new Error(message))) } } }