/* eslint-disable @typescript-eslint/no-explicit-any */
import { Context, Layer, LayerMap } from "effect"
import { Effect, type NonEmptyReadonlyArray, Option, Struct } from "effect-app"
import { toNonEmptyArray } from "effect-app/Array"
import { SqlClient } from "effect/unstable/sql"
import { OptimisticConcurrencyException } from "../errors.js"
import { InfraLogger } from "../logger.js"
import type { FieldValues } from "../Model/filter/types.js"
import { storeId } from "./Memory.js"
import { type FilterArgs, type PersistenceModelType, type StorageConfig, type Store, type StoreConfig, StoreMaker } from "./service.js"
import { buildWhereSQLQuery, logQuery, type SQLDialect, sqliteDialect } from "./SQL/query.js"
import { makeETag } from "./utils.js"
export type WithNsTransactionFn = (effect: Effect.Effect) => Effect.Effect
export class WithNsTransaction
extends Context.Service()("effect-app/WithNsTransaction")
{}
/** @internal */
export const parseRow = (
row: { id: string; _etag: string | null; data: string },
idKey: PropertyKey,
defaultValues: Partial
): PersistenceModelType => {
const data = (typeof row.data === "string" ? JSON.parse(row.data) : row.data) as object
return { ...defaultValues, ...data, [idKey]: row.id, _etag: row._etag ?? undefined } as PersistenceModelType
}
const parseSelectRow = (
row: Record,
idKey: PropertyKey
): any => {
const result: Record = {}
for (const [key, value] of Object.entries(row)) {
if (key === "id") {
result[idKey as string] = value
result["id"] = value
} else if (typeof value === "string") {
try {
result[key] = JSON.parse(value)
} catch {
result[key] = value
}
} else {
result[key] = value
}
}
return result
}
function makeSQLStoreInt(dialect: SQLDialect, jsonColumnType: string) {
return Effect.fnUntraced(function*({ prefix }: StorageConfig) {
const sql = yield* SqlClient.SqlClient
return {
make: Effect.fnUntraced(function*(
name: string,
idKey: IdKey,
seed?: Effect.Effect, E, R>,
config?: StoreConfig
) {
type PM = PersistenceModelType
const tableName = `${prefix}${name}`
const defaultValues = config?.defaultValues ?? {}
const resolveNamespace = !config?.allowNamespace
? Effect.succeed("primary")
: storeId.asEffect().pipe(Effect.map((namespace) => {
if (namespace !== "primary" && !config.allowNamespace!(namespace)) {
throw new Error(`Namespace ${namespace} not allowed!`)
}
return namespace
}))
const ensureTable = sql
.unsafe(
`CREATE TABLE IF NOT EXISTS "${tableName}" (id TEXT NOT NULL, _namespace TEXT NOT NULL DEFAULT 'primary', _etag TEXT, data ${jsonColumnType} NOT NULL, PRIMARY KEY (id, _namespace))`
)
.pipe(
Effect.andThen(
sql.unsafe(
`CREATE TABLE IF NOT EXISTS "_migrations" (id TEXT NOT NULL, version TEXT NOT NULL, PRIMARY KEY (id, version))`
)
),
Effect.orDie,
Effect.asVoid
)
const toRow = (e: PM) => {
const newE = makeETag(e)
const id = newE[idKey] as string
const { _etag, [idKey]: _id, ...rest } = newE as any
const data = JSON.stringify(rest)
return { id, _etag: newE._etag!, data, item: newE }
}
const exec = (query: string, params?: readonly unknown[]) => sql.unsafe(query, params as any).pipe(Effect.orDie)
const setInternal = Effect.fnUntraced(function*(e: PM, ns: string) {
const row = toRow(e)
if (e._etag) {
yield* exec(
`UPDATE "${tableName}" SET _etag = ?, data = ? WHERE id = ? AND _etag = ? AND _namespace = ?`,
[row._etag, row.data, row.id, e._etag, ns]
)
const existing = yield* exec(
`SELECT _etag FROM "${tableName}" WHERE id = ? AND _namespace = ?`,
[row.id, ns]
)
const current = (existing as any[])[0]
if (!current || current._etag !== row._etag) {
if (current) {
return yield* new OptimisticConcurrencyException({
type: name,
id: row.id,
current: current._etag,
found: e._etag,
code: 412
})
}
return yield* new OptimisticConcurrencyException({
type: name,
id: row.id,
current: "",
found: e._etag,
code: 404
})
}
} else {
yield* exec(
`INSERT INTO "${tableName}" (id, _namespace, _etag, data) VALUES (?, ?, ?, ?)`,
[row.id, ns, row._etag, row.data]
)
}
return row.item
})
const bulkSetInternal = (items: NonEmptyReadonlyArray, ns: string) =>
sql
.withTransaction(Effect.forEach(items, (e) => setInternal(e, ns)))
.pipe(
Effect.orDie,
Effect.map((_) => _ as unknown as NonEmptyReadonlyArray)
)
const ctx = yield* Effect.context()
const seedCache = new Map>()
const makeSeedEffect = Effect.fnUntraced(function*(ns: string) {
yield* ensureTable
if (!seed) return
const existing = yield* exec(
`SELECT id FROM "_migrations" WHERE id = ? AND version = ?`,
[`${tableName}::${ns}`, tableName]
)
if ((existing as any[]).length > 0) return
yield* InfraLogger.logInfo(`Seeding data for ${name} (namespace: ${ns})`)
const items = yield* seed.pipe(Effect.provide(ctx), Effect.orDie)
const ne = toNonEmptyArray([...items])
if (Option.isSome(ne)) yield* bulkSetInternal(ne.value, ns)
yield* exec(
`INSERT INTO "_migrations" (id, version) VALUES (?, ?)`,
[`${tableName}::${ns}`, tableName]
)
})
const seedNamespace = (ns: string) => {
let cached = seedCache.get(ns)
if (!cached) {
cached = Effect.cached(Effect.uninterruptible(makeSeedEffect(ns))).pipe(Effect.runSync)
seedCache.set(ns, cached)
}
return cached
}
const s: Store = {
seedNamespace: (ns) => seedNamespace(ns),
all: resolveNamespace.pipe(
Effect.flatMap((ns) =>
exec(`SELECT id, _etag, data FROM "${tableName}" WHERE _namespace = ?`, [ns])
.pipe(
Effect.map((rows) => (rows as any[]).map((r) => parseRow(r, idKey, defaultValues))),
Effect.withSpan("SQL.all [effect-app/infra/Store]", {
attributes: {
"repository.table_name": tableName,
"repository.model_name": name,
"repository.namespace": ns
}
}, { captureStackTrace: false })
)
)
),
find: (id) =>
resolveNamespace.pipe(
Effect.flatMap((ns) =>
exec(`SELECT id, _etag, data FROM "${tableName}" WHERE id = ? AND _namespace = ?`, [id, ns])
.pipe(
Effect.map((rows) => {
const row = (rows as any[])[0]
return row
? Option.some(parseRow(row, idKey, defaultValues))
: Option.none()
}),
Effect.withSpan("SQL.find [effect-app/infra/Store]", {
attributes: { "repository.table_name": tableName, "repository.model_name": name, id }
}, { captureStackTrace: false })
)
)
),
filter: (f: FilterArgs) => {
const filter = f
.filter
type M = U extends undefined ? Encoded
: Pick
return resolveNamespace
.pipe(Effect
.flatMap((ns) =>
Effect
.sync(() => {
const q = buildWhereSQLQuery(
dialect,
idKey,
filter ? [{ t: "where-scope", result: filter, relation: "some" }] : [],
tableName,
defaultValues,
f
.select as
| NonEmptyReadonlyArray
| undefined,
f
.order,
f
.skip,
f
.limit
)
const hasWhere = q
.sql
.includes("WHERE")
const nsSql = hasWhere
? q
.sql
.replace("WHERE", `WHERE _namespace = ? AND`)
: q
.sql
.replace(
`FROM "${tableName}"`,
`FROM "${tableName}" WHERE _namespace = ?`
)
return {
sql: nsSql,
params: [
ns,
...q
.params
]
}
})
.pipe(
Effect
.tap((q) => logQuery(q)),
Effect.flatMap((q) =>
exec(q.sql, q.params).pipe(
Effect.map((rows) => {
if (f.select) {
return (rows as any[]).map((r) => {
const selected = parseSelectRow(r, idKey)
return {
...Struct.pick(
defaultValues as any,
f.select!.filter((_) => typeof _ === "string") as never[]
),
...selected
} as M
})
}
return (rows as any[]).map((r) => parseRow(r, idKey, defaultValues) as any as M)
})
)
),
Effect.withSpan("SQL.filter [effect-app/infra/Store]", {
attributes: { "repository.table_name": tableName, "repository.model_name": name }
}, { captureStackTrace: false })
)
))
},
set: (e) =>
resolveNamespace.pipe(Effect.flatMap((ns) =>
setInternal(e, ns).pipe(
Effect.withSpan("SQL.set [effect-app/infra/Store]", {
attributes: { "repository.table_name": tableName, "repository.model_name": name, id: e[idKey] }
}, { captureStackTrace: false })
)
)),
batchSet: (items) =>
resolveNamespace.pipe(Effect.flatMap((ns) =>
bulkSetInternal(items, ns).pipe(
Effect.withSpan("SQL.batchSet [effect-app/infra/Store]", {
attributes: { "repository.table_name": tableName, "repository.model_name": name }
}, { captureStackTrace: false })
)
)),
bulkSet: (items) =>
resolveNamespace.pipe(Effect.flatMap((ns) =>
bulkSetInternal(items, ns).pipe(
Effect.withSpan("SQL.bulkSet [effect-app/infra/Store]", {
attributes: { "repository.table_name": tableName, "repository.model_name": name }
}, { captureStackTrace: false })
)
)),
batchRemove: (ids) => {
const placeholders = ids.map(() => "?").join(", ")
return resolveNamespace.pipe(Effect.flatMap((ns) =>
exec(
`DELETE FROM "${tableName}" WHERE id IN (${placeholders}) AND _namespace = ?`,
[...ids, ns]
)
.pipe(
Effect.asVoid,
Effect.withSpan("SQL.batchRemove [effect-app/infra/Store]", {
attributes: { "repository.table_name": tableName, "repository.model_name": name }
}, { captureStackTrace: false })
)
))
},
queryRaw: (query) =>
s.all.pipe(
Effect.map(query.memory),
Effect.withSpan("SQL.queryRaw [effect-app/infra/Store]", {
attributes: { "repository.table_name": tableName, "repository.model_name": name }
}, { captureStackTrace: false })
)
}
// Eagerly seed primary namespace on initialization
yield* seedNamespace("primary")
return s
})
}
})
}
type WithNsSqlFn = (
ns: string,
f: (sql: SqlClient.SqlClient) => Effect.Effect
) => Effect.Effect
function makeSQLiteStorePerNs(
withNsSql: WithNsSqlFn,
{ prefix }: StorageConfig
) {
return {
make: Effect.fnUntraced(function*(
name: string,
idKey: IdKey,
seed?: Effect.Effect, E, R>,
config?: StoreConfig
) {
type PM = PersistenceModelType
const tableName = `${prefix}${name}`
const defaultValues = config?.defaultValues ?? {}
const resolveNamespace = !config?.allowNamespace
? Effect.succeed("primary")
: storeId.asEffect().pipe(Effect.map((namespace) => {
if (namespace !== "primary" && !config.allowNamespace!(namespace)) {
throw new Error(`Namespace ${namespace} not allowed!`)
}
return namespace
}))
const toRow = (e: PM) => {
const newE = makeETag(e)
const id = newE[idKey] as string
const { _etag, [idKey]: _id, ...rest } = newE as any
const data = JSON.stringify(rest)
return { id, _etag: newE._etag!, data, item: newE }
}
const exec = (ns: string, query: string, params?: readonly unknown[]) =>
withNsSql(ns, (sql) => sql.unsafe(query, params as any).pipe(Effect.orDie))
const ensureTable = (ns: string) =>
withNsSql(ns, (sql) =>
sql
.unsafe(
`CREATE TABLE IF NOT EXISTS "${tableName}" (id TEXT NOT NULL PRIMARY KEY, _etag TEXT, data JSON NOT NULL)`
)
.pipe(
Effect.andThen(
sql.unsafe(
`CREATE TABLE IF NOT EXISTS "_migrations" (id TEXT NOT NULL, version TEXT NOT NULL, PRIMARY KEY (id, version))`
)
),
Effect.orDie,
Effect.asVoid
))
const setInternal = Effect.fnUntraced(function*(e: PM, ns: string) {
const row = toRow(e)
if (e._etag) {
yield* exec(
ns,
`UPDATE "${tableName}" SET _etag = ?, data = ? WHERE id = ? AND _etag = ?`,
[row._etag, row.data, row.id, e._etag]
)
const existing = yield* exec(
ns,
`SELECT _etag FROM "${tableName}" WHERE id = ?`,
[row.id]
)
const current = (existing as any[])[0]
if (!current || current._etag !== row._etag) {
if (current) {
return yield* new OptimisticConcurrencyException({
type: name,
id: row.id,
current: current._etag,
found: e._etag,
code: 412
})
}
return yield* new OptimisticConcurrencyException({
type: name,
id: row.id,
current: "",
found: e._etag,
code: 404
})
}
} else {
yield* exec(
ns,
`INSERT INTO "${tableName}" (id, _etag, data) VALUES (?, ?, ?)`,
[row.id, row._etag, row.data]
)
}
return row.item
})
const bulkSetInternal = (items: NonEmptyReadonlyArray, ns: string) =>
withNsSql(ns, (sql) =>
sql
.withTransaction(Effect.forEach(items, (e) => setInternal(e, ns)))
.pipe(
Effect.orDie,
Effect.map((_) => _ as unknown as NonEmptyReadonlyArray)
))
const ctx = yield* Effect.context()
const seedCache = new Map>()
const makeSeedEffect = Effect.fnUntraced(function*(ns: string) {
yield* ensureTable(ns)
if (!seed) return
const existing = yield* exec(
ns,
`SELECT id FROM "_migrations" WHERE id = ? AND version = ?`,
[tableName, tableName]
)
if ((existing as any[]).length > 0) return
yield* InfraLogger.logInfo(`Seeding data for ${name} (namespace: ${ns})`)
const items = yield* seed.pipe(Effect.provide(ctx), Effect.orDie)
const ne = toNonEmptyArray([...items])
if (Option.isSome(ne)) yield* bulkSetInternal(ne.value, ns)
yield* exec(
ns,
`INSERT INTO "_migrations" (id, version) VALUES (?, ?)`,
[tableName, tableName]
)
})
const seedNamespace = (ns: string) => {
let cached = seedCache.get(ns)
if (!cached) {
cached = Effect.cached(Effect.uninterruptible(makeSeedEffect(ns))).pipe(Effect.runSync)
seedCache.set(ns, cached)
}
return cached
}
const s: Store = {
seedNamespace: (ns) => seedNamespace(ns),
all: resolveNamespace.pipe(Effect.flatMap((ns) =>
exec(ns, `SELECT id, _etag, data FROM "${tableName}"`)
.pipe(
Effect.map((rows) => (rows as any[]).map((r) => parseRow(r, idKey, defaultValues))),
Effect.withSpan("SQLite.all [effect-app/infra/Store]", {
attributes: {
"repository.table_name": tableName,
"repository.model_name": name,
"repository.namespace": ns
}
}, { captureStackTrace: false })
)
)),
find: (id) =>
resolveNamespace.pipe(
Effect.flatMap((ns) =>
exec(ns, `SELECT id, _etag, data FROM "${tableName}" WHERE id = ?`, [id])
.pipe(
Effect.map((rows) => {
const row = (rows as any[])[0]
return row
? Option.some(parseRow(row, idKey, defaultValues))
: Option.none()
}),
Effect.withSpan("SQLite.find [effect-app/infra/Store]", {
attributes: { "repository.table_name": tableName, "repository.model_name": name, id }
}, { captureStackTrace: false })
)
)
),
filter: (f: FilterArgs) => {
const filter = f
.filter
type M = U extends undefined ? Encoded
: Pick
return resolveNamespace
.pipe(Effect
.flatMap((ns) =>
Effect
.sync(() =>
buildWhereSQLQuery(
sqliteDialect,
idKey,
filter ? [{ t: "where-scope", result: filter, relation: "some" }] : [],
tableName,
defaultValues,
f
.select as
| NonEmptyReadonlyArray
| undefined,
f
.order as NonEmptyReadonlyArray<{ key: string; direction: "ASC" | "DESC" }> | undefined,
f
.skip,
f
.limit
)
)
.pipe(
Effect
.tap((q) => logQuery(q)),
Effect.flatMap((q) =>
exec(ns, q.sql, q.params).pipe(
Effect.map((rows) => {
if (f.select) {
return (rows as any[]).map((r) => {
const selected = parseSelectRow(r, idKey)
return {
...Struct.pick(
defaultValues as any,
f.select!.filter((_) => typeof _ === "string") as never[]
),
...selected
} as M
})
}
return (rows as any[]).map((r) => parseRow(r, idKey, defaultValues) as any as M)
})
)
),
Effect.withSpan("SQLite.filter [effect-app/infra/Store]", {
attributes: { "repository.table_name": tableName, "repository.model_name": name }
}, { captureStackTrace: false })
)
))
},
set: (e) =>
resolveNamespace.pipe(Effect.flatMap((ns) =>
setInternal(e, ns).pipe(
Effect.withSpan("SQLite.set [effect-app/infra/Store]", {
attributes: { "repository.table_name": tableName, "repository.model_name": name, id: e[idKey] }
}, { captureStackTrace: false })
)
)),
batchSet: (items) =>
resolveNamespace.pipe(Effect.flatMap((ns) =>
bulkSetInternal(items, ns).pipe(
Effect.withSpan("SQLite.batchSet [effect-app/infra/Store]", {
attributes: { "repository.table_name": tableName, "repository.model_name": name }
}, { captureStackTrace: false })
)
)),
bulkSet: (items) =>
resolveNamespace.pipe(Effect.flatMap((ns) =>
bulkSetInternal(items, ns).pipe(
Effect.withSpan("SQLite.bulkSet [effect-app/infra/Store]", {
attributes: { "repository.table_name": tableName, "repository.model_name": name }
}, { captureStackTrace: false })
)
)),
batchRemove: (ids) => {
const placeholders = ids.map(() => "?").join(", ")
return resolveNamespace.pipe(Effect.flatMap((ns) =>
exec(
ns,
`DELETE FROM "${tableName}" WHERE id IN (${placeholders})`,
[...ids]
)
.pipe(
Effect.asVoid,
Effect.withSpan("SQLite.batchRemove [effect-app/infra/Store]", {
attributes: { "repository.table_name": tableName, "repository.model_name": name }
}, { captureStackTrace: false })
)
))
},
queryRaw: (query) =>
s.all.pipe(
Effect.map(query.memory),
Effect.withSpan("SQLite.queryRaw [effect-app/infra/Store]", {
attributes: { "repository.table_name": tableName, "repository.model_name": name }
}, { captureStackTrace: false })
)
}
yield* seedNamespace("primary")
return s
})
}
}
export function SQLiteStoreLayer(
cfg: StorageConfig,
options?: { makeSqlClientLayer?: (namespace: string) => Layer.Layer }
) {
if (options?.makeSqlClientLayer) {
return Layer.effectContext(
Effect.gen(function*() {
const layerMap = yield* LayerMap.make(
(namespace: string) => options.makeSqlClientLayer!(namespace),
{ idleTimeToLive: "10 minutes" }
)
const withNsSql: WithNsSqlFn = (ns, f) => SqlClient.SqlClient.use(f).pipe(Effect.provide(layerMap.get(ns)))
const storeMaker = makeSQLiteStorePerNs(withNsSql, cfg)
const withTransaction: WithNsTransactionFn = (effect) =>
storeId.asEffect().pipe(
Effect.flatMap((ns) => withNsSql(ns, (sql) => sql.withTransaction(effect).pipe(Effect.orDie)))
)
return StoreMaker.context(storeMaker).pipe(
Context.add(WithNsTransaction, withTransaction)
)
})
)
}
return StoreMaker
.toLayer(makeSQLStoreInt(sqliteDialect, "JSON")(cfg))
}