/* eslint-disable @typescript-eslint/no-explicit-any */ import { Array, Context, Effect, flow, type NonEmptyReadonlyArray, Option, Order, pipe, Ref, Result, Semaphore, Struct } from "effect-app" import { NonEmptyString255 } from "effect-app/Schema" import { get } from "effect-app/utils" import { InfraLogger } from "../logger.js" import type { FieldValues } from "../Model/filter/types.js" import { codeFilter, codeFilter3_ } from "./codeFilter.js" import { type FilterArgs, type PersistenceModelType, type Store, type StoreConfig, StoreMaker } from "./service.js" import { makeUpdateETag } from "./utils.js" export function memFilter(f: FilterArgs) { type M = U extends undefined ? T : Pick return ((c: T[]): M[] => { const select = (r: T[]): M[] => { const sel = f.select if (!sel) return r as M[] return r.map((i) => { const [keys, subKeys] = pipe( sel, Array.partition((r) => typeof r === "string" ? Result.fail(String(r)) : Result.succeed(r as { key: string; subKeys: string[] }) ) ) const n = Struct.pick(i, keys) subKeys.forEach((subKey) => { n[subKey.key] = i[subKey.key]!.map(Struct.pick(subKey.subKeys as never[])) }) return n as M }) } const skip = f?.skip const limit = f?.limit const ords = Option.map(Option.fromNullishOr(f.order), (_) => _.map((_) => Order.make((self, that) => { // TODO: inspect data types for the right comparison? const selfV = get(self, _.key) ?? false const thatV = get(that, _.key) ?? false if (selfV === thatV) { return 0 } if (_.direction === "ASC") { return selfV < thatV ? -1 : 1 } return selfV < thatV ? 1 : -1 }) )) if (Option.isSome(ords)) { c = Array.sortBy(...ords.value)(c) } if (!skip && limit === 1) { return select( Array.findFirst(c, f.filter ? codeFilter(f.filter) : (_) => Option.some(_)).pipe( Option.map(Array.make), Option.getOrElse( () => [] ) ) ) } let r = f.filter ? Array.filter(c, (x) => codeFilter3_(f.filter!, x)) : c if (skip) { r = Array.drop(r, skip) } if (limit !== undefined) { r = Array.take(r, limit) } return select(r) }) } const defaultNs: NonEmptyString255 = NonEmptyString255("primary") export class storeId extends Context.Reference("StoreId", { defaultValue: (): NonEmptyString255 => defaultNs }) {} function logQuery(f: FilterArgs, defaultValues?: any) { return InfraLogger .logDebug("mem query") .pipe(Effect.annotateLogs({ filter: JSON.stringify( f.filter, undefined, 2 ), order: JSON.stringify(f.order, undefined, 2), select: JSON.stringify(f.select, undefined, 2), defaultValues: JSON.stringify(defaultValues, undefined, 2), skip: f.skip, limit: f.limit })) } export function makeMemoryStoreInt( modelName: string, idKey: IdKey, namespace: string, seed?: Effect.Effect, E, R>, _defaultValues?: Partial ) { type PM = PersistenceModelType return Effect.gen(function*() { const updateETag = makeUpdateETag(modelName) const items_ = yield* seed ?? Effect.sync(() => []) const defaultValues = _defaultValues ?? {} const items = new Map([...items_].map((_) => [_[idKey], { _etag: undefined, ...defaultValues, ..._ }] as const)) const store = Ref.makeUnsafe>(items) const sem = Semaphore.makeUnsafe(1) const withPermit = sem.withPermits(1) const values = Effect.map(Ref.get(store), (s) => s.values()) const all = Effect.map(values, Array.fromIterable) const batchSet = (items: NonEmptyReadonlyArray) => Effect .forEach(items, (i) => Effect.flatMap(s.find(i[idKey]), (current) => updateETag(i, idKey, current))) .pipe( Effect .tap((items) => Ref .get(store) .pipe( Effect .map((m) => { const mut = m as Map items.forEach((e) => mut.set(e[idKey], e)) return mut }), Effect .flatMap((_) => Ref.set(store, _)) ) ), Effect .map((_) => _), withPermit ) const batchRemove = (items: NonEmptyReadonlyArray) => Ref .get(store) .pipe( Effect .map((m) => { return new Map([...m].filter(([_k]) => !items.includes(_k))) }), Effect .flatMap((_) => Ref.set(store, _)) ) .pipe( withPermit ) const s: Store = { seedNamespace: () => Effect.void, queryRaw: (query) => all .pipe( // Effect.tap(() => logQuery(query, defaultValues)), Effect.map(query.memory), Effect.withSpan("Memory.queryRaw [effect-app/infra/Store]", { attributes: { "repository.model_name": modelName, "repository.namespace": namespace } }, { captureStackTrace: false }) ), all: all.pipe(Effect.withSpan("Memory.all [effect-app/infra/Store]", { attributes: { modelName, namespace } }, { captureStackTrace: false })), find: (id) => Ref .get(store) .pipe( Effect.map((_) => Option.fromNullishOr(_.get(id))), Effect .withSpan("Memory.find [effect-app/infra/Store]", { attributes: { modelName, namespace } }, { captureStackTrace: false }) ), filter: (f) => all .pipe( Effect.tap(() => logQuery(f, defaultValues)), Effect.map(memFilter(f)), Effect.withSpan("Memory.filter [effect-app/infra/Store]", { attributes: { "repository.model_name": modelName, "repository.namespace": namespace } }, { captureStackTrace: false }) ), set: (e) => s .find(e[idKey]) .pipe( Effect.flatMap((current) => updateETag(e, idKey, current)), Effect .tap((e) => Ref.get(store).pipe( Effect.map((_) => new Map([..._, [e[idKey], e]])), Effect.flatMap((_) => Ref.set(store, _)) ) ), withPermit, Effect .withSpan("Memory.set [effect-app/infra/Store]", { attributes: { "repository.model_name": modelName, "repository.namespace": namespace } }, { captureStackTrace: false }) ), batchRemove: (items: NonEmptyReadonlyArray) => pipe( Effect .sync(() => items) // align with CosmosDB .pipe( Effect.filterOrFail((_) => _.length <= 100, () => "BatchRemove: a batch may not exceed 100 items"), Effect.orDie, Effect.andThen(batchRemove), Effect .withSpan("Memory.batchRemove [effect-app/infra/Store]", { attributes: { "repository.model_name": modelName, "repository.namespace": namespace } }, { captureStackTrace: false }) ) ), batchSet: (items: readonly [PM, ...PM[]]) => pipe( Effect .sync(() => items) // align with CosmosDB .pipe( Effect.filterOrFail((_) => _.length <= 100, () => "BatchSet: a batch may not exceed 100 items"), Effect.orDie, Effect.andThen(batchSet), Effect .withSpan("Memory.batchSet [effect-app/infra/Store]", { attributes: { "repository.model_name": modelName, "repository.namespace": namespace } }, { captureStackTrace: false }) ) ), bulkSet: flow( batchSet, (_) => _.pipe(Effect.withSpan("Memory.bulkSet [effect-app/infra/Store]", { attributes: { "repository.model_name": modelName, "repository.namespace": namespace } }, { captureStackTrace: false })) ) } return s }) } export const makeMemoryStore = () => ({ make: Effect.fnUntraced(function*( modelName: string, idKey: IdKey, seed?: Effect.Effect, E, R>, config?: StoreConfig ) { const primary = yield* makeMemoryStoreInt( modelName, idKey, "primary", seed, config?.defaultValues ) const ctx = yield* Effect.context() const stores = new Map([["primary", primary]]) const semaphores = new Map() const getSem = (ns: string) => { let sem = semaphores.get(ns) if (!sem) { sem = Semaphore.makeUnsafe(1) semaphores.set(ns, sem) } return sem } const ensureStore = (namespace: string) => getSem(namespace).withPermits(1)(Effect.suspend(() => { const store = stores.get(namespace) if (store) return Effect.succeed(store) if (config?.allowNamespace && !config.allowNamespace(namespace)) { throw new Error(`Namespace ${namespace} not allowed!`) } return makeMemoryStoreInt(modelName, idKey, namespace, seed, config?.defaultValues) .pipe( Effect.orDie, Effect.provide(ctx), Effect.tap((store) => Effect.sync(() => stores.set(namespace, store))) ) })) const getStore = !config?.allowNamespace ? Effect.succeed(primary) : storeId.asEffect().pipe(Effect.flatMap((namespace) => ensureStore(namespace))) const s: Store = { seedNamespace: (namespace) => ensureStore(namespace).pipe(Effect.asVoid), all: Effect.flatMap(getStore, (_) => _.all), queryRaw: (...args) => Effect.flatMap(getStore, (_) => _.queryRaw(...args)), find: (...args) => Effect.flatMap(getStore, (_) => _.find(...args)), filter: (...args) => Effect.flatMap(getStore, (_) => _.filter(...args)), set: (...args) => Effect.flatMap(getStore, (_) => _.set(...args)), batchSet: (...args) => Effect.flatMap(getStore, (_) => _.batchSet(...args)), bulkSet: (...args) => Effect.flatMap(getStore, (_) => _.bulkSet(...args)), batchRemove: (...args) => Effect.flatMap(getStore, (_) => _.batchRemove(...args)) } return s }) }) export const MemoryStoreLive = StoreMaker.toLayer(Effect.sync(() => makeMemoryStore()))