// ets_tracing: off
// port of: https://github.com/zio/zio-query/blob/3f9f4237ca2d879b629163f23fe79045eb29f0b0/zio-query/shared/src/main/scala/zio/query/internal/BlockedRequests.scala
import * as A from "@effect-ts/core/Collections/Immutable/Chunk"
import * as HS from "@effect-ts/core/Collections/Immutable/HashSet"
import * as L from "@effect-ts/core/Collections/Immutable/List"
import * as T from "@effect-ts/core/Effect"
import { _R } from "@effect-ts/core/Effect"
import * as REF from "@effect-ts/core/Effect/Ref"
import type * as E from "@effect-ts/core/Either"
import { pipe, tuple } from "@effect-ts/core/Function"
import * as O from "@effect-ts/core/Option"
import * as St from "@effect-ts/core/Structural"
import * as S from "@effect-ts/core/Sync"
import type { Cache } from "../../Cache/index.js"
import * as CRM from "../../CompletedRequestMap/index.js"
import * as DS from "../../DataSource/index.js"
import type { DataSourceAspect } from "../../DataSourceAspect/index.js"
import type { BlockedRequest } from "../BlockedRequest/index.js"
import * as PL from "../Parallel/index.js"
import * as SQ from "../Sequential/index.js"
function scalaTail(a: L.List): L.List {
return L.size(a) === 0 ? L.empty() : L.tail(a)
}
class Both {
readonly _tag = "Both";
readonly [_R]!: (r: R) => void
constructor(readonly left: BlockedRequests, readonly right: BlockedRequests) {}
}
class Empty {
readonly _tag = "Empty";
readonly [_R]!: (r: R) => void
}
class Single {
readonly _tag = "Single";
readonly [_R]!: (r: R) => void
constructor(
readonly dataSource: DS.DataSource,
readonly blockedRequest: BlockedRequest
) {}
}
class Then {
readonly _tag = "Then";
readonly [_R]!: (r: R) => void
constructor(
public readonly left: BlockedRequests,
public readonly right: BlockedRequests
) {}
}
/**
* `BlockedRequests` captures a collection of blocked requests as a data
* structure. By doing this the library is able to preserve information about
* which requests must be performed sequentially and which can be performed in
* parallel, allowing for maximum possible batching and pipelining while
* preserving ordering guarantees.
*/
export type BlockedRequests = Both | Empty | Single | Then
/**
* Combines this collection of blocked requests with the specified collection
* of blocked requests, in parallel.
*/
export function both(that: BlockedRequests) {
return (self: BlockedRequests): BlockedRequests => both_(self, that)
}
export function both_(self: BlockedRequests, that: BlockedRequests) {
return new Both(self, that)
}
/**
* Combines this collection of blocked requests with the specified collection
* of blocked requests, in sequence.
*/
export function then(that: BlockedRequests) {
return (self: BlockedRequests): BlockedRequests => then_(self, that)
}
export function then_(self: BlockedRequests, that: BlockedRequests) {
return new Then(self, that)
}
/**
* Transforms all data sources with the specified data source aspect, which
* can change the environment type of data sources but must preserve the
* request type of each data source.
*/
export function mapDataSources(
f: DataSourceAspect
): (fa: BlockedRequests) => BlockedRequests {
return (fa) => S.run(mapDataSourcesSafe(fa, f))
}
/**
* Transforms all data sources with the specified data source aspect, which
* can change the environment type of data sources but must preserve the
* request type of each data source.
*/
export function mapDataSources_(
fa: BlockedRequests,
f: DataSourceAspect
): BlockedRequests {
return S.run(mapDataSourcesSafe(fa, f))
}
/**
* Transforms all data sources with the specified data source aspect, which
* can change the environment type of data sources but must preserve the
* request type of each data source.
*/
function mapDataSourcesSafe(
fa: BlockedRequests,
f: DataSourceAspect
): S.UIO> {
return S.gen(function* (_) {
switch (fa._tag) {
case "Empty":
return new Empty()
case "Both":
return new Both(
yield* _(mapDataSourcesSafe(fa.left, f)),
yield* _(mapDataSourcesSafe(fa.right, f))
)
case "Then":
return new Then(
yield* _(mapDataSourcesSafe(fa.left, f)),
yield* _(mapDataSourcesSafe(fa.right, f))
)
case "Single": {
return new Single(f(fa.dataSource), fa.blockedRequest)
}
}
})
}
/**
* Provides each data source with part of its required environment.
*/
export function provideSome(
description: string,
f: (a: R0) => R
): (fa: BlockedRequests) => BlockedRequests {
return (fa) => S.run(provideSomeSafe(description, f)(fa))
}
/**
* Provides each data source with part of its required environment.
*/
export function provideSomeSafe(
description: string,
f: (a: R0) => R
): (fa: BlockedRequests) => S.UIO> {
return (fa) =>
S.gen(function* (_) {
switch (fa._tag) {
case "Empty":
return new Empty()
case "Both":
return new Both(
yield* _(provideSomeSafe(description, f)(fa.left)),
yield* _(provideSomeSafe(description, f)(fa.right))
)
case "Then":
return new Then(
yield* _(provideSomeSafe(description, f)(fa.left)),
yield* _(provideSomeSafe(description, f)(fa.right))
)
case "Single": {
return new Single(
DS.provideSome(description, f)(fa.dataSource),
fa.blockedRequest
)
}
}
})
}
/**
* The empty collection of blocked requests.
*/
export const empty: BlockedRequests = new Empty()
/**
* Constructs a collection of blocked requests from the specified blocked
* request and data source.
*/
export function single(
dataSource: DS.DataSource,
blockedRequest: BlockedRequest
): BlockedRequests {
return new Single(dataSource as DS.DataSource, blockedRequest)
}
/**
* Merges a collection of requests that must be executed sequentially with a
* collection of requests that can be executed in parallel. If the
* collections are both from the same single data source then the requests
* can be pipelined while preserving ordering guarantees.
*/
export function merge(
sequential: L.List>,
parallel: PL.Parallel
): L.List> {
if (L.isEmpty(sequential)) return L.of(PL.sequential(parallel))
if (PL.isEmpty(parallel)) return sequential
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const seqHead = L.unsafeFirst(sequential)!
const seqHeadKeys = L.from(SQ.keys(seqHead))
const parKeys = L.from(PL.keys(parallel))
if (
L.size(seqHeadKeys) === 1 &&
L.size(parKeys) === 1 &&
St.equals(seqHeadKeys, parKeys)
) {
return L.prepend_(scalaTail(sequential), SQ.add_(seqHead, PL.sequential(parallel)))
}
return L.concat_(L.of(PL.sequential(parallel)), sequential)
}
/**
* Flattens a collection of blocked requests into a collection of pipelined
* and batched requests that can be submitted for execution.
*/
export function flatten(
blockedRequests: BlockedRequests
): L.List> {
let current = L.of(blockedRequests)
let flattened = L.empty>()
// eslint-disable-next-line no-constant-condition
while (1) {
const [parallel, sequential] = L.reduce_(
current,
tuple<[PL.Parallel, L.List>]>(PL.empty, L.empty()),
([parallel, sequential], blockedRequest) => {
const [par, seq] = step(blockedRequest)
return tuple(PL.add(parallel)(par), L.concat_(sequential, seq))
}
)
flattened = merge(flattened, parallel)
if (L.isEmpty(sequential)) return L.reverse(flattened)
current = sequential
}
throw new Error("absurd")
}
class StepFrame {
constructor(
readonly blockedRequests: BlockedRequests,
readonly stack: L.List>,
readonly parallel: PL.Parallel,
readonly sequential: L.List>
) {}
}
/**
* Takes one step in evaluating a collection of blocked requests, returning a
* collection of blocked requests that can be performed in parallel and a
* list of blocked requests that must be performed sequentially after those
* requests.
*/
export function step(
c: BlockedRequests
): readonly [PL.Parallel, L.List>] {
let current = new StepFrame(c, L.empty(), PL.empty, L.empty())
// eslint-disable-next-line no-constant-condition
while (1) {
switch (current.blockedRequests._tag) {
case "Empty": {
const head = L.first(current.stack)
if (O.isSome(head)) {
current = new StepFrame(
head.value,
scalaTail(current.stack),
current.parallel,
current.sequential
)
} else {
return tuple(current.parallel, current.sequential)
}
break
}
case "Both": {
current = new StepFrame(
current.blockedRequests.left,
L.prepend_(current.stack, current.blockedRequests.right),
current.parallel,
current.sequential
)
break
}
case "Single": {
const head = L.first(current.stack)
if (O.isSome(head)) {
current = new StepFrame(
head.value,
scalaTail(current.stack),
PL.add_(
current.parallel,
PL.apply(
current.blockedRequests.dataSource,
current.blockedRequests.blockedRequest
)
),
current.sequential
)
} else {
return tuple(
PL.add_(
current.parallel,
PL.apply(
current.blockedRequests.dataSource,
current.blockedRequests.blockedRequest
)
),
current.sequential
)
}
break
}
case "Then": {
const { left, right } = current.blockedRequests
switch (left._tag) {
case "Empty": {
current = new StepFrame(
right,
current.stack,
current.parallel,
current.sequential
)
break
}
case "Then": {
current = new StepFrame(
then_(left.left, then_(left.right, current.blockedRequests.right)),
current.stack,
current.parallel,
current.sequential
)
break
}
case "Both": {
const l = left.left
const r = left.right
current = new StepFrame(
both_(then_(l, right), then_(r, right)),
current.stack,
current.parallel,
current.sequential
)
break
}
case "Single": {
current = new StepFrame(
left,
current.stack,
current.parallel,
L.prepend_(current.sequential, current.blockedRequests.right)
)
break
}
}
}
}
}
throw new Error("absurd")
}
/**
* Executes all requests, submitting requests to each data source in
* parallel.
*/
export function run(cache: Cache) {
return (self: BlockedRequests): T.Effect =>
T.forEach_(flatten(self), (requestsByDataSource) =>
T.forEachPar_(SQ.toIterable(requestsByDataSource), ([dataSource, sequential]) =>
pipe(
T.do,
T.bind("completedRequests", () =>
dataSource.runAll(
A.map_(A.from(sequential), (_) => A.map_(_, (br) => br.request))
)
),
T.bind("blockedRequests", () => T.succeed(A.flatten(sequential))),
T.bind("leftovers", (_) => {
const arg1 = CRM.requests(_.completedRequests)
const arg2 = A.map_(_.blockedRequests, (a) => a.request)
const a = HS.difference_(arg1, arg2)
return T.succeed(a)
}),
T.tap((_) =>
T.forEach_(_.blockedRequests, (blockedRequest) =>
REF.set_(
blockedRequest.result as REF.Ref>>,
CRM.lookup_(_.completedRequests, blockedRequest.request)
)
)
),
T.tap((_) =>
T.forEach_(_.leftovers, (request) =>
T.chain_(REF.makeRef(CRM.lookup_(_.completedRequests, request)), (res) =>
cache.put(request, res)
)
)
),
T.chain(() => T.unit)
)
)
)
}