import type { Driver } from "@effect/core/io/Schedule/Driver" import { concreteStream, StreamInternal } from "@effect/core/stream/Stream/operations/_internal/StreamInternal" /** * Repeats each element of the stream using the provided schedule. When the * schedule is finished, then the output of the schedule will be emitted into * the stream. Repetitions are done in addition to the first execution, which * means using `Schedule.recurs(1)` actually results in the original effect, * plus an additional recurrence, for a total of two repetitions of each value * in the stream. * * This function accepts two conversion functions, which allow the output of * this stream and the output of the provided schedule to be unified into a * single type. For example, `Either` or similar data type. * * @tsplus static effect/core/stream/Stream.Aspects repeatElementsWith * @tsplus pipeable effect/core/stream/Stream repeatElementsWith */ export function repeatElementsWith( schedule: Schedule, f: (a: A) => C1, g: (b: B) => C2 ) { return (self: Stream): Stream => { concreteStream(self) return new StreamInternal( self.channel >> Channel.unwrap( schedule.driver .map((driver) => { const loop: Channel< R | R2, E, Chunk, unknown, E, Chunk, void > = Channel.readWith( (chunk: Chunk) => feed(loop, driver, f, g, chunk), (err) => Channel.fail(err), () => Channel.unit ) return loop }) ) ) } } function feed( loop: Channel, unknown, E, Chunk, void>, driver: Driver, f: (a: A) => C1, g: (b: B) => C2, input: Chunk ): Channel, unknown, E, Chunk, void> { return input.head.fold( loop, (a) => Channel.write(Chunk.single(f(a))).flatMap(() => step(loop, driver, f, g, input.drop(1), a) ) ) } function step( loop: Channel, unknown, E, Chunk, void>, driver: Driver, f: (a: A) => C1, g: (b: B) => C2, input: Chunk, value: A ): Channel, unknown, E, Chunk, void> { const advance = driver .next(value) .as( Channel.write(Chunk.single(f(value))).flatMap(() => step(loop, driver, f, g, input, value) ) ) const reset: Effect< R | R2, never, Channel, unknown, E, Chunk, void> > = driver.last .orDie .tap(() => driver.reset) .map( (b) => Channel.write(Chunk.single(g(b))).flatMap(() => feed(loop, driver, f, g, input) ) ) return Channel.unwrap(advance | reset) }