import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Splits elements on a delimiter and transforms the splits into desired * output. * * @tsplus static effect/core/stream/Stream.Aspects splitOnChunk * @tsplus pipeable effect/core/stream/Stream splitOnChunk */ export function splitOnChunk(delimiter: Chunk) { return (self: Stream): Stream> => { concreteStream(self) return new StreamInternal(self.channel >> next(delimiter, Maybe.none, 0)) } } function next( delimiter: Chunk, leftover: Maybe>, delimiterIndex: number ): Channel, unknown, E, Chunk>, unknown> { return Channel.readWithCause( (inputChunk: Chunk) => { const buffer = Chunk.builder>() const [carry, delimiterCursor] = inputChunk.reduce( [leftover.getOrElse(Chunk.empty()), delimiterIndex] as const, ([carry, delimiterCursor], a) => { const concatenated = carry.append(a) if ( delimiterCursor < delimiter.length && a === delimiter.unsafeGet(delimiterCursor) ) { if (delimiterCursor + 1 === delimiter.length) { buffer.append(concatenated.take(concatenated.length - delimiter.length)) return [Chunk.empty(), 0] } return [concatenated, delimiterCursor + 1] } return [concatenated, a === delimiter.unsafeHead ? 1 : 0] } ) return ( Channel.write(buffer.build()).flatMap(() => next( delimiter, carry.isNonEmpty ? Maybe.some(carry) : Maybe.none, delimiterCursor ) ) ) }, (cause) => leftover.fold( Channel.failCause(cause), (chunk) => Channel.write(Chunk.single(chunk)).flatMap(() => Channel.failCause(cause)) ), (done) => leftover.fold( Channel.succeed(done), (chunk) => Channel.write(Chunk.single(chunk)).flatMap(() => Channel.succeed(done)) ) ) }