import { Table, Column, Default, Length, AllowNull, BeforeSave, DataType, BeforeDestroy, BeforeCreate, AfterDestroy, HasMany, HasOne, DefaultScope, AfterSave, } from "sequelize-typescript"; import { api, redis } from "actionhero"; import { Op } from "sequelize"; import { Source } from "./Source"; import { Option } from "./Option"; import { OptionHelper } from "./../modules/optionHelper"; import { StateMachine } from "./../modules/stateMachine"; import { Destination } from "./Destination"; import { AppOps } from "../modules/ops/app"; import { LockableHelper } from "../modules/lockableHelper"; import { ConfigWriter } from "../modules/configWriter"; import { APIData } from "../modules/apiData"; import { AppConfigurationObject } from "../classes/codeConfig"; import { AppRefreshQuery } from "./AppRefreshQuery"; import { CommonModel } from "../classes/commonModel"; import { AppsCache } from "../modules/caches/appsCache"; import { CLS } from "../modules/cls"; export interface SimpleAppOptions extends OptionHelper.SimpleOptions {} const STATES = ["draft", "ready", "deleted"] as const; const STATE_TRANSITIONS = [ { from: "draft", to: "ready", checks: [(instance: App) => instance.validateOptions()], }, { from: "draft", to: "deleted", checks: [] }, { from: "ready", to: "deleted", checks: [] }, { from: "deleted", to: "ready", checks: [(instance: App) => instance.validateOptions()], }, ]; @DefaultScope(() => ({ where: { state: "ready" }, })) @Table({ tableName: "apps", paranoid: false }) export class App extends CommonModel { idPrefix() { return "app"; } @Length({ min: 0, max: 191 }) @Default("") @Column name: string; @AllowNull(false) @Column type: string; @Column locked: string; @AllowNull(false) @Default("draft") @Column(DataType.ENUM(...STATES)) state: typeof STATES[number]; @HasOne(() => AppRefreshQuery) appRefreshQuery: AppRefreshQuery; @HasMany(() => Option, { foreignKey: "ownerId", scope: { ownerType: "app" }, }) __options: Option[]; // the underscores are needed as "options" is an internal method on sequelize instances @HasMany(() => Source) sources: Source[]; async appOptions() { const { pluginApp } = await this.getPlugin(); const staticAppOptions = pluginApp.options; const appOptions = typeof pluginApp.methods.appOptions === "function" ? await pluginApp.methods.appOptions() : {}; for (const staticOption of staticAppOptions) { if (staticOption.type && !appOptions[staticOption.key]) { appOptions[staticOption.key] = { type: staticOption.type }; } } return appOptions; } async getOptions(sourceFromEnvironment = true, obfuscatePasswords = false) { return OptionHelper.getOptions( this, sourceFromEnvironment, obfuscatePasswords ); } async setOptions(options: SimpleAppOptions, externallyValidate = true) { return OptionHelper.setOptions(this, options, externallyValidate); } async afterSetOptions(hasChanges: boolean) { if (hasChanges) await App.invalidateCache(); if (hasChanges && this.state !== "draft" && !this.isNewRecord) { await redis.doCluster( "api.rpc.app.disconnect", [this.id], undefined, true ); } } async validateOptions(options?: SimpleAppOptions, externallyValidate = true) { if (!options) options = await this.getOptions(true); const { pluginApp } = await this.getPlugin(); if (!pluginApp) throw new Error(`cannot find a pluginApp for type ${this.type}`); const appOptions = externallyValidate ? await this.appOptions() : {}; const optionsSpec: OptionHelper.OptionsSpec = pluginApp.options.map( (opt) => ({ ...opt, options: appOptions[opt.key]?.options ?? [], }) ); return OptionHelper.validateOptions(this, options, optionsSpec); } async getPlugin() { return OptionHelper.getPlugin(this); } async setConnection(connection: any) { api.plugins.persistentConnections[this.id] = connection; } async getConnection() { const connection = api.plugins.persistentConnections[this.id]; if (!connection) return this.connect(null); return connection; } async connect(options?: SimpleAppOptions) { return AppOps.connect(this, options, null); } async disconnect() { return AppOps.disconnect(this, undefined); } async test(options?: SimpleAppOptions) { return AppOps.test(this, options); } async getParallelism(): Promise { const { pluginApp } = await this.getPlugin(); const method = pluginApp.methods.parallelism; if (!method) return Infinity; const appOptions = await this.getOptions(); return method({ app: this, appOptions }); } async checkAndUpdateParallelism(direction: "incr" | "decr") { const key = this.parallelismKey(); const redis = api.redis.clients.client; const limit = await this.getParallelism(); const count = await redis[direction](key); if (count < 0) { // invalid. how did this happen? reset it await redis.set(key, 0); } if (count <= limit || direction === "decr") { return true; } else { // move it back down because incremented await redis.decr(key); return false; } } parallelismKey() { return `app:${this.id}:ratelimit:parallel`; } async apiData() { const options = await this.getOptions(false, true); const icon = await this._getIcon(); const provides = this.provides(); const { plugin, pluginApp } = await this.getPlugin(); const refreshQueryAvailable = await this.refreshQueryAvailable(); const appRefreshQuery: AppRefreshQuery = await this.$get( "appRefreshQuery", { scope: null } ); return { id: this.id, name: this.name, icon, type: this.type, state: this.state, locked: this.locked, options, provides, pluginName: plugin.name, pluginApp, refreshQueryAvailable, createdAt: APIData.formatDate(this.createdAt), updatedAt: APIData.formatDate(this.updatedAt), appRefreshQuery: appRefreshQuery ? await appRefreshQuery.apiData() : null, }; } async refreshQueryAvailable() { const { pluginApp } = await this.getPlugin(); if (typeof pluginApp?.methods?.appQuery !== "function") { return false; } return true; } /** * Determine if this App can provide Source or Destination Connections */ provides() { const source = api.plugins.plugins.find((p) => p?.connections?.find( (c) => c.apps.includes(this.type) && c.direction === "import" ) ) ? true : false; const destination = api.plugins.plugins.find((p) => p?.connections?.find( (c) => c.apps.includes(this.type) && c.direction === "export" ) ) ? true : false; return { source, destination }; } getConfigId() { return this.idIsDefault() ? ConfigWriter.generateId(this.name) : this.id; } async getConfigObject(): Promise { const { type, name } = this; const options = await this.getOptions(false); const appRefreshQuery = await this.$get("appRefreshQuery"); if (!name) return; return { class: "App", id: this.getConfigId(), name, type, options, refresh: appRefreshQuery ? { query: appRefreshQuery.refreshQuery, recurringFrequency: appRefreshQuery.recurringFrequency, } : undefined, }; } async _getIcon() { const { plugin } = await this.getPlugin(); return plugin?.icon; } // --- Class Methods --- // // Disconnect all Apps from their persistent connections static async disconnect(id?: string) { const apps = id ? await App.scope(null).findAll({ where: { id } }) : await App.scope(null).findAll(); for (const app of apps) await app.disconnect(); } @BeforeCreate static async checkMaxInstances(instance: App) { const count = await App.scope(null).count({ where: { type: instance.type }, }); const { pluginApp } = await instance.getPlugin(); if ( pluginApp && pluginApp.maxInstances && pluginApp.maxInstances < count + 1 ) { throw new Error( `cannot create a new ${instance.type} app, only ${pluginApp.maxInstances} allowed` ); } } @BeforeSave static async validateType(instance: App) { await instance.getPlugin(); // will throw if not found } @BeforeSave static async updateState(instance: App) { await StateMachine.transition(instance, STATE_TRANSITIONS); } @BeforeSave static async noUpdateIfLocked(instance: App) { await LockableHelper.beforeSave(instance, ["state"]); } @BeforeDestroy static async noDestroyIfLocked(instance: App) { await LockableHelper.beforeDestroy(instance); } @BeforeDestroy static async ensureNotInUse(instance: App) { const sources = await Source.scope(null).findAll({ where: { appId: instance.id }, }); if (sources.length > 0) { throw new Error( `cannot delete this app, source ${sources[0].id} relies on it` ); } const destinations = await Destination.scope(null).findAll({ where: { appId: instance.id }, }); if (destinations.length > 0) { throw new Error( `cannot delete this app, destination ${destinations[0].id} relies on it` ); } } @BeforeDestroy static async checkMinInstances(instance: App) { const count = await App.scope(null).count({ where: { type: instance.type }, }); const { pluginApp } = await instance.getPlugin(); if ( pluginApp && pluginApp.minInstances && pluginApp.minInstances > count - 1 ) { throw new Error( `cannot delete this ${instance.type} app, at least ${pluginApp.minInstances} required` ); } } @AfterDestroy static async destroyOptions(instance: App) { return Option.destroy({ where: { ownerId: instance.id, ownerType: "app", }, }); } @AfterDestroy static async destroyAppRefreshQueries(instance: App) { return AppRefreshQuery.destroy({ where: { appId: instance.id, }, }); } @AfterDestroy static async removeParallelismKey(instance: App) { const key = instance.parallelismKey(); const redis = api.redis.clients.client; return redis.del(key); } @AfterSave @AfterDestroy static async invalidateCache() { AppsCache.invalidate(); await CLS.afterCommit( async () => await redis.doCluster("api.rpc.app.invalidateCache") ); } }