/* oxlint-disable @typescript-eslint/no-explicit-any */ // PuriWrapper는 다양한 타입을 사용하고 있습니다. import chalk from "chalk"; import { type Knex } from "knex"; import { type DatabaseSchemaExtend } from "../types/types"; import { type DBPreset } from "./db"; import { Puri } from "./puri"; import { type ColumnKeys, type IdType, type OmitInternalTypeKeys, type PuriTable, } from "./puri.types"; import { type InsertOnlyOptions, type UBRef, type UpsertBuilder, type UpsertOptions, } from "./upsert-builder"; type TableName = Extract; export type TransactionalOptions = { isolation?: Exclude; // snapshot: mssql only dbPreset?: DBPreset; readOnly?: boolean; }; export class PuriWrapper { constructor( public knex: Knex, public upsertBuilder: UpsertBuilder, ) {} raw(sql: string): Knex.Raw { return this.knex.raw(sql); } // 테이블명으로 시작 from( tableName: TTable, ): Puri< TSchema, Record>, OmitInternalTypeKeys> >; // 테이블명 + Alias로 시작 from(spec: { [K in TAlias]: TTable; }): Puri< TSchema, Record>, OmitInternalTypeKeys> >; // 서브쿼리로 시작 from(spec: { [K in TAlias]: Puri; }): Puri< TSchema, Record>, OmitInternalTypeKeys> >; from(spec: any): any { return new Puri(this.knex, spec); } // 테이블명으로 시작 table( tableName: TTable, ): Puri< TSchema, Record>, OmitInternalTypeKeys> >; // 테이블명 + Alias로 시작 table(spec: { [K in TAlias]: TTable; }): Puri< TSchema, Record>, OmitInternalTypeKeys> >; // 서브쿼리로 시작 table(spec: { [K in TAlias]: Puri; }): Puri< TSchema, Record>, OmitInternalTypeKeys> >; table(spec: any): any { return new Puri(this.knex, spec); } async transaction( callback: (trx: PuriTransactionWrapper) => Promise, options: TransactionalOptions = {}, ): Promise { const { isolation, readOnly, dbPreset = "w" } = options; // @transactional 데코레이터와 동일한 로직: 이미 트랜잭션 컨텍스트가 있는지 확인 const { DB } = await import("./db"); const existingContext = DB.transactionStorage.getStore(); // AsyncLocalStorage 컨텍스트가 없거나 해당 preset의 트랜잭션이 없으면 새로 시작 const startTransaction = async ( knex: Knex | Knex.Transaction, upsertBuilder: UpsertBuilder, ) => { return knex.transaction( async (trx) => { const trxWrapper = new PuriTransactionWrapper(trx, upsertBuilder); // TransactionContext에 트랜잭션 저장 DB.getTransactionContext().setTransaction(dbPreset, trxWrapper); try { return await callback(trxWrapper); } finally { // 트랜잭션 제거 DB.getTransactionContext().deleteTransaction(dbPreset); } }, { isolationLevel: isolation, readOnly }, ); }; // AsyncLocalStorage 컨텍스트가 없으면 새로 생성 if (!existingContext) { return DB.runWithTransaction(() => startTransaction(this.knex, this.upsertBuilder)); } // 해당 preset의 트랜잭션이 이미 있으면 SAVEPOINT로 중첩 트랜잭션 생성 const existingTrx = existingContext.getTransaction(dbPreset); if (existingTrx) { return startTransaction(existingTrx.trx, existingTrx.upsertBuilder); } else { // 컨텍스트는 있지만 이 preset의 트랜잭션은 없는 경우 (같은 컨텍스트 내에서 실행) return startTransaction(this.knex, this.upsertBuilder); } } ubRegister>( tableName: TTable, row: Partial<{ [K in ColumnKeys]: TSchema[TTable][K] | UBRef; }>, ): UBRef { return this.upsertBuilder.register(tableName, row); } ubUpsert & keyof DatabaseSchemaExtend>( tableName: TTable, options?: UpsertOptions, ): Promise[]> { return this.upsertBuilder.upsert(this.knex, tableName, options); } ubInsertOnly & keyof DatabaseSchemaExtend>( tableName: TTable, options?: InsertOnlyOptions, ): Promise[]> { return this.upsertBuilder.insertOnly(this.knex, tableName, options); } ubUpsertOrInsert & keyof DatabaseSchemaExtend>( tableName: TTable, mode: "upsert" | "insert", options?: UpsertOptions, ): Promise[]> { return this.upsertBuilder.upsertOrInsert(this.knex, tableName, mode, options); } ubUpdateBatch( tableName: TableName, options?: { chunkSize?: number; where?: string | string[] }, ): Promise { return this.upsertBuilder.updateBatch(this.knex, tableName, options); } // 트랜잭션 연결 테스트용 async debugTransaction() { const info = await this.getTransactionInfo(); console.log(`${chalk.cyan("[Puri Transaction]")} ${chalk.magenta(info)}`); } private async getTransactionInfo(): Promise { // 연결 ID 조회 const [connectionIdRows] = await this.knex.raw(`SELECT CONNECTION_ID() as connection_id`); const connectionId = connectionIdRows[0].connection_id; // 트랜잭션 정보 조회 const [trxRows] = await this.knex.raw(` SELECT STATE, ISOLATION_LEVEL, THREAD_ID, EVENT_ID FROM performance_schema.events_transactions_current WHERE THREAD_ID = (SELECT THREAD_ID FROM performance_schema.threads WHERE PROCESSLIST_ID = CONNECTION_ID()) `); if (trxRows.length > 0 && trxRows[0].STATE !== "COMMITTED") { const trx = trxRows[0]; return `In Transaction, ConnID: ${connectionId}, ThreadID: ${trx.THREAD_ID}, EventID: ${trx.EVENT_ID}, InnoDB TRX: ${trx.STATE}(${trx.ISOLATION_LEVEL})`; } else { return `Not in Transaction, ConnID: ${connectionId}`; } } } export class PuriTransactionWrapper extends PuriWrapper { constructor( public trx: Knex.Transaction, public upsertBuilder: UpsertBuilder, ) { super(trx, upsertBuilder); } async rollback(): Promise { await this.trx.rollback(); } async commit(): Promise { await this.trx.commit(); } }