import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" interface Pipeline { (stream: Stream): Stream } /** * Reads the first `n` values from the stream and uses them to choose the * pipeline that will be used for the remainder of the stream. * * @tsplus static effect/core/stream/Stream.Aspects branchAfter * @tsplus pipeable effect/core/stream/Stream branchAfter */ export function branchAfter( n: number, f: (output: Chunk) => Pipeline ) { return (self: Stream): Stream => { concreteStream(self) return new StreamInternal( Channel.suspend(self.channel >> collecting(Chunk.empty(), n, f)) ) } } function collecting( buffer: Chunk, n: number, f: (output: Chunk) => Pipeline ): Channel, unknown, E | E2, Chunk, unknown> { return Channel.readWithCause( (chunk: Chunk) => { const newBuffer = buffer + chunk if (newBuffer.length >= n) { const [inputs, inputs1] = newBuffer.splitAt(n) const pipeline = f(inputs) const stream = pipeline(Stream.fromChunk(inputs1)) concreteStream(stream) return stream.channel.flatMap(() => emitting(pipeline)) } return collecting(newBuffer, n, f) }, (cause) => Channel.failCauseSync(cause), () => { if (buffer.isEmpty) { return Channel.unit } const pipeline = f(buffer) const stream = pipeline(Stream.empty) concreteStream(stream) return stream.channel } ) } function emitting( pipeline: Pipeline ): Channel, unknown, E | E2, Chunk, unknown> { return Channel.readWithCause( (chunk: Chunk) => { const stream = pipeline(Stream.fromChunk(chunk)) concreteStream(stream) return stream.channel.flatMap(() => emitting(pipeline)) }, (cause) => Channel.failCause(cause), () => Channel.unit ) }