import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" import { RingBufferNew } from "@effect/core/support/RingBufferNew" /** * Takes the last specified number of elements from this stream. * * @tsplus static effect/core/stream/Stream.Aspects takeRight * @tsplus pipeable effect/core/stream/Stream takeRight */ export function takeRight(n: number) { return (self: Stream): Stream => { if (n <= 0) { return Stream.empty } concreteStream(self) return new StreamInternal( Channel.unwrap( Effect.sync(new RingBufferNew(n)).map((queue) => { const reader: Channel< never, E, Chunk, unknown, E, Chunk, void > = Channel.readWith( (input: Chunk) => { input.forEach((a) => queue.put(a)) return reader }, (err) => Channel.fail(err), () => Channel.write(queue.toChunk()).flatMap(() => Channel.unit) ) return self.channel >> reader }) ) ) } }