export function bufferSignal( effect: Effect, Deferred]>>, channel: Channel, unknown> ): Channel, void> { return Channel.unwrapScoped( Do(($) => { const queue = $(effect) const start = $(Deferred.make()) $(start.succeed(undefined)) const ref = $(Ref.make(start)) $((channel >> producer(queue, ref)).runScoped.forkScoped) return consumer(queue) }) ) } function producer( queue: Queue, Deferred]>, ref: Ref> ): Channel, unknown, never, never, unknown> { return Channel.readWith( (input: Chunk) => Channel.fromEffect( Do(($) => { const deferred = $(Deferred.make()) const added = $(queue.offer([Take.chunk(input), deferred] as const)) $(Effect.when(added, ref.set(deferred))) }) ).zipRight(producer(queue, ref)), (err) => terminate(queue, ref, Take.fail(err)), () => terminate(queue, ref, Take.end) ) } function consumer( queue: Queue, Deferred]> ): Channel, void> { const process: Channel< never, unknown, unknown, unknown, E, Chunk, void > = Channel.fromEffect(queue.take).flatMap( ([take, deferred]) => Channel.fromEffect(deferred.succeed(undefined)).flatMap(() => take.fold( Channel.unit, (cause) => Channel.failCause(cause), (a) => Channel.write(a).flatMap(() => process) ) ) ) return process } function terminate( queue: Queue, Deferred]>, ref: Ref>, take: Take ): Channel, unknown, never, never, unknown> { return Channel.fromEffect( Do(($) => { const latch = $(ref.get) $(latch.await) const deferred = $(Deferred.make()) $(queue.offer([take, deferred])) $(ref.set(deferred)) $(deferred.await) }) ) }