import type { AnyrowSDKConfig } from "./sdk.types.gen" import { serviceMap } from "./sdk.map.gen" type _SSEEvent = { data: string; event?: string; id?: string; retry?: number } type _TypedWebSocket = { close(code?: number, reason?: string): void off(event: "close" | "error" | "message" | "open", handler: (...args: never[]) => void): void on(event: "close", handler: (code: number, reason: string) => void): void on(event: "error", handler: (error: unknown) => void): void on(event: "message", handler: (data: string) => void): void on(event: "open", handler: () => void): void readonly readyState: number send(data: ArrayBuffer | ArrayBufferView | object | string): void } type _ServiceEntry = { idempotent?: boolean invalidate?: readonly string[] method: string params?: readonly string[] path: string sse?: boolean ws?: boolean } type _ServiceMapNode = _ServiceEntry | { [key: string]: _ServiceMapNode } type _ServiceMap = { [key: string]: _ServiceMapNode } type _RequestOptions = { body?: ReadableStream | Blob | ArrayBuffer | Uint8Array cookies?: Record form?: Record headers?: Record idempotencyKey?: string json?: unknown lastEventId?: string params?: Record protocols?: string | string[] reconnectToken?: string search?: Record signal?: AbortSignal timeout?: number } type _RequestMeta = { invalidatedBy: string[] isStale: boolean selector: string seqSnapshot: number } class _ClientError extends Error { readonly body: unknown readonly data: unknown readonly response: Response readonly status: number constructor(init: { body: unknown; data: unknown; message: string; response: Response; status: number }) { super(init.message) ;(Error as unknown as { captureStackTrace?: (t: object, c: Function) => void }).captureStackTrace?.(this, _ClientError) this.name = "ClientError" this.body = init.body this.data = init.data this.response = init.response this.status = init.status } } class _BadRequestError extends _ClientError { constructor(init: ConstructorParameters[0]) { super(init); this.name = "BadRequestError" } } class _UnauthorizedError extends _ClientError { constructor(init: ConstructorParameters[0]) { super(init); this.name = "UnauthorizedError" } } class _ForbiddenError extends _ClientError { constructor(init: ConstructorParameters[0]) { super(init); this.name = "ForbiddenError" } } class _NotFoundError extends _ClientError { constructor(init: ConstructorParameters[0]) { super(init); this.name = "NotFoundError" } } class _ConflictError extends _ClientError { constructor(init: ConstructorParameters[0]) { super(init); this.name = "ConflictError" } } class _UnprocessableEntityError extends _ClientError { constructor(init: ConstructorParameters[0]) { super(init); this.name = "UnprocessableEntityError" } } class _RateLimitError extends _ClientError { constructor(init: ConstructorParameters[0]) { super(init); this.name = "RateLimitError" } } class _InternalServerError extends _ClientError { constructor(init: ConstructorParameters[0]) { super(init); this.name = "InternalServerError" } } class _BadGatewayError extends _ClientError { constructor(init: ConstructorParameters[0]) { super(init); this.name = "BadGatewayError" } } class _ServiceUnavailableError extends _ClientError { constructor(init: ConstructorParameters[0]) { super(init); this.name = "ServiceUnavailableError" } } class _GatewayTimeoutError extends _ClientError { constructor(init: ConstructorParameters[0]) { super(init); this.name = "GatewayTimeoutError" } } const _STATUS_ERROR_MAP: Record[0]) => _ClientError> = { 400: _BadRequestError, 401: _UnauthorizedError, 403: _ForbiddenError, 404: _NotFoundError, 409: _ConflictError, 422: _UnprocessableEntityError, 429: _RateLimitError, 500: _InternalServerError, 502: _BadGatewayError, 503: _ServiceUnavailableError, 504: _GatewayTimeoutError, } export class AnyrowSDK { state: Record #config: AnyrowSDKConfig #fetchFn: typeof fetch #resourceCache = new Map>() #searchSerializer: (query: Record) => URLSearchParams #staleTime: number #staleUntil: Map | null #patternRegexCache = new Map() #invalidationSeq = 0 #staleMaxEntries: number #maxSourcesPerTarget: number #maxErrorMessageChars: number #sseMaxBufferChars: number #disposeCtrl = new AbortController() #disposed = false constructor(config: AnyrowSDKConfig) { const ownState = config.state ?? {} this.state = ownState this.#config = { ...config, state: ownState } this.#fetchFn = config.fetch ?? globalThis.fetch this.#searchSerializer = config.buildSearchParams ?? ((q: Record) => this.#serializeSearch(q)) this.#staleTime = config.invalidation?.staleTime ?? 0 this.#staleUntil = this.#staleTime > 0 ? new Map() : null this.#staleMaxEntries = config.invalidation?.staleMaxEntries ?? 1000 this.#maxSourcesPerTarget = Math.max(config.invalidation?.maxSourcesPerTarget ?? 16, 1) this.#maxErrorMessageChars = config.maxErrorMessageChars ?? 512 this.#sseMaxBufferChars = config.sseMaxBufferChars ?? 1024 * 1024 const self = this function makeNodeProxy(node: _ServiceMapNode, path: string[]): object { const actionCache = new Map) => unknown>() const childCache = new Map() return new Proxy({} as Record, { get: (_, key: string | symbol) => { if (typeof key === "symbol") return undefined const child = (node as Record)[key] if (child === undefined) return undefined /* leaf: entry has a "method" string field */ if (typeof (child as Record)["method"] === "string") { const cached = actionCache.get(key) if (cached) return cached const entry = child as _ServiceEntry const entryPath = self.#toColonParams(entry.path) let fn: (input?: Record) => unknown if (entry.ws) { fn = (input?: Record) => self.#connectWS(entry, entryPath, (input ?? {}) as _RequestOptions) } else if (entry.sse) { fn = (input?: Record) => self.#requestSSE(entry, entryPath, (input ?? {}) as _RequestOptions) } else { fn = (input?: Record) => self.#request(entry, input ?? {}) } Object.defineProperty(fn, "name", { value: [...path, key].join(".") }) actionCache.set(key, fn) return fn } /* namespace: recurse */ const cachedChild = childCache.get(key) if (cachedChild) return cachedChild const childProxy = makeNodeProxy(child as _ServiceMapNode, [...path, key]) childCache.set(key, childProxy) return childProxy }, }) } return new Proxy(this, { get: (target, key: string | symbol) => { if (typeof key === "symbol") return Reflect.get(target, key) const cached = target.#resourceCache.get(key) if (cached) return cached const node = (serviceMap as _ServiceMap)[key] if (node === undefined) return Reflect.get(target, key) /* root-level leaf (single-segment operationId) */ if (typeof (node as Record)["method"] === "string") { const entry = node as _ServiceEntry const entryPath = target.#toColonParams(entry.path) let fn: (input?: Record) => unknown if (entry.ws) { fn = (input?: Record) => target.#connectWS(entry, entryPath, (input ?? {}) as _RequestOptions) } else if (entry.sse) { fn = (input?: Record) => target.#requestSSE(entry, entryPath, (input ?? {}) as _RequestOptions) } else { fn = (input?: Record) => target.#request(entry, input ?? {}) } Object.defineProperty(fn, "name", { value: key }) target.#resourceCache.set(key, fn as unknown as Record) return fn } /* namespace node — check for single _call promotion */ const nodeRecord = node as Record const nodeKeys = Object.keys(nodeRecord) if (nodeKeys.length === 1 && nodeKeys[0] === "_call") { const entry = nodeRecord["_call"] as _ServiceEntry const entryPath = target.#toColonParams(entry.path) let fn: (input?: Record) => unknown if (entry.ws) { fn = (input?: Record) => target.#connectWS(entry, entryPath, (input ?? {}) as _RequestOptions) } else if (entry.sse) { fn = (input?: Record) => target.#requestSSE(entry, entryPath, (input ?? {}) as _RequestOptions) } else { fn = (input?: Record) => target.#request(entry, input ?? {}) } Object.defineProperty(fn, "name", { value: key }) target.#resourceCache.set(key, fn as unknown as Record) return fn } const proxy = makeNodeProxy(node as _ServiceMapNode, [key]) target.#resourceCache.set(key, proxy as Record) return proxy }, }) as AnyrowSDK } #interpolatePath(path: string, params: Record): string { return path.replace(/:(\w+)/g, (_, key: string) => { const val = params[key] if (val === undefined) throw new Error(`Missing path param: ${key}`) return encodeURIComponent(val) }) } #toColonParams(path: string): string { return path.replace(/\{(\w+)\}/g, ":$1") } #resolveInvalidationTargets( targets: readonly string[], params: Record | undefined, ): string[] { const resolved: string[] = [] for (const target of targets) { if (!params || !target.includes(":")) { resolved.push(target); continue } const spaceIdx = target.indexOf(" ") const targetMethod = target.slice(0, spaceIdx) const targetPath = target.slice(spaceIdx + 1) let hasUnresolved = false const replaced = targetPath.replace(/:(\w+)/g, (match, key: string) => { const val = params[key] if (val === undefined) { hasUnresolved = true; return match } return encodeURIComponent(val) }) if (hasUnresolved) continue resolved.push(`${targetMethod} ${replaced}`) } return resolved } #pathMatchesPattern(concretePath: string, pattern: string): boolean { let re = this.#patternRegexCache.get(pattern) if (!re) { const escaped = pattern .replace(/:[^\/]+/g, "\x00") .replace(/[.*+?^${}()|[\]\\]/g, "\\$&") .replace(/\x00/g, "[^/]+") re = new RegExp(`^${escaped}$`) this.#patternRegexCache.set(pattern, re) } return re.test(concretePath) } #lookupStale( concreteSelector: string, concretePath: string, method: string, now: number, ): { by: string[]; isStale: boolean } { const allBy: string[] = [] const exact = this.#staleUntil?.get(concreteSelector) if (exact && exact.until > now) allBy.push(...exact.by) else if (exact) this.#staleUntil?.delete(concreteSelector) const expired: string[] = [] if (this.#staleUntil) { for (const [key, entry] of this.#staleUntil) { if (entry.until <= now) { expired.push(key); continue } if (!key.includes(":")) continue const spaceIdx = key.indexOf(" ") const keyMethod = key.slice(0, spaceIdx) const keyPattern = key.slice(spaceIdx + 1) if (keyMethod !== method) continue if (this.#pathMatchesPattern(concretePath, keyPattern)) { allBy.push(...entry.by) } } for (const key of expired) this.#staleUntil.delete(key) } return { by: [...new Set(allBy)], isStale: allBy.length > 0 } } #createTypedWebSocket(url: string, protocols?: string | string[]): _TypedWebSocket { const ws = protocols ? new WebSocket(url, protocols) : new WebSocket(url) const listenerMap = new WeakMap<(...args: never[]) => void, EventListener>() const sendBuffer: Array = [] let buffering = true ws.addEventListener("open", () => { buffering = false for (const msg of sendBuffer) ws.send(msg as Parameters[0]) sendBuffer.length = 0 }) function close(code?: number, reason?: string) { buffering = false sendBuffer.length = 0 ws.close(code, reason) } function on(event: string, handler: (...args: never[]) => void): void { let wrapped: EventListener switch (event) { case "message": wrapped = (e: Event) => (handler as (data: string) => void)((e as MessageEvent).data) break case "open": wrapped = () => (handler as () => void)() break case "close": wrapped = (e: Event) => (handler as (code: number, reason: string) => void)((e as CloseEvent).code, (e as CloseEvent).reason) break case "error": wrapped = (e: Event) => (handler as (error: unknown) => void)(e) break default: return } listenerMap.set(handler, wrapped) ws.addEventListener(event, wrapped) } function off(event: string, handler: (...args: never[]) => void): void { const wrapped = listenerMap.get(handler) if (wrapped) { ws.removeEventListener(event, wrapped) listenerMap.delete(handler) } } function send(data: ArrayBuffer | ArrayBufferView | object | string) { let payload: ArrayBuffer | ArrayBufferView | string if (typeof data === "string") { payload = data } else if (data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { payload = data } else { payload = JSON.stringify(data) } if (buffering) { sendBuffer.push(payload) } else { ws.send(payload as Parameters[0]) } } const typed: _TypedWebSocket = { close, off, on, get readyState() { return ws.readyState }, send } Object.defineProperty(typed, "_ws", { enumerable: false, value: ws }) return typed } #serializeSearch(query: Record): URLSearchParams { const params = new URLSearchParams() const coerce = (v: unknown): string | null => { if (v === undefined || v === null) return null if (v instanceof Date) return v.toISOString() if (typeof v === "symbol") return null return String(v) } for (const [k, v] of Object.entries(query)) { if (Array.isArray(v)) { for (const item of v) { const s = coerce(item); if (s !== null) params.append(k, s) } } else { const s = coerce(v); if (s !== null) params.set(k, s) } } return params } #buildURL(path: string, opts: _RequestOptions): string { let resolvedPath = path if (opts.params) resolvedPath = this.#interpolatePath(path, opts.params) const baseUrl = new URL(this.#config.baseURL) const basePath = baseUrl.pathname.endsWith("/") ? baseUrl.pathname : `${baseUrl.pathname}/` const relative = resolvedPath.startsWith("/") ? resolvedPath.slice(1) : resolvedPath const url = new URL(`${basePath}${relative}`, baseUrl) for (const [k, v] of baseUrl.searchParams.entries()) url.searchParams.append(k, v) if (opts.search) { const sp = this.#searchSerializer(opts.search) if (this.#config.sortSearchParams) sp.sort() for (const [k, v] of sp.entries()) url.searchParams.append(k, v) } return url.toString() } async #buildHeaders( opts: _RequestOptions, ctx: { method: string; path: string }, ): Promise { const headers = new Headers() if (this.#config.headers) { const resolved = typeof this.#config.headers === "function" ? await this.#config.headers(ctx) : this.#config.headers for (const [k, v] of Object.entries(resolved)) { if (v !== undefined) headers.set(k, v) } } if (opts.headers) { for (const [k, v] of Object.entries(opts.headers)) { headers.set(k, v) } } if (opts.cookies) { const existing = headers.get("cookie") const pairs = Object.entries(opts.cookies) .map(([k, v]) => `${encodeURIComponent(k)}=${encodeURIComponent(v)}`) .join("; ") if (pairs) { headers.set("cookie", existing ? `${existing}; ${pairs}` : pairs) } } return headers } async #doRequest( method: string, path: string, opts: _RequestOptions, isRetry: boolean, requestMeta?: _RequestMeta, ): Promise<{ response: Response }> { const url = this.#buildURL(path, opts) const headers = await this.#buildHeaders(opts, { method, path }) let body: BodyInit | undefined if (opts.body !== undefined) { headers.set("content-type", "application/octet-stream") body = opts.body as BodyInit } else if (opts.json !== undefined) { headers.set("content-type", "application/json") try { body = JSON.stringify(opts.json) } catch (e) { throw new _ClientError({ body: opts.json, data: null, message: `JSON serialization failed: ${e instanceof Error ? e.message : String(e)}`, response: new Response(null, { status: 0 }), status: 0, }) } } else if (opts.form !== undefined) { const hasFiles = Object.values(opts.form).some( (v) => (typeof File !== "undefined" && v instanceof File) || (typeof Blob !== "undefined" && v instanceof Blob) || (typeof FileList !== "undefined" && v instanceof FileList) || (Array.isArray(v) && v.some( (item) => (typeof File !== "undefined" && item instanceof File) || (typeof Blob !== "undefined" && item instanceof Blob), )), ) if (hasFiles) { const fd = new FormData() for (const [k, v] of Object.entries(opts.form)) { if (v === undefined || v === null) continue if (typeof FileList !== "undefined" && v instanceof FileList) { for (let i = 0; i < v.length; i++) { const f = v[i]; if (f) fd.append(k, f) } } else if (Array.isArray(v)) { for (const item of v) { if (item instanceof File || item instanceof Blob) fd.append(k, item) else fd.append(k, String(item)) } } else if (v instanceof File || v instanceof Blob) { fd.append(k, v) } else { fd.append(k, String(v)) } } body = fd } else { headers.set("content-type", "application/x-www-form-urlencoded") const sp = new URLSearchParams() for (const [k, v] of Object.entries(opts.form)) { if (v === undefined || v === null) continue if (v instanceof Date) { sp.set(k, v.toISOString()); continue } if (typeof v === "symbol") continue sp.set(k, String(v)) } body = sp.toString() } } if (this.#config.onRequest) { const reqCtx: { body?: BodyInit; headers: Headers; invalidatedBy?: string[]; isStale?: boolean; method: string; path: string; selector?: string; state: Record; url: string } = { body, headers, method, path, state: this.#config.state ?? {}, url } if (requestMeta) { reqCtx.invalidatedBy = requestMeta.invalidatedBy reqCtx.isStale = requestMeta.isStale reqCtx.selector = requestMeta.selector } for (const hook of this.#config.onRequest) { await hook(reqCtx) } if (reqCtx.body !== body) body = reqCtx.body } const { signal, cleanup } = this.#buildSignal(opts) const init: RequestInit = { body, headers, method, signal } if (this.#config.credentials) init.credentials = this.#config.credentials if (this.#config.mode) init.mode = this.#config.mode const _logOp = `${method.toUpperCase()} ${path}` const _logStart = Date.now() this.#config.onLog?.({ duration_ms: 0, event: "request_start", level: "debug", operation: _logOp }) try { signal?.throwIfAborted() let response = await this.#fetchFn(url, init) if (response.status === 401 && this.#config.onAuthExpired && !isRetry && !(body instanceof FormData)) { const newToken = await this.#config.onAuthExpired() if (newToken != null) { const retryHeaders = new Headers(headers) const authName = this.#config.authHeaderName ?? "Authorization" const authPrefix = this.#config.authHeaderPrefix ?? "Bearer " retryHeaders.set(authName, `${authPrefix}${newToken}`) response = await this.#fetchFn(url, { ...init, headers: retryHeaders }) } } this.#config.onLog?.({ duration_ms: Date.now() - _logStart, event: "response_received", level: response.status >= 400 ? "warn" : "info", operation: _logOp, status: response.status, }) if (this.#config.onResponse) { const resCtx: { invalidatedBy?: string[]; isRetry: boolean; isStale?: boolean; method: string; path: string; request: Request; response: Response; retry: () => Promise; selector?: string; state: Record; url: string } = { isRetry, method, path, request: new Request(url, init), response, retry: () => { if (isRetry) throw new Error("Max 1 retry per request") return this.#doRequest(method, path, opts, true, requestMeta) .then(async (r) => { if (!r.response.ok) throw await this.#parseAsClientError(r.response) return r.response }) }, state: this.#config.state ?? {}, url, } if (requestMeta) { resCtx.invalidatedBy = requestMeta.invalidatedBy resCtx.isStale = requestMeta.isStale resCtx.selector = requestMeta.selector } for (const hook of this.#config.onResponse) { const result = await hook(resCtx) if (result instanceof Response) { response = result resCtx.response = result } } } return { response } } catch (err) { const _errStatus = typeof (err as { status?: unknown })?.status === "number" ? (err as { status: number }).status : undefined this.#config.onLog?.({ duration_ms: Date.now() - _logStart, error: err, event: "error", level: "error", operation: _logOp, ...(_errStatus !== undefined ? { status: _errStatus } : {}), }) throw err } finally { cleanup() } } #parseBody(response: Response): Promise { if (response.status === 204) return Promise.resolve(null) const rawCt = response.headers.get("content-type") ?? "" const ct = rawCt.split(";")[0]?.trim().toLowerCase() ?? "" if (ct === "application/json" || ct.endsWith("+json")) { return response.json() } if (ct === "application/octet-stream" || ct === "application/pdf") { return response.arrayBuffer() } if (ct.startsWith("text/")) return response.text() /* unknown content type — binary-safe fallback */ return response.arrayBuffer() } async #parseErrorBody(response: Response): Promise { try { return await response.json() } catch { return undefined } } async #parseAsClientError(response: Response): Promise<_ClientError> { const preserved = response.clone() const body = await this.#parseErrorBody(response) const msgVal = typeof body === "object" && body !== null && "message" in body ? (body as Record)["message"] : undefined const rawMsg = typeof msgVal === "string" ? msgVal : null const safeMsg = rawMsg !== null ? rawMsg.replace(/[\x00-\x1f]/g, "").slice(0, this.#maxErrorMessageChars) : `HTTP ${response.status}` const Cls = _STATUS_ERROR_MAP[response.status] ?? _ClientError return new Cls({ body, data: body, message: safeMsg, response: preserved, status: response.status }) } #buildSignal(opts: _RequestOptions, isStream?: boolean): { cleanup: () => void; signal: AbortSignal } { const userSignal = opts.signal const timeout = isStream ? undefined : (opts.timeout ?? this.#config.timeout) if (!timeout && !userSignal) return { cleanup: () => {}, signal: this.#disposeCtrl.signal } const ctrl = new AbortController() let timer: ReturnType | undefined if (timeout) { timer = setTimeout(() => ctrl.abort(new Error("timeout")), timeout) if (typeof (timer as unknown as { unref?: () => void }).unref === "function") { (timer as unknown as { unref: () => void }).unref() } } const abort = (reason?: unknown) => { if (!ctrl.signal.aborted) ctrl.abort(reason) } const onDispose = () => abort(this.#disposeCtrl.signal.reason) const onUserAbort = () => abort(userSignal?.reason) if (this.#disposeCtrl.signal.aborted) { abort(this.#disposeCtrl.signal.reason) } else { this.#disposeCtrl.signal.addEventListener("abort", onDispose, { once: true }) } if (userSignal?.aborted) { abort(userSignal.reason) } else if (userSignal) { userSignal.addEventListener("abort", onUserAbort, { once: true }) } return { cleanup: () => { if (timer !== undefined) clearTimeout(timer) this.#disposeCtrl.signal.removeEventListener("abort", onDispose) userSignal?.removeEventListener("abort", onUserAbort) }, signal: ctrl.signal, } } async #requestThrow( method: string, path: string, opts: _RequestOptions, requestMeta?: _RequestMeta, ): Promise { const { response } = await this.#doRequest(method, path, opts, false, requestMeta) if (!response.ok) { throw await this.#parseAsClientError(response) } return this.#parseBody(response) } async #requestSafe( method: string, path: string, opts: _RequestOptions, requestMeta?: _RequestMeta, ): Promise<{ data: unknown error: unknown response: Response status: number }> { let doResponse: { response: Response } try { doResponse = await this.#doRequest(method, path, opts, false, requestMeta) } catch (e) { if (e instanceof _ClientError) return { data: null, error: e, response: e.response, status: e.status } throw e } const { response } = doResponse if (!response.ok) { const preserved = response.clone() const error = await this.#parseErrorBody(response) return { data: null, error, response: preserved, status: response.status, } } let data: unknown try { data = await this.#parseBody(response) } catch (e) { return { data: null, error: e, response, status: response.status } } return { data, error: null, response, status: response.status } } async #request(entry: _ServiceEntry, input: Record): Promise { const path = this.#toColonParams(entry.path) const method = entry.method const opts = input as _RequestOptions if (entry.idempotent) { const existing = opts.headers?.["Idempotency-Key"] ?? opts.headers?.["idempotency-key"] if (existing === undefined) { const key = opts.idempotencyKey ?? crypto.randomUUID() opts.headers = { ...(opts.headers ?? {}), "Idempotency-Key": key } } } const params = opts.params if (entry.params) { for (const p of entry.params) { if (!params || params[p] === undefined) { const err = new _ClientError({ body: null, data: null, message: `Missing required path param \`${p}\` for ${method} ${entry.path}`, response: new Response(null, { status: 0 }), status: 0, }) if (!this.#config.throwOnError) return { data: null, error: err, response: err.response, status: 0 } throw err } } } const cp = params ? this.#interpolatePath(path, params) : path const cs = `${method} ${cp}` const requestMeta = this.#buildRequestMeta(cs, cp, method) if (!this.#config.throwOnError) { const r = await this.#requestSafe(method, path, opts, requestMeta) if (r.status >= 200 && r.status < 300) { this.#markStale(entry.invalidate??[], params, cs) if (requestMeta?.isStale) this.#clearStale(cs, cp, method, requestMeta.seqSnapshot) } return r } const data = await this.#requestThrow(method, path, opts, requestMeta) this.#markStale(entry.invalidate??[], params, cs) if (requestMeta?.isStale) this.#clearStale(cs, cp, method, requestMeta?.seqSnapshot ?? 0) return data } #requestSSE(entry: _ServiceEntry, path: string, opts: _RequestOptions): AsyncIterable<_SSEEvent> { return { [Symbol.asyncIterator]: () => this.#doSSE(entry, path, opts) } } #connectWS(entry: _ServiceEntry, path: string, opts: _RequestOptions): _TypedWebSocket { let url = this.#buildURL(path, opts).replace(/^https:\/\//, "wss://").replace(/^http:\/\//, "ws://") if (opts.reconnectToken) { const sep = url.includes("?") ? "&" : "?" url = `${url}${sep}reconnect_token=${encodeURIComponent(opts.reconnectToken)}` } const ws = this.#createTypedWebSocket(url, opts.protocols) if (entry.invalidate && entry.invalidate.length > 0) { const invalidate = entry.invalidate const params = opts.params const cs = `WS ${params ? this.#interpolatePath(path, params) : path}` ws.on("open", () => this.#markStale(invalidate, params, cs)) } return ws } #buildRequestMeta(concreteSelector: string, concretePath: string, method: string): _RequestMeta | undefined { if (!this.#staleUntil) return undefined const now = Date.now() const { by, isStale } = this.#lookupStale(concreteSelector, concretePath, method, now) return { invalidatedBy: by, isStale, selector: concreteSelector, seqSnapshot: this.#invalidationSeq } } #markStale(invalidate: readonly string[], params: Record | undefined, mutationSelector: string): void { if (!this.#staleUntil || invalidate.length === 0) return const seq = ++this.#invalidationSeq const until = Date.now() + this.#staleTime const resolved = this.#resolveInvalidationTargets(invalidate, params) for (const target of resolved) { const existing = this.#staleUntil.get(target) if (existing) { if (!existing.by.includes(mutationSelector) && existing.by.length < this.#maxSourcesPerTarget) existing.by.push(mutationSelector) existing.until = until existing.seq = seq } else { this.#staleUntil.set(target, { by: [mutationSelector], seq, until }) } } if (this.#staleUntil.size > this.#staleMaxEntries) for (const [k, entry] of this.#staleUntil) if (entry.until <= Date.now()) this.#staleUntil.delete(k) } #clearStale(concreteSelector: string, concretePath: string, method: string, seqSnapshot: number): void { if (!this.#staleUntil) return const exact = this.#staleUntil.get(concreteSelector) if (exact && exact.seq <= seqSnapshot) this.#staleUntil.delete(concreteSelector) const toDelete: string[] = [] for (const [key, entry] of this.#staleUntil) { if (entry.seq > seqSnapshot) continue if (!key.includes(":")) continue const spaceIdx = key.indexOf(" ") const keyMethod = key.slice(0, spaceIdx) const keyPattern = key.slice(spaceIdx + 1) if (keyMethod === method && this.#pathMatchesPattern(concretePath, keyPattern)) { toDelete.push(key) } } for (const key of toDelete) this.#staleUntil.delete(key) } dispose(): void { if (this.#disposed) return this.#disposed = true this.#disposeCtrl.abort() this.#staleUntil?.clear() this.#resourceCache.clear() this.#patternRegexCache.clear() } async *#doSSE(entry: _ServiceEntry, path: string, opts: _RequestOptions): AsyncGenerator<_SSEEvent> { const method = entry.method const url = this.#buildURL(path, opts) const headers = await this.#buildHeaders(opts, { method, path }) headers.set("accept", "text/event-stream") if (opts.lastEventId) { headers.set("last-event-id", opts.lastEventId) } const { signal, cleanup } = this.#buildSignal(opts, true) let reader: ReadableStreamDefaultReader | undefined try { if (this.#config.onRequest) { const reqCtx: { headers: Headers; invalidatedBy?: string[]; isStale?: boolean; method: string; path: string; selector?: string; state: Record; url: string } = { headers, method, path, state: this.#config.state ?? {}, url } for (const hook of this.#config.onRequest) { await hook(reqCtx) } } const sseInit: RequestInit = { headers, method, signal } if (this.#config.credentials) sseInit.credentials = this.#config.credentials if (this.#config.mode) sseInit.mode = this.#config.mode signal?.throwIfAborted() let response = await this.#fetchFn(url, sseInit) if (this.#config.onResponse) { const resCtx: { invalidatedBy?: string[]; isRetry: boolean; isStale?: boolean; method: string; path: string; request: Request; response: Response; retry: () => Promise; selector?: string; state: Record; url: string } = { isRetry: false, method, path, request: new Request(url, sseInit), response, retry: () => { throw new Error("SSE streams do not support retry") }, state: this.#config.state ?? {}, url, } for (const hook of this.#config.onResponse) { const result = await hook(resCtx) if (result instanceof Response) { response = result; resCtx.response = result } } } if (!response.ok) { throw await this.#parseAsClientError(response) } if (entry.invalidate && entry.invalidate.length > 0) { const cs = `${method} ${opts.params ? this.#interpolatePath(path, opts.params) : path}` this.#markStale(entry.invalidate, opts.params, cs) } if (!response.body) return const decoder = new TextDecoder() reader = response.body.getReader() let buffer = "" const maxBuffer = this.#sseMaxBufferChars while (true) { const { done, value } = await reader.read() if (done) break buffer += decoder.decode(value, { stream: true }) if (buffer.length > maxBuffer) throw new Error(`SSE buffer exceeded ${maxBuffer} characters`) const blocks = buffer.split(/\r\n\r\n|\r\n\r|\r\n\n|\r\r\n|\n\r\n|\n\r|\r\r|\n\n/) buffer = blocks.pop() ?? "" for (const block of blocks) { if (block.trim() === "") continue const event = this.#parseSSEBlock(block) if (event) yield event } } if (buffer.trim() !== "") { const event = this.#parseSSEBlock(buffer) if (event) yield event } } finally { if (reader) { try { await reader.cancel() } catch {} reader.releaseLock() } cleanup() } } #parseSSEBlock(block: string): _SSEEvent | undefined { const lines = block.split(/\r\n|\r|\n/) let isComment = true let data: string | undefined let event: string | undefined let id: string | undefined let retry: number | undefined for (const line of lines) { if (line.startsWith(":")) continue isComment = false const colonIdx = line.indexOf(":") let field: string let val: string if (colonIdx === -1) { field = line val = "" } else { field = line.slice(0, colonIdx) val = line.slice(colonIdx + 1) if (val.startsWith(" ")) val = val.slice(1) } switch (field) { case "data": data = data === undefined ? val : `${data}\n${val}` break case "event": event = val break case "id": if (!val.includes("\0")) id = val break case "retry": { const nr = Number(val) if (Number.isFinite(nr) && nr >= 0) retry = nr break } } } if (isComment || data === undefined) return undefined const evt: _SSEEvent = { data } if (event !== undefined) evt.event = event if (id !== undefined) evt.id = id if (retry !== undefined) evt.retry = retry return evt } } export { _ClientError as ClientError, _BadRequestError as BadRequestError, _UnauthorizedError as UnauthorizedError, _ForbiddenError as ForbiddenError, _NotFoundError as NotFoundError, _ConflictError as ConflictError, _UnprocessableEntityError as UnprocessableEntityError, _RateLimitError as RateLimitError, _InternalServerError as InternalServerError, _BadGatewayError as BadGatewayError, _ServiceUnavailableError as ServiceUnavailableError, _GatewayTimeoutError as GatewayTimeoutError, } export function isClientError(e: unknown): e is _ClientError { return e instanceof _ClientError }