import type { LookupFn, SsrFPolicy } from "@hanzo/bot/plugin-sdk/tlon"; import { randomUUID } from "node:crypto"; import { Readable } from "node:stream"; import { ensureUrbitChannelOpen, pokeUrbitChannel, scryUrbitPath } from "./channel-ops.js"; import { getUrbitContext, normalizeUrbitCookie } from "./context.js"; import { urbitFetch } from "./fetch.js"; export type UrbitSseLogger = { log?: (message: string) => void; error?: (message: string) => void; }; type UrbitSseOptions = { ship?: string; ssrfPolicy?: SsrFPolicy; lookupFn?: LookupFn; fetchImpl?: (input: RequestInfo | URL, init?: RequestInit) => Promise; onReconnect?: (client: UrbitSSEClient) => Promise | void; autoReconnect?: boolean; maxReconnectAttempts?: number; reconnectDelay?: number; maxReconnectDelay?: number; logger?: UrbitSseLogger; }; export class UrbitSSEClient { url: string; cookie: string; ship: string; channelId: string; channelUrl: string; subscriptions: Array<{ id: number; action: "subscribe"; ship: string; app: string; path: string; }> = []; eventHandlers = new Map< number, { event?: (data: unknown) => void; err?: (error: unknown) => void; quit?: () => void } >(); aborted = false; streamController: AbortController | null = null; onReconnect: UrbitSseOptions["onReconnect"] | null; autoReconnect: boolean; reconnectAttempts = 0; maxReconnectAttempts: number; reconnectDelay: number; maxReconnectDelay: number; isConnected = false; logger: UrbitSseLogger; ssrfPolicy?: SsrFPolicy; lookupFn?: LookupFn; fetchImpl?: (input: RequestInfo | URL, init?: RequestInit) => Promise; streamRelease: (() => Promise) | null = null; // Event ack tracking - must ack every ~50 events to keep channel healthy private lastHeardEventId = -1; private lastAcknowledgedEventId = -1; private readonly ackThreshold = 20; constructor(url: string, cookie: string, options: UrbitSseOptions = {}) { const ctx = getUrbitContext(url, options.ship); this.url = ctx.baseUrl; this.cookie = normalizeUrbitCookie(cookie); this.ship = ctx.ship; this.channelId = `${Math.floor(Date.now() / 1000)}-${randomUUID()}`; this.channelUrl = new URL(`/~/channel/${this.channelId}`, this.url).toString(); this.onReconnect = options.onReconnect ?? null; this.autoReconnect = options.autoReconnect !== false; this.maxReconnectAttempts = options.maxReconnectAttempts ?? 10; this.reconnectDelay = options.reconnectDelay ?? 1000; this.maxReconnectDelay = options.maxReconnectDelay ?? 30000; this.logger = options.logger ?? {}; this.ssrfPolicy = options.ssrfPolicy; this.lookupFn = options.lookupFn; this.fetchImpl = options.fetchImpl; } async subscribe(params: { app: string; path: string; event?: (data: unknown) => void; err?: (error: unknown) => void; quit?: () => void; }) { const subId = this.subscriptions.length + 1; const subscription = { id: subId, action: "subscribe", ship: this.ship, app: params.app, path: params.path, } as const; this.subscriptions.push(subscription); this.eventHandlers.set(subId, { event: params.event, err: params.err, quit: params.quit }); if (this.isConnected) { try { await this.sendSubscription(subscription); } catch (error) { const handler = this.eventHandlers.get(subId); handler?.err?.(error); } } return subId; } private async sendSubscription(subscription: { id: number; action: "subscribe"; ship: string; app: string; path: string; }) { const { response, release } = await urbitFetch({ baseUrl: this.url, path: `/~/channel/${this.channelId}`, init: { method: "PUT", headers: { "Content-Type": "application/json", Cookie: this.cookie, }, body: JSON.stringify([subscription]), }, ssrfPolicy: this.ssrfPolicy, lookupFn: this.lookupFn, fetchImpl: this.fetchImpl, timeoutMs: 30_000, auditContext: "tlon-urbit-subscribe", }); try { if (!response.ok && response.status !== 204) { const errorText = await response.text().catch(() => ""); throw new Error( `Subscribe failed: ${response.status}${errorText ? ` - ${errorText}` : ""}`, ); } } finally { await release(); } } async connect() { await ensureUrbitChannelOpen( { baseUrl: this.url, cookie: this.cookie, ship: this.ship, channelId: this.channelId, ssrfPolicy: this.ssrfPolicy, lookupFn: this.lookupFn, fetchImpl: this.fetchImpl, }, { createBody: this.subscriptions, createAuditContext: "tlon-urbit-channel-create", }, ); await this.openStream(); this.isConnected = true; this.reconnectAttempts = 0; } async openStream() { // Use AbortController with manual timeout so we only abort during initial connection, // not after the SSE stream is established and actively streaming. const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), 60_000); this.streamController = controller; const { response, release } = await urbitFetch({ baseUrl: this.url, path: `/~/channel/${this.channelId}`, init: { method: "GET", headers: { Accept: "text/event-stream", Cookie: this.cookie, }, }, ssrfPolicy: this.ssrfPolicy, lookupFn: this.lookupFn, fetchImpl: this.fetchImpl, signal: controller.signal, auditContext: "tlon-urbit-sse-stream", }); this.streamRelease = release; // Clear timeout once connection established (headers received). clearTimeout(timeoutId); if (!response.ok) { await release(); this.streamRelease = null; throw new Error(`Stream connection failed: ${response.status}`); } this.processStream(response.body).catch((error) => { if (!this.aborted) { this.logger.error?.(`Stream error: ${String(error)}`); for (const { err } of this.eventHandlers.values()) { if (err) { err(error); } } } }); } async processStream(body: ReadableStream | Readable | null) { if (!body) { return; } // oxlint-disable-next-line typescript/no-explicit-any const stream = body instanceof ReadableStream ? Readable.fromWeb(body as any) : body; let buffer = ""; try { for await (const chunk of stream) { if (this.aborted) { break; } buffer += chunk.toString(); let eventEnd; while ((eventEnd = buffer.indexOf("\n\n")) !== -1) { const eventData = buffer.substring(0, eventEnd); buffer = buffer.substring(eventEnd + 2); this.processEvent(eventData); } } } finally { if (this.streamRelease) { const release = this.streamRelease; this.streamRelease = null; await release(); } this.streamController = null; if (!this.aborted && this.autoReconnect) { this.isConnected = false; this.logger.log?.("[SSE] Stream ended, attempting reconnection..."); await this.attemptReconnect(); } } } processEvent(eventData: string) { const lines = eventData.split("\n"); let data: string | null = null; let eventId: number | null = null; for (const line of lines) { if (line.startsWith("id: ")) { eventId = parseInt(line.substring(4), 10); } if (line.startsWith("data: ")) { data = line.substring(6); } } if (!data) { return; } // Track event ID and send ack if needed if (eventId !== null && !isNaN(eventId)) { if (eventId > this.lastHeardEventId) { this.lastHeardEventId = eventId; if (eventId - this.lastAcknowledgedEventId > this.ackThreshold) { this.logger.log?.( `[SSE] Acking event ${eventId} (last acked: ${this.lastAcknowledgedEventId})`, ); this.ack(eventId).catch((err) => { this.logger.error?.(`Failed to ack event ${eventId}: ${String(err)}`); }); } } } try { const parsed = JSON.parse(data) as { id?: number; json?: unknown; response?: string }; if (parsed.response === "quit") { if (parsed.id) { const handlers = this.eventHandlers.get(parsed.id); if (handlers?.quit) { handlers.quit(); } } return; } if (parsed.id && this.eventHandlers.has(parsed.id)) { const { event } = this.eventHandlers.get(parsed.id) ?? {}; if (event && parsed.json) { event(parsed.json); } } else if (parsed.json) { for (const { event } of this.eventHandlers.values()) { if (event) { event(parsed.json); } } } } catch (error) { this.logger.error?.(`Error parsing SSE event: ${String(error)}`); } } async poke(params: { app: string; mark: string; json: unknown }) { return await pokeUrbitChannel( { baseUrl: this.url, cookie: this.cookie, ship: this.ship, channelId: this.channelId, ssrfPolicy: this.ssrfPolicy, lookupFn: this.lookupFn, fetchImpl: this.fetchImpl, }, { ...params, auditContext: "tlon-urbit-poke" }, ); } async scry(path: string) { return await scryUrbitPath( { baseUrl: this.url, cookie: this.cookie, ssrfPolicy: this.ssrfPolicy, lookupFn: this.lookupFn, fetchImpl: this.fetchImpl, }, { path, auditContext: "tlon-urbit-scry" }, ); } /** * Update the cookie used for authentication. * Call this when re-authenticating after session expiry. */ updateCookie(newCookie: string): void { this.cookie = normalizeUrbitCookie(newCookie); } private async ack(eventId: number): Promise { this.lastAcknowledgedEventId = eventId; const ackData = { id: Date.now(), action: "ack", "event-id": eventId, }; const { response, release } = await urbitFetch({ baseUrl: this.url, path: `/~/channel/${this.channelId}`, init: { method: "PUT", headers: { "Content-Type": "application/json", Cookie: this.cookie, }, body: JSON.stringify([ackData]), }, ssrfPolicy: this.ssrfPolicy, lookupFn: this.lookupFn, fetchImpl: this.fetchImpl, timeoutMs: 10_000, auditContext: "tlon-urbit-ack", }); try { if (!response.ok) { throw new Error(`Ack failed with status ${response.status}`); } } finally { await release(); } } async attemptReconnect() { if (this.aborted || !this.autoReconnect) { this.logger.log?.("[SSE] Reconnection aborted or disabled"); return; } // If we've hit max attempts, wait longer then reset and keep trying if (this.reconnectAttempts >= this.maxReconnectAttempts) { this.logger.log?.( `[SSE] Max reconnection attempts (${this.maxReconnectAttempts}) reached. Waiting 10s before resetting...`, ); // Wait 10 seconds before resetting and trying again const extendedBackoff = 10000; // 10 seconds await new Promise((resolve) => setTimeout(resolve, extendedBackoff)); this.reconnectAttempts = 0; // Reset counter to continue trying this.logger.log?.("[SSE] Reconnection attempts reset, resuming reconnection..."); } this.reconnectAttempts += 1; const delay = Math.min( this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1), this.maxReconnectDelay, ); this.logger.log?.( `[SSE] Reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms...`, ); await new Promise((resolve) => setTimeout(resolve, delay)); try { this.channelId = `${Math.floor(Date.now() / 1000)}-${randomUUID()}`; this.channelUrl = new URL(`/~/channel/${this.channelId}`, this.url).toString(); if (this.onReconnect) { await this.onReconnect(this); } await this.connect(); this.logger.log?.("[SSE] Reconnection successful!"); } catch (error) { this.logger.error?.(`[SSE] Reconnection failed: ${String(error)}`); await this.attemptReconnect(); } } async close() { this.aborted = true; this.isConnected = false; this.streamController?.abort(); try { const unsubscribes = this.subscriptions.map((sub) => ({ id: sub.id, action: "unsubscribe", subscription: sub.id, })); { const { response, release } = await urbitFetch({ baseUrl: this.url, path: `/~/channel/${this.channelId}`, init: { method: "PUT", headers: { "Content-Type": "application/json", Cookie: this.cookie, }, body: JSON.stringify(unsubscribes), }, ssrfPolicy: this.ssrfPolicy, lookupFn: this.lookupFn, fetchImpl: this.fetchImpl, timeoutMs: 30_000, auditContext: "tlon-urbit-unsubscribe", }); try { void response.body?.cancel(); } finally { await release(); } } { const { response, release } = await urbitFetch({ baseUrl: this.url, path: `/~/channel/${this.channelId}`, init: { method: "DELETE", headers: { Cookie: this.cookie, }, }, ssrfPolicy: this.ssrfPolicy, lookupFn: this.lookupFn, fetchImpl: this.fetchImpl, timeoutMs: 30_000, auditContext: "tlon-urbit-channel-close", }); try { void response.body?.cancel(); } finally { await release(); } } } catch (error) { this.logger.error?.(`Error closing channel: ${String(error)}`); } if (this.streamRelease) { const release = this.streamRelease; this.streamRelease = null; await release(); } } }