import { pipe } from '@effect-ts/core' import * as Chunk from '@effect-ts/core/Collections/Immutable/Chunk' import * as Tuple from '@effect-ts/core/Collections/Immutable/Tuple' import * as T from '@effect-ts/core/Effect' import * as S from '@effect-ts/core/Effect/Experimental/Stream' import * as E from '@effect-ts/core/Either' export * from '@effect-ts/core/Effect/Experimental/Stream' export const streamTapSkipFirst = (f: (o: O) => T.Effect) => (stream: S.Stream): S.Stream => pipe( stream, S.mapAccumEffect(0, (x, o) => T.gen(function* (_) { if (x > 0) { yield* _(f(o)) } return Tuple.tuple(x + 1, o) }), ), ) /** Note this function doesn't currently work if the first value is a `E.left` value */ export const tapSkipFirstRight = (f: (o: A1) => T.Effect) => (stream: S.Stream>): S.Stream> => pipe( stream, S.zipWithIndex, S.tap(({ tuple: [val, index] }) => (index === 0 || E.isLeft(val) ? T.succeed(null) : f(val.right))), S.map(({ tuple: [val] }) => val), ) export const tapRight = (f: (o: A1) => T.Effect) => (stream: S.Stream>): S.Stream> => pipe( stream, S.tap((val) => (E.isLeft(val) ? T.succeed(null) : f(val.right))), ) export const tapLeft = (f: (e: EE1) => T.Effect) => (stream: S.Stream>): S.Stream> => pipe( stream, S.tap((val) => (E.isLeft(val) ? f(val.left) : T.succeed(null))), ) export const tapRightEither = (f: (o: A1) => T.Effect>) => (stream: S.Stream>): S.Stream> => pipe( stream, S.tap((val) => (E.isLeft(val) ? T.succeed(null) : f(val.right))), ) export const startWith = (...values: A2[]) => (stream: S.Stream): S.Stream => S.merge_(stream, S.fromChunk(Chunk.from(values))) export const startWithRight = (value: A2) => (stream: S.Stream>): S.Stream> => S.merge_(stream, S.fromIterable([E.right(value)])) export const chainMapEitherRight = (mapRight: (a1: A1) => S.Stream>) => (stream: S.Stream>): S.Stream> => { return S.chain_( stream, E.fold( (_left) => stream as any, (right) => mapRight(right), ), ) } export const chainSwitchMapEitherRight = (mapRight: (a1: A1) => S.Stream>) => (stream: S.Stream>): S.Stream> => { return S.chainParSwitch, E.Either>( E.fold( (_left) => stream as any, (right) => mapRight(right), ), 1, )(stream) } export const mapEffectEitherRight = (mapRight: (a1: A1) => T.Effect>) => (stream: S.Stream>): S.Stream> => { return S.mapEffect_( stream, E.fold( (left) => T.succeed(E.leftW(left)), (right) => mapRight(right), ), ) } export const mapEitherLeft = (mapLeft: (e1: EE1) => EE2) => (stream: S.Stream>): S.Stream> => { return S.map_(stream, E.mapLeft(mapLeft)) } export const mapEitherRight = (mapRight: (a1: A1) => A2) => (stream: S.Stream>): S.Stream> => { return S.map_(stream, E.map(mapRight)) } export const fromValue = (a: A) => S.fromEffect(T.succeed(a))