import { AbortedError, noError } from "../errors.ts"; import { createEventEmitter, type On, type OnParameters } from "../events.ts"; import { type PackedPromise, packPromise } from "../promise.ts"; import { WebSocketClient, type WebSocketEventMap, type WebSocketLike, type WebSocketOptions } from "../ws.ts"; import type { Retry } from "./retry.ts"; /** * `ResilientWebSocket` wraps the provided WebSocket client and makes it * resilient to network disruptions and abrupt closures. It implements the * {@link WebSocketLike `WebSocketLike`} interface, so it can be further * wrapped to add behaviors. Every time the underlying connection is abruptly * closed, `ResilientWebSocket` tries to reopen it according to the provided * retry policy: * * When opening connection with `resilient.open()`, the first attempt to connect * is made immediately. If it fails, retry policy is applied with attempt = 1. * If retry policy doesn't allow further attempts, or if the connection is not * opened within {@link ResilientWebSocketOptions.maxTimeoutMs `maxTimeoutMs`}, * both the `resilient.open()` and the `resilient.healthy()` promises are * rejected. Otherwise, another attempt is made, and the attempt counter is * incremented. * * When a previously opened connection closes abruptly, the `reconnect` event * is dispatched. The first attempt to connect is made immediately. If it * fails, retry policy is applied with attempt = 1. If retry policy doesn't * allow further attempts, or if the connection is not opened within * {@link ResilientWebSocketOptions.maxTimeoutMs `maxTimeoutMs`}, * the `resilient.open()` promise is rejected, and the `gaveup` event is * dispatched. Otherwise, another attempt is made, and the attempt counter is * incremented. * * Both the `resilient.open()` and the `resilient.healthy()` promises resolve * with the return value of the `open()` method on the wrapped client, and * reject with an error that has the latest reason for connection to fail as * its `cause`. */ export interface ResilientWebSocket extends WebSocketLike { /** * @see {@link WebSocketClient.on} */ on: On>; /** * Returns a promise resolving when connection is opened. If connection is * currently opened ("healthy"), it resolves immediately. Otherwise, it * resolves as soon as current reconnection attempt succeeds. If the attempt * fails ("gives up"), it rejects with an error that has the latest reason * for connection to fail as its `cause`. * * It's recommended to always await for the connection to be healthy before * sending messages with `resilient.send()` or executing procedures with * `resilient.exec()`: * * ```ts * await resilient.healthy(); * await resilient.send('hello'); * ``` * * This method resolves with the return value of the `open()` method on * the wrapped client. * * If connection is cancelled by calling `ws.close()`, this method rejects * with {@link AbortedError `AbortedError`}. * * If connection gives up, this method rejects with an error that has * the latest reason for connection to fail as its `cause`. */ healthy(): Promise; } export interface ResilientWebSocketOptions { /** * Retry policy to be used when reconnecting. Can be one of built-in * policies (`exponentialBackoff()` or `indefinetely()`) or a custom one. * * @see {@link Retry} */ retry: Retry; /** * Maximum duration in milliseconds that the connection can be disrupted. * After trying to reconnect for `maxTimeoutMs`, `ResilientWebSocket` gives * up further attempts to retry. */ maxTimeoutMs: number; /** * Optional list of codes that are considered normal status codes. No * reconnection attempts are made if the connection closes with one of these * codes, and the `close` event is dispatched as usual. */ allowedCloseCodes?: number[]; } type ResilientWebSocketEventMap = ResilientWebSocketOwnEventMap & WebSocketEventMap; type ResilientWebSocketOwnEventMap = { /** * The `close` event is dispatched when a connection is closed "externally" * with one of the {@link ResilientWebSocketOptions.allowedCloseCodes allowed status codes}. * * @see {@link WebSocketEventMap.close} */ close: (code: number, reason: string) => void; /** * The `reconnect` event is dispatched when the connection is closed abruptly * and before any reconnection attempts are made. * * @param code Status code for the abrupt closure sent by the server * @param reason Reason for the abrupt closure specified by the server */ reconnect: (code: number, reason: string) => void; /** * The `gaveup` event is dispatched when no further reconnection attempts can * be made, either because the retry policy doesn't allow it, or the * {@link ResilientWebSocketOptions.maxTimeoutMs `maxTimeoutMs`} timeout has * been reached. * * @param error The latest reason for connection to fail * @param code Status code for the abrupt closure sent by the server * @param reason Reason for the abrupt closure specified by the server */ gaveup: (error: any, code: number, reason: string) => void; }; /** * Creates a new `WebSocketClient` and wraps it with the * {@link ResilientWebSocket `ResilientWebSocket`} behavior. */ export function createResilientWebSocketClient( options: WebSocketOptions & ResilientWebSocketOptions, ): ResilientWebSocket { return makeResilient(new WebSocketClient(options), options); } /** * Wraps an existing `WebSocketClient` with the * {@link ResilientWebSocket `ResilientWebSocket`} behavior. */ export function makeResilient( ws: WebSocketLike, options: ResilientWebSocketOptions, ): ResilientWebSocket { const emitter = createEventEmitter(); let openPromise: PackedPromise | null = null; ws.on("close", (code, reason) => { openPromise = null; if (options.allowedCloseCodes?.includes(code)) { emitter.dispatch("close", code, reason); return; } open().catch((err) => { if (err instanceof AbortedError) { return; } emitter.dispatch("gaveup", err, code, reason); }); emitter.dispatch("reconnect", code, reason); }); async function open(): Promise { if (openPromise) { throw new Error("Connection already open or opening"); } const promise = restore(); const packedPromise = packPromise(promise); openPromise = packedPromise; return promise.catch((err) => { if (openPromise === packedPromise) { openPromise = null; } throw err; }); } async function healthy(): Promise { if (!openPromise) { throw new Error("Connection is not open. Did you forget to await ws.open()?"); } return openPromise(); } async function close(code?: number, reason?: string): Promise { if (!openPromise) { throw new Error("Connection already closed or closing"); } openPromise = null; return ws.close(code, reason); } function on(...args: OnParameters>): () => void { if (args[0] === "close" || args[0] === "gaveup" || args[0] === "reconnect") { return emitter.on(...args); } return ws.on(...args); } function restore(reason: unknown = noError): Promise { let shouldRetry = true; let latestError: unknown = reason; let giveUpTimeout: NodeJS.Timeout | undefined; const attempt = async (): Promise => { let attempt = 0; while (shouldRetry) { try { return await ws.open(); } catch (err) { if (err instanceof AbortedError) { throw err; } latestError = err; // Order of operands for the AND operator is important here. The value // of `shouldRetry` can become false while awaiting for `retry()`. // We don't want to accidentally reset it back to true based on its // stale value captured before await. shouldRetry = (await options.retry({ attempt: ++attempt, latestError, })) && shouldRetry; } } throw new Error(`Could not open connection after ${attempt} attempt(s)`, { cause: latestError, }); }; const giveUp = (): Promise => new Promise((_, reject) => { giveUpTimeout = setTimeout(() => { reject( new Error( `Could not open connection within ${options.maxTimeoutMs}ms`, latestError !== noError ? { cause: latestError } : undefined, ), ); shouldRetry = false; ws.close(); }, options.maxTimeoutMs); }); return Promise.race( [attempt(), Number.isFinite(options.maxTimeoutMs) ? [giveUp()] : []].flat(), ).finally(() => { clearTimeout(giveUpTimeout); }); } return { open, close, send: ws.send.bind(ws), exec: ws.exec.bind(ws), on, healthy, }; }