import { Application, Binding, Context, ContextEventType, ContextObserver, ContextTags, ContextView, CoreBindings, filterByTag, inject, lifeCycleObserver, LifeCycleObserver, service, } from '@loopback/core'; import { Queue } from "bull"; import { QueueService } from '../services'; import { BullQueueComponentBindings, BullQueueComponentTags } from '../keys'; /** * This class will be bound to the application as a `LifeCycleObserver` during * `boot` */ @lifeCycleObserver('') export class QueuesObserver implements LifeCycleObserver { constructor( @inject(CoreBindings.APPLICATION_INSTANCE) public application: Application, @service(QueueService) public queueServices: QueueService, @inject.view( filterByTag({ [BullQueueComponentBindings.BULL_QUEUE_TYPE.key]: BullQueueComponentTags.QUEUE_TYPE_CONSUMER, }), ) private queueConsumers: ContextView, ) {} /** * This method will be invoked when the application initializes. It will be * called at most once for a given application instance. */ async init(): Promise { // Add your logic for init const self = this; const queueObserver: ContextObserver = { filter: binding => binding.tagMap[ContextTags.KEY] === BullQueueComponentTags.QUEUE_KEY, async observe(event: ContextEventType, binding: Readonly>, context: Context) { if (event === 'bind') { const queueConsumerFnsPromises: any = []; self.queueConsumers.bindings.map((consumerBindings: Readonly) => { const filterTags = consumerBindings.tagMap.filterTags; if(filterTags) { let isTagsMatch = true; for(const [key, value] of Object.entries(filterTags)) { if(binding.tagMap[key] !== value) { isTagsMatch = false } } if(isTagsMatch) { queueConsumerFnsPromises.push(consumerBindings.getValue(context)) } } }) if(queueConsumerFnsPromises.length > 0) { const queueConsumerFns = await Promise.all(queueConsumerFnsPromises); const queue: Queue | undefined = await self.queueServices.getQueue(binding.key); queue?.process((job, done) => { Promise.all(queueConsumerFns.map(async (queueConsumerFn: Function | undefined) => { if(!queueConsumerFn) throw new Error("Invalid consumer function"); const queueName = binding.key; return queueConsumerFn({ data: job.data, jobInfo: { id: job.id, queueName, created: job.timestamp, processedOn: job.processedOn } }); })).then(() => { done(); }).catch((err: any) => { console.log("Queue process error", err) }); }) } } }, }; this.application.subscribe(queueObserver); } /** * This method will be invoked when the application starts. */ async start(): Promise { } /** * This method will be invoked when the application stops. */ async stop(): Promise { // Add your logic for stop } }