/* eslint-disable @typescript-eslint/prefer-promise-reject-errors */
import { type OperationOptionsBase, type ProcessErrorArgs, ServiceBusClient, type ServiceBusMessage, type ServiceBusMessageBatch, type ServiceBusReceivedMessage, type ServiceBusReceiver } from "@azure/service-bus"
import { Cause, Context, Effect, Exit, FiberSet, Layer, type Scope } from "effect-app"
import { InfraLogger } from "../logger.js"
const withSpanAndLog = (name: string) => (self: Effect.Effect) =>
Effect.logInfo(name).pipe(
Effect.andThen(self),
Effect.tap(Effect.logInfo(name + " done")),
Effect.withLogSpan(name),
Effect.withSpan(name)
)
function makeClient(url: string) {
return Effect.acquireRelease(
Effect.sync(() => new ServiceBusClient(url)).pipe(withSpanAndLog("ServiceBus.client.create")),
(client) => Effect.promise(() => client.close()).pipe(withSpanAndLog("ServiceBus.client.close"))
)
}
export class ServiceBusClientTag
extends Context.Opaque()("@services/Client", { make: makeClient })
{
static readonly layer = (url: string) => this.toLayer(this.make(url))
}
const makeSender_ = Effect.fnUntraced(function*(queueName: string) {
const serviceBusClient = yield* ServiceBusClientTag
return yield* Effect.acquireRelease(
Effect.sync(() => serviceBusClient.createSender(queueName)).pipe(
withSpanAndLog(`ServiceBus.sender.create ${queueName}`)
),
(sender) => Effect.promise(() => sender.close()).pipe(withSpanAndLog(`ServiceBus.sender.close ${queueName}`))
)
})
const makeSender = Effect.fnUntraced(function*(name: string) {
const sender = yield* makeSender_(name)
const sendMessages = Effect.fnUntraced(function*(
messages: ServiceBusMessage | ServiceBusMessage[] | ServiceBusMessageBatch,
options?: Omit
) {
return yield* Effect.promise((abortSignal) => sender.sendMessages(messages, { ...options, abortSignal }))
})
return { name, sendMessages }
})
export class Sender extends Context.Opaque
) => Effect.Effect
}>()("Sender", { make: makeSender }) {
static readonly layer = (name: string) => this.toLayer(this.make(name))
}
export const SenderTag = () => (queueName: Key) => {
const tag = Context.Service(`ServiceBus.Sender.${queueName}`)
return Object.assign(tag, {
layer: Layer.effect(
tag,
Sender.make(queueName).pipe(Effect.map(Sender.of))
)
})
}
const makeReceiver = Effect.fnUntraced(function*(name: string) {
const serviceBusClient = yield* ServiceBusClientTag
const makeReceiver = Effect.fnUntraced(
function*(queueName: string, waitTillEmpty: Effect.Effect, sessionId?: string) {
return yield* Effect.acquireRelease(
(sessionId
? Effect.promise(() => serviceBusClient.acceptSession(queueName, sessionId))
: Effect.sync(() => serviceBusClient.createReceiver(queueName)))
.pipe(withSpanAndLog(`ServiceBus.receiver.create ${queueName}.${sessionId}`)),
(r) =>
waitTillEmpty.pipe(
withSpanAndLog(`ServiceBus.receiver.waitTillEmpty ${queueName}.${sessionId}`),
Effect.andThen(
Effect.promise(() => r.close()).pipe(
withSpanAndLog(`ServiceBus.receiver.close ${queueName}.${sessionId}`)
)
),
withSpanAndLog(`ServiceBus.receiver.release ${queueName}.${sessionId}`)
)
)
}
)
const make = (waitTillEmpty: Effect.Effect) => makeReceiver(name, waitTillEmpty)
const makeSession = (sessionId: string, waitTillEmpty: Effect.Effect) =>
makeReceiver(name, waitTillEmpty, sessionId)
return {
name,
make,
makeSession,
subscribe: Effect.fnUntraced(function*(hndlr: MessageHandlers, sessionId?: string) {
const fs = yield* FiberSet.make()
const fr = yield* FiberSet.runtime(fs)()
const wait = Effect
.gen(function*() {
if ((yield* FiberSet.size(fs)) > 0) {
yield* InfraLogger.logDebug("Waiting ServiceBusFiberSet to be empty: " + (yield* FiberSet.size(fs)))
}
while ((yield* FiberSet.size(fs)) > 0) yield* Effect.sleep("250 millis")
})
const r = yield* sessionId
? makeSession(
sessionId,
wait
)
: make(wait)
const runEffect = (effect: Effect.Effect) =>
new Promise((resolve, reject) =>
fr(effect)
.addObserver((exit) => {
if (Exit.isSuccess(exit)) {
resolve(exit.value)
} else {
// disable @typescript-eslint/prefer-promise-reject-errors
reject(Cause.pretty(exit.cause))
}
})
)
yield* Effect.acquireRelease(
Effect
.sync(() => {
const s = r
.subscribe({
processError: (err) =>
runEffect(
hndlr
.processError(err)
.pipe(
Effect.catchCause((cause) => Effect.logError(`ServiceBus Error ${sessionId}`, cause))
)
),
processMessage: (msg) => runEffect(hndlr.processMessage(msg))
// DO NOT CATCH ERRORS here as they should return to the queue!
})
return { close: Effect.promise(() => s.close()) }
})
.pipe(withSpanAndLog(`ServiceBus.subscription.create ${sessionId}`)),
(subscription) =>
subscription.close.pipe(
withSpanAndLog(`ServiceBus.subscription.close ${sessionId}`)
)
) as Effect.Effect // wth is going on here
})
}
})
export class Receiver extends Context.Opaque) => Effect.Effect
makeSession: (
sessionId: string,
waitTillEmpty: Effect.Effect
) => Effect.Effect
subscribe(
hndlr: MessageHandlers,
sessionId?: string
): Effect.Effect
}>()("Receiver") {
static readonly make = makeReceiver
static readonly layer = (name: string) => this.toLayer(makeReceiver(name))
}
export const ReceiverTag = () => (queueName: Key) => {
const tag = Context.Service(`ServiceBus.Receiver.${queueName}`)
return Object.assign(tag, {
layer: Layer.effect(
tag,
makeReceiver(queueName).pipe(Effect.map(Receiver.of))
)
})
}
export const SenderReceiver = (queue: string, queueDrain?: string) =>
Layer.mergeAll(Sender.layer(queue), Receiver.layer(queueDrain ?? queue))
export interface MessageHandlers {
/**
* Handler that processes messages from service bus.
*
* @param message - A message received from Service Bus.
*/
processMessage(message: ServiceBusReceivedMessage): Effect.Effect
/**
* Handler that processes errors that occur during receiving.
* @param args - The error and additional context to indicate where
* the error originated.
*/
processError(args: ProcessErrorArgs): Effect.Effect
}