import BullQueue, { Queue } from "bull"; import { injectable, BindingScope, ContextTags, Application, inject, CoreBindings, Binding, config } from '@loopback/core'; import { BullQueueComponentBindings, BullQueueComponentTags } from "../keys"; import { BullQueueComponentOptions, SeawaspQueue } from "../types"; @injectable({ scope: BindingScope.SINGLETON }) export class QueueService { constructor( @inject(CoreBindings.APPLICATION_INSTANCE) public application: Application, @config({ fromBinding: BullQueueComponentBindings.COMPONENT}) private config: BullQueueComponentOptions ) {} // TODO: Check if the queue already exists initializeQueue(queueNames: string[], options?: any): void { queueNames.map((queueName: string) => { try { const queue: SeawaspQueue = new BullQueue(queueName, this.config); let tags: any = { [ContextTags.KEY]: BullQueueComponentTags.QUEUE_KEY, [BullQueueComponentBindings.BULL_QUEUE_TYPE.key]: BullQueueComponentTags.QUEUE_TYPE_QUEUE, [BullQueueComponentBindings.BULL_QUEUE_NAME.key]: queueName, } if (options?.tags) { tags = { ...tags, ...options?.tags } } queue.on('error', e => console.error('Error event', e)); this.application .bind(`queue.${queueName}`) .to(queue) .tag(tags); } catch (err) { console.log(err) } }) } async getQueue(keyName: string): Promise { if (keyName.split('.')[0] !== 'queue') { keyName = `queue.${keyName}`; } return this.application.getValueOrPromise(keyName); } async getQueues(): Promise<(Queue | undefined)[]> { const queueBindings = this.application.findByTag({ [ContextTags.KEY]: BullQueueComponentTags.QUEUE_KEY }); return Promise.all(queueBindings.map((queueBinding) => this.application.get(queueBinding.key))); } async sendMessage(queueName: string, data: any, options: any) { const queue = await this.getQueue(queueName); if (options?.delay) { return queue?.add(data, {delay: options?.delay}); } return queue?.add(data); } async cleanQueue(queueName: string) { const queue = await this.getQueue(queueName); await queue?.clean(1000, 'delayed'); await queue?.clean(1000, 'active'); await queue?.clean(1000, 'completed'); await queue?.clean(1000, 'wait'); await queue?.clean(1000, 'failed'); } }