/** * @since 1.0.0 */ import * as Arr from "effect/Array" import type * as Duration from "effect/Duration" import * as Effect from "effect/Effect" import * as Either from "effect/Either" import * as Fiber from "effect/Fiber" import { dual, pipe } from "effect/Function" import * as Option from "effect/Option" import * as Request from "effect/Request" import * as RequestResolver from "effect/RequestResolver" import * as Runtime from "effect/Runtime" import type * as Schema from "effect/Schema" import * as Scope from "effect/Scope" import * as Persistence from "./Persistence.js" interface DataLoaderItem> { readonly request: A readonly resume: (effect: Effect.Effect, Request.Request.Error>) => void } /** * @since 1.0.0 * @category combinators */ export const dataLoader = dual< /** * @since 1.0.0 * @category combinators */ ( options: { readonly window: Duration.DurationInput readonly maxBatchSize?: number } ) => >( self: RequestResolver.RequestResolver ) => Effect.Effect, never, Scope.Scope>, /** * @since 1.0.0 * @category combinators */ >( self: RequestResolver.RequestResolver, options: { readonly window: Duration.DurationInput readonly maxBatchSize?: number } ) => Effect.Effect, never, Scope.Scope> >( 2, Effect.fnUntraced(function*< A extends Request.Request >(self: RequestResolver.RequestResolver, options: { readonly window: Duration.DurationInput readonly maxBatchSize?: number }) { const maxSize = options.maxBatchSize ?? Infinity const scope = yield* Effect.scope const runtime = yield* Effect.runtime().pipe( Effect.interruptible ) const runFork = Runtime.runFork(runtime) let batch = new Set>() const process = (items: Iterable>) => Effect.withRequestCaching( Effect.forEach( items, ({ request, resume }) => Effect.request(request, self).pipe( Effect.exit, Effect.map(resume) ), { batching: true, discard: true } ), false ) const delayedProcess = Effect.sleep(options.window).pipe( Effect.flatMap(() => { const currentBatch = batch batch = new Set() fiber = undefined return process(currentBatch) }) ) let fiber: Fiber.RuntimeFiber | undefined yield* Scope.addFinalizer(scope, Effect.suspend(() => fiber ? Fiber.interrupt(fiber) : Effect.void)) return RequestResolver.fromEffect((request: A) => Effect.async, /** * @since 1.0.0 * @category combinators */ Request.Request.Error>((resume) => { const item: DataLoaderItem = { request, resume } batch.add(item) if (batch.size >= maxSize) { const currentBatch = batch batch = new Set() if (fiber) { const parent = Option.getOrThrow(Fiber.getCurrentFiber()) fiber.unsafeInterruptAsFork(parent.id()) fiber = undefined } runFork(process(currentBatch)) } else if (!fiber) { fiber = runFork(delayedProcess) } return Effect.sync(() => { batch.delete(item) }) }) ); }) ) /** * @since 1.0.0 * @category model */ export interface PersistedRequest extends Request.Request, Schema.WithResult {} /** * @since 1.0.0 * @category model */ export declare namespace PersistedRequest { /** * @since 1.0.0 * @category model */ export type Any = PersistedRequest | PersistedRequest } /** * @since 1.0.0 * @category combinators */ export const persisted: { /** * @since 1.0.0 * @category combinators */ ( options: { readonly storeId: string readonly timeToLive: (...args: Persistence.ResultPersistence.TimeToLiveArgs) => Duration.DurationInput } ): ( self: RequestResolver.RequestResolver ) => Effect.Effect< RequestResolver.RequestResolver>, never, Persistence.ResultPersistence | Scope.Scope > /** * @since 1.0.0 * @category combinators */ ( self: RequestResolver.RequestResolver, options: { readonly storeId: string readonly timeToLive: (...args: Persistence.ResultPersistence.TimeToLiveArgs) => Duration.DurationInput } ): Effect.Effect< RequestResolver.RequestResolver>, never, Persistence.ResultPersistence | Scope.Scope > } = dual(2, ( self: RequestResolver.RequestResolver, options: { readonly storeId: string readonly timeToLive: (...args: Persistence.ResultPersistence.TimeToLiveArgs) => Duration.DurationInput } ): Effect.Effect< RequestResolver.RequestResolver>, never, Persistence.ResultPersistence | Scope.Scope > => Effect.gen(function*() { const storage = yield* (yield* Persistence.ResultPersistence).make({ storeId: options.storeId, timeToLive: options.timeToLive as any }) const partition = (requests: ReadonlyArray) => storage.getMany(requests as any).pipe( Effect.map( Arr.partitionMap((_, i) => Option.match(_, { onNone: () => Either.left(requests[i]), onSome: (_) => Either.right([requests[i], _] as const) }) ) ), Effect.orElseSucceed(() => [requests, []] as const) ) const set = ( request: Req, result: Request.Request.Result ): Effect.Effect => Effect.ignoreLogged(storage.set(request as any, result)) return RequestResolver.makeBatched((requests: Arr.NonEmptyArray) => Effect.flatMap(partition(requests), ([remaining, results]) => { const completeCached = Effect.forEach( results, ([request, result]) => Request.complete(request, result as any) as Effect.Effect, { discard: true } ) const completeUncached = pipe( Effect.forEach( remaining, (request) => Effect.exit(Effect.request(request, self)), { batching: true } ), Effect.flatMap((results) => Effect.forEach( results, (result, i) => { const request = remaining[i] return Effect.zipRight( set(request, result as any), Request.complete(request, result as any) ) }, { discard: true } ) ), Effect.withRequestCaching(false) ) return Effect.zipRight(completeCached, completeUncached) }) ) }))