import { SbServerOptions, SbEntityProvision, SbEntityProvisionOption, SgEntityProvisionType, SbQueueEntityProvision, SbTopicEntityProvision, SbTopicSubscriptionEntityProvision, SbRuleEntityProvision, SbManagementDefaultsAdapter, SbLinkedEntityProvisionOption, SbManagementClientAdapter, SbClientOptions, } from '../interfaces'; import { SbLogger, NoopLogger } from '../utils'; import { SbQueue, SbTopic, SbSubscription, SbRule } from '../models'; import { normalizeDefaults, resolveRules } from './defaults'; function uniqueKey(...keys: string[]): string { return keys.join(':'); } async function safeAsync(task: Promise, fallback = null): Promise { try { return await task; } catch (err) { return fallback; } } export class SbConfigurator { private sbLogger: SbLogger = NoopLogger.shared; private readonly cache = { queue: new Map(), topic: new Map(), subscription: new Map(), rule: new Map(), }; private defaults: SbManagementDefaultsAdapter; constructor(public readonly managementClient: SbManagementClientAdapter, options: SbServerOptions | SbClientOptions) { if (options.logger) { this.sbLogger = options.logger; } this.defaults = normalizeDefaults((options.management as any).defaults); } async verifyQueue(queueName: string, provision: SbEntityProvisionOption, forceCreate = false): Promise { // check if already verified for this connector... if (this.cache.queue.has(queueName)) { return; } provision = this.normalizeEntityProvision(provision, this.defaults.entities.queue); if (provision.type === 'skip') { return; } let queue = await safeAsync(this.managementClient.getQueue(queueName)); if (forceCreate || this.shouldCreate(queue, provision.type)) { queue = await this.managementClient.upsertQueue(queueName, provision.params, !queue); } // Add to cache so we won't verify this one again. if (queue) { this.cache.queue.set(queueName, queue); } if (queue) { await this.resolveLinkedEntity(queue.forwardDeadLetteredMessagesTo, provision, provision.deadLetter, this.defaults.entities); await this.resolveLinkedEntity(queue.forwardTo, provision, provision.forward, this.defaults.entities); } this.sbLogger.log(`Queue ${queueName} verified.`); return queue; } async deleteQueue(queueName: string) { await this.managementClient.deleteQueue(queueName); this.sbLogger.log(`Queue ${queueName} deleted.`); } async verifyTopic(topicName: string, provision: SbEntityProvisionOption, forceCreate = false): Promise { // check if already verified for this connector... if (this.cache.topic.has(topicName)) { return this.cache.topic.get(topicName); } provision = this.normalizeEntityProvision(provision, this.defaults.entities.topic); if (provision.type === 'skip') { return; } let topic: SbTopic = await safeAsync(this.managementClient.getTopic(topicName)); if (forceCreate || this.shouldCreate(topic, provision.type)) { topic = await this.managementClient.upsertTopic(topicName, provision.params, !topic); } // Add to cache so we won' verify this one again. if (topic) { this.cache.topic.set(topicName, topic); } this.sbLogger.log(`Topic ${topicName} verified.`); return topic; } async deleteTopic(topicName: string) { await this.managementClient.deleteTopic(topicName); this.sbLogger.log(`Topic ${topicName} deleted.`); } async verifySubscription(topicName: string, subscriptionName: string, provision: SbEntityProvisionOption, forceCreate = false,): Promise { const uniqueName = uniqueKey(topicName, subscriptionName); // check if already verified for this connector... if (this.cache.subscription.has(uniqueName)) { return this.cache.subscription.get(uniqueName); } provision = this.normalizeEntityProvision(provision, this.defaults.entities.subscription); if (provision.type === 'skip') { return; } if (provision.topic) { await this.verifyTopic(topicName, provision.topic); } let subscription: SbSubscription = await safeAsync(this.managementClient.getSubscription(topicName, subscriptionName)); if (forceCreate || this.shouldCreate(subscription, provision.type)) { subscription = await this.managementClient.upsertSubscription(topicName, subscriptionName, provision.params, !subscription); } // Add to cache so we won' verify this one again. if (subscription) { this.cache.subscription.set(uniqueName, subscription); } if (subscription) { await this.resolveLinkedEntity(subscription.forwardDeadLetteredMessagesTo, provision, provision.deadLetter, this.defaults.entities); await this.resolveLinkedEntity(subscription.forwardTo, provision, provision.forward, this.defaults.entities); } const rules = resolveRules(topicName, subscriptionName, provision, this.defaults); if (rules.length > 0) { const rulesAsync = rules.map( r => this.verifyRule(topicName, subscriptionName, r)); await Promise.all(rulesAsync); } this.sbLogger.log(`Subscription ${topicName}-${subscriptionName} verified.`); return subscription; } async deleteSubscription(topicName: string, subscriptionName: string) { await this.managementClient.deleteSubscription(topicName, subscriptionName); this.sbLogger.log(`Subscription ${topicName}-${subscriptionName} de;ete.`); } async verifyRule(topicName: string, subscriptionName: string, provision: SbRuleEntityProvision): Promise { const { ruleName } = provision; const uniqueName = uniqueKey(topicName, subscriptionName, ruleName); // check if check if already verified for this connector... if (this.cache.rule.has(uniqueName)) { return this.cache.rule.get(uniqueName); } else if (provision.type === 'skip') { return; } let rule: SbRule = await safeAsync(this.managementClient.getRule(topicName, subscriptionName, ruleName)); if (this.shouldCreate(rule, provision.type)) { rule = await this.managementClient.upsertRule(topicName, subscriptionName, ruleName, provision.params, !rule); } // Add to cache so we won' verify this one again. if (rule) { this.cache.rule.set(uniqueName, rule); } return rule; } async deleteRule(topicName: string, subscriptionName: string, ruleName: string) { await this.managementClient.deleteRule(topicName, subscriptionName, ruleName); this.sbLogger.log(`Rule ${topicName}-${subscriptionName}-${ruleName} deleted.`); } /** * Returns true when the subscription provision has a dependency, which is a topic */ subscriptionHasDependency(provision?: SbEntityProvisionOption) { provision = this.normalizeEntityProvision(provision || 'skip', this.defaults.entities.subscription); if (provision.type !== 'skip' && provision.topic) { const topicProvision = this.normalizeEntityProvision(provision.topic, this.defaults.entities.topic); return topicProvision.type !== 'skip'; } return false; } private async resolveLinkedEntity( entityName: string, parentProvision: SbEntityProvision, linkedEntity: SbLinkedEntityProvisionOption, defaults: SbManagementDefaultsAdapter['entities'], ): Promise> { if (entityName && linkedEntity) { if (linkedEntity.type === 'queue') { const defaultParams = defaults[linkedEntity.type]; const linkedProvision = this.normalizeEntityProvision(linkedEntity.provision || parentProvision.type, defaultParams); await this.verifyQueue(entityName, linkedProvision); return linkedProvision; } else { const defaultParams = defaults[linkedEntity.type]; const linkedProvision = this.normalizeEntityProvision(linkedEntity.provision || parentProvision.type, defaultParams); await this.verifyTopic(entityName, linkedProvision); return linkedProvision; } } } private normalizeEntityProvision>(provision: SbEntityProvisionOption, defaults: Z): T { const result: T = typeof provision === 'string' ? { type: provision } as any : { ...provision }; if (result.type === 'verifyCreate') { result.params = { ...defaults, ...(result.params || {} as any) }; } return result; } private shouldCreate(result: T, provisionType: SbEntityProvision['type']): boolean { return !result && provisionType === 'verifyCreate'; } }