import { AbortedError, WebSocketClosedError } from "./errors.ts"; import { createEventEmitter, type On, type OnParametersFor } from "./events.ts"; import { jsonFormat } from "./formats/json.ts"; import { execProcedure, type Procedure } from "./procedure.ts"; import { type MessageFormat, recv, send } from "./sendrecv.ts"; import { closeTag, type TaggedWebSocket } from "./taggedws.ts"; import { waitForEvents } from "./wait.ts"; /** * Interface implemented by the WebSocketClient itself and also by the behaviors * that wrap it. As long as the wrapper accepts and returns a type implementing * `WebSocketLike`, it should be compatible with other wrappers. * * @param TSend - Outgoing message payload type * @param TRecv - Incoming message payload type (defaults to `TSend`) * @param TConn - Connection type (the return type of the * {@link WebSocketLike.open `open()`} method) */ export interface WebSocketLike { open(): Promise; close(code?: number, reason?: string): Promise; send(message: TSend): void; exec( procedure: Procedure, options?: WebSocketProcedureOptions, ): Promise; on: On>; } export interface WebSocketOptions { /** * URL of the WebSocket endpoint. Usually starts with `ws://` or `wss://`. */ url: string; /** * The {@link MessageFormat} object providing methods to serialize (format) * and deserialize (parse) message payloads. */ format?: MessageFormat; } export type WebSocketEventMap = { /** * The `message` event is dispatched when a message is received * and successfully parsed on an opened WebSocket connection. * * This event can be suppressed in two cases: * 1. parser returned `recvIgnore()`, * 2. a {@link WebSocketClient.exec procedure} is running with * `suppressMessageEvents` option enabled. * * @param message - Parsed message payload */ message: (message: TRecv) => void; /** * The `close` event is dispatched when a connection is closed "externally" - * either by the server, or abruptly (because the server went away, because * the client went offline, etc). Importantly, this event is not dispatched * if the connection was closed explicitly from the client by calling * {@link WebSocketClient.close ws.close()}. * * @param code Status code sent by the server * @param reason Reason for closure specified by the server */ close: (code: number, reason: string) => void; /** * The `recverror` event is dispatched when parser throws an error. */ recverror: (error: any) => void; }; export interface WebSocketProcedureOptions { /** * When set to true, `message` events are not dispatched as long as * the procedure is running. This is useful when incoming messages are handled * by the procedure itself. * * @default false */ suppressMessageEvents?: boolean; } export class WebSocketClient implements WebSocketLike { private ws: TaggedWebSocket | null = null; private emitter = createEventEmitter>(); private url: string; private format: MessageFormat; private suppressMessageDepth = 0; constructor(options: WebSocketOptions) { this.url = options.url; this.format = options.format ?? jsonFormat(); } /** * Connect to the WebSocket endpoint. Once resolved, the underlying connection * is guaranteed to be in `WebSocket.OPEN` ready state. Always await for the * `ws.open()` promise to resolve before sending messages or executing * procedures. * * If connection is cancelled by calling `ws.close()` before it's opened, * this method rejects with {@link AbortedError `AbortedError`}. * * If the connection is abruptly closed before it's opened, this method * rejects with {@link WebSocketClosedError `WebSocketClosedError`} which may * contain status code and closure reason. * * ```ts * const ws = new WebSocketClient({ url: 'wss://example.com' }); * try { * await ws.open(); * } catch (err) { * if (err instanceof AbortedError) { * console.log("Connection was cancelled"); * } else if (err instance of WebSocketClosedError) { * console.log("Connection closed abrubtly", err.code, err.reason); * } * } * ``` */ async open(): Promise { if (this.ws) { throw new Error("Connection already open or opening"); } const ws: TaggedWebSocket = new WebSocket(this.url); this.ws = ws; const eventType = await waitForEvents(ws, ["open", "error"]); if (eventType === "error") { if (this.ws === ws) { this.ws = null; } if (ws[closeTag]) { throw new AbortedError("Connection aborted"); } const closeEvent = await new Promise((resolve) => { const handleClose = (event: CloseEvent) => resolve(event); ws.addEventListener("close", handleClose); setTimeout(() => { ws.removeEventListener("close", handleClose); resolve(undefined); }, 0); }); throw new WebSocketClosedError( "Connection was closed before it was established", closeEvent, ); } this.listen(ws); } /** * Closes current WebSocket connection (if there is one). Calling this method * does not cause the `close` event to be dispatched; instead, await it to be * sure that the connection is closed. * * Calling this method aborts any currently running operation: * 1. Any unresolved `ws.open()` promise will reject with * {@link AbortedError `AbortedError`}. * 2. Any unfinished procedure will reject with * {@link AbortedError `AbortedError`}. * * It's not necessary to await the `ws.close()` promise. Immediately after * this method is called, a new connection can be opened by calling * {@link WebSocketClient.open `ws.open()`}. * * @param code Optional [status code](https://www.rfc-editor.org/rfc/rfc6455.html#section-7.4.1). * Only use codes within the 3000-4999 range, and only use codes within the * 4000-4999 range for application-specific statuses. * @param reason Optional closure reason */ async close(code?: number, reason?: string): Promise { if (!this.ws) { return; } const ws = this.ws; ws[closeTag] = true; this.ws = null; ws.close(code, reason); await waitForEvents(ws, ["close", "error"]); } /** * Sends a message using the current WebSocket connection. Message payload * will be serialized using the provided {@link WebSocketOptions.format format} * before sending. */ send(message: TSend): void { this.assertOpen(this.ws); send(message, { ws: this.ws, format: this.format }); } /** * Executes a procedure. Procedure is a series of commands to be executed * and conditions to be checked on top of the current WebSocket connection. * This allows writing code in a more straight-forward and linear manner * instead of relying on event listeners. * * Procedure is [a generator](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/function*). * It receives an object with commands as its argument. It can yield commands, * and the yielded command will be executed using current WebSocket * connection. * * ```ts * await ws.exec(function* ({ send, recv, expect }) { * yield send('ping'); * const message = yield recv(); * if (message === 'pong') { * yield expect(m => m === 'ok'); * } * }) * ``` * * Supported commands are: * - `yield send(message)` Sends message payload, same as calling * {@link WebSocketClient.send `ws.send(message)`} * - `yield recv()` Returns the next incoming message * - `yield expect(predicate)` Checks the next incoming message against * predicate. If predicate succeeds, returns the message. If predicate fails, * `expect` throws {@link UnexpectedMessageError `UnexpectedMessageError`} * which can be handled by the procedure. * * For example, this procedure will echo incoming messages back to the server, * until it receives the message saying "stop": * * ```ts * await ws.exec(function* ({ send, recv, expect }) { * while (true) { * const message = yield recv(); * if (message === 'stop') { * break; * } * yield send(message); * } * }) * ``` * * If connection is cancelled by calling `ws.close()` while a procedure is * running, this method rejects with {@link AbortedError `AbortedError`}. * * If the connection is abruptly closed while a procedure is running, this * method rejects with {@link WebSocketClosedError `WebSocketClosedError`} * containing status code and closure reason. * * If procedure throws, this method rejects with the thrown error. * * @returns Promise resolving with the value returned by the procedure */ async exec( procedure: Procedure, { suppressMessageEvents = false }: WebSocketProcedureOptions = {}, ): Promise { this.assertOpen(this.ws); try { if (suppressMessageEvents) { this.suppressMessageDepth++; } return await execProcedure({ ws: this.ws, procedure, format: this.format, }); } finally { if (suppressMessageEvents) { this.suppressMessageDepth--; } } } /** * Sets up event listener and returns a function that unsubscribes the event * listener when called. * * All event listeners set up for the event are guaranteed to run, even if * some of them throw an error. In browser enivronment, an `error` event will * then be dispatched on the `window` object. * * @see {@link WebSocketEventMap} */ on>( ...args: OnParametersFor, E> ): () => void { return this.emitter.on(...args); } private assertOpen(ws: WebSocket | null): asserts ws is WebSocket { if (ws?.readyState !== WebSocket.OPEN) { throw new Error("Connection is not open. Did you forget to await ws.open()?"); } } private listen(ws: WebSocket) { ws.addEventListener("message", (event) => { if (this.ws === ws) { const result = recv(event, { format: this.format }); switch (result.type) { case "message": if (this.suppressMessageDepth === 0) { this.emitter.dispatch("message", result.message); } break; case "error": this.emitter.dispatch("recverror", result.error); break; } } }); ws.addEventListener("close", (event) => { if (this.ws === ws) { this.ws = null; this.emitter.dispatch("close", event.code, event.reason); } }); } }