import { Duration, Effect, pipe, S, Schedule, Stream } from "effect-app" import { HttpHeaders, HttpServerResponse } from "effect-app/http" import { reportError } from "../../errorReporter.js" import { storeId } from "../../Store/Memory.js" import { setupRequestContextFromCurrent } from "../setupRequest.js" // Tell the client to retry every 10 seconds if connectivity is lost const setRetry = Stream.succeed("retry: 10000") const keepAlive = Stream.fromEffectSchedule(Effect.succeed(":keep-alive"), Schedule.fixed(Duration.seconds(15))) let connId = BigInt(0) export const makeSSE = ( schema: S.Codec ) => (events: Stream.Stream<{ evt: A; namespace: string }, E, R>) => Effect .gen(function*() { const id = connId++ const ctx = yield* Effect.context() const res = HttpServerResponse.stream( // workaround for different scoped behaviour for streams in Bun // https://discord.com/channels/795981131316985866/1098177242598756412/1389646879675125861 Effect .gen(function*() { const ns = yield* storeId yield* Effect.annotateCurrentSpan({ connectionId: id.toString() }) yield* Effect.logInfo("$ start listening to events, id: " + id.toString() + ", ns: " + ns) yield* Effect.addFinalizer(() => Effect.logInfo("$ end listening to events, id: " + id.toString() + ", ns: " + ns) ) const enc = new TextEncoder() const encode = S.encodeEffect(S.fromJsonString(S.toCodecJson(schema))) const eventStream = Stream.mapEffect( Stream.filter(events, (_) => _.namespace === ns), (_) => encode(_.evt) .pipe(Effect.map((data) => `id: ${_.evt.id}\ndata: ${data}`)) ) const stream = pipe( setRetry, Stream.merge(keepAlive), // Keep this unary so pipe receives a function, not a Stream value. (self) => Stream.merge(self, eventStream, { haltStrategy: "either" }), Stream.tapCause((cause) => Effect.logError("SSE error, id: " + id.toString() + ", ns: " + ns, cause)), Stream.map((_) => enc.encode(_ + "\n\n")) ) return stream }) .pipe( Stream.unwrap, Stream.tapCause(reportError("Request")), Stream.provide(ctx) ), { contentType: "text/event-stream", headers: HttpHeaders.fromInput({ "content-type": "text/event-stream", "cache-control": "no-cache", "x-accel-buffering": "no", "connection": "keep-alive" // if (req.httpVersion !== "2.0") }) } ) return res }) .pipe(Effect.tapCause(reportError("Request")), setupRequestContextFromCurrent("events"))