import { isEmptyObject } from "../../../is" import { setUnEnumerable } from "../../../objects" import { defineIndexes, IKvDefineIndexesOptions, IKvIndex } from "./defineIndexes" import { Kvlite } from "./Kvlite" import { IKvFilter, IKvUpdateOp, IKvFindOptions } from "./type" import { filterToSql, updateOpToSql } from "./util" import { inputDoc, outputDoc } from "./util" export interface IKvliteCollectionOptions {} /** * KvliteCollection * * 实现类似 MongoDB Collection 的接口 */ export class KvliteCollection { collectionName!: string _kvlite!: Kvlite _options!: IKvliteCollectionOptions /** 预编译的 SQL */ _stmts!: ReturnType constructor(kvlite: Kvlite, collectionName: string, options?: IKvliteCollectionOptions) { this.collectionName = collectionName const sqlite = kvlite._sqlite const exists = sqlite .prepare(`SELECT name FROM sqlite_master WHERE type='table' AND name='${collectionName}';`) .get() if (!exists) { sqlite.exec(`CREATE TABLE ${collectionName} (key TEXT PRIMARY KEY, data JSON);`) } setUnEnumerable(this, "_kvlite", kvlite) setUnEnumerable(this, "_options", options) setUnEnumerable(this, "_stmts", this._prepareStmts()) } /** 初始化预编译语句 */ _prepareStmts() { let sqlite = this._kvlite._sqlite let collectionName = this.collectionName let stmtGet = sqlite.prepare(`SELECT data FROM ${collectionName} WHERE key = ?;`) let stmtSet = sqlite.prepare(`INSERT OR REPLACE INTO ${collectionName} (key, data) VALUES (?, ?);`) let stmtDel = sqlite.prepare(`DELETE FROM ${collectionName} WHERE key = ?;`) let stmtAll = sqlite.prepare(`SELECT data FROM ${collectionName};`) let stmtKeys = sqlite.prepare(`SELECT key FROM ${collectionName};`) let stmtCount = sqlite.prepare(`SELECT COUNT(*) FROM ${collectionName};`) let stmtClear = sqlite.prepare(`DELETE FROM ${collectionName};`) return { stmtGet, stmtSet, stmtDel, stmtAll, stmtKeys, stmtCount, stmtClear, } } // ========================================================================== /** * 获取数据 */ get(key: string): TData | undefined { let { stmtGet } = this._stmts let re: any = stmtGet.get(key) return outputDoc(re?.data) } /** * 设置数据 */ set(key: string, data: TData) { let { stmtSet } = this._stmts return stmtSet.run(key, inputDoc(data)) } /** * 更新 data 中的 JSON 字段 */ update(filter: string | IKvFilter, updateOp: IKvUpdateOp): any { if (isEmptyObject(updateOp)) return let sqlite = this._kvlite._sqlite let { sql, parameters } = updateOpToSql(this.collectionName, filter, updateOp) return sqlite.prepare(sql).run(parameters) } /** * 删除数据 */ delete(key: string) { let { stmtDel } = this._stmts return stmtDel.run(key) } /** * 获取所有数据 */ getAll(): TData[] { let { stmtAll } = this._stmts return stmtAll.all().map((row: any) => { return outputDoc(row.data) }) } /** * 获取所有 key */ keys(): string[] { let { stmtKeys } = this._stmts return stmtKeys.all().map((row: any) => { return row.key }) } /** * 根据 filter 在数据集合中查找数据 */ find(filter: IKvFilter, findOptions?: IKvFindOptions): [string, TData][] { let { sql, parameters, limitOffset } = filterToSql(this.collectionName, filter, findOptions) let finParameters = parameters.concat(limitOffset) let sqlite = this._kvlite._sqlite // console.log(">>> find sql", sql, finParameters) return sqlite .prepare(sql) .all(finParameters) .map((row: any) => { return [row.key, outputDoc(row.data)] }) } /** * 遍历数据集合 */ forEach(each: (keyValue: [string, TData], index: number) => void) { let i = 0 let sqlite = this._kvlite._sqlite let stmt = sqlite.prepare(`SELECT key, data FROM ${this.collectionName};`) for (const row of stmt.iterate() as any) { each([row.key, outputDoc(row.data)], i) i++ } } /** * 遍历数据集合,如果有回调函数返回 true,则停止遍历 */ some(each: (data: TData, key: string, index: number) => boolean) { let i = 0 let sqlite = this._kvlite._sqlite let stmt = sqlite.prepare(`SELECT key, data FROM ${this.collectionName};`) for (const row of stmt.iterate() as any) { if (each(outputDoc(row.data), row.key, i)) { return true } i++ } return false } /** * 返回数据集合的数量 */ count(filter?: IKvFilter): number { let result const sqlite = this._kvlite._sqlite const { stmtCount } = this._stmts if (filter && !isEmptyObject(filter)) { let { sql, parameters } = filterToSql(this.collectionName, filter) let finSql = `SELECT COUNT(*) FROM ${this.collectionName} WHERE ${sql.split("WHERE")[1]}` // console.log(">>> finSql", finSql) // console.log(">>> parameters", parameters) result = sqlite.prepare(finSql).get(parameters) } else { result = stmtCount.get() } if (result && typeof result === "object" && "COUNT(*)" in result) { return result["COUNT(*)"] as number } else { throw new Error("Invalid count result") } } /** * 清空数据集合 */ clear() { let { stmtClear } = this._stmts return stmtClear.run() } /** * 定义索引 * 指定 data 中哪些字段需要创建索引,可以加快查询速度 * * field 为 false 会删除索引 * * @example * defineIndexes({ id:{unique:true}, name:true, age:{type:'number'} }) */ defineIndexes(indexes: { [field: string]: IKvIndex }, options?: IKvDefineIndexesOptions) { let sqlite = this._kvlite._sqlite let collectionName = this.collectionName return defineIndexes(sqlite, collectionName, indexes, options) } /** * 高性能的遍历数据集合,每次返回一批数据 * 最终会调用 each 函数,直到所有数据遍历完成 * @param options * @param each */ eachBatch( options: { filter: IKvFilter limit?: number projection?: string[] }, each: (list: [string, TData][], batch: number) => void ) { this._eachBatchCore(options, each, false) } /** * 异步高性能批量遍历,支持异步 each 回调 * @param options * @param each */ async eachBatchAsync( options: { filter: IKvFilter limit?: number projection?: string[] }, each: (list: [string, TData][], batch: number) => Promise ) { await this._eachBatchCore(options, each, true) } /** * 执行事务 * 执行一个函数,保证事务的完整性。 * 如果有大量的数据操作,如批量插入、删除、更新,使用事务能大幅度提高性能 */ runTransaction(func: () => any) { let sqlite = this._kvlite._sqlite const adopt = sqlite.transaction(func) adopt() } /** * 内部复用的批量遍历核心方法 */ private _eachBatchCore( options: { filter: IKvFilter limit?: number projection?: string[] }, each: (list: [string, TData][], batch: number) => void | Promise, isAsync: boolean ) { const batchSize = options.limit || 100 let offset = 0 let batchNumber = 0 const { sql, parameters } = filterToSql(this.collectionName, options.filter, { limit: batchSize, offset: offset, projection: options.projection, }) let sqlite = this._kvlite._sqlite const stmt = sqlite.prepare(sql) const runBatch = async () => { while (true) { let finParameters = parameters.concat([batchSize, offset]) const result = stmt.all(finParameters) if (result.length === 0) break const mapped = result.map((raw: any) => [raw.key, outputDoc(raw.data)]) if (isAsync) { await (each as any)(mapped, batchNumber) } else { ;(each as any)(mapped, batchNumber) } offset += batchSize batchNumber++ } } if (isAsync) { return runBatch() } else { // 用同步方式执行 let done = false let err: any = null runBatch().catch((e) => { done = true err = e }) if (err) throw err } } /** * 将集合所属的 Kvlite 导出为 Buffer 数据,可以保存为文件 */ toBuffer() { return this._kvlite.toBuffer() } }