// ets_tracing: off // port of: https://github.com/zio/zio-query/blob/3f9f4237ca2d879b629163f23fe79045eb29f0b0/zio-query/shared/src/main/scala/zio/query/internal/BlockedRequests.scala import * as A from "@effect-ts/core/Collections/Immutable/Chunk" import * as HS from "@effect-ts/core/Collections/Immutable/HashSet" import * as L from "@effect-ts/core/Collections/Immutable/List" import * as T from "@effect-ts/core/Effect" import { _R } from "@effect-ts/core/Effect" import * as REF from "@effect-ts/core/Effect/Ref" import type * as E from "@effect-ts/core/Either" import { pipe, tuple } from "@effect-ts/core/Function" import * as O from "@effect-ts/core/Option" import * as St from "@effect-ts/core/Structural" import * as S from "@effect-ts/core/Sync" import type { Cache } from "../../Cache/index.js" import * as CRM from "../../CompletedRequestMap/index.js" import * as DS from "../../DataSource/index.js" import type { DataSourceAspect } from "../../DataSourceAspect/index.js" import type { BlockedRequest } from "../BlockedRequest/index.js" import * as PL from "../Parallel/index.js" import * as SQ from "../Sequential/index.js" function scalaTail(a: L.List): L.List { return L.size(a) === 0 ? L.empty() : L.tail(a) } class Both { readonly _tag = "Both"; readonly [_R]!: (r: R) => void constructor(readonly left: BlockedRequests, readonly right: BlockedRequests) {} } class Empty { readonly _tag = "Empty"; readonly [_R]!: (r: R) => void } class Single { readonly _tag = "Single"; readonly [_R]!: (r: R) => void constructor( readonly dataSource: DS.DataSource, readonly blockedRequest: BlockedRequest ) {} } class Then { readonly _tag = "Then"; readonly [_R]!: (r: R) => void constructor( public readonly left: BlockedRequests, public readonly right: BlockedRequests ) {} } /** * `BlockedRequests` captures a collection of blocked requests as a data * structure. By doing this the library is able to preserve information about * which requests must be performed sequentially and which can be performed in * parallel, allowing for maximum possible batching and pipelining while * preserving ordering guarantees. */ export type BlockedRequests = Both | Empty | Single | Then /** * Combines this collection of blocked requests with the specified collection * of blocked requests, in parallel. */ export function both(that: BlockedRequests) { return (self: BlockedRequests): BlockedRequests => both_(self, that) } export function both_(self: BlockedRequests, that: BlockedRequests) { return new Both(self, that) } /** * Combines this collection of blocked requests with the specified collection * of blocked requests, in sequence. */ export function then(that: BlockedRequests) { return (self: BlockedRequests): BlockedRequests => then_(self, that) } export function then_(self: BlockedRequests, that: BlockedRequests) { return new Then(self, that) } /** * Transforms all data sources with the specified data source aspect, which * can change the environment type of data sources but must preserve the * request type of each data source. */ export function mapDataSources( f: DataSourceAspect ): (fa: BlockedRequests) => BlockedRequests { return (fa) => S.run(mapDataSourcesSafe(fa, f)) } /** * Transforms all data sources with the specified data source aspect, which * can change the environment type of data sources but must preserve the * request type of each data source. */ export function mapDataSources_( fa: BlockedRequests, f: DataSourceAspect ): BlockedRequests { return S.run(mapDataSourcesSafe(fa, f)) } /** * Transforms all data sources with the specified data source aspect, which * can change the environment type of data sources but must preserve the * request type of each data source. */ function mapDataSourcesSafe( fa: BlockedRequests, f: DataSourceAspect ): S.UIO> { return S.gen(function* (_) { switch (fa._tag) { case "Empty": return new Empty() case "Both": return new Both( yield* _(mapDataSourcesSafe(fa.left, f)), yield* _(mapDataSourcesSafe(fa.right, f)) ) case "Then": return new Then( yield* _(mapDataSourcesSafe(fa.left, f)), yield* _(mapDataSourcesSafe(fa.right, f)) ) case "Single": { return new Single(f(fa.dataSource), fa.blockedRequest) } } }) } /** * Provides each data source with part of its required environment. */ export function provideSome( description: string, f: (a: R0) => R ): (fa: BlockedRequests) => BlockedRequests { return (fa) => S.run(provideSomeSafe(description, f)(fa)) } /** * Provides each data source with part of its required environment. */ export function provideSomeSafe( description: string, f: (a: R0) => R ): (fa: BlockedRequests) => S.UIO> { return (fa) => S.gen(function* (_) { switch (fa._tag) { case "Empty": return new Empty() case "Both": return new Both( yield* _(provideSomeSafe(description, f)(fa.left)), yield* _(provideSomeSafe(description, f)(fa.right)) ) case "Then": return new Then( yield* _(provideSomeSafe(description, f)(fa.left)), yield* _(provideSomeSafe(description, f)(fa.right)) ) case "Single": { return new Single( DS.provideSome(description, f)(fa.dataSource), fa.blockedRequest ) } } }) } /** * The empty collection of blocked requests. */ export const empty: BlockedRequests = new Empty() /** * Constructs a collection of blocked requests from the specified blocked * request and data source. */ export function single( dataSource: DS.DataSource, blockedRequest: BlockedRequest ): BlockedRequests { return new Single(dataSource as DS.DataSource, blockedRequest) } /** * Merges a collection of requests that must be executed sequentially with a * collection of requests that can be executed in parallel. If the * collections are both from the same single data source then the requests * can be pipelined while preserving ordering guarantees. */ export function merge( sequential: L.List>, parallel: PL.Parallel ): L.List> { if (L.isEmpty(sequential)) return L.of(PL.sequential(parallel)) if (PL.isEmpty(parallel)) return sequential // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const seqHead = L.unsafeFirst(sequential)! const seqHeadKeys = L.from(SQ.keys(seqHead)) const parKeys = L.from(PL.keys(parallel)) if ( L.size(seqHeadKeys) === 1 && L.size(parKeys) === 1 && St.equals(seqHeadKeys, parKeys) ) { return L.prepend_(scalaTail(sequential), SQ.add_(seqHead, PL.sequential(parallel))) } return L.concat_(L.of(PL.sequential(parallel)), sequential) } /** * Flattens a collection of blocked requests into a collection of pipelined * and batched requests that can be submitted for execution. */ export function flatten( blockedRequests: BlockedRequests ): L.List> { let current = L.of(blockedRequests) let flattened = L.empty>() // eslint-disable-next-line no-constant-condition while (1) { const [parallel, sequential] = L.reduce_( current, tuple<[PL.Parallel, L.List>]>(PL.empty, L.empty()), ([parallel, sequential], blockedRequest) => { const [par, seq] = step(blockedRequest) return tuple(PL.add(parallel)(par), L.concat_(sequential, seq)) } ) flattened = merge(flattened, parallel) if (L.isEmpty(sequential)) return L.reverse(flattened) current = sequential } throw new Error("absurd") } class StepFrame { constructor( readonly blockedRequests: BlockedRequests, readonly stack: L.List>, readonly parallel: PL.Parallel, readonly sequential: L.List> ) {} } /** * Takes one step in evaluating a collection of blocked requests, returning a * collection of blocked requests that can be performed in parallel and a * list of blocked requests that must be performed sequentially after those * requests. */ export function step( c: BlockedRequests ): readonly [PL.Parallel, L.List>] { let current = new StepFrame(c, L.empty(), PL.empty, L.empty()) // eslint-disable-next-line no-constant-condition while (1) { switch (current.blockedRequests._tag) { case "Empty": { const head = L.first(current.stack) if (O.isSome(head)) { current = new StepFrame( head.value, scalaTail(current.stack), current.parallel, current.sequential ) } else { return tuple(current.parallel, current.sequential) } break } case "Both": { current = new StepFrame( current.blockedRequests.left, L.prepend_(current.stack, current.blockedRequests.right), current.parallel, current.sequential ) break } case "Single": { const head = L.first(current.stack) if (O.isSome(head)) { current = new StepFrame( head.value, scalaTail(current.stack), PL.add_( current.parallel, PL.apply( current.blockedRequests.dataSource, current.blockedRequests.blockedRequest ) ), current.sequential ) } else { return tuple( PL.add_( current.parallel, PL.apply( current.blockedRequests.dataSource, current.blockedRequests.blockedRequest ) ), current.sequential ) } break } case "Then": { const { left, right } = current.blockedRequests switch (left._tag) { case "Empty": { current = new StepFrame( right, current.stack, current.parallel, current.sequential ) break } case "Then": { current = new StepFrame( then_(left.left, then_(left.right, current.blockedRequests.right)), current.stack, current.parallel, current.sequential ) break } case "Both": { const l = left.left const r = left.right current = new StepFrame( both_(then_(l, right), then_(r, right)), current.stack, current.parallel, current.sequential ) break } case "Single": { current = new StepFrame( left, current.stack, current.parallel, L.prepend_(current.sequential, current.blockedRequests.right) ) break } } } } } throw new Error("absurd") } /** * Executes all requests, submitting requests to each data source in * parallel. */ export function run(cache: Cache) { return (self: BlockedRequests): T.Effect => T.forEach_(flatten(self), (requestsByDataSource) => T.forEachPar_(SQ.toIterable(requestsByDataSource), ([dataSource, sequential]) => pipe( T.do, T.bind("completedRequests", () => dataSource.runAll( A.map_(A.from(sequential), (_) => A.map_(_, (br) => br.request)) ) ), T.bind("blockedRequests", () => T.succeed(A.flatten(sequential))), T.bind("leftovers", (_) => { const arg1 = CRM.requests(_.completedRequests) const arg2 = A.map_(_.blockedRequests, (a) => a.request) const a = HS.difference_(arg1, arg2) return T.succeed(a) }), T.tap((_) => T.forEach_(_.blockedRequests, (blockedRequest) => REF.set_( blockedRequest.result as REF.Ref>>, CRM.lookup_(_.completedRequests, blockedRequest.request) ) ) ), T.tap((_) => T.forEach_(_.leftovers, (request) => T.chain_(REF.makeRef(CRM.lookup_(_.completedRequests, request)), (res) => cache.put(request, res) ) ) ), T.chain(() => T.unit) ) ) ) }