import type { ErasedChannel, ErasedExecutor } from "@effect/core/stream/Channel/ChannelExecutor" import type { ChildExecutorDecision } from "@effect/core/stream/Channel/ChildExecutorDecision" import type { UpstreamPullRequest } from "@effect/core/stream/Channel/UpstreamPullRequest" import type { UpstreamPullStrategy } from "@effect/core/stream/Channel/UpstreamPullStrategy" export const SubexecutorSym = Symbol.for("@effect/core/stream/Channel/Subexecutor") export type SubexecutorSym = typeof SubexecutorSym /** * @tsplus type effect/core/stream/Channel/Subexecutor */ export interface Subexecutor { readonly [SubexecutorSym]: SubexecutorSym readonly close: (exit: Exit) => Effect | undefined readonly enqueuePullFromChild: (child: PullFromChild) => Subexecutor } /** * @tsplus type effect/core/stream/Channel/Subexecutor.Ops */ export interface SubexecutorOps {} export const Subexecutor: SubexecutorOps = {} /** * Execute upstreamExecutor and for each emitted element, spawn a child * channel and continue with processing it by `PullFromChild`. */ export class PullFromUpstream implements Subexecutor { readonly _tag = "PullFromUpstream" readonly [SubexecutorSym]: SubexecutorSym = SubexecutorSym constructor( readonly upstreamExecutor: ErasedExecutor, readonly createChild: (_: unknown) => ErasedChannel, readonly lastDone: unknown, readonly activeChildExecutors: ImmutableQueue | undefined>, readonly combineChildResults: (x: unknown, y: unknown) => unknown, readonly combineWithChildResult: (x: unknown, y: unknown) => unknown, readonly onPull: (_: UpstreamPullRequest) => UpstreamPullStrategy, readonly onEmit: (_: unknown) => ChildExecutorDecision ) {} close(exit: Exit): Effect | undefined { const fin1 = this.upstreamExecutor.close(exit) const fins = this.activeChildExecutors .map((child) => (child != null ? child.childExecutor.close(exit) : undefined)) .append(fin1) const result = fins.reduce( undefined as Effect> | undefined, (acc, next) => { if (acc != null && next != null) { return acc.zipWith(next.exit, (a, b) => a > b) } else if (acc != null) { return acc } else if (next != null) { return next.exit } else { return undefined } } ) return result == null ? result : result.flatMap((exit) => Effect.done(exit)) as Effect } enqueuePullFromChild(child: PullFromChild): Subexecutor { return new PullFromUpstream( this.upstreamExecutor, this.createChild, this.lastDone, this.activeChildExecutors.append(child), this.combineChildResults, this.combineWithChildResult, this.onPull, this.onEmit ) } } /** * Execute the childExecutor and on each emitted value, decide what to do by * `onEmit`. */ export class PullFromChild implements Subexecutor { readonly _tag = "PullFromChild" readonly [SubexecutorSym]: SubexecutorSym = SubexecutorSym constructor( readonly childExecutor: ErasedExecutor, readonly parentSubexecutor: Subexecutor, readonly onEmit: (_: unknown) => ChildExecutorDecision ) {} close(exit: Exit): Effect | undefined { const fin1 = this.childExecutor.close(exit) const fin2 = this.parentSubexecutor.close(exit) if (fin1 != null && fin2 != null) { return fin1 .exit .zipWith(fin2.exit, (a, b) => a > b) .flatMap((exit) => Effect.done(exit)) } else if (fin1 != null) { return fin1 } else if (fin2 != null) { return fin2 } else { return undefined } } enqueuePullFromChild(_: PullFromChild): Subexecutor { return this } } export class DrainChildExecutors implements Subexecutor { readonly _tag = "DrainChildExecutors" readonly [SubexecutorSym]: SubexecutorSym = SubexecutorSym constructor( readonly upstreamExecutor: ErasedExecutor, readonly lastDone: unknown, readonly activeChildExecutors: ImmutableQueue | undefined>, readonly upstreamDone: Exit, readonly combineChildResults: (x: unknown, y: unknown) => unknown, readonly combineWithChildResult: (x: unknown, y: unknown) => unknown, readonly onPull: (_: UpstreamPullRequest) => UpstreamPullStrategy ) {} close(exit: Exit): Effect | undefined { const fin1 = this.upstreamExecutor.close(exit) const fins = this.activeChildExecutors .map((child) => (child != null ? child.childExecutor.close(exit) : undefined)) .append(fin1) return fins.reduce( undefined as Effect> | undefined, (acc, next) => { if (acc != null && next != null) { return acc.zipWith(next.exit, (a, b) => a > b) } else if (acc != null) { return acc } else if (next != null) { return next.exit } else { return undefined } } ) } enqueuePullFromChild(child: PullFromChild): Subexecutor { return new DrainChildExecutors( this.upstreamExecutor, this.lastDone, this.activeChildExecutors.append(child), this.upstreamDone, this.combineChildResults, this.combineWithChildResult, this.onPull ) } } export class Emit implements Subexecutor { readonly _tag = "Emit" readonly [SubexecutorSym]: SubexecutorSym = SubexecutorSym constructor(readonly value: unknown, readonly next: Subexecutor) {} close(exit: Exit): Effect | undefined { return this.next.close(exit) } enqueuePullFromChild(_child: PullFromChild): Subexecutor { return this } } /** * @tsplus macro remove */ export function concreteSubexecutor( _: Subexecutor ): asserts _ is | PullFromUpstream | PullFromChild | DrainChildExecutors | Emit { // } /** * @tsplus static effect/core/stream/Channel/Subexecutor.Ops PullFromUpstream */ export function pullFromUpstream( upstreamExecutor: ErasedExecutor, createChild: (_: unknown) => ErasedChannel, lastDone: unknown, activeChildExecutors: ImmutableQueue | undefined>, combineChildResults: (x: unknown, y: unknown) => unknown, combineWithChildResult: (x: unknown, y: unknown) => unknown, onPull: (_: UpstreamPullRequest) => UpstreamPullStrategy, onEmit: (_: unknown) => ChildExecutorDecision ): Subexecutor { return new PullFromUpstream( upstreamExecutor, createChild, lastDone, activeChildExecutors, combineChildResults, combineWithChildResult, onPull, onEmit ) } /** * @tsplus static effect/core/stream/Channel/Subexecutor.Ops PullFromChild */ export function pullFromChild( childExecutor: ErasedExecutor, parentSubexecutor: Subexecutor, onEmit: (_: unknown) => ChildExecutorDecision ): Subexecutor { return new PullFromChild(childExecutor, parentSubexecutor, onEmit) } /** * @tsplus static effect/core/stream/Channel/Subexecutor.Ops DrainChildExecutors */ export function drainChildExecutors( upstreamExecutor: ErasedExecutor, lastDone: unknown, activeChildExecutors: ImmutableQueue | undefined>, upstreamDone: Exit, combineChildResults: (x: unknown, y: unknown) => unknown, combineWithChildResult: (x: unknown, y: unknown) => unknown, onPull: (_: UpstreamPullRequest) => UpstreamPullStrategy ): Subexecutor { return new DrainChildExecutors( upstreamExecutor, lastDone, activeChildExecutors, upstreamDone, combineChildResults, combineWithChildResult, onPull ) } /** * @tsplus static effect/core/stream/Channel/Subexecutor.Ops Emit */ export function emit(value: unknown, next: Subexecutor): Subexecutor { return new Emit(value, next) }