import { Logger } from 'besonders-logger' import { CID } from 'multiformats/cid' import ReconnectingWebSocket, { type Options as PartysocketOptions } from 'partysocket/ws' export type { PartysocketOptions } const { WARN, LOG, DEBUG, ERROR } = Logger.setup(Logger.INFO) // eslint-disable-line unused-imports/no-unused-vars /** * `nameBaseUrl` is required for both `watchNameRaw` and `IpnsWatcher`. The * legacy `https://name.web3.storage` endpoint (used by the old hardcoded * `NAME_WS_URL`/`NAME_HTTP_URL` constants) is **shut down** — these classes * will not work without a real, configured naming service that supports * WebSocket subscriptions to `/name//watch` and HTTP GET on `/name/`. * * As of writing no such public service exists, so most callers should expect * these to fail at runtime and handle the error in their `onError` handler. */ function buildWsUrl(nameBaseUrl: string, name: string) { const base = nameBaseUrl.replace(/\/+$/, '').replace(/^http/, 'ws') return `${base}/${name}/watch` } function buildHttpUrl(nameBaseUrl: string, name: string) { const base = nameBaseUrl.replace(/\/+$/, '') return `${base}/${name}` } function requireNameBaseUrl(nameBaseUrl: string | undefined, fn: string): string { if (!nameBaseUrl) { throw new Error( `[${fn}] nameBaseUrl is required. The legacy default ` + `https://name.web3.storage is shut down. Pass the base URL of a naming ` + `service that supports WebSocket subscriptions to /name//watch ` + `and HTTP GET on /name/.`, ) } return nameBaseUrl } export interface W3NameRecord { value: string // e.g. "/ipfs/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi" seq?: number validity?: string } /** * Debug info provided when a stale WebSocket connection is detected. */ export interface StaleConnectionInfo { /** When the WebSocket connection was established */ connectedAt: Date /** When we last received a WebSocket message */ lastMessageAt: Date | null /** How long since last message (ms) */ silenceDuration: number /** The stale value from WebSocket */ staleValue: string | null /** The current value from HTTP */ currentValue: string } /** * Enriched IPNS update with parsed CID and change detection. * Backwards compatible - `.value` still works as before. */ export interface IpnsUpdate { /** Raw IPNS value string (e.g. '/ipfs/bafy...') — same as W3NameRecord.value */ value: string /** Parsed CID if value is valid IPFS path, null otherwise */ cid: CID | null /** Previous value (null on first update) */ lastValue: string | null /** Whether this is a change from lastValue */ isNew: boolean /** Original W3NameRecord for access to seq/validity */ record: W3NameRecord } /** * Parse CID from IPNS value string (e.g. "/ipfs/bafybeig...") * @returns CID if valid, null otherwise */ function parseCidFromIpnsValue(value: string): CID | null { try { // Strip /ipfs/ prefix if present const cidStr = value.startsWith('/ipfs/') ? value.slice(6) : value return CID.parse(cidStr) } catch { DEBUG('[parseCidFromIpnsValue] failed to parse:', value) return null } } export interface WatchRawOptions { /** Called when the IPNS record is updated */ onUpdate: (record: W3NameRecord) => void /** Called when an error occurs */ onError?: (error: Event | Error) => void /** Called when the connection is opened */ onOpen?: () => void /** Called when the connection is closed */ onClose?: (event: CloseEvent) => void } export interface WatchRawSubscription { /** Close the WebSocket connection */ close: () => void /** The underlying WebSocket instance */ ws: WebSocket } /** * Low-level WebSocket watcher for IPNS (no reconnect logic). * Use this when you want full control over connection lifecycle. * For most cases, prefer `watchName` or `IpnsWatcher` which handle reconnection. * * @param nameBaseUrl - Base URL of a naming service that supports `/name//watch` (WebSocket) and `/name/` (HTTP) * @param name - The IPNS name/key to watch * @param options - Callback options * @returns Subscription with close() and ws * * @example * ```ts * const sub = watchNameRaw('https://name.example.com', 'k51qzi5u...', { * onUpdate: (record) => console.log('Update:', record.value), * onClose: () => console.log('Disconnected - handle reconnect yourself'), * }) * ``` */ export function watchNameRaw(nameBaseUrl: string, name: string, options: WatchRawOptions): WatchRawSubscription { const resolvedBase = requireNameBaseUrl(nameBaseUrl, 'watchNameRaw') const url = buildWsUrl(resolvedBase, name) DEBUG('[watchNameRaw] connecting to', url) const ws = new WebSocket(url) ws.onopen = () => { LOG('[watchNameRaw] connected to', name) options.onOpen?.() } ws.onmessage = (event) => { try { const record: W3NameRecord = JSON.parse(event.data) DEBUG('[watchNameRaw] received update for', name, record) options.onUpdate(record) } catch (err) { WARN('[watchNameRaw] failed to parse message:', event.data, err) options.onError?.(err instanceof Error ? err : new Error(String(err))) } } ws.onerror = (event) => { WARN('[watchNameRaw] error for', name, event) options.onError?.(event) } ws.onclose = (event) => { DEBUG('[watchNameRaw] closed for', name, 'code:', event.code) options.onClose?.(event) } return { close: () => { DEBUG('[watchNameRaw] closing connection for', name) ws.close() }, ws, } } export interface IpnsWatcherOptions { /** Called when the IPNS record is updated (enriched payload with CID and change detection) */ onUpdate: (update: IpnsUpdate) => void | Promise /** Called when an error occurs */ onError?: (error: Error | Event) => void /** Called when the connection is opened/reconnected */ onConnected?: () => void /** Called when the connection is closed */ onDisconnected?: () => void /** Fetch current IPNS state on first connect (default: false) */ fetchInitialState?: boolean /** Fetch current IPNS state on reconnect to catch missed updates (default: true) */ catchUpOnReconnect?: boolean /** If true, call onUpdate even when value hasn't changed (default: false) */ includeUnchanged?: boolean /** * Enable periodic liveness checks via HTTP to detect zombie connections (default: true). * When enabled, periodically fetches current IPNS value and forces reconnect if it * differs from the last WebSocket update. */ livenessCheck?: boolean /** * Liveness check interval in milliseconds (default: 3600000 = 1 hour). * Only used when livenessCheck is enabled. */ livenessCheckInterval?: number /** * Called when a stale connection is detected (WebSocket missed updates). * Provides debug info about the connection state. */ onStaleConnection?: (info: StaleConnectionInfo) => void /** * Partysocket options (passed through to ReconnectingWebSocket). * Useful options: startClosed, maxReconnectionDelay, minReconnectionDelay, etc. * @see https://github.com/partykit/partykit/tree/main/packages/partysocket */ wsOptions?: PartysocketOptions } /** * Robust IPNS watcher with auto-reconnect and catch-up logic. * Uses partysocket for reliable WebSocket reconnection. * * @example * ```ts * const watcher = new IpnsWatcher('k51qzi5uqu...', { * onUpdate: (update) => console.log('New CID:', update.cid?.toString()), * onError: (err) => console.error('Error:', err), * }) * * // Later, to stop watching: * watcher.close() * ``` */ const DEFAULT_LIVENESS_INTERVAL = 3600000 // 1 hour export class IpnsWatcher { private name: string private nameBaseUrl: string private ws: ReconnectingWebSocket private lastKnownValue: string | null = null private options: IpnsWatcherOptions private isFirstConnect = true private livenessTimer: ReturnType | null = null private connectedAt: Date | null = null private lastMessageAt: Date | null = null constructor(nameBaseUrl: string, name: string, options: IpnsWatcherOptions) { this.nameBaseUrl = requireNameBaseUrl(nameBaseUrl, 'IpnsWatcher') this.name = name this.options = options const url = buildWsUrl(this.nameBaseUrl, name) DEBUG('[IpnsWatcher] creating for', name) this.ws = new ReconnectingWebSocket(url, [], { maxReconnectionDelay: 900000, // 15min minReconnectionDelay: 5000, reconnectionDelayGrowFactor: 2, maxRetries: Infinity, ...options.wsOptions, }) this.ws.onopen = () => { LOG('[IpnsWatcher] connected to', name) this.connectedAt = new Date() options.onConnected?.() // Check for current state on first connect if requested if (this.isFirstConnect && (options.fetchInitialState ?? false)) { this.checkForMissedUpdates() } // Check for missed updates on reconnect else if (!this.isFirstConnect && (options.catchUpOnReconnect ?? true)) { this.checkForMissedUpdates() } this.isFirstConnect = false // Start liveness checking (default: enabled) if (options.livenessCheck !== false) { this.startLivenessCheck() } } this.ws.onmessage = (event) => { this.lastMessageAt = new Date() try { const record: W3NameRecord = JSON.parse(event.data as string) DEBUG('[IpnsWatcher] received update for', name, record) const lastValue = this.lastKnownValue const isNew = record.value !== lastValue const cid = parseCidFromIpnsValue(record.value) // Skip unchanged values unless includeUnchanged is set if (!isNew && !options.includeUnchanged) { DEBUG('[IpnsWatcher] skipping unchanged value for', name) return } // Update lastKnownValue after the skip check this.lastKnownValue = record.value const update: IpnsUpdate = { value: record.value, cid, lastValue, isNew, record, } void options.onUpdate(update) } catch (err) { WARN('[IpnsWatcher] failed to parse message:', event.data, err) options.onError?.(err instanceof Error ? err : new Error(String(err))) } } this.ws.onerror = (event) => { // Extract meaningful error info instead of logging entire ErrorEvent const errorMsg = event instanceof ErrorEvent ? event.message : 'WebSocket error' // "Unexpected EOF" is a normal disconnection - partysocket will auto-reconnect // Log at INFO level as it's expected behavior, not an error if (errorMsg === 'Unexpected EOF') { LOG('[IpnsWatcher] error for', name, ':', errorMsg, '(auto-reconnect enabled)') } else { WARN('[IpnsWatcher] error for', name, ':', errorMsg) } // Still call the error handler for unexpected errors if (errorMsg !== 'Unexpected EOF') { options.onError?.(event) } } this.ws.onclose = () => { DEBUG('[IpnsWatcher] disconnected from', name) this.stopLivenessCheck() this.connectedAt = null this.lastMessageAt = null options.onDisconnected?.() } } /** * Resolve current IPNS value via HTTP API to catch missed updates */ private async checkForMissedUpdates(): Promise { try { DEBUG('[IpnsWatcher] checking for missed updates for', this.name) const response = await fetch(buildHttpUrl(this.nameBaseUrl, this.name)) if (!response.ok) { if (response.status === 404) { DEBUG('[IpnsWatcher] IPNS not yet published:', this.name) return } throw new Error(`HTTP ${response.status}: ${response.statusText}`) } const record: W3NameRecord = await response.json() const lastValue = this.lastKnownValue const isNew = record.value !== lastValue const cid = parseCidFromIpnsValue(record.value) // Skip unchanged values unless includeUnchanged is set if (!isNew && !this.options.includeUnchanged) { DEBUG('[IpnsWatcher] no new updates for', this.name) return } const logMsg = lastValue === null ? '[IpnsWatcher] fetched initial state for' : '[IpnsWatcher] caught missed update for' LOG(logMsg, this.name, { previous: lastValue, current: record.value, }) // Update lastKnownValue after the skip check this.lastKnownValue = record.value const update: IpnsUpdate = { value: record.value, cid, lastValue, isNew, record, } void this.options.onUpdate(update) } catch (err) { WARN('[IpnsWatcher] failed to check for missed updates:', this.name, err) this.options.onError?.(err instanceof Error ? err : new Error(String(err))) } } /** * Start periodic liveness checks to detect zombie connections. */ private startLivenessCheck(): void { this.stopLivenessCheck() // Clear any existing timer const interval = this.options.livenessCheckInterval ?? DEFAULT_LIVENESS_INTERVAL DEBUG('[IpnsWatcher] starting liveness check for', this.name, 'interval:', interval) this.livenessTimer = setInterval(() => { void this.performLivenessCheck() }, interval) } /** * Stop periodic liveness checks. */ private stopLivenessCheck(): void { if (this.livenessTimer !== null) { DEBUG('[IpnsWatcher] stopping liveness check for', this.name) clearInterval(this.livenessTimer) this.livenessTimer = null } } /** * Perform a single liveness check via HTTP. * If the HTTP value differs from lastKnownValue, the connection is stale. */ private async performLivenessCheck(): Promise { try { DEBUG('[IpnsWatcher] performing liveness check for', this.name) const response = await fetch(buildHttpUrl(this.nameBaseUrl, this.name)) if (!response.ok) { if (response.status === 404) { // IPNS not published - if we also have null, that's consistent if (this.lastKnownValue === null) { DEBUG('[IpnsWatcher] liveness check OK (both null) for', this.name) return } // We have a value but HTTP says 404 - this shouldn't happen normally WARN('[IpnsWatcher] liveness check inconsistent (we have value, HTTP 404) for', this.name) return } throw new Error(`HTTP ${response.status}: ${response.statusText}`) } const record: W3NameRecord = await response.json() // Check if values match if (record.value === this.lastKnownValue) { DEBUG('[IpnsWatcher] liveness check OK for', this.name) return } // Stale connection detected! const now = new Date() const silenceDuration = this.lastMessageAt ? now.getTime() - this.lastMessageAt.getTime() : this.connectedAt ? now.getTime() - this.connectedAt.getTime() : 0 const staleInfo: StaleConnectionInfo = { connectedAt: this.connectedAt ?? now, lastMessageAt: this.lastMessageAt, silenceDuration, staleValue: this.lastKnownValue, currentValue: record.value, } WARN('[IpnsWatcher] stale connection detected for', this.name, { connectedAt: staleInfo.connectedAt.toISOString(), lastMessageAt: staleInfo.lastMessageAt?.toISOString() ?? 'never', silenceDuration: `${Math.round(silenceDuration / 1000)}s`, staleValue: staleInfo.staleValue, currentValue: staleInfo.currentValue, }) // Notify via callback this.options.onStaleConnection?.(staleInfo) // Fire immediate update with the current value const lastValue = this.lastKnownValue const cid = parseCidFromIpnsValue(record.value) this.lastKnownValue = record.value const update: IpnsUpdate = { value: record.value, cid, lastValue, isNew: true, record, } void this.options.onUpdate(update) // Force reconnect to get a fresh connection LOG('[IpnsWatcher] forcing reconnect due to stale connection for', this.name) this.ws.reconnect() } catch (err) { // Don't treat HTTP errors as stale - could be network issue WARN('[IpnsWatcher] liveness check failed for', this.name, err) } } /** * Manually start/reconnect the WebSocket. * Only needed if you used `wsOptions: { startClosed: true }`. */ start(): void { LOG('[IpnsWatcher] starting watcher for', this.name) this.ws.reconnect() } /** * Alias for close() - for backward compatibility */ stop(): void { this.close() } /** * Close the WebSocket connection and stop watching */ close(): void { LOG('[IpnsWatcher] closing watcher for', this.name) this.stopLivenessCheck() this.ws.close() } /** * Get the last known IPNS value */ get lastValue(): string | null { return this.lastKnownValue } /** * Get the WebSocket ready state */ get readyState(): number { return this.ws.readyState } } /** * Create an IPNS watcher with auto-reconnect and catch-up logic. * Convenience function that creates and returns an IpnsWatcher instance. * * @param nameBaseUrl - Base URL of a naming service that supports `/name//watch` (WebSocket) and `/name/` (HTTP) * @param name - The IPNS name/key to watch (e.g. "k51qzi5u...") * @param options - Callback options for handling events * @returns An IpnsWatcher instance with close() method */ export function watchName(nameBaseUrl: string, name: string, options: IpnsWatcherOptions): IpnsWatcher { return new IpnsWatcher(nameBaseUrl, name, options) } /** * Watch an IPNS name and return updates as an async iterator. * Includes auto-reconnect - iterator continues through disconnections. * * @param nameBaseUrl - Base URL of a naming service that supports `/name//watch` (WebSocket) and `/name/` (HTTP) * @param name - The IPNS name/key to watch * @param signal - Optional AbortSignal to stop the watch * * @example * ```ts * const controller = new AbortController() * for await (const update of watchNameIterator('https://name.example.com', 'k51qzi5u...', controller.signal)) { * console.log('Update:', update.cid?.toString()) * } * ``` */ export async function* watchNameIterator( nameBaseUrl: string, name: string, signal?: AbortSignal, ): AsyncGenerator { const queue: IpnsUpdate[] = [] let resolve: (() => void) | null = null let error: Error | null = null const watcher = new IpnsWatcher(nameBaseUrl, name, { onUpdate: (update) => { queue.push(update) resolve?.() }, onError: (err) => { error = err instanceof Error ? err : new Error('WebSocket error') resolve?.() }, }) signal?.addEventListener('abort', () => { watcher.close() }) try { while (!signal?.aborted) { if (queue.length > 0) { yield queue.shift()! } else if (error) { // Log error but continue - partysocket will reconnect WARN('[watchNameIterator] error occurred, continuing:', error) error = null } else { await new Promise((r) => { resolve = r }) resolve = null } } } finally { watcher.close() } }