import type { Cache } from '../Cache' import type { DataSource } from '../DataSource' import type { DataSourceAspect } from '../DataSourceAspect' import type { Described } from '../Described' import type { BlockedRequest } from './BlockedRequest' import type { Sequential } from './Sequential' import type { List } from '@principia/base/List' import * as C from '@principia/base/Chunk' import * as Ev from '@principia/base/Eval' import { identity, pipe } from '@principia/base/function' import * as HS from '@principia/base/HashSet' import * as I from '@principia/base/IO' import * as L from '@principia/base/List' import * as Ref from '@principia/base/Ref' import * as DS from '../DataSource' import * as Par from './Parallel' export class Empty { readonly _tag = 'Empty' readonly _R!: (_: unknown) => void } export const empty: BlockedRequests = new Empty() export class Both { readonly _tag = 'Both' readonly _R!: (_: R) => void constructor(readonly left: BlockedRequests, readonly right: BlockedRequests) {} } export function both(left: BlockedRequests, right: BlockedRequests): BlockedRequests { return new Both(left, right) } export class Single { readonly _tag = 'Single' readonly _R!: (_: R) => void readonly _A!: () => A constructor(readonly dataSource: DataSource, readonly blockedRequest: BlockedRequest) {} } export function single(dataSource: DataSource, blockedRequest: BlockedRequest): BlockedRequests { return new Single(dataSource, blockedRequest) } export class Then { readonly _tag = 'Then' readonly _R!: (_: R) => void constructor(readonly left: BlockedRequests, readonly right: BlockedRequests) {} } export function then(left: BlockedRequests, right: BlockedRequests): BlockedRequests { return new Then(left, right) } export type BlockedRequests = Empty | Both | Then | Single export function mapDataSources(br: BlockedRequests, f: DataSourceAspect): BlockedRequests { const go = (br: BlockedRequests, f: DataSourceAspect): Ev.Eval> => Ev.gen(function* (_) { switch (br._tag) { case 'Empty': { return empty } case 'Both': { return both(yield* _(go(br.left, f)), yield* _(go(br.right, f))) } case 'Then': { return then(yield* _(go(br.left, f)), yield* _(go(br.right, f))) } case 'Single': { return single(f.apply(br.dataSource), br.blockedRequest) } } }) return Ev.run(go(br, f)) } export function gives_(br: BlockedRequests, f: Described<(r0: R0) => R>): BlockedRequests { const go = (br: BlockedRequests, f: Described<(r0: R0) => R>): Ev.Eval> => Ev.gen(function* (_) { switch (br._tag) { case 'Empty': { return empty } case 'Both': { return both(yield* _(go(br.left, f)), yield* _(go(br.right, f))) } case 'Then': { return then(yield* _(go(br.left, f)), yield* _(go(br.right, f))) } case 'Single': { return single(DS.gives_(br.dataSource, f), br.blockedRequest) } } }) return Ev.run(go(br, f)) } export function run_(br: BlockedRequests, cache: Cache): I.IO { return I.defer(() => pipe( flatten(br), I.foreachUnit((requestsByDataSource) => I.foreachUnitC_(requestsByDataSource.toIterable, ([dataSource, sequential]) => I.gen(function* (_) { const completedRequests = yield* _( dataSource.runAll( C.map_( sequential, C.map((r) => r.request) ) ) ) const blockedRequests = pipe(sequential, C.chain(identity)) let leftovers = completedRequests.requests for (const r of C.map_(blockedRequests, (br) => br.request)) { leftovers = HS.remove_(leftovers, r) } yield* _(I.foreachUnit_(blockedRequests, (br) => br.result.set(completedRequests.lookup(br.request)))) yield* _( I.foreachUnit_(leftovers, (r) => pipe( Ref.make(completedRequests.lookup(r)), I.chain((ref) => cache.put(r, ref)) ) ) ) }) ) ) ) ) } function flatten(blockedRequests: BlockedRequests): List> { const go = (brs: List>, flattened: List>): Ev.Eval>> => Ev.gen(function* (_) { const [parallel, sequential] = L.foldl_( brs, [Par.empty(), L.empty>()] as const, ([parallel, sequential], blockedRequest) => { const [par, seq] = step(blockedRequest) return [parallel['++'](par), L.concat_(sequential, seq)] as const } ) const updated = merge(flattened, parallel) if (L.isEmpty(sequential)) { return L.reverse(updated) } else { return yield* _(go(sequential, updated)) } }) return Ev.run(go(L.list(blockedRequests), L.empty())) } function step(c: BlockedRequests): readonly [Par.Parallel, List>] { const go = ( blockedRequests: BlockedRequests, stack: List>, parallel: Par.Parallel, sequential: List> ): Ev.Eval, List>]> => Ev.gen(function* (_) { switch (blockedRequests._tag) { case 'Empty': { if (L.isEmpty(stack)) { return [parallel, sequential] } else { return yield* _(go(L.unsafeHead(stack) as BlockedRequests, L.tail(stack), parallel, sequential)) } } case 'Then': { switch (blockedRequests.left._tag) { case 'Empty': { return yield* _(go(blockedRequests.left, stack, parallel, sequential)) } case 'Then': { return yield* _( go( then(blockedRequests.left.left, then(blockedRequests.left.right, blockedRequests.right)), stack, parallel, sequential ) ) } case 'Both': { return yield* _( go( both( then(blockedRequests.left.left, blockedRequests.right), then(blockedRequests.left.right, blockedRequests.right) ), stack, parallel, sequential ) ) } case 'Single': { return yield* _(go(blockedRequests.left, stack, parallel, L.append(blockedRequests.right)(sequential))) } } } // eslint-disable-next-line no-fallthrough case 'Both': { return yield* _(go(blockedRequests.left, L.append(blockedRequests.right)(stack), parallel, sequential)) } case 'Single': { if (L.isEmpty(stack)) { return [parallel['++'](Par.from(blockedRequests.dataSource, blockedRequests.blockedRequest)), sequential] } else { return yield* _( go( L.unsafeHead(stack) as BlockedRequests, L.tail(stack), parallel['++'](Par.from(blockedRequests.dataSource, blockedRequests.blockedRequest)), sequential ) ) } } } }) return Ev.run(go(c, L.empty(), Par.empty(), L.empty())) } const getIterableSize = (it: Iterable | undefined): number => (it ? Array.from(it).length : 0) function merge(sequential: List>, parallel: Par.Parallel): List> { if (L.isEmpty(sequential)) { return L.list(parallel.sequential) } else if (parallel.isEmpty) { return sequential } else if (getIterableSize(L.unsafeHead(sequential)?.keys) === 1 && getIterableSize(parallel.keys) === 1) { return pipe( L.unsafeHead(sequential) as Sequential, (s) => s['++'](parallel.sequential), (s) => L.append(s)(L.tail(sequential)) ) } else { return L.append(parallel.sequential)(sequential) } }