WOrmPostgresql.mjs

import events from 'events'
import pg from 'pg'
import mongoSql from 'mongo-sql'
import size from 'lodash-es/size.js'
import get from 'lodash-es/get.js'
import map from 'lodash-es/map.js'
import keys from 'lodash-es/keys.js'
import filter from 'lodash-es/filter.js'
import omit from 'lodash-es/omit.js'
import trim from 'lodash-es/trim.js'
import isEqual from 'lodash-es/isEqual.js'
import cloneDeep from 'lodash-es/cloneDeep.js'
import isstr from 'wsemi/src/isstr.mjs'
import isestr from 'wsemi/src/isestr.mjs'
import isarr from 'wsemi/src/isarr.mjs'
import isearr from 'wsemi/src/isearr.mjs'
import iseobj from 'wsemi/src/iseobj.mjs'
import isnum from 'wsemi/src/isnum.mjs'
import isint from 'wsemi/src/isint.mjs'
import isbol from 'wsemi/src/isbol.mjs'
import isDate from 'wsemi/src/isDate.mjs'
import haskey from 'wsemi/src/haskey.mjs'
import pmSeries from 'wsemi/src/pmSeries.mjs'


/**
 * 操作資料庫(PostgreSQL)
 *
 * @class
 * @param {Object} [opt={}] 輸入設定物件,預設{}
 * @param {String} [opt.url='postgresql://127.0.0.1:5432'] 輸入連接資料庫字串,預設'postgresql://127.0.0.1:5432'
 * @param {String} [opt.db='worm'] 輸入使用資料庫名稱字串,預設'worm'
 * @param {String} [opt.cl='test'] 輸入使用資料表名稱字串,預設'test'
 * @param {Boolean} [opt.useCache=false] 輸入是否使用select快取,適用於單程序操作,預設false
 * @returns {Object} 回傳操作資料庫物件,各事件功能詳見說明
 */
function WOrmPostgresql(opt = {}) {

    //_cache
    let _cache = null

    //url
    let url = get(opt, 'url')
    if (!isestr(url)) {
        url = 'postgresql://user:password@127.0.0.1:5432'
    }

    //db
    let db = get(opt, 'db')
    if (!isestr(db)) {
        db = 'worm'
    }

    //cl
    let cl = get(opt, 'cl')
    if (!isestr(cl)) {
        cl = 'test'
    }

    //useCache
    let useCache = get(opt, 'useCache')
    if (!isbol(useCache)) {
        useCache = false
    }

    //getValueType
    function getValueType(value) {
        if (isstr(value)) {
            let isoTimeRegex = /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?(?:Z)?$/
            return isoTimeRegex.test(value) ? 'TIMESTAMPTZ' : 'TEXT'
        }
        else if (isnum(value)) {
            return isint(value) ? 'INTEGER' : 'DOUBLE PRECISION'
        }
        else if (isbol(value)) {
            return 'BOOLEAN'
        }
        else if (value === null) {
            return 'TEXT'
        }
        else {
            return 'JSONB'
        }
    }

    //genSqlForCreateTable
    function genSqlForCreateTable(tableName, pk, obj) {

        //check
        if (!iseobj(obj)) {
            throw new Error(`obj[${obj}] is not an effective object`)
        }

        //columns
        let columns = Object.entries(obj).map(([key, value]) => {
            let type = getValueType(value)
            let c = `  ${key} ${type}`
            if (key === pk) {
                c = `${c} PRIMARY KEY`
            }
            return c
        })

        //c
        let c = `CREATE TABLE ${tableName} (\n${columns.join(',\n')}\n);`
        // console.log('c', c)

        return c
    }

    //genConflictSQL
    function genConflictSQL(obj) {

        //allKeys
        let allKeys = keys(obj)

        //conflictKeys
        let conflictKeys = ['time']

        //updateKeys
        let updateKeys = filter(allKeys, (k) => {
            return !conflictKeys.includes(k)
        })

        //check
        if (size(updateKeys) === 0) {
            return ''
        }

        //updateClause
        let updateClause = updateKeys
            .map(key => `${key} = EXCLUDED.${key}`)
            .join(',\n  ')

        //c
        let c = `
            ON CONFLICT (${conflictKeys.join(', ')}) DO UPDATE SET
                ${updateClause}
        `

        //trim
        c = trim(c)

        return c
    }

    //connectionString
    let connectionString = `${url}/${db}`
    // console.log('connectionString', connectionString)

    //ee
    let ee = new events.EventEmitter()

    //PgClient
    let PgClient = pg.Client
    // console.log('PgClient', PgClient)

    //clearCache
    function clearCache() {
        _cache = null
    }

    //getCacheKey
    function getCacheKey(find = {}, order = {}) {
        return JSON.stringify({
            find,
            order,
        })
    }

    //getCache
    function getCache(find = {}, order = {}) {
        if (iseobj(_cache)) {
            let key = getCacheKey(find, order)
            if (haskey(_cache, key)) {
                return cloneDeep(_cache[key]) //與外部使用數據脫勾
            }
        }
        return null
    }

    //setCache
    function setCache(find = {}, order = {}, data = []) {
        if (!iseobj(_cache)) {
            _cache = {}
        }
        let key = getCacheKey(find, order)
        _cache[key] = cloneDeep(data) //與外部使用數據脫勾
    }

    /**
     * 創建資料表
     *
     * @memberOf WOrmPostgresql
     * @param {String} cl 輸入資料表名字串
     * @param {String} pk 輸入主鍵字串
     * @param {Array|Object} arr 輸入數據物件陣列或數據物件
     * @returns {Promise} 回傳Promise,resolve回傳成功訊息,reject回傳錯誤訊息
     */
    async function createTable(cl, pk, arr) {
        let isErr = false
        let res = null

        //client
        let client = new PgClient({ connectionString })

        //connect
        try {

            //connect
            await client.connect()

        }
        catch (err) {
            isErr = true
            res = err
            client = null
        }

        //check
        if (isErr) {
            return Promise.reject(res)
        }

        try {

            //obj
            let obj = arr
            if (isearr(obj)) {
                obj = obj[0]
            }

            //check
            if (!iseobj(obj)) {
                throw new Error(`obj[${obj}] is not an effective object`)
            }

            //sql
            let sql = genSqlForCreateTable(cl, pk, obj)
            // console.log('sql', sql)

            //select
            await client.query(sql)
                .then(() => {
                    res = {
                        ok: 1,
                    }
                })
                .catch((err) => {
                    isErr = true
                    res = err.message
                })

        }
        catch (err) {
            isErr = true
            res = err
        }
        finally {
            await client.end()
            client = null
        }
        // console.log('res', res)

        //update
        if (useCache) {
            clearCache()
        }

        //check
        if (isErr) {
            return Promise.reject(res)
        }

        return res
    }

    /**
     * 查詢數據
     *
     * @memberOf WOrmPostgresql
     * @param {Object} [find={}] 輸入查詢條件物件
     * @param {Object} [order={}] 輸入排序條件物件
     * @returns {Promise} 回傳Promise,resolve回傳數據,reject回傳錯誤訊息
     */
    async function select(find = {}, order = {}) {
        let isErr = false
        let res = null

        //cache
        if (useCache) {
            let cache = getCache(find, order)
            if (isarr(cache)) {
                return cache
            }
        }

        //client
        let client = new PgClient({ connectionString })

        //connect
        try {

            //connect
            await client.connect()

        }
        catch (err) {
            isErr = true
            res = err
            client = null
        }

        //check
        if (isErr) {
            return Promise.reject(res)
        }

        try {

            //mr
            let mr = mongoSql.sql({
                type: 'select',
                table: cl,
                where: find,
                // limit: 10,
                order,
            })
            // console.log('mr', mr)
            // console.log('mr.query', mr.query)
            // console.log('mr.values', mr.values)

            //select
            let r = await client.query(mr.query, mr.values)
            // console.log('r', r)

            //res
            res = get(r, 'rows')

            //check
            if (!isarr(res)) {
                isErr = true
                res = `can not select by find[${JSON.stringify(find)}]`
            }

            //cache
            if (useCache && !isErr) {
                setCache(find, order, res)
            }

        }
        catch (err) {
            isErr = true
            res = err
        }
        finally {
            await client.end()
            client = null
        }
        // console.log('res', res)

        //check
        if (isErr) {
            return Promise.reject(res)
        }

        return res
    }

    /**
     * 插入數據
     *
     * @memberOf WOrmPostgresql
     * @param {Object|Array} data 輸入數據物件或陣列
     * @returns {Promise} 回傳Promise,resolve回傳插入結果,reject回傳錯誤訊息
     */
    async function insert(data) {
        let isErr = false
        let res = null

        //check
        if (!iseobj(data) && !isearr(data)) {
            return {
                n: 0,
                nInserted: 0,
                ok: 1,
            }
        }

        //cloneDeep
        data = cloneDeep(data)

        //client
        let client = new PgClient({ connectionString })

        //connect
        try {

            //connect
            await client.connect()

        }
        catch (err) {
            isErr = true
            res = err
            client = null
        }

        //check
        if (isErr) {
            return Promise.reject(res)
        }

        try {

            //check
            if (!isarr(data)) {
                data = [data]
            }

            //check time
            data = map(data, function(v, k) {
                if (!isDate(v.time)) {
                    throw new Error(`invalid data[${k}].time[${v.time}]`)
                }
                return v
            })

            //mr
            let mr = mongoSql.sql({
                type: 'insert',
                table: cl,
                values: data,
            })
            // console.log('mr', mr)
            // console.log('mr.query', mr.query)
            // console.log('mr.values', mr.values)

            //nAll, nInsert
            let nAll = size(data)
            let nInsert = nAll //一次插入全部數據加速, 但也因此沒法個別處理conflict, 無法個別計算已插入數量

            //insert
            await client.query(mr.query, mr.values)
                .then(() => {

                    //res
                    res = {
                        n: nAll,
                        nInserted: nInsert,
                        ok: 1,
                    }

                })
                .catch((err) => {
                    isErr = true
                    res = err.message
                })

        }
        catch (err) {
            isErr = true
            res = err
        }
        finally {
            await client.end()
            client = null
        }

        //update
        if (useCache) {
            clearCache()
        }

        //emit
        if (!isErr) {
            try {
                ee.emit('change', 'insert', data, res)
            }
            catch (err) {
                console.log(err)
            }
        }

        //check
        if (isErr) {
            return Promise.reject(res)
        }

        return res
    }

    /**
     * 儲存數據
     *
     * @memberOf WOrmPostgresql
     * @param {Object|Array} data 輸入數據物件或陣列
     * @param {Object} [option={}] 輸入設定物件,預設為{}
     * @param {boolean} [option.autoInsert=true] 輸入是否於儲存時發現原本無數據,則自動改以插入處理,預設為true
     * @returns {Promise} 回傳Promise,resolve回傳儲存結果,reject回傳錯誤訊息
     */
    async function save(data, option = {}) {
        let isErr = false
        let res = null

        //check
        if (!iseobj(data) && !isearr(data)) {
            return []
        }

        //cloneDeep
        data = cloneDeep(data)

        //autoInsert
        let autoInsert = get(option, 'autoInsert', true)

        //client
        let client = new PgClient({ connectionString })

        //connect
        try {

            //connect
            await client.connect()

        }
        catch (err) {
            isErr = true
            res = err
            client = null
        }

        //check
        if (isErr) {
            return Promise.reject(res)
        }

        try {

            //check
            if (!isarr(data)) {
                data = [data]
            }

            //check time
            data = map(data, function(v, k) {
                if (!isDate(v.time)) {
                    throw new Error(`invalid data[${k}].time[${v.time}]`)
                }
                return v
            })

            //pmSeries
            res = await pmSeries(data, async(v) => {

                //rest
                let rest = null

                //_v
                let _v = null
                if (true) {

                    //mr
                    let mr = mongoSql.sql({
                        type: 'select',
                        table: cl,
                        where: {
                            time: v.time,
                        },
                        // limit: 10,
                        // order,
                    })
                    // console.log('mr', mr)
                    // console.log('mr.query', mr.query)
                    // console.log('mr.values', mr.values)

                    //select
                    let r = await client.query(mr.query, mr.values)
                    // console.log('r', r)

                    //rows
                    let rows = get(r, 'rows')
                    // console.log('rows', rows)

                    //_v
                    _v = get(rows, 0, null)
                    // console.log('_v', _v)

                }

                //check
                if (iseobj(_v)) {
                    //存在

                    //_vt
                    let _vt = omit(_v, 'time')
                    // console.log('_vt', _vt)

                    //vt
                    let vt = omit(v, 'time')
                    // console.log('vt', vt)

                    if (isEqual(_vt, vt)) {
                        //相同時不更新

                        //rest
                        rest = {
                            n: 1,
                            nModified: 0,
                            ok: 1,
                        }
                        // console.log('相同時不更新', rest, v)

                    }
                    else {
                        //不相同時須更新

                        //mr
                        let mr = mongoSql.sql({
                            type: 'insert',
                            table: cl,
                            values: v,
                        })
                        // console.log('mr', mr)
                        // console.log('mr.query', mr.query)
                        // console.log('mr.values', mr.values)

                        //添加conflict
                        let conflict = genConflictSQL(v)

                        //sql
                        let sql = `${mr.query} ${conflict}`

                        //save
                        await client.query(sql, mr.values)
                            .then(() => {

                                //rest
                                rest = {
                                    n: 1,
                                    nModified: 1,
                                    ok: 1,
                                }
                                // console.log('不相同時須更新', rest, vt)

                            })
                            .catch((err) => {

                                //rest
                                rest = {
                                    n: 1,
                                    nModified: 0,
                                    ok: 0,
                                    err: err.message,
                                }

                            })

                    }
                }
                else {
                    //不存在

                    //rest
                    rest = {
                        n: 0,
                        nModified: 0,
                        ok: 1,
                    }
                    // console.log('不存在', rest, v)

                }

                //autoInsert
                if (autoInsert && rest.n === 0) {
                    //之前不存在(rest.n)且可自動插入(autoInsert=true)

                    //mr
                    let mr = mongoSql.sql({
                        type: 'insert',
                        table: cl,
                        values: v,
                    })
                    // console.log('mr', mr)
                    // console.log('mr.query', mr.query)
                    // console.log('mr.values', mr.values)

                    //save
                    await client.query(mr.query, mr.values)
                        .then(() => {

                            //rest
                            rest = {
                                n: 1,
                                nInserted: 1,
                                ok: 1,
                            }
                            // console.log('之前不存在且可自動插入', rest, v)

                        })
                        .catch((err) => {

                            //rest
                            rest = {
                                n: 1,
                                nInserted: 0,
                                ok: 0,
                                err: err.message,
                            }

                        })

                }

                return rest
            })

        }
        catch (err) {
            isErr = true
            res = err
        }
        finally {
            await client.end()
            client = null
        }

        //update
        if (useCache) {
            clearCache()
        }

        //emit
        if (!isErr) {
            try {
                ee.emit('change', 'save', data, res)
            }
            catch (err) {
                console.log(err)
            }
        }

        //check
        if (isErr) {
            return Promise.reject(res)
        }

        return res
    }

    /**
     * 刪除數據
     *
     * @memberOf WOrmPostgresql
     * @param {Object|Array} data 輸入數據物件或陣列
     * @returns {Promise} 回傳Promise,resolve回傳刪除結果,reject回傳錯誤訊息
     */
    async function del(data) {
        let isErr = false
        let res = null

        //check
        if (!iseobj(data) && !isearr(data)) {
            return []
        }

        //cloneDeep
        data = cloneDeep(data)

        //client
        let client = new PgClient({ connectionString })

        //connect
        try {

            //connect
            await client.connect()

        }
        catch (err) {
            isErr = true
            res = err
            client = null
        }

        //check
        if (isErr) {
            return Promise.reject(res)
        }

        try {

            // //database, collection
            // let database = client.db(opt.db)
            // let collection = database.collection(opt.cl)

            //check
            if (!isarr(data)) {
                data = [data]
            }

            //check time
            data = map(data, function(v, k) {
                if (!isDate(v.time)) {
                    throw new Error(`invalid data[${k}].time[${v.time}]`)
                }
                return v
            })

            //pmSeries
            res = await pmSeries(data, async(v) => {

                //rest
                let rest = null

                //mr
                let mr = mongoSql.sql({
                    type: 'delete',
                    table: cl,
                    where: {
                        time: v.time,
                    },
                })
                // console.log('mr', mr)
                // console.log('mr.query', mr.query)
                // console.log('mr.values', mr.values)

                //del
                await client.query(mr.query, mr.values)
                    .then((r) => {

                        //res
                        rest = {
                            n: 1,
                            nDeleted: r.rowCount,
                            ok: 1,
                        }

                    })
                    .catch((err) => {

                        //rest
                        rest = {
                            n: 1,
                            nDeleted: 0,
                            ok: 0,
                            err: err.message,
                        }

                    })

                return rest
            })

        }
        catch (err) {
            isErr = true
            res = err
        }
        finally {
            await client.end()
            client = null
        }

        //update
        if (useCache) {
            clearCache()
        }

        //emit
        if (!isErr) {
            try {
                ee.emit('change', 'del', data, res)
            }
            catch (err) {
                console.log(err)
            }
        }

        //check
        if (isErr) {
            return Promise.reject(res)
        }

        return res
    }

    /**
     * 刪除全部數據,需與del分開,避免未傳數據導致直接刪除全表
     *
     * @memberOf WOrmPostgresql
     * @param {Object} [find={}] 輸入刪除條件物件
     * @returns {Promise} 回傳Promise,resolve回傳刪除結果,reject回傳錯誤訊息
     */
    async function delAll(find = {}) {
        let isErr = false
        let res = null

        //client
        let client = new PgClient({ connectionString })

        //connect
        try {

            //connect
            await client.connect()

        }
        catch (err) {
            isErr = true
            res = err
            client = null
        }

        //check
        if (isErr) {
            return Promise.reject(res)
        }

        try {

            // //database, collection
            // let database = client.db(opt.db)
            // let collection = database.collection(opt.cl)

            // //deleteMany
            // res = await collection.deleteMany(find)

            // //res
            // res = {
            //     n: res.deletedCount,
            //     nDeleted: res.deletedCount,
            //     ok: res.acknowledged ? 1 : 0,
            // }

            //mr
            let mr = mongoSql.sql({
                type: 'delete',
                table: cl,
                where: find,
            })
            // console.log('mr', mr)
            // console.log('mr.query', mr.query)
            // console.log('mr.values', mr.values)

            //delAll
            await client.query(mr.query, mr.values)
                .then((r) => {

                    //res
                    res = {
                        n: r.rowCount,
                        nDeleted: r.rowCount,
                        ok: 1,
                    }

                })
                .catch((err) => {
                    isErr = true
                    res = err.message
                })

        }
        catch (err) {
            isErr = true
            res = err
        }
        finally {
            await client.end()
            client = null
        }

        //update
        if (useCache) {
            clearCache()
        }

        //emit
        if (!isErr) {
            try {
                ee.emit('change', 'delAll', null, res)
            }
            catch (err) {
                console.log(err)
            }
        }

        //check
        if (isErr) {
            return Promise.reject(res)
        }

        return res
    }

    //save
    ee.createTable = createTable
    ee.select = select
    ee.insert = insert
    ee.save = save
    ee.del = del
    ee.delAll = delAll

    return ee
}


export default WOrmPostgresql