/** * Creates a channel backed by a buffer. When the buffer is empty, the channel * will simply passthrough its input as output. However, when the buffer is * non-empty, the value inside the buffer will be passed along as output. * * @tsplus static effect/core/stream/Channel.Ops buffer */ export function buffer( empty: InElem, isEmpty: Predicate, ref: Ref ): Channel { return Channel.suspend(bufferInternal(empty, isEmpty, ref)) } function bufferInternal( empty: InElem, isEmpty: Predicate, ref: Ref ): Channel { return Channel.unwrap( ref.modify(( value ): readonly [Channel, InElem] => isEmpty(value) ? [ Channel.readWith( (inElem) => Channel.write(inElem).flatMap(() => bufferInternal(empty, isEmpty, ref) ), (inErr) => Channel.fail(inErr), (inDone) => Channel.succeed(inDone) ), value ] : [ Channel.write(value).flatMap(() => bufferInternal(empty, isEmpty, ref) ), empty ] ) ) }