import { getRequestContext, setupRequestContextWithCustomSpan } from "@effect-app/infra/api/setupRequest" import { reportNonInterruptedFailure } from "@effect-app/infra/QueueMaker/errors" import { type QueueBase, QueueMeta } from "@effect-app/infra/QueueMaker/service" import { subMinutes } from "date-fns" import type { NonEmptyReadonlyArray } from "effect-app/Array" import * as Effect from "effect-app/Effect" import * as Option from "effect-app/Option" import * as S from "effect-app/Schema" import { type NonEmptyString255 } from "effect-app/Schema" import { pretty } from "effect-app/utils" import * as Fiber from "effect/Fiber" import * as Tracer from "effect/Tracer" import { SqlClient } from "effect/unstable/sql" import { SQLModel } from "../adapters/SQL.js" import { InfraLogger } from "../logger.js" import { messagingSpanArgs } from "../otel.js" export const QueueId = S.Finite.pipe(S.brand("QueueId")) export type QueueId = typeof QueueId.Type // TODO: let the model track and Auto Generate versionColumn on every update instead export const makeSQLQueue = Effect.fnUntraced(function*< Evt extends { id: S.StringId; _tag: string }, DrainEvt extends { id: S.StringId; _tag: string }, EvtE, DrainEvtE >( queueName: NonEmptyString255, queueDrainName: NonEmptyString255, schema: S.Codec, drainSchema: S.Codec ) { const base = { id: SQLModel.Generated(QueueId), meta: SQLModel.JsonFromString(QueueMeta), name: S.NonEmptyString255, createdAt: SQLModel.DateTimeInsert, updatedAt: SQLModel.DateTimeUpdate, // TODO: at+owner processingAt: SQLModel.FieldOption(S.Date), finishedAt: SQLModel.FieldOption(S.Date), etag: S.String // TODO: use a SQLModel thing that auto updates it? // TODO: record locking.. / optimistic locking // rowVersion: SQLModel.DateTimeFromNumberWithNow } class Queue extends SQLModel.Class("Queue")({ body: SQLModel.JsonFromString(schema), ...base }) {} class Drain extends SQLModel.Class("Drain")({ body: SQLModel.JsonFromString(drainSchema), ...base }) {} const sql = yield* SqlClient.SqlClient const queueRepo = yield* SQLModel.makeRepository(Queue, { tableName: "queue", spanPrefix: "QueueRepo", idColumn: "id", versionColumn: "etag" }) const drainRepo = yield* SQLModel.makeRepository(Drain, { tableName: "queue", spanPrefix: "DrainRepo", idColumn: "id", versionColumn: "etag" }) const decodeDrain = S.decodeEffectConcurrently(Drain) const drain = Effect.gen(function*() { const limit = subMinutes(new Date(), 15) return yield* sql`SELECT * FROM queue WHERE name = ${queueDrainName} AND finishedAt IS NULL AND (processingAt IS NULL OR processingAt < ${limit.getTime()}) LIMIT 1` }) const q = { offer: Effect.fnUntraced(function*(body: Evt, meta: typeof QueueMeta.Type) { yield* queueRepo.insertVoid(Queue.insert.make({ body, meta, name: queueName, processingAt: Option.none(), finishedAt: Option.none(), etag: crypto.randomUUID() })) }), take: Effect.gen(function*() { while (true) { const [first] = yield* drain.pipe(Effect.withTracerEnabled(false)) // disable sql tracer otherwise we spam it.. if (first) { const dec = yield* decodeDrain(first) const { createdAt, updatedAt, ...rest } = dec return yield* drainRepo.update( Drain.update.make({ ...rest, processingAt: Option.some(new Date()) }) // auto in lib , etag: crypto.randomUUID() ) } if (first) return first yield* Effect.sleep(250) } }), finish: Effect.fn(function*({ createdAt, updatedAt, ...q }: Drain) { return yield* drainRepo.updateVoid(Drain.update.make({ ...q, finishedAt: Option.some(new Date()) })) // auto in lib , etag: crypto.randomUUID() }) } const queue = { publish: Effect.fn(`publish ${queueName}`, { kind: "producer", attributes: { "messaging.system": "sql", "messaging.operation.name": "publish", "messaging.destination.name": queueName } })(function*( ...messages: NonEmptyReadonlyArray ) { yield* Effect.annotateCurrentSpan({ "messaging.batch.message_count": messages.length, "messaging.message.types": messages.map((_) => _._tag) }) const requestContext = yield* getRequestContext yield* Effect.forEach(messages, (m) => q.offer(m, requestContext), { discard: true }) }), drain: ( handleEvent: (ks: DrainEvt) => Effect.Effect, sessionId?: string ) => { const silenceAndReportError = reportNonInterruptedFailure({ name: "MemQueue.drain." + queueDrainName }) const processMessage = Effect.fnUntraced(function*({ body, meta }: Drain) { let effect = InfraLogger .logDebug(`[${queueDrainName}] Processing incoming message`) .pipe( Effect.annotateLogs({ body: pretty(body), meta: pretty(meta) }), Effect.andThen(handleEvent(body)), silenceAndReportError, (_) => { const args = messagingSpanArgs({ operation: "process", system: "sql", destination: queueDrainName, messageId: body.id, conversationId: sessionId, extra: { "messaging.message.type": body._tag, "messaging.message.body": body } }, "consumer") return setupRequestContextWithCustomSpan( _, meta, args.name, { captureStackTrace: false, kind: args.kind, attributes: args.attributes } ) } ) if (meta.span) { effect = Effect.withParentSpan(effect, Tracer.externalSpan(meta.span)) } return yield* effect }) return Effect.fn(`receive ${queueDrainName}`, { kind: "consumer", attributes: { "messaging.system": "sql", "messaging.operation.name": "receive", "messaging.destination.name": queueDrainName, ...(sessionId !== undefined && { "messaging.message.conversation_id": sessionId }) } })(function*() { const x = yield* q.take yield* processMessage(x).pipe( Effect.uninterruptible, Effect.forkChild, Effect.flatMap(Fiber.join), Effect.tap(q.finish(x)) ) }, (effect) => effect.pipe(silenceAndReportError, Effect.forever))() } } return queue as QueueBase })