import type { BetterOmit, DocumentByInfo, DocumentByName, Expand, Expression, FilterBuilder, GenericDatabaseReader, GenericDatabaseWriter, GenericDataModel, Indexes, IndexRange, IndexRangeBuilder, NamedIndex, NamedSearchIndex, OrderedQuery, PaginationOptions, PaginationResult, Query, QueryInitializer, SearchFilter, SearchFilterBuilder, SearchIndexes, WithOptionalSystemFields, WithoutSystemFields, } from "convex/server"; import type { GenericId } from "convex/values"; import { Array, type Cause, Chunk, Data, Effect, identity, Option, type ParseResult, pipe, Record, Schema, Stream, } from "effect"; import type { ConfectDocumentByName, DataModelFromConfectDataModel, GenericConfectDataModel, GenericConfectDocument, GenericConfectTableInfo, GenericEncodedConfectDocument, TableInfoFromConfectTableInfo, TableNamesInConfectDataModel, } from "~/src/server/data-model"; import { type ConfectDataModelFromConfectSchema, type ConfectSystemDataModel, confectSystemSchemaDefinition, type GenericConfectSchema, } from "~/src/server/schema"; import { extendWithSystemFields } from "~/src/server/schemas/SystemFields"; interface ConfectQuery< ConfectTableInfo extends GenericConfectTableInfo, TableName extends string, > { filter( predicate: ( q: FilterBuilder>, ) => Expression, ): ConfectQuery; order( order: "asc" | "desc", ): ConfectOrderedQuery; paginate( paginationOpts: PaginationOptions, ): Effect.Effect>; collect(): Effect.Effect; take(n: number): Effect.Effect; first(): Effect.Effect>; unique(): Effect.Effect< Option.Option, NotUniqueError >; stream(): Stream.Stream; } interface ConfectOrderedQuery< ConfectTableInfo extends GenericConfectTableInfo, TableName extends string, > extends Omit, "order"> {} export class NotUniqueError extends Data.TaggedError("NotUniqueError") {} class ConfectQueryImpl< ConfectTableInfo extends GenericConfectTableInfo, TableName extends string, > implements ConfectQuery { q: Query>; tableSchema: Schema.Schema< ConfectTableInfo["confectDocument"], ConfectTableInfo["encodedConfectDocument"] >; tableName: TableName; constructor( q: | Query> | OrderedQuery>, tableSchema: Schema.Schema< ConfectTableInfo["confectDocument"], ConfectTableInfo["encodedConfectDocument"] >, tableName: TableName, ) { // This is some trickery, copied from convex-js. I suspect there's a better way. this.q = q as Query>; this.tableSchema = tableSchema; this.tableName = tableName; } decode( convexDocument: ConfectTableInfo["encodedConfectDocument"], ): ConfectTableInfo["confectDocument"] { return decodeDocument(this.tableName, this.tableSchema, convexDocument); } filter( predicate: ( q: FilterBuilder>, ) => Expression, ) { return new ConfectQueryImpl( this.q.filter(predicate), this.tableSchema, this.tableName, ); } order(order: "asc" | "desc"): ConfectQueryImpl { return new ConfectQueryImpl( this.q.order(order), this.tableSchema, this.tableName, ); } paginate( paginationOpts: PaginationOptions, ): Effect.Effect> { return pipe( Effect.Do, Effect.bind("paginationResult", () => Effect.promise(() => this.q.paginate(paginationOpts)), ), Effect.let("parsedPage", ({ paginationResult }) => pipe( paginationResult.page, Array.map((document) => this.decode(document)), ), ), Effect.map(({ paginationResult, parsedPage }) => ({ page: parsedPage, isDone: paginationResult.isDone, continueCursor: paginationResult.continueCursor, /* v8 ignore next -- @preserve */ ...(paginationResult.splitCursor ? { splitCursor: paginationResult.splitCursor } : {}), /* v8 ignore next -- @preserve */ ...(paginationResult.pageStatus ? { pageStatus: paginationResult.pageStatus } : {}), })), ); } // It could be better to implement collect() with stream() collect(): Effect.Effect { return pipe( Effect.promise(() => this.q.collect()), Effect.map(Array.map((document) => this.decode(document))), ); } take(n: number): Effect.Effect { return pipe( this.stream(), Stream.take(n), Stream.runCollect, Effect.map((chunk) => Chunk.toArray(chunk)), ); } first(): Effect.Effect> { return pipe(this.stream(), Stream.runHead); } unique(): Effect.Effect< Option.Option, NotUniqueError > { return pipe( this.stream(), Stream.take(2), Stream.runCollect, Effect.andThen((chunk) => pipe( chunk, Chunk.get(1), Option.match({ onSome: () => Effect.fail(new NotUniqueError()), onNone: () => Effect.succeed(Chunk.get(chunk, 0)), }), ), ), ); } stream(): Stream.Stream { return pipe( Stream.fromAsyncIterable(this.q, identity), Stream.map((document) => this.decode(document)), Stream.orDie, ); } } interface ConfectQueryInitializer< ConfectTableInfo extends GenericConfectTableInfo, TableName extends string, > extends ConfectQuery { fullTableScan(): ConfectQuery; withIndex< IndexName extends keyof Indexes< TableInfoFromConfectTableInfo >, >( indexName: IndexName, indexRange?: | (( q: IndexRangeBuilder< DocumentByInfo>, NamedIndex< TableInfoFromConfectTableInfo, IndexName >, 0 >, ) => IndexRange) | undefined, ): ConfectQuery; withSearchIndex< IndexName extends keyof SearchIndexes< TableInfoFromConfectTableInfo >, >( indexName: IndexName, searchFilter: ( q: SearchFilterBuilder< DocumentByInfo>, NamedSearchIndex< TableInfoFromConfectTableInfo, IndexName > >, ) => SearchFilter, ): ConfectOrderedQuery; } class ConfectQueryInitializerImpl< ConfectTableInfo extends GenericConfectTableInfo, TableName extends string, > implements ConfectQueryInitializer { q: QueryInitializer>; tableSchema: Schema.Schema< ConfectTableInfo["confectDocument"], ConfectTableInfo["encodedConfectDocument"] >; tableName: TableName; constructor( q: QueryInitializer>, tableSchema: Schema.Schema< ConfectTableInfo["confectDocument"], ConfectTableInfo["encodedConfectDocument"] >, tableName: TableName, ) { this.q = q; this.tableSchema = tableSchema; this.tableName = tableName; } fullTableScan(): ConfectQuery { return new ConfectQueryImpl( this.q.fullTableScan(), this.tableSchema, this.tableName, ); } withIndex< IndexName extends keyof Indexes< TableInfoFromConfectTableInfo >, >( indexName: IndexName, indexRange?: | (( q: IndexRangeBuilder< DocumentByInfo>, NamedIndex< TableInfoFromConfectTableInfo, IndexName >, 0 >, ) => IndexRange) | undefined, ): ConfectQuery { return new ConfectQueryImpl( this.q.withIndex(indexName, indexRange), this.tableSchema, this.tableName, ); } withSearchIndex< IndexName extends keyof SearchIndexes< TableInfoFromConfectTableInfo >, >( indexName: IndexName, searchFilter: ( q: SearchFilterBuilder< DocumentByInfo>, NamedSearchIndex< TableInfoFromConfectTableInfo, IndexName > >, ) => SearchFilter, ): ConfectOrderedQuery { return new ConfectQueryImpl( this.q.withSearchIndex(indexName, searchFilter), this.tableSchema, this.tableName, ); } filter( predicate: ( q: FilterBuilder>, ) => Expression, ): ConfectQuery { return this.fullTableScan().filter(predicate); } order( order: "asc" | "desc", ): ConfectOrderedQuery { return this.fullTableScan().order(order); } paginate( paginationOpts: PaginationOptions, ): Effect.Effect> { return this.fullTableScan().paginate(paginationOpts); } collect(): Effect.Effect { return this.fullTableScan().collect(); } take(n: number): Effect.Effect { return this.fullTableScan().take(n); } first(): Effect.Effect> { return this.fullTableScan().first(); } unique(): Effect.Effect< Option.Option, NotUniqueError > { return this.fullTableScan().unique(); } stream(): Stream.Stream { return this.fullTableScan().stream(); } } export type DatabaseSchemasFromConfectDataModel< ConfectDataModel extends GenericConfectDataModel, > = { [TableName in keyof ConfectDataModel & string]: Schema.Schema< ConfectDataModel[TableName]["confectDocument"], ConfectDataModel[TableName]["encodedConfectDocument"] >; }; export interface ConfectDatabaseReader< ConfectDataModel extends GenericConfectDataModel, > extends ConfectBaseDatabaseReader { system: ConfectBaseDatabaseReader; } export interface ConfectBaseDatabaseReader< ConfectDataModel extends GenericConfectDataModel, > { query>( tableName: TableName, ): ConfectQueryInitializer; get>( id: GenericId, ): Effect.Effect< Option.Option >; normalizeId>( tableName: TableName, id: string, ): Option.Option>; } export class ConfectBaseDatabaseReaderImpl< ConfectDataModel extends GenericConfectDataModel, > implements ConfectBaseDatabaseReader { db: BaseDatabaseReader>; databaseSchemas: DatabaseSchemasFromConfectDataModel; constructor( db: BaseDatabaseReader>, databaseSchemas: DatabaseSchemasFromConfectDataModel, ) { this.db = db; this.databaseSchemas = databaseSchemas; } decode>( tableName: TableName, convexDocument: ConfectDataModel[TableName]["encodedConfectDocument"], ): ConfectDataModel[TableName]["confectDocument"] { return decodeDocument( tableName, this.databaseSchemas[tableName], convexDocument, ); } tableName( id: GenericId>, ): Option.Option> { return Array.findFirst(Record.keys(this.databaseSchemas), (tableName) => Option.isSome(this.normalizeId(tableName, id)), ); } normalizeId>( tableName: TableName, id: string, ): Option.Option> { return Option.fromNullable(this.db.normalizeId(tableName, id)); } get>( id: GenericId, ): Effect.Effect< Option.Option > { return Effect.gen(this, function* () { const optionConvexDoc = yield* Effect.promise(() => this.db.get(id)).pipe( Effect.map(Option.fromNullable), ); const tableName = yield* this.tableName(id).pipe(Effect.orDie); return pipe( optionConvexDoc, Option.map((convexDoc) => this.decode(tableName, convexDoc)), ); }); } query>( tableName: TableName, ): ConfectQueryInitializer { return new ConfectQueryInitializerImpl( this.db.query(tableName), this.databaseSchemas[tableName], tableName, ); } } export class ConfectDatabaseReaderImpl< ConfectDataModel extends GenericConfectDataModel, > implements ConfectDatabaseReader { db: GenericDatabaseReader>; databaseSchemas: DatabaseSchemasFromConfectDataModel; system: ConfectBaseDatabaseReader; constructor( db: GenericDatabaseReader>, databaseSchemas: DatabaseSchemasFromConfectDataModel, ) { this.db = db; this.databaseSchemas = databaseSchemas; this.system = new ConfectBaseDatabaseReaderImpl( this.db.system, databaseSchemasFromConfectSchema( confectSystemSchemaDefinition.confectSchema, ), ); } decode>( tableName: TableName, convexDocument: ConfectDataModel[TableName]["encodedConfectDocument"], ): ConfectDataModel[TableName]["confectDocument"] { return decodeDocument( tableName, this.databaseSchemas[tableName], convexDocument, ); } tableName( id: GenericId>, ): Option.Option> { return Array.findFirst(Record.keys(this.databaseSchemas), (tableName) => Option.isSome(this.normalizeId(tableName, id)), ); } normalizeId>( tableName: TableName, id: string, ): Option.Option> { return Option.fromNullable(this.db.normalizeId(tableName, id)); } get>( id: GenericId, ): Effect.Effect< Option.Option > { return Effect.gen(this, function* () { const optionConvexDoc = yield* Effect.promise(() => this.db.get(id)).pipe( Effect.map(Option.fromNullable), ); const tableName = yield* this.tableName(id).pipe(Effect.orDie); return pipe( optionConvexDoc, Option.map((convexDoc) => this.decode(tableName, convexDoc)), ); }); } query>( tableName: TableName, ): ConfectQueryInitializer { return new ConfectQueryInitializerImpl( this.db.query(tableName), this.databaseSchemas[tableName], tableName, ); } } export interface ConfectDatabaseWriter< ConfectDataModel extends GenericConfectDataModel, > { query>( tableName: TableName, ): ConfectQueryInitializer; get>( id: GenericId, ): Effect.Effect< Option.Option >; normalizeId>( tableName: TableName, id: string, ): Option.Option>; insert>( table: TableName, value: WithoutSystemFields< ConfectDocumentByName >, ): Effect.Effect, ParseResult.ParseError>; patch>( id: GenericId, value: Partial< WithoutSystemFields> >, ): Effect.Effect; replace>( id: GenericId, value: WithOptionalSystemFields< ConfectDocumentByName >, ): Effect.Effect; delete(id: GenericId): Effect.Effect; } export class ConfectDatabaseWriterImpl< ConfectDataModel extends GenericConfectDataModel, > implements ConfectDatabaseWriter { databaseSchemas: DatabaseSchemasFromConfectDataModel; db: GenericDatabaseWriter>; reader: ConfectDatabaseReader; constructor( db: GenericDatabaseWriter>, databaseSchemas: DatabaseSchemasFromConfectDataModel, ) { this.db = db; this.databaseSchemas = databaseSchemas; this.reader = new ConfectDatabaseReaderImpl(db, databaseSchemas); } tableName( id: GenericId>, ): Option.Option> { return Array.findFirst(Record.keys(this.databaseSchemas), (tableName) => Option.isSome(this.normalizeId(tableName, id)), ); } query>( tableName: TableName, ): ConfectQueryInitializer { return this.reader.query(tableName); } get>( id: GenericId, ): Effect.Effect< Option.Option > { return this.reader.get(id); } normalizeId>( tableName: TableName, id: string, ): Option.Option> { return Option.fromNullable(this.db.normalizeId(tableName, id)); } insert>( table: TableName, value: WithoutSystemFields< ConfectDocumentByName >, ): Effect.Effect, ParseResult.ParseError> { return pipe( value, Schema.encode(this.databaseSchemas[table]), Effect.andThen((encodedValue) => Effect.promise(() => this.db.insert( table, encodedValue as Expand< BetterOmit< DocumentByName< DataModelFromConfectDataModel, TableName >, "_creationTime" | "_id" > >, ), ), ), ); } patch>( id: GenericId, value: Partial< WithoutSystemFields> >, ): Effect.Effect< void, ParseResult.ParseError | Cause.NoSuchElementException > { return Effect.gen(this, function* () { const tableName = yield* this.tableName(id); const tableSchema = this.databaseSchemas[tableName]; const originalConvexDoc = yield* Effect.promise(() => this.db.get(id), ).pipe( Effect.andThen( ( doc: DocumentByName< DataModelFromConfectDataModel, TableName > | null, ) => doc ? Effect.succeed(doc) : Effect.die(new InvalidIdProvidedForPatch()), ), ); const originalConfectDoc = yield* Schema.decodeUnknown(tableSchema)(originalConvexDoc); const updatedConvexDoc = yield* pipe( value, Record.reduce(originalConfectDoc, (acc, value, key) => value === undefined ? Record.remove(acc, key) : Record.set(acc, key, value), ), Schema.encodeUnknown(tableSchema), ); yield* Effect.promise(() => this.db.replace( id, updatedConvexDoc as Expand< BetterOmit< DocumentByName< DataModelFromConfectDataModel, TableName >, "_creationTime" | "_id" > >, ), ); }); } replace>( id: GenericId, value: WithOptionalSystemFields< ConfectDocumentByName >, ): Effect.Effect { return Effect.promise(() => this.db.replace(id, value)); } delete(id: GenericId): Effect.Effect { return Effect.promise(() => this.db.delete(id)); } } export const databaseSchemasFromConfectSchema = < ConfectSchema extends GenericConfectSchema, >( confectSchema: ConfectSchema, ) => Record.map( confectSchema, ({ tableSchema }) => tableSchema, ) as DatabaseSchemasFromConfectDataModel< ConfectDataModelFromConfectSchema >; class InvalidIdProvidedForPatch extends Data.TaggedError( "InvalidIdProvidedForPatch", ) {} const decodeDocument = < TableName extends string, ConvexDocument extends GenericEncodedConfectDocument, ConfectDocument extends GenericConfectDocument, >( tableName: TableName, tableSchema: Schema.Schema, convexDocument: ConvexDocument, ): ConfectDocument => Schema.decodeUnknownSync(extendWithSystemFields(tableName, tableSchema), { onExcessProperty: "error", })(convexDocument); // Would be better if this were exported from `convex/server` type BaseDatabaseReader = Omit< GenericDatabaseReader, "system" >;