import {
concreteStream,
StreamInternal
} from "@effect/core/stream/Stream/operations/_internal/StreamInternal"
/**
* Splits elements on a delimiter and transforms the splits into desired
* output.
*
* @tsplus static effect/core/stream/Stream.Aspects splitOnChunk
* @tsplus pipeable effect/core/stream/Stream splitOnChunk
*/
export function splitOnChunk(delimiter: Chunk) {
return (self: Stream): Stream> => {
concreteStream(self)
return new StreamInternal(self.channel >> next(delimiter, Maybe.none, 0))
}
}
function next(
delimiter: Chunk,
leftover: Maybe>,
delimiterIndex: number
): Channel, unknown, E, Chunk>, unknown> {
return Channel.readWithCause(
(inputChunk: Chunk) => {
const buffer = Chunk.builder>()
const [carry, delimiterCursor] = inputChunk.reduce(
[leftover.getOrElse(Chunk.empty()), delimiterIndex] as const,
([carry, delimiterCursor], a) => {
const concatenated = carry.append(a)
if (
delimiterCursor < delimiter.length &&
a === delimiter.unsafeGet(delimiterCursor)
) {
if (delimiterCursor + 1 === delimiter.length) {
buffer.append(concatenated.take(concatenated.length - delimiter.length))
return [Chunk.empty(), 0]
}
return [concatenated, delimiterCursor + 1]
}
return [concatenated, a === delimiter.unsafeHead ? 1 : 0]
}
)
return (
Channel.write(buffer.build()).flatMap(() =>
next(
delimiter,
carry.isNonEmpty ? Maybe.some(carry) : Maybe.none,
delimiterCursor
)
)
)
},
(cause) =>
leftover.fold(
Channel.failCause(cause),
(chunk) => Channel.write(Chunk.single(chunk)).flatMap(() => Channel.failCause(cause))
),
(done) =>
leftover.fold(
Channel.succeed(done),
(chunk) => Channel.write(Chunk.single(chunk)).flatMap(() => Channel.succeed(done))
)
)
}