/* eslint-disable @typescript-eslint/no-explicit-any */ import { Array, Duration, Effect, Layer, type NonEmptyReadonlyArray, Option, pipe, Redacted, Struct } from "effect-app" import { toNonEmptyArray } from "effect-app/Array" import { dropUndefinedT, mutable } from "effect-app/utils" import { CosmosClient, CosmosClientLayer } from "../adapters/cosmos-client.js" import { OptimisticConcurrencyException } from "../errors.js" import { InfraLogger } from "../logger.js" import type { FieldValues } from "../Model/filter/types.js" import { type RawQuery } from "../Model/query.js" import { buildWhereCosmosQuery3, logQuery } from "./Cosmos/query.js" import { storeId } from "./Memory.js" import { type FilterArgs, type PersistenceModelType, type StorageConfig, type Store, type StoreConfig, StoreMaker } from "./service.js" const makeMapId = (idKey: IdKey) => ({ [idKey]: id, ...e }: Encoded) => ({ ...e, id }) const makeReverseMapId = (idKey: IdKey) => ({ id, ...t }: PersistenceModelType & { id: string }>) => ({ ...t, [idKey]: id }) as any as PersistenceModelType class CosmosDbOperationError { constructor(readonly message: string, readonly raw?: unknown) {} } // TODO: Retry operation when running into RU limit. const makeCosmosStore = Effect.fnUntraced(function*({ prefix }: StorageConfig) { const { db } = yield* CosmosClient return { make: Effect.fnUntraced(function*( name: string, idKey: IdKey, seed?: Effect.Effect, E, R>, config?: StoreConfig ) { const mapId = makeMapId(idKey) const mapReverseId = makeReverseMapId(idKey) type PM = PersistenceModelType type PMCosmos = PersistenceModelType & { id: string }> const containerId = `${prefix}${name}` yield* Effect.promise(() => db.containers.createIfNotExists(dropUndefinedT({ id: containerId, uniqueKeyPolicy: config?.uniqueKeys ? { uniqueKeys: config.uniqueKeys } : undefined, partitionKey: { paths: ["/_partitionKey"], version: 2 // support large partitionkeys so that the hash is not based on just the first 100 bytes! } })) ) const basePartitionKey = config?.partitionValue() ?? "primary" const nsPrefix = (ns: string) => ns === "primary" ? "" : `${ns}::` const nsPartitionValue = (ns: string, e?: Encoded) => { const base = config?.partitionValue(e) ?? "primary" return `${nsPrefix(ns)}${base}` } const nsBasePartitionKey = (ns: string) => `${nsPrefix(ns)}${basePartitionKey}` const resolveNamespace = !config?.allowNamespace ? Effect.succeed("primary") : storeId.asEffect().pipe(Effect.map((namespace) => { if (namespace !== "primary" && !config.allowNamespace!(namespace)) { throw new Error(`Namespace ${namespace} not allowed!`) } return namespace })) const defaultValues = config?.defaultValues ?? {} const container = db.container(containerId) const bulk = container.items.bulk.bind(container.items) const execBatch = container.items.batch.bind(container.items) // TODO: move the marker to a separate container and get rid of the checks on every query // then need to clean up the actual data.. perhaps first do with a config toggle to prescribe to it. const importedMarkerId = containerId const ctx = yield* Effect.context() const seedCache = new Map>() const makeSeedEffect = (ns: string) => { const markerId = ns === "primary" ? importedMarkerId : `${importedMarkerId}::${ns}` return Effect .promise(() => container .item(markerId, markerId) .read<{ id: string }>() .then(({ resource }) => Option.fromNullishOr(resource)) ) .pipe( Effect.flatMap((marker) => { if (Option.isSome(marker)) return Effect.void return InfraLogger.logInfo(`Creating mock data for ${name} (namespace: ${ns})`).pipe( Effect.andThen(seed!), Effect.flatMap((m) => Effect.flatMapOption( Effect.succeed(toNonEmptyArray([...m])), (a) => bulkSetInternal(a, ns).pipe(Effect.orDie) ) ), Effect.andThen( Effect.promise(() => container.items.create({ _partitionKey: markerId, id: markerId, ttl: -1 }) ) ), Effect.provide(ctx), Effect.orDie ) }), Effect.withLogSpan(`Cosmos.seedCheck ${name} in ${ns} [effect-app/infra/Store]`), Effect.withSpan("Cosmos.seed [effect-app/infra/Store]", { attributes: { name, namespace: ns } }) ) } const seedNamespace = Effect.fn("seedNamespace")(function*(ns: string) { if (!seed) return let cached = seedCache.get(ns) if (!cached) { cached = yield* Effect.cached(Effect.uninterruptible(makeSeedEffect(ns))) seedCache.set(ns, cached) } yield* cached }) const bulkSetInternal = (items: NonEmptyReadonlyArray, ns: string) => Effect .gen(function*() { // TODO: disable batching if need atomicity // we delay and batch to keep low amount of RUs const b = [...items] .map( (x) => [ x, Option.match(Option.fromNullishOr(x._etag), { onNone: () => dropUndefinedT({ operationType: "Create" as const, resourceBody: { ...Struct.omit(x, ["_etag", idKey]), id: x[idKey], _partitionKey: nsPartitionValue(ns, x) } // don't use this or we get an error that the request and some item partition key dont match - makese no sense // partitionKey: config?.partitionValue(x) }), onSome: (eTag) => dropUndefinedT({ operationType: "Replace" as const, id: x[idKey], resourceBody: { ...Struct.omit(x, ["_etag", idKey]), id: x[idKey], _partitionKey: nsPartitionValue(ns, x) }, ifMatch: eTag // don't use this or we get an error that the request and some item partition key dont match - makese no sense // partitionKey: config?.partitionValue(x) }) }) ] as const ) const batches = Array.chunksOf(b, config?.maxBulkSize ?? 10) const batchResult = yield* Effect.forEach( batches .map((x, i) => [i, x] as const), ([i, batch]) => Effect .promise(() => bulk(batch.map(([, op]) => op))) .pipe( Effect .delay(Duration.millis(i === 0 ? 0 : 150)), Effect .flatMap((responses) => Effect.gen(function*() { const r = responses.find((x) => x.statusCode === 412 || x.statusCode === 404 || x.statusCode === 409 ) if (r) { return yield* Effect.fail( new OptimisticConcurrencyException( { type: name, id: JSON.stringify(r.resourceBody?.["id"]), code: r.statusCode, raw: responses } ) ) } const r2 = responses.find( (x) => x.statusCode !== 424 && (x.statusCode > 299 || x.statusCode < 200) ) if (r2) { return yield* Effect.die( new CosmosDbOperationError( "not able to update records: " + r2.statusCode, responses ) ) } const r3 = responses.find( (x) => x.statusCode > 299 || x.statusCode < 200 ) if (r3) { return yield* Effect.die( new CosmosDbOperationError( "not able to update records: " + r3.statusCode, responses ) ) } return batch.map(([e], i) => ({ ...e, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion _etag: responses[i]!.eTag })) }) ) ) ) return batchResult.flat() as unknown as NonEmptyReadonlyArray }) .pipe( Effect.withSpan("Cosmos.bulkSet [effect-app/infra/Store]", { attributes: { "repository.container_id": containerId, "repository.model_name": name, namespace: ns } }, { captureStackTrace: false }) ) const bulkSet = (items: NonEmptyReadonlyArray) => resolveNamespace.pipe(Effect.flatMap((ns) => bulkSetInternal(items, ns))) const batchSet = (items: NonEmptyReadonlyArray) => { return resolveNamespace .pipe(Effect.flatMap((ns) => Effect .suspend(() => { const batch = [...items].map( (x) => [ x, Option.match(Option.fromNullishOr(x._etag), { onNone: () => ({ operationType: "Create" as const, resourceBody: { ...Struct.omit(x, ["_etag", idKey]), id: x[idKey], _partitionKey: nsPartitionValue(ns, x) } // don't use this or we get an error that the request and some item partition key dont match - makese no sense // partitionKey: config?.partitionValue(x) }), onSome: (eTag) => ({ operationType: "Replace" as const, id: x[idKey], resourceBody: { ...Struct.omit(x, ["_etag", idKey]), id: x[idKey], _partitionKey: nsPartitionValue(ns, x) }, // don't use this or we get an error that the request and some item partition key dont match - makese no sense // partitionKey: config?.partitionValue(x) ifMatch: eTag }) }) ] as const ) const ex = batch.map(([, c]) => c) return Effect .promise(() => execBatch(ex, ex[0]?.resourceBody._partitionKey)) .pipe(Effect.flatMap(Effect.fnUntraced(function*(x) { const result = x.result ?? [] const firstFailed = result.find( (x: any) => x.statusCode > 299 || x.statusCode < 200 ) if (firstFailed) { const code = firstFailed.statusCode ?? 0 if (code === 412 || code === 404 || code === 409) { return yield* new OptimisticConcurrencyException({ type: name, id: "batch", code }) } return yield* Effect.die( new CosmosDbOperationError("not able to update record: " + code) ) } return batch.map(([e], i) => ({ ...e, _etag: result[i]?.eTag })) as unknown as NonEmptyReadonlyArray }))) }) .pipe(Effect .withSpan("Cosmos.batchSet [effect-app/infra/Store]", { attributes: { "repository.container_id": containerId, "repository.model_name": name, namespace: ns } }, { captureStackTrace: false })) )) } const s: Store = { seedNamespace: (ns) => seedNamespace(ns), queryRaw: (query: RawQuery) => Effect .all({ q: Effect.sync(() => query.cosmos({ name })), ns: resolveNamespace }) .pipe( Effect.tap(({ q }) => logQuery(q)), Effect.flatMap(({ ns, q }) => Effect .promise(() => container .items .query(q, { partitionKey: nsBasePartitionKey(ns) }) .fetchAll() .then(({ resources }) => resources.map( (_) => ({ ...defaultValues, ...mapReverseId(_ as any) }) as Out ) ) ) .pipe( Effect.withSpan("Cosmos.queryRaw [effect-app/infra/Store]", { attributes: { "repository.container_id": containerId, "repository.model_name": name, namespace: ns } }, { captureStackTrace: false }) ) ) ), batchRemove: (ids, partitionKey?: string) => resolveNamespace.pipe(Effect.flatMap((ns) => Effect .promise(() => execBatch( mutable(ids.map((id) => dropUndefinedT({ operationType: "Delete" as const, id // don't use this or we get an error that the request and some item partition key dont match - makese no sense // partitionKey: config?.partitionValue({ [idKey]: id } as Encoded) }) )), partitionKey ?? nsBasePartitionKey(ns) ) ) .pipe( Effect.withSpan("Cosmos.batchRemove [effect-app/infra/Store]", { attributes: { "repository.container_id": containerId, "repository.model_name": name, namespace: ns } }, { captureStackTrace: false }) ) )), all: Effect .all({ q: Effect.sync(() => ({ query: `SELECT * FROM ${name}`, parameters: [] })), ns: resolveNamespace }) .pipe( Effect.tap(({ q }) => logQuery(q)), Effect.flatMap(({ ns, q }) => Effect .promise(() => container .items .query(q, { partitionKey: nsBasePartitionKey(ns) }) .fetchAll() .then(({ resources }) => resources.map( (_) => ({ ...defaultValues, ...mapReverseId(_) }) ) ) ) .pipe( Effect.withSpan("Cosmos.all [effect-app/infra/Store]", { attributes: { "repository.container_id": containerId, "repository.model_name": name, namespace: ns } }, { captureStackTrace: false }) ) ) ), /** * May return duplicate results for "join_find", when matching more than once. */ filter: ( f: FilterArgs ) => { const skip = f?.skip const limit = f?.limit const filter = f.filter type M = U extends undefined ? Encoded : Pick return Effect .all({ q: Effect.sync(() => buildWhereCosmosQuery3( idKey, filter ? [{ t: "where-scope", result: filter, relation: "some" }] : [], name, defaultValues, f.select as | NonEmptyReadonlyArray | undefined, f.order as NonEmptyReadonlyArray<{ key: string; direction: "ASC" | "DESC" }> | undefined, skip, limit ) ), ns: resolveNamespace }) .pipe( Effect.tap(({ q }) => logQuery(q)), Effect .flatMap(({ ns, q }) => Effect .promise(() => f.select ? container .items .query(q, { partitionKey: nsBasePartitionKey(ns) }) .fetchAll() .then(({ resources }) => resources.map((_) => ({ ...pipe( defaultValues, Struct.pick(f.select!.filter((_) => typeof _ === "string") as never[]) ), ...mapReverseId(_ as any) })) ) : container .items .query<{ f: M }>(q, { partitionKey: nsBasePartitionKey(ns) }) .fetchAll() .then(({ resources }) => resources.map(({ f }) => ({ ...defaultValues, ...mapReverseId(f as any) }) as any) ) ) .pipe( Effect.withSpan("Cosmos.filter [effect-app/infra/Store]", { attributes: { "repository.container_id": containerId, "repository.model_name": name, namespace: ns } }, { captureStackTrace: false }) ) ) ) }, find: (id) => resolveNamespace.pipe(Effect.flatMap((ns) => Effect .promise(() => container .item(id, nsPartitionValue(ns, { [idKey]: id } as Encoded)) .read() .then(({ resource }) => Option.fromNullishOr(resource).pipe( Option.map((_) => ({ ...defaultValues, ...mapReverseId(_) })) ) ) ) .pipe(Effect .withSpan("Cosmos.find [effect-app/infra/Store]", { attributes: { "repository.container_id": containerId, "repository.model_name": name, partitionValue: nsPartitionValue(ns, { [idKey]: id } as Encoded), namespace: ns, id } }, { captureStackTrace: false })) )), set: (e) => resolveNamespace.pipe(Effect.flatMap((ns) => Option .match( Option .fromNullishOr(e._etag), { onNone: () => Effect.promise(() => container.items.create({ ...mapId(e), _partitionKey: nsPartitionValue(ns, e) }) ), onSome: (eTag) => Effect.promise(() => container.item(e[idKey], nsPartitionValue(ns, e)).replace( { ...mapId(e), _partitionKey: nsPartitionValue(ns, e) }, { accessCondition: { type: "IfMatch", condition: eTag } } ) ) } ) .pipe( Effect .flatMap((x) => { if (x.statusCode === 412 || x.statusCode === 404 || x.statusCode === 409) { return Effect.fail( new OptimisticConcurrencyException({ type: name, id: e[idKey], code: x.statusCode }) ) } if (x.statusCode > 299 || x.statusCode < 200) { return Effect.die( new CosmosDbOperationError( "not able to update record: " + x.statusCode ) ) } return Effect.sync(() => ({ ...e, _etag: x.etag })) }), Effect .withSpan("Cosmos.set [effect-app/infra/Store]", { attributes: { "repository.container_id": containerId, "repository.model_name": name, namespace: ns, id: e[idKey] } }, { captureStackTrace: false }) ) )), batchSet, bulkSet } // Eagerly seed primary namespace on initialization yield* seedNamespace("primary") return s }) } }) export function CosmosStoreLayer(cfg: StorageConfig) { return StoreMaker .toLayer(makeCosmosStore(cfg)) .pipe(Layer.provide(CosmosClientLayer(Redacted.value(cfg.url), cfg.dbName))) }