/* Copyright 2017-2021 Norman Breau Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ import { getInstance } from './instance'; import {Readable} from 'stream'; import {IDatabaseConnection} from './IDatabaseConnection'; import {IQueryable} from './IQueryable'; import { IConfig } from './IConfig'; import { IsolationLevel } from './IsolationLevel'; import { IDatabasePosition } from './IDatabasePosition'; export const LINGER_WARNING: number = 60000; export const DEFAULT_QUERY_TIMEOUT: number = 3600000; export const RECURRING_WARNING_TIMER: number = 60000; const TAG: string = 'DatabaseConnection'; /** * Do not call `new Database` directly. Use `Database.getConnection` to create a `DatabaseConnection` object. * @abstract * @implements `IDatabaseConnection` * @class */ export abstract class DatabaseConnection implements IDatabaseConnection { private $api: any; private $readOnly: boolean; private $timeout: number; private $lingerTimer: NodeJS.Timeout; private $lingerInterval: NodeJS.Timeout; private $lingerTickCount: number; private $instantiationStack: string; private $open: boolean; public constructor(api: TAPI, isReadOnly: boolean, instantiationStack: string) { this.$api = api; this.$readOnly = isReadOnly; this.$instantiationStack = (instantiationStack || '').replace(/Error:/, 'Warning:'); this.$open = true; let config: IConfig = getInstance().getConfig(); this.$timeout = config.database ? config.database.query_timeout : null; if (isNaN(this.$timeout)) { this.$timeout = DEFAULT_QUERY_TIMEOUT; } this.$lingerTickCount = 0; this.$armLingerWarning(); } public abstract isMaster(): boolean; public abstract isReplication(): boolean; public abstract hasReplicationEnabled(): boolean; private $triggerLingerWarning(): void { let elapsed = (LINGER_WARNING + (RECURRING_WARNING_TIMER * this.$lingerTickCount)) / 1000; getInstance().getLogger().warn(TAG, `Database connection has lingered for ${elapsed}s of inactivity.\n\n${this.$instantiationStack}`); } public setInstantiationStack(stack: string): void { this.$instantiationStack = stack; } /** * Gets the callback stacktrace to determine what opened * this connection. Useful for debugging lingering connections. * @returns string - A stacktrace */ public getInstantiationStack(): string { return this.$instantiationStack; } private $disarmLingerWarnings(): void { clearTimeout(this.$lingerTimer); clearInterval(this.$lingerInterval); this.$lingerTickCount = 0; } private $armLingerWarning(): void { this.$disarmLingerWarnings(); this.$lingerTimer = setTimeout(() => { this.$triggerLingerWarning(); this.$lingerInterval = setInterval(() => { this.$lingerTickCount++; this.$triggerLingerWarning(); }, RECURRING_WARNING_TIMER); }, LINGER_WARNING); } /** * Gets the underlying Database API * @returns any */ public getAPI(): TAPI { return this.$api; } public abstract formatQuery(query: IQueryable, params?: any): string; /** * Returns true if connection was created without * write access * @returns boolean */ public isReadOnly(): boolean { return this.$readOnly; } /** * Sets the timeout of this connectino * * @param timeout in milliseconds */ public setTimeout(timeout: number): void { if (isNaN(timeout)) { throw new TypeError('setTimeout expects a number in parameter 1.'); } this.$timeout = timeout; } /** * Returns the current timeout setting * @returns number in milliseconds */ public getTimeout(): number { return this.$timeout; } /** * Queries the database for a dataset. * * @param {Query} query The database query * @async * @returns Promise */ public async query(query: IQueryable): Promise { this.$armLingerWarning(); let queryStr: string = null; queryStr = query.getQuery(this); let params: Record = query.getParametersForQuery(); await query.onPreQuery(this); let out: TQueryResult = null; let e: unknown = null; try { let results: TQueryResult = await this._query(queryStr, params); out = await (query.onPostProcess(this, results as any) as any); } catch (ex) { e = ex; } await query.onPostQuery(this); if (e !== null) { throw e; } return out; } /** * * @param query The database query * @param params Parameters for the query * @param streamOptions Stream options * @returns Readable */ public stream(query: IQueryable, streamOptions?: any): Readable { this.$armLingerWarning(); let queryStr: string = null; let params: Record = query.getParametersForQuery(); queryStr = query.getQuery(this); return this._stream(queryStr, params, streamOptions); } /** * Closes the connection. May error if connection has an active transaction. * if `forceClose` boolean is true, it will force close the connection, regardless * of transaction state. * * @param forceClose optional boolean * @async * @returns Promise */ public async close(forceClose: boolean = false): Promise { if (this.isClosed()) { return; } await this._close(forceClose); this.$open = false; this.$disarmLingerWarnings(); } /** * Returns true if the connection has been closed. */ public isClosed(): boolean { return !this.$open; } /** * Implementation method to start a transaction. * * @abstract * @async * @returns Promise */ public abstract startTransaction(isolationLevel?: IsolationLevel): Promise; /** * Implementation method to determine if the connection is in an active transaction. * * @abstract * @returns boolean */ public abstract isTransaction(): boolean; /** * Ends a transaction. if `requiresRollback` is `true`, then `rollback()` is invoked. Otherwise, `commit()` is invoked. * * @abstract * @async * @param requiresRollback optional boolean * @returns Promise */ public abstract endTransaction(requiresRollback?: boolean): Promise; /** * Commits a transaction. This will end a transaction. * * @abstract * @async * @returns Promise */ public abstract commit(): Promise; /** * Rollsback a transaction. This will end a transaction. * * @abstract * @async * @returns Promise */ public abstract rollback(): Promise; /** * Implementation to close the connection, if `forceClose` is true, close the connection no matter what. * Silently error if it means the connection is closed. * * @param forceClose boolean, if `true`, should close the connection no matter what. * @async * @returns Promise */ protected abstract _close(forceClose: boolean): Promise; /** * Implementation method to return a dataset from the database * * @param query The database query * @param params The query parameters * @async * @returns Promise */ protected abstract _query(query: string, params?: any): Promise; /** * Implementation method to return a dataset from the database like `query()`, * but returns a `Readable` stream instead. * * @param query The database query * @param params The query parameters * @param streamOptions `Readable` stream options * @returns `Readable` */ protected abstract _stream(query: string, params?: any, streamOptions?: any): Readable; /** * @since 8.1.0 */ public abstract getCurrentDatabasePosition(): Promise; }