/* eslint-disable @typescript-eslint/no-explicit-any */
import { Context, Effect, Fiber, FiberSet, Layer, Option, type Tracer } from "effect-app"
import { reportRequestError, reportUnknownRequestError } from "./api/reportError.js"
import { InfraLogger } from "./logger.js"
const getRootParentSpan = Effect.gen(function*() {
let span: Tracer.AnySpan | null = yield* Effect.currentSpan.pipe(
Effect.catchTag("NoSuchElementError", () => Effect.succeed(null))
)
if (!span) return span
while (span._tag === "Span" && Option.isSome(span.parent)) {
span = span.parent.value
}
return span
})
export const setRootParentSpan = (self: Effect.Effect) =>
getRootParentSpan.pipe(Effect.andThen((span) => span ? Effect.withParentSpan(self, span) : self))
const make = Effect.gen(function*() {
const set = yield* FiberSet.make()
const add = (...fibers: Fiber.Fiber[]) =>
Effect.sync(() => fibers.forEach((_) => FiberSet.addUnsafe(set, _)))
const addAll = (fibers: readonly Fiber.Fiber[]) =>
Effect.sync(() => fibers.forEach((_) => FiberSet.addUnsafe(set, _)))
const join = FiberSet.size(set).pipe(
Effect.andThen((count) => InfraLogger.logInfo(`Joining ${count} current fibers on the RequestFiberSet`)),
Effect.andThen(FiberSet.join(set))
)
const run = FiberSet.run(set)
const register = (self: Effect.Effect) =>
self.pipe(Effect.forkChild, Effect.tap(add), Effect.andThen(Fiber.join))
// const waitUntilEmpty = Effect.gen(function*() {
// const currentSize = yield* FiberSet.size(set)
// if (currentSize === 0) {
// return
// }
// yield* Effect.logInfo("Waiting RequestFiberSet to be empty: " + currentSize)
// while ((yield* FiberSet.size(set)) > 0) yield* Effect.sleep("250 millis")
// yield* Effect.logDebug("RequestFiberSet is empty")
// })
// TODO: loop and interrupt all fibers in the set continuously?
const interrupt = Fiber.interruptAll(set)
/**
* Forks the effect into a new fiber attached to the RequestFiberSet scope. Because the
* new fiber isn't attached to the parent, when the fiber executing the
* returned effect terminates, the forked fiber will continue running.
* The fiber will be interrupted when the RequestFiberSet scope is closed.
*
* Reports errors.
*/
function forkDaemonReport(self: Effect.Effect) {
return self.pipe(
reportRequestError,
Effect.uninterruptible,
run
)
}
/**
* Forks the effect into a new fiber attached to the RequestFiberSet scope. Because the
* new fiber isn't attached to the parent, when the fiber executing the
* returned effect terminates, the forked fiber will continue running.
* The fiber will be interrupted when the RequestFiberSet scope is closed.
*
* Reports unexpected errors.
*/
function forkDaemonReportUnexpected(self: Effect.Effect) {
return self
.pipe(
reportUnknownRequestError,
Effect.uninterruptible,
run
)
}
return {
interrupt,
join,
run,
add,
addAll,
register,
forkDaemonReport,
forkDaemonReportUnexpected
}
})
/**
* Whenever you fork a fiber for a Request, and you want to prevent dependent services to close prematurely on interruption,
* like the ServiceBus Sender, you should register these fibers in this FiberSet.
*/
export class RequestFiberSet extends Context.Service()("RequestFiberSet", { make }) {
static readonly Live = Layer.effect(this, this.make)
static readonly register = (self: Effect.Effect) =>
this.asEffect().pipe(Effect.andThen((_) => _.register(self)))
static readonly run = (self: Effect.Effect) =>
this.asEffect().pipe(Effect.andThen((_) => _.run(self)))
static readonly forkDaemonReport = (self: Effect.Effect) =>
this.asEffect().pipe(Effect.andThen((_) => _.forkDaemonReport(self)))
static readonly forkDaemonReportUnexpected = (self: Effect.Effect) =>
this.asEffect().pipe(Effect.andThen((_) => _.forkDaemonReportUnexpected(self)))
}