// ets_tracing: off // port of: https://github.com/zio/zio-query/blob/3f9f4237ca2d879b629163f23fe79045eb29f0b0/zio-query/shared/src/main/scala/zio/query/DataSource.scala import "@effect-ts/system/Operator" import * as C from "@effect-ts/core/Collections/Immutable/Chunk" import * as Tp from "@effect-ts/core/Collections/Immutable/Tuple" import * as T from "@effect-ts/core/Effect" import * as E from "@effect-ts/core/Either" import type * as O from "@effect-ts/core/Option" import * as St from "@effect-ts/core/Structural" import type { _A, _E } from "@effect-ts/core/Utils" import { LazyGetter } from "@effect-ts/core/Utils" import * as CR from "../CompletedRequestMap/index.js" import type { Request } from "../Request/index.js" /** * A `DataSource[R, A]` requires an environment `R` and is capable of executing * requests of type `A`. * * Data sources must implement the method `runAll` which takes a collection of * requests and returns an effect with a `CompletedRequestMap` containing a * mapping from requests to results. The type of the collection of requests is * a `Chunk[Chunk[A]]`. The outer `Chunk` represents batches of requests that * must be performed sequentially. The inner `Chunk` represents a batch of * requests that can be performed in parallel. This allows data sources to * introspect on all the requests being executed and optimize the query. * * Data sources will typically be parameterized on a subtype of `Request[A]`, * though that is not strictly necessarily as long as the data source can map * the request type to a `Request[A]`. Data sources can then pattern match on * the collection of requests to determine the information requested, execute * the query, and place the results into the `CompletedRequestsMap` using * [[CompletedRequestMap.empty]] and [[CompletedRequestMap.insert]]. Data * sources must provide results for all requests received. Failure to do so * will cause a query to die with a `QueryFailure` when run. */ export class DataSource { readonly _tag = "DataSource" constructor( /** * The data source's identifier. */ public readonly identifier: string, /** * Execute a collection of requests. The outer `Chunk` represents batches * of requests that must be performed sequentially. The inner `Chunk` * represents a batch of requests that can be performed in parallel. */ public readonly runAll: ( requests: C.Chunk> ) => T.Effect ) {} [St.equalsSym](that: unknown): boolean { return that instanceof DataSource && this.identifier === that.identifier } @LazyGetter() get [St.hashSym](): number { return St.hashString(this.identifier) } } export function equals(a: DataSource, b: DataSource): boolean { return a.identifier === b.identifier } export class InvalidBatchConfig { readonly _tag = "InvalidBatchConfig" readonly message = "batchN: n must be at least 1 and must be an integer" constructor(readonly n: number) {} } /** * Returns a data source that executes at most `n` requests in parallel. */ export function batchN_(self: DataSource, n: number): DataSource { return new DataSource(`${self.identifier}.batchN(${n})`, (requests) => { if (n < 1 || !Number.isInteger(n)) { return T.die(new InvalidBatchConfig(n)) } else { return self.runAll( C.reduce_(requests, C.empty>(), (x, y) => C.concat_(x, C.grouped_(y, n)) ) ) } }) } /** * Returns a data source that executes at most `n` requests in parallel. * @ets_data_first batchN_ */ export function batchN(n: number) { return (self: DataSource): DataSource => batchN_(self, n) } /** * Returns a new data source that executes requests of type `B` using the * specified function to transform `B` requests into requests that this data * source can execute. */ export function contramap_( self: DataSource, description: string, f: (a: B) => A ): DataSource { return new DataSource(`${self.identifier}.contramap(${description})`, (requests) => self.runAll(C.map_(requests, (_) => C.map_(_, f))) ) } /** * Returns a new data source that executes requests of type `B` using the * specified function to transform `B` requests into requests that this data * source can execute. * @ets_data_first contramap_ */ export function contramap(description: string, f: (a: B) => A) { return (self: DataSource): DataSource => contramap_(self, description, f) } /** * Returns a new data source that executes requests of type `B` using the * specified effectual function to transform `B` requests into requests that * this data source can execute. */ export function contramapM_( self: DataSource, description: string, f: (b: B) => T.Effect ): DataSource { return new DataSource(`${self.identifier}.contramapM(${description})`, (requests) => T.chain_( T.forEach_(requests, (_) => T.forEachPar_(_, f)), self.runAll ) ) } /** * Returns a new data source that executes requests of type `B` using the * specified effectual function to transform `B` requests into requests that * this data source can execute. * @ets_data_first contramapM_ */ export function contramapM( description: string, f: (b: B) => T.Effect ) { return (self: DataSource): DataSource => contramapM_(self, description, f) } /** * Returns a new data source that executes requests of type `C` using the * specified function to transform `C` requests into requests that either * this data source or that data source can execute. */ export function eitherWith_( self: DataSource, description: string, that: DataSource, f: (c: C) => E.Either ): DataSource { return new DataSource( `${self.identifier}.eitherWith(${that.identifier})(${description})`, (requests) => T.map_( T.forEach_(requests, (requests) => { const { tuple: [as, bs] } = C.partitionMap_(requests, f) return T.zipWithPar_( self.runAll(C.single(as)), that.runAll(C.single(bs)), CR.concat ) }), (_) => C.reduce_(_, CR.empty, CR.concat) ) ) } /** * Returns a new data source that executes requests of type `C` using the * specified function to transform `C` requests into requests that either * this data source or that data source can execute. * @ets_data_first eitherWith_ */ export function eitherWith( description: string, that: DataSource, f: (c: C) => E.Either ) { return (self: DataSource) => eitherWith_(self, description, that, f) } /** * Provides this data source with its required environment. */ export function provide_( self: DataSource, description: string, r: R ): DataSource { return provideSome(`_ => ${description}`, () => r)(self) } /** * Provides this data source with its required environment. * @ets_data_first provide_ */ export function provide(description: string, r: R) { return (self: DataSource): DataSource => provide_(self, description, r) } /** * Provides this data source with part of its required environment. */ export function provideSome_( self: DataSource, description: string, f: (r: R0) => R ): DataSource { return new DataSource(`${self.identifier}.provideSome(${description})`, (requests) => T.provideSome_(self.runAll(requests), f) ) } /** * Provides this data source with part of its required environment. * @ets_data_first provideSome_ */ export function provideSome(description: string, f: (r: R0) => R) { return (self: DataSource): DataSource => provideSome_(self, description, f) } /** * Returns a new data source that executes requests by sending them to this * data source and that data source, returning the results from the first * data source to complete and safely interrupting the loser. */ export function race_( self: DataSource, that: DataSource ): DataSource { return new DataSource(`${self.identifier}.race(${that.identifier})`, (requests) => T.race_(self.runAll(requests), that.runAll(requests)) ) } /** * Returns a new data source that executes requests by sending them to this * data source and that data source, returning the results from the first * data source to complete and safely interrupting the loser. * @ets_data_first race_ */ export function race(that: DataSource) { return (self: DataSource): DataSource => race_(self, that) } /** * A data source that executes requests that can be performed in parallel in * batches but does not further optimize batches of requests that must be * performed sequentially. */ export function makeBatched(identifier: string) { return ( run: (requests: C.Chunk) => T.Effect ): DataSource => new DataSource(identifier, (requests) => T.reduce_(requests, CR.empty, (crm, requests) => { const newRequests = C.filter_(requests, (e) => !CR.contains_(crm, e)) return C.isEmpty(newRequests) ? T.succeed(crm) : T.map_(run(newRequests), (_) => CR.concat(crm, _)) }) ) } /** * Constructs a data source from a pure function. */ export function fromFunction(identifier: string) { return >(f: (a: A) => _A): DataSource => makeBatched(identifier)((requests) => T.succeed( C.reduce_(requests, CR.empty, (crm, k) => CR.insert_(crm, k, E.right(f(k)))) ) ) } /** * Constructs a data source from a pure function that takes a list of * requests and returns a list of results of the same size. Each item in the * result list must correspond to the item at the same index in the request * list. */ export function fromFunctionBatched(identifier: string) { return >( f: (a: C.Chunk) => C.Chunk<_A> ): DataSource => fromFunctionBatchedM(identifier)((as) => T.succeed(f(as))) } /** * Constructs a data source from an effectual function that takes a list of * requests and returns a list of results of the same size. Each item in the * result list must correspond to the item at the same index in the request * list. */ export function fromFunctionBatchedM(identifier: string) { return >( f: (a: C.Chunk) => T.Effect, C.Chunk<_A>> ): DataSource => makeBatched(identifier)((requests) => { const a: T.Effect< R, never, C.Chunk, _A>]>> > = T.fold_( f(requests), (e) => C.map_(requests, (_) => Tp.tuple(_, E.left(e))), (bs) => C.zip_( requests, C.map_(bs, (_) => E.right(_)) ) ) return T.map_(a, (_) => C.reduce_(_, CR.empty, (crm, { tuple: [k, v] }) => CR.insert_(crm, k, v)) ) }) } /** * Constructs a data source from a pure function that takes a list of * requests and returns a list of optional results of the same size. Each * item in the result list must correspond to the item at the same index in * the request list. */ export function fromFunctionBatchedOption(identifier: string) { return >( f: (a: C.Chunk) => C.Chunk>> ): DataSource => fromFunctionBatchedOptionM(identifier)((as) => T.succeed(f(as))) } /** * Constructs a data source from an effectual function that takes a list of * requests and returns a list of optional results of the same size. Each * item in the result list must correspond to the item at the same index in * the request list. */ export function fromFunctionBatchedOptionM(identifier: string) { return , E, R>( f: (a: C.Chunk) => T.Effect>>> ): DataSource => makeBatched(identifier)((requests) => { const a: T.Effect< R, never, C.Chunk>>]>> > = T.fold_( f(requests), (e) => C.map_(requests, (_) => Tp.tuple(_, E.left(e))), (bs) => C.zip_( requests, C.map_(bs, (_) => E.right(_)) ) ) return T.map_(a, (_) => C.reduce_(_, CR.empty, (crm, { tuple: [k, v] }) => CR.insertOption_(crm, k, v)) ) }) }