import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" type DecodingChannel = Channel< R, E, Chunk, unknown, E, Chunk, unknown > export function utfDecodeDetectingBom( bomSize: number, processBom: ( bom: Chunk ) => readonly [Chunk, (stream: Stream) => Stream] ) { return (stream: Stream): Stream => { concreteStream(stream) return new StreamInternal( Channel.suspend( stream.channel >> lookingForBom(Chunk.empty(), bomSize, processBom) ) ) } } function passThrough( decodingPipeline: (stream: Stream) => Stream ): DecodingChannel { return Channel.readWith( (received: Chunk) => { const stream = decodingPipeline(Stream.fromChunk(received)) concreteStream(stream) return stream.channel.flatMap(() => passThrough(decodingPipeline)) }, (err) => Channel.fail(err), () => Channel.unit ) } function lookingForBom( buffer: Chunk, bomSize: number, processBom: ( bom: Chunk ) => readonly [Chunk, (stream: Stream) => Stream] ): DecodingChannel { return Channel.readWith( (received: Chunk) => { const data = buffer + received if (data.length >= bomSize) { const [bom, rest] = data.splitAt(bomSize) const [dataWithoutBom, decodingPipeline] = processBom(bom) const stream = decodingPipeline(Stream.fromChunk(dataWithoutBom + rest)) concreteStream(stream) return stream.channel.flatMap(() => passThrough(decodingPipeline)) } return lookingForBom(data, bomSize, processBom) }, (err) => Channel.fail(err), () => { if (buffer.isEmpty) { return Channel.unit } const [dataWithoutBom, decodingPipeline] = processBom(buffer) const stream = decodingPipeline(Stream.fromChunk(dataWithoutBom)) concreteStream(stream) return stream.channel.flatMap(() => passThrough(decodingPipeline)) } ) }