import { AbortedError, WebSocketClosedError } from "../errors.ts"; import { createEventEmitter, type OnParameters } from "../events.ts"; import type { Procedure } from "../procedure.ts"; import { WebSocketClient, type WebSocketEventMap, type WebSocketLike, type WebSocketOptions, } from "../ws.ts"; export interface HealthCheckedWebSocketOptions { /** * Interval between health checks, in milliseconds. The countdown starts when * the previous check succeeds, so the actual time between two checks * may be longer than this interval. */ checkEveryMs: number; /** * Timeout in milliseconds before a running health check is considered failed. */ timeoutMs: number; /** * When connection is closed because the health check has failed, these arguments * are used as status code and reason for closure. */ closeArgs: [code: number, reason: string]; /** * Optional {@link Timers timers} implementation. Useful for using alternative timer * implementations (e.g. running timers from a Web Worker to reduce throttling). */ timers?: Timers; } export interface Timers { setTimeout(handler: () => void, timeout?: number): number; clearTimeout(id: number | undefined): void; } type HealthCheckedWebSocketEventMap = HealthCheckedWebSocketOwnEventMap & WebSocketEventMap; type HealthCheckedWebSocketOwnEventMap = { close: (code: number, reason: string) => void; }; /** * Creates a `WebSocketClient` and wraps it with * {@link makeHealthchecked `makeHealthchecked`}. */ export const createHealthCheckedWebSocketClient = ( options: WebSocketOptions & HealthCheckedWebSocketOptions, ): ((healthcheck: Procedure) => WebSocketLike) => makeHealthChecked(new WebSocketClient(options), options); /** * Wraps the provided `WebSocketClient` and adds a health check. Health check * is a {@link WebSocketClient.exec procedure} that runs periodically. If this * procedure completes successfully, the health check is considered passed. If * it fails, health check is considered failed, the `close` event is dispatched * and the connection is closed. */ export const makeHealthChecked = ( ws: WebSocketLike, options: HealthCheckedWebSocketOptions, ) => (healthcheck: Procedure): WebSocketLike => { const emitter = createEventEmitter>(); const { setTimeout, clearTimeout } = options.timers ?? browserTimers; let scheduledCheck: number | null = null; ws.on("close", (code, reason) => { stop(); emitter.dispatch("close", code, reason); }); async function open(): Promise { const res = await ws.open(); schedule(); return res; } function close(code?: number, reason?: string): Promise { stop(); return ws.close(code, reason); } function on( ...args: OnParameters> ): () => void { if (args[0] === "close") { return emitter.on(...args); } return ws.on(...args); } function schedule() { scheduledCheck = setTimeout(check, options.checkEveryMs); } async function check() { let timeout: number | undefined; try { if (Number.isFinite(options.timeoutMs)) { timeout = setTimeout(fail, options.timeoutMs); } await ws.exec(healthcheck); schedule(); } catch (err) { if (!(err instanceof AbortedError) && !(err instanceof WebSocketClosedError)) { fail(); } } clearTimeout(timeout); } async function fail() { const closeArgs = [3008, "health check failed"] as const; await ws.close(...closeArgs); emitter.dispatch("close", ...closeArgs); } function stop() { if (scheduledCheck) { clearTimeout(scheduledCheck); scheduledCheck = null; } } return { open, close, send: ws.send.bind(ws), exec: ws.exec.bind(ws), on, }; }; const browserTimers: Timers = { setTimeout, clearTimeout, };