/* eslint-disable @typescript-eslint/no-explicit-any */ import { Context, Effect, Fiber, FiberSet, Layer, Option, type Tracer } from "effect-app" import { reportRequestError, reportUnknownRequestError } from "./api/reportError.js" import { InfraLogger } from "./logger.js" const getRootParentSpan = Effect.gen(function*() { let span: Tracer.AnySpan | null = yield* Effect.currentSpan.pipe( Effect.catchTag("NoSuchElementError", () => Effect.succeed(null)) ) if (!span) return span while (span._tag === "Span" && Option.isSome(span.parent)) { span = span.parent.value } return span }) export const setRootParentSpan = (self: Effect.Effect) => getRootParentSpan.pipe(Effect.andThen((span) => span ? Effect.withParentSpan(self, span) : self)) const make = Effect.gen(function*() { const set = yield* FiberSet.make() const add = (...fibers: Fiber.Fiber[]) => Effect.sync(() => fibers.forEach((_) => FiberSet.addUnsafe(set, _))) const addAll = (fibers: readonly Fiber.Fiber[]) => Effect.sync(() => fibers.forEach((_) => FiberSet.addUnsafe(set, _))) const join = FiberSet.size(set).pipe( Effect.andThen((count) => InfraLogger.logInfo(`Joining ${count} current fibers on the RequestFiberSet`)), Effect.andThen(FiberSet.join(set)) ) const run = FiberSet.run(set) const register = (self: Effect.Effect) => self.pipe(Effect.forkChild, Effect.tap(add), Effect.andThen(Fiber.join)) // const waitUntilEmpty = Effect.gen(function*() { // const currentSize = yield* FiberSet.size(set) // if (currentSize === 0) { // return // } // yield* Effect.logInfo("Waiting RequestFiberSet to be empty: " + currentSize) // while ((yield* FiberSet.size(set)) > 0) yield* Effect.sleep("250 millis") // yield* Effect.logDebug("RequestFiberSet is empty") // }) // TODO: loop and interrupt all fibers in the set continuously? const interrupt = Fiber.interruptAll(set) /** * Forks the effect into a new fiber attached to the RequestFiberSet scope. Because the * new fiber isn't attached to the parent, when the fiber executing the * returned effect terminates, the forked fiber will continue running. * The fiber will be interrupted when the RequestFiberSet scope is closed. * * Reports errors. */ function forkDaemonReport(self: Effect.Effect) { return self.pipe( reportRequestError, Effect.uninterruptible, run ) } /** * Forks the effect into a new fiber attached to the RequestFiberSet scope. Because the * new fiber isn't attached to the parent, when the fiber executing the * returned effect terminates, the forked fiber will continue running. * The fiber will be interrupted when the RequestFiberSet scope is closed. * * Reports unexpected errors. */ function forkDaemonReportUnexpected(self: Effect.Effect) { return self .pipe( reportUnknownRequestError, Effect.uninterruptible, run ) } return { interrupt, join, run, add, addAll, register, forkDaemonReport, forkDaemonReportUnexpected } }) /** * Whenever you fork a fiber for a Request, and you want to prevent dependent services to close prematurely on interruption, * like the ServiceBus Sender, you should register these fibers in this FiberSet. */ export class RequestFiberSet extends Context.Service()("RequestFiberSet", { make }) { static readonly Live = Layer.effect(this, this.make) static readonly register = (self: Effect.Effect) => this.asEffect().pipe(Effect.andThen((_) => _.register(self))) static readonly run = (self: Effect.Effect) => this.asEffect().pipe(Effect.andThen((_) => _.run(self))) static readonly forkDaemonReport = (self: Effect.Effect) => this.asEffect().pipe(Effect.andThen((_) => _.forkDaemonReport(self))) static readonly forkDaemonReportUnexpected = (self: Effect.Effect) => this.asEffect().pipe(Effect.andThen((_) => _.forkDaemonReportUnexpected(self))) }