import type { Charset } from "@effect/core/stream/Stream/operations/_internal/Charset" import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" import { stringChunkFrom } from "@effect/core/stream/Stream/operations/_internal/stringChunkFrom" const emptyByteChunk = Chunk.empty() const emptyStringChunk = Chunk.empty() export function utfDecodeFixedLength( charset: Charset, fixedLength: number ) { return (stream: Stream): Stream => { concreteStream(stream) return new StreamInternal( Channel.suspend( stream.channel >> readThenTransduce(emptyByteChunk, charset, fixedLength) ) ) } } function readThenTransduce( buffer: Chunk, charset: Charset, fixedLength: number ): Channel, unknown, E, Chunk, unknown> { return Channel.readWith( (received: Chunk) => { const [string, buffered] = process(buffer, received, charset, fixedLength) return ( Channel.write(string).flatMap(() => readThenTransduce(buffered, charset, fixedLength)) ) }, (err) => Channel.fail(err), () => buffer.isEmpty ? Channel.unit : Channel.write(stringChunkFrom(buffer, charset)) ) } function process( buffered: Chunk, received: Chunk, charset: Charset, fixedLength: number ): readonly [Chunk, Chunk] { const bytes = buffered + received const remainder = bytes.length % fixedLength if (remainder === 0) { return [stringChunkFrom(bytes, charset), emptyByteChunk] } if (bytes.length > fixedLength) { const [fullChunk, rest] = bytes.splitAt(bytes.length - remainder) return [stringChunkFrom(fullChunk, charset), rest] } return [emptyStringChunk, bytes.materialize] }