import { SinkInternal } from "@effect/core/stream/Sink/operations/_internal/SinkInternal" /** * Creates a sink from a chunk processing function. * * @tsplus static effect/core/stream/Sink.Ops fromPush */ export function fromPush( push: Effect< R | Scope, never, (input: Maybe>) => Effect, Chunk], void> > ): Sink { return new SinkInternal(Channel.unwrapScoped(push.map(pull))) } function pull( push: (option: Maybe>) => Effect, Chunk], void> ): Channel, unknown, E, Chunk, Z> { return Channel.readWith( (input: Chunk) => Channel.fromEffect(push(Maybe.some(input))).foldChannel( ([either, leftovers]) => either.fold( (e) => Channel.write(leftovers).flatMap(() => Channel.fail(e)), (z) => Channel.write(leftovers).flatMap(() => Channel.succeed(z)) ), () => pull(push) ), (err) => Channel.fail(err), () => Channel.fromEffect(push(Maybe.none)).foldChannel( ([either, leftovers]) => either.fold( (e) => Channel.write(leftovers).flatMap(() => Channel.fail(e)), (z) => Channel.write(leftovers).flatMap(() => Channel.succeed(z)) ), () => Channel.fromEffect(Effect.dieMessage("empty sink")) ) ) }