/* eslint-disable @typescript-eslint/prefer-promise-reject-errors */ import { type OperationOptionsBase, type ProcessErrorArgs, ServiceBusClient, type ServiceBusMessage, type ServiceBusMessageBatch, type ServiceBusReceivedMessage, type ServiceBusReceiver } from "@azure/service-bus" import { Cause, Context, Effect, Exit, FiberSet, Layer, type Scope } from "effect-app" import { InfraLogger } from "../logger.js" const withSpanAndLog = (name: string) => (self: Effect.Effect) => Effect.logInfo(name).pipe( Effect.andThen(self), Effect.tap(Effect.logInfo(name + " done")), Effect.withLogSpan(name), Effect.withSpan(name) ) function makeClient(url: string) { return Effect.acquireRelease( Effect.sync(() => new ServiceBusClient(url)).pipe(withSpanAndLog("ServiceBus.client.create")), (client) => Effect.promise(() => client.close()).pipe(withSpanAndLog("ServiceBus.client.close")) ) } export class ServiceBusClientTag extends Context.Opaque()("@services/Client", { make: makeClient }) { static readonly layer = (url: string) => this.toLayer(this.make(url)) } const makeSender_ = Effect.fnUntraced(function*(queueName: string) { const serviceBusClient = yield* ServiceBusClientTag return yield* Effect.acquireRelease( Effect.sync(() => serviceBusClient.createSender(queueName)).pipe( withSpanAndLog(`ServiceBus.sender.create ${queueName}`) ), (sender) => Effect.promise(() => sender.close()).pipe(withSpanAndLog(`ServiceBus.sender.close ${queueName}`)) ) }) const makeSender = Effect.fnUntraced(function*(name: string) { const sender = yield* makeSender_(name) const sendMessages = Effect.fnUntraced(function*( messages: ServiceBusMessage | ServiceBusMessage[] | ServiceBusMessageBatch, options?: Omit ) { return yield* Effect.promise((abortSignal) => sender.sendMessages(messages, { ...options, abortSignal })) }) return { name, sendMessages } }) export class Sender extends Context.Opaque ) => Effect.Effect }>()("Sender", { make: makeSender }) { static readonly layer = (name: string) => this.toLayer(this.make(name)) } export const SenderTag = () => (queueName: Key) => { const tag = Context.Service(`ServiceBus.Sender.${queueName}`) return Object.assign(tag, { layer: Layer.effect( tag, Sender.make(queueName).pipe(Effect.map(Sender.of)) ) }) } const makeReceiver = Effect.fnUntraced(function*(name: string) { const serviceBusClient = yield* ServiceBusClientTag const makeReceiver = Effect.fnUntraced( function*(queueName: string, waitTillEmpty: Effect.Effect, sessionId?: string) { return yield* Effect.acquireRelease( (sessionId ? Effect.promise(() => serviceBusClient.acceptSession(queueName, sessionId)) : Effect.sync(() => serviceBusClient.createReceiver(queueName))) .pipe(withSpanAndLog(`ServiceBus.receiver.create ${queueName}.${sessionId}`)), (r) => waitTillEmpty.pipe( withSpanAndLog(`ServiceBus.receiver.waitTillEmpty ${queueName}.${sessionId}`), Effect.andThen( Effect.promise(() => r.close()).pipe( withSpanAndLog(`ServiceBus.receiver.close ${queueName}.${sessionId}`) ) ), withSpanAndLog(`ServiceBus.receiver.release ${queueName}.${sessionId}`) ) ) } ) const make = (waitTillEmpty: Effect.Effect) => makeReceiver(name, waitTillEmpty) const makeSession = (sessionId: string, waitTillEmpty: Effect.Effect) => makeReceiver(name, waitTillEmpty, sessionId) return { name, make, makeSession, subscribe: Effect.fnUntraced(function*(hndlr: MessageHandlers, sessionId?: string) { const fs = yield* FiberSet.make() const fr = yield* FiberSet.runtime(fs)() const wait = Effect .gen(function*() { if ((yield* FiberSet.size(fs)) > 0) { yield* InfraLogger.logDebug("Waiting ServiceBusFiberSet to be empty: " + (yield* FiberSet.size(fs))) } while ((yield* FiberSet.size(fs)) > 0) yield* Effect.sleep("250 millis") }) const r = yield* sessionId ? makeSession( sessionId, wait ) : make(wait) const runEffect = (effect: Effect.Effect) => new Promise((resolve, reject) => fr(effect) .addObserver((exit) => { if (Exit.isSuccess(exit)) { resolve(exit.value) } else { // disable @typescript-eslint/prefer-promise-reject-errors reject(Cause.pretty(exit.cause)) } }) ) yield* Effect.acquireRelease( Effect .sync(() => { const s = r .subscribe({ processError: (err) => runEffect( hndlr .processError(err) .pipe( Effect.catchCause((cause) => Effect.logError(`ServiceBus Error ${sessionId}`, cause)) ) ), processMessage: (msg) => runEffect(hndlr.processMessage(msg)) // DO NOT CATCH ERRORS here as they should return to the queue! }) return { close: Effect.promise(() => s.close()) } }) .pipe(withSpanAndLog(`ServiceBus.subscription.create ${sessionId}`)), (subscription) => subscription.close.pipe( withSpanAndLog(`ServiceBus.subscription.close ${sessionId}`) ) ) as Effect.Effect // wth is going on here }) } }) export class Receiver extends Context.Opaque) => Effect.Effect makeSession: ( sessionId: string, waitTillEmpty: Effect.Effect ) => Effect.Effect subscribe( hndlr: MessageHandlers, sessionId?: string ): Effect.Effect }>()("Receiver") { static readonly make = makeReceiver static readonly layer = (name: string) => this.toLayer(makeReceiver(name)) } export const ReceiverTag = () => (queueName: Key) => { const tag = Context.Service(`ServiceBus.Receiver.${queueName}`) return Object.assign(tag, { layer: Layer.effect( tag, makeReceiver(queueName).pipe(Effect.map(Receiver.of)) ) }) } export const SenderReceiver = (queue: string, queueDrain?: string) => Layer.mergeAll(Sender.layer(queue), Receiver.layer(queueDrain ?? queue)) export interface MessageHandlers { /** * Handler that processes messages from service bus. * * @param message - A message received from Service Bus. */ processMessage(message: ServiceBusReceivedMessage): Effect.Effect /** * Handler that processes errors that occur during receiving. * @param args - The error and additional context to indicate where * the error originated. */ processError(args: ProcessErrorArgs): Effect.Effect }