import * as Cause from "../Cause.js" import type * as Channel from "../Channel.js" import * as Chunk from "../Chunk.js" import * as Deferred from "../Deferred.js" import * as Effect from "../Effect.js" import * as Effectable from "../Effectable.js" import * as Exit from "../Exit.js" import { dual, pipe } from "../Function.js" import type * as GroupBy from "../GroupBy.js" import * as Option from "../Option.js" import { pipeArguments } from "../Pipeable.js" import { hasProperty, type Predicate } from "../Predicate.js" import * as Queue from "../Queue.js" import * as Ref from "../Ref.js" import * as Scope from "../Scope.js" import type * as Stream from "../Stream.js" import type * as Take from "../Take.js" import type { NoInfer } from "../Types.js" import * as channel from "./channel.js" import * as channelExecutor from "./channel/channelExecutor.js" import * as core from "./core-stream.js" import * as stream from "./stream.js" import * as take from "./take.js" /** @internal */ const GroupBySymbolKey = "effect/GroupBy" /** @internal */ export const GroupByTypeId: GroupBy.GroupByTypeId = Symbol.for( GroupBySymbolKey ) as GroupBy.GroupByTypeId const groupByVariance = { /* c8 ignore next */ _R: (_: never) => _, /* c8 ignore next */ _E: (_: never) => _, /* c8 ignore next */ _K: (_: never) => _, /* c8 ignore next */ _V: (_: never) => _ } /** @internal */ export const isGroupBy = (u: unknown): u is GroupBy.GroupBy => hasProperty(u, GroupByTypeId) /** @internal */ export const evaluate = dual< ( f: (key: K, stream: Stream.Stream) => Stream.Stream, options?: { readonly bufferSize?: number | undefined } ) => (self: GroupBy.GroupBy) => Stream.Stream, ( self: GroupBy.GroupBy, f: (key: K, stream: Stream.Stream) => Stream.Stream, options?: { readonly bufferSize?: number | undefined } ) => Stream.Stream >( (args) => isGroupBy(args[0]), ( self: GroupBy.GroupBy, f: (key: K, stream: Stream.Stream) => Stream.Stream, options?: { readonly bufferSize?: number | undefined } ): Stream.Stream => stream.flatMap( self.grouped, ([key, queue]) => f(key, stream.flattenTake(stream.fromQueue(queue, { shutdown: true }))), { concurrency: "unbounded", bufferSize: options?.bufferSize ?? 16 } ) ) /** @internal */ export const filter = dual< (predicate: Predicate>) => (self: GroupBy.GroupBy) => GroupBy.GroupBy, (self: GroupBy.GroupBy, predicate: Predicate) => GroupBy.GroupBy >(2, (self: GroupBy.GroupBy, predicate: Predicate): GroupBy.GroupBy => make( pipe( self.grouped, stream.filterEffect((tuple) => { if (predicate(tuple[0])) { return pipe(Effect.succeed(tuple), Effect.as(true)) } return pipe(Queue.shutdown(tuple[1]), Effect.as(false)) }) ) )) /** @internal */ export const first = dual< (n: number) => (self: GroupBy.GroupBy) => GroupBy.GroupBy, (self: GroupBy.GroupBy, n: number) => GroupBy.GroupBy >(2, (self: GroupBy.GroupBy, n: number): GroupBy.GroupBy => make( pipe( stream.zipWithIndex(self.grouped), stream.filterEffect((tuple) => { const index = tuple[1] const queue = tuple[0][1] if (index < n) { return pipe(Effect.succeed(tuple), Effect.as(true)) } return pipe(Queue.shutdown(queue), Effect.as(false)) }), stream.map((tuple) => tuple[0]) ) )) /** @internal */ export const make = ( grouped: Stream.Stream>], E, R> ): GroupBy.GroupBy => ({ [GroupByTypeId]: groupByVariance, pipe() { return pipeArguments(this, arguments) }, grouped }) // Circular with Stream /** @internal */ export const groupBy = dual< ( f: (a: A) => Effect.Effect, options?: { readonly bufferSize?: number | undefined } ) => (self: Stream.Stream) => GroupBy.GroupBy, ( self: Stream.Stream, f: (a: A) => Effect.Effect, options?: { readonly bufferSize?: number | undefined } ) => GroupBy.GroupBy >( (args) => stream.isStream(args[0]), ( self: Stream.Stream, f: (a: A) => Effect.Effect, options?: { readonly bufferSize?: number | undefined } ): GroupBy.GroupBy => make( stream.unwrapScoped( Effect.gen(function*() { const decider = yield* Deferred.make<(key: K, value: V) => Effect.Effect>>() const output = yield* Effect.acquireRelease( Queue.bounded>], Option.Option>>( options?.bufferSize ?? 16 ), (queue) => Queue.shutdown(queue) ) const ref = yield* Ref.make>(new Map()) const add = yield* pipe( stream.mapEffectSequential(self, f), stream.distributedWithDynamicCallback( options?.bufferSize ?? 16, ([key, value]) => Effect.flatMap(Deferred.await(decider), (f) => f(key, value)), (exit) => Queue.offer(output, exit) ) ) yield* Deferred.succeed(decider, (key, _) => pipe( Ref.get(ref), Effect.map((map) => Option.fromNullable(map.get(key))), Effect.flatMap(Option.match({ onNone: () => Effect.flatMap(add, ([index, queue]) => Effect.zipRight( Ref.update(ref, (map) => map.set(key, index)), pipe( Queue.offer( output, Exit.succeed( [ key, mapDequeue(queue, (exit) => new take.TakeImpl(pipe( exit, Exit.map((tuple) => Chunk.of(tuple[1])) ))) ] as const ) ), Effect.as>((n: number) => n === index) ) )), onSome: (index) => Effect.succeed>((n: number) => n === index) })) )) return stream.flattenExitOption(stream.fromQueue(output, { shutdown: true })) }) ) ) ) /** @internal */ export const mapEffectOptions = dual< { ( f: (a: A) => Effect.Effect, options?: { readonly concurrency?: number | "unbounded" | undefined readonly unordered?: boolean | undefined } ): (self: Stream.Stream) => Stream.Stream ( f: (a: A) => Effect.Effect, options: { readonly key: (a: A) => K readonly bufferSize?: number | undefined } ): (self: Stream.Stream) => Stream.Stream }, { ( self: Stream.Stream, f: (a: A) => Effect.Effect, options?: { readonly concurrency?: number | "unbounded" | undefined readonly unordered?: boolean | undefined } ): Stream.Stream ( self: Stream.Stream, f: (a: A) => Effect.Effect, options: { readonly key: (a: A) => K readonly bufferSize?: number | undefined } ): Stream.Stream } >( (args) => typeof args[0] !== "function", (( self: Stream.Stream, f: (a: A) => Effect.Effect, options?: { readonly key?: ((a: A) => K) | undefined readonly concurrency?: number | "unbounded" | undefined readonly unordered?: boolean | undefined readonly bufferSize?: number | undefined } ): Stream.Stream => { if (options?.key) { return evaluate( groupByKey(self, options.key, { bufferSize: options.bufferSize }), (_, s) => stream.mapEffectSequential(s, f) ) } return stream.matchConcurrency( options?.concurrency, () => stream.mapEffectSequential(self, f), (n) => options?.unordered ? stream.flatMap(self, (a) => stream.fromEffect(f(a)), { concurrency: n }) : stream.mapEffectPar(self, n, f) ) }) as any ) /** @internal */ export const bindEffect = dual< ( tag: Exclude, f: (_: NoInfer) => Effect.Effect, options?: { readonly concurrency?: number | "unbounded" | undefined readonly bufferSize?: number | undefined } ) => (self: Stream.Stream) => Stream.Stream< { [K in keyof A | N]: K extends keyof A ? A[K] : B }, E | E2, R | R2 >, ( self: Stream.Stream, tag: Exclude, f: (_: NoInfer) => Effect.Effect, options?: { readonly concurrency?: number | "unbounded" | undefined readonly unordered?: boolean | undefined } ) => Stream.Stream< { [K in keyof A | N]: K extends keyof A ? A[K] : B }, E | E2, R | R2 > >((args) => typeof args[0] !== "string", ( self: Stream.Stream, tag: Exclude, f: (_: A) => Effect.Effect, options?: { readonly concurrency?: number | "unbounded" | undefined readonly unordered?: boolean | undefined } ) => mapEffectOptions(self, (k) => Effect.map( f(k), (a) => ({ ...k, [tag]: a } as { [K in keyof A | N]: K extends keyof A ? A[K] : B }) ), options)) const mapDequeue = (dequeue: Queue.Dequeue, f: (a: A) => B): Queue.Dequeue => new MapDequeue(dequeue, f) class MapDequeue extends Effectable.Class implements Queue.Dequeue { readonly [Queue.DequeueTypeId] = { _Out: (_: never) => _ } constructor( readonly dequeue: Queue.Dequeue, readonly f: (a: A) => B ) { super() } capacity(): number { return Queue.capacity(this.dequeue) } get size(): Effect.Effect { return Queue.size(this.dequeue) } unsafeSize(): Option.Option { return this.dequeue.unsafeSize() } get awaitShutdown(): Effect.Effect { return Queue.awaitShutdown(this.dequeue) } isActive(): boolean { return this.dequeue.isActive() } get isShutdown(): Effect.Effect { return Queue.isShutdown(this.dequeue) } get shutdown(): Effect.Effect { return Queue.shutdown(this.dequeue) } get isFull(): Effect.Effect { return Queue.isFull(this.dequeue) } get isEmpty(): Effect.Effect { return Queue.isEmpty(this.dequeue) } get take(): Effect.Effect { return pipe(Queue.take(this.dequeue), Effect.map((a) => this.f(a))) } get takeAll(): Effect.Effect> { return pipe(Queue.takeAll(this.dequeue), Effect.map(Chunk.map((a) => this.f(a)))) } takeUpTo(max: number): Effect.Effect> { return pipe(Queue.takeUpTo(this.dequeue, max), Effect.map(Chunk.map((a) => this.f(a)))) } takeBetween(min: number, max: number): Effect.Effect> { return pipe(Queue.takeBetween(this.dequeue, min, max), Effect.map(Chunk.map((a) => this.f(a)))) } takeN(n: number): Effect.Effect> { return pipe(Queue.takeN(this.dequeue, n), Effect.map(Chunk.map((a) => this.f(a)))) } poll(): Effect.Effect> { return pipe(Queue.poll(this.dequeue), Effect.map(Option.map((a) => this.f(a)))) } pipe() { return pipeArguments(this, arguments) } commit() { return this.take } } /** @internal */ export const groupByKey = dual< ( f: (a: A) => K, options?: { readonly bufferSize?: number | undefined } ) => (self: Stream.Stream) => GroupBy.GroupBy, ( self: Stream.Stream, f: (a: A) => K, options?: { readonly bufferSize?: number | undefined } ) => GroupBy.GroupBy >( (args) => typeof args[0] !== "function", ( self: Stream.Stream, f: (a: A) => K, options?: { readonly bufferSize?: number | undefined } ): GroupBy.GroupBy => { const loop = ( map: Map>>, outerQueue: Queue.Queue>], E>> ): Channel.Channel, E, E, unknown, unknown, R> => core.readWithCause({ onInput: (input: Chunk.Chunk) => core.flatMap( core.fromEffect( Effect.forEach(groupByIterable(input, f), ([key, values]) => { const innerQueue = map.get(key) if (innerQueue === undefined) { return pipe( Queue.bounded>(options?.bufferSize ?? 16), Effect.flatMap((innerQueue) => pipe( Effect.sync(() => { map.set(key, innerQueue) }), Effect.zipRight( Queue.offer(outerQueue, take.of([key, innerQueue] as const)) ), Effect.zipRight( pipe( Queue.offer(innerQueue, take.chunk(values)), Effect.catchSomeCause((cause) => Cause.isInterruptedOnly(cause) ? Option.some(Effect.void) : Option.none() ) ) ) ) ) ) } return Effect.catchSomeCause( Queue.offer(innerQueue, take.chunk(values)), (cause) => Cause.isInterruptedOnly(cause) ? Option.some(Effect.void) : Option.none() ) }, { discard: true }) ), () => loop(map, outerQueue) ), onFailure: (cause) => core.fromEffect(Queue.offer(outerQueue, take.failCause(cause))), onDone: () => core.fromEffect( pipe( Effect.forEach(map.entries(), ([_, innerQueue]) => pipe( Queue.offer(innerQueue, take.end), Effect.catchSomeCause((cause) => Cause.isInterruptedOnly(cause) ? Option.some(Effect.void) : Option.none() ) ), { discard: true }), Effect.zipRight(Queue.offer(outerQueue, take.end)) ) ) }) return make(stream.unwrapScopedWith((scope) => Effect.gen(function*() { const map = new Map>>() const queue = yield* Queue.unbounded>], E>>() yield* Scope.addFinalizer(scope, Queue.shutdown(queue)) return yield* stream.toChannel(self).pipe( core.pipeTo(loop(map, queue)), channel.drain, channelExecutor.runIn(scope), Effect.forkIn(scope), Effect.as(stream.flattenTake(stream.fromQueue(queue, { shutdown: true }))) ) }) )) } ) /** * A variant of `groupBy` that retains the insertion order of keys. * * @internal */ const groupByIterable = dual< (f: (value: V) => K) => (iterable: Iterable) => Chunk.Chunk<[K, Chunk.Chunk]>, (iterable: Iterable, f: (value: V) => K) => Chunk.Chunk<[K, Chunk.Chunk]> >(2, (iterable: Iterable, f: (value: V) => K): Chunk.Chunk<[K, Chunk.Chunk]> => { const builder: Array<[K, Array]> = [] const iterator = iterable[Symbol.iterator]() const map = new Map>() let next: IteratorResult while ((next = iterator.next()) && !next.done) { const value = next.value const key = f(value) if (map.has(key)) { const innerBuilder = map.get(key)! innerBuilder.push(value) } else { const innerBuilder: Array = [value] builder.push([key, innerBuilder]) map.set(key, innerBuilder) } } return Chunk.unsafeFromArray( builder.map((tuple) => [tuple[0], Chunk.unsafeFromArray(tuple[1])]) ) })