import type { Command, CommandMethods, CommandResponse, Envelope, Message, MessageTypes, Notification, UnknownCommandResponse, } from '../../types/index.ts' import { logger } from '../../utils/logger.ts' import { BlipError } from '../bliperror.ts' import { RetryableError } from '../retryableerror.ts' import { ConnectionSender, type ConnectionSenderConstructor, OpenConnectionSender } from '../sender.ts' import { SessionNegotiator } from '../sessionnegotiator.ts' import { EnvelopeThrottler } from '../throttler.ts' export class WebSocketSender extends OpenConnectionSender { private readonly throttler = new EnvelopeThrottler() private readonly connectionHandle: WebSocketHandle constructor(options: ConstructorParameters[0]) { super(options) const prefix = options.tenantId ? `${options.tenantId}.` : '' this.connectionHandle = new WebSocketHandle( `wss://${prefix}ws.${this.domain}`, (webSocket) => { this.sessionNegotiator = new SessionNegotiator(this, (session) => { webSocket.send(JSON.stringify(session)) }) return this.sessionNegotiator.negotiate({ node: options.node, authentication: options.authentication, }) }, () => this.envelopeResolver.rejectPendingEnvelopes('Connection was closed'), (envelope: Envelope) => { if (this.sessionNegotiator?.negotiating) { return this.sessionNegotiator.handleEnvelope(envelope) } this.envelopeResolver.resolve(envelope) }, ) } public async sendMessage(message: Message): Promise { await this.throttler.throttle('message') const webSocket = await this.connectionHandle.get() await this.sessionNegotiator?.ensurePresence() webSocket.send(JSON.stringify(message)) } public sendCommand(command: Command): Promise { return this.withRetryPolicy(async () => { await this.throttler.throttle('command') const envelopeResponsePromise = this.envelopeResolver.createEnvelopeResponsePromise(command.id) const webSocket = await this.connectionHandle.get() await this.sessionNegotiator?.ensurePresence(command.uri) webSocket.send(JSON.stringify(command)) const response = (await envelopeResponsePromise) as CommandResponse< unknown, CommandMethods, 'success' | 'failure' > if (BlipError.isFailedCommandResponse(response)) { throw BlipError.commandResponseToBlipError(command.uri, response) } else if (response.status === 'success') { return response.resource } else { throw new Error(`Unexpected response for command '${command.uri}': ${JSON.stringify(response)}`) } }) } public async sendNotification(notification: Notification): Promise { const webSocket = await this.connectionHandle.get() webSocket.send(JSON.stringify(notification)) } public async sendCommandResponse(response: UnknownCommandResponse): Promise { const webSocket = await this.connectionHandle.get() webSocket.send(JSON.stringify(response)) } public async isConnected(): Promise { const webSocket = await this.connectionHandle.get() return webSocket.readyState === WebSocket.OPEN } public async close() { this.sessionNegotiator?.finish() await this.connectionHandle.close() await super.close() } public static login = ConnectionSender.login private async withRetryPolicy(fn: () => Promise, retries = 5): Promise { try { return await fn() } catch (err) { logger.warn('WebSocketSender', `Error while sending command, ${retries} retries left`, err) if (retries > 0 && RetryableError.isRetryable(err)) { // wait before retrying await new Promise((resolve) => setTimeout(resolve, Math.random() * 1000)) return this.withRetryPolicy(fn, retries - 1) } throw err } } } class WebSocketHandle { private currentWebSocketPromise: Promise | null = null private closing = false private connectionAttempts = 0 constructor( url: string, onConnected: (webSocket: WebSocket) => Promise, onClose: () => void, onMessage: (message: T) => void, ) { this.currentWebSocketPromise = this.connect(url, onConnected, onClose, onMessage) } public get() { return this.currentWebSocketPromise! } public async close() { if (!this.closing && this.currentWebSocketPromise !== null) { this.closing = true const current = await this.currentWebSocketPromise current.close() } } private async connect( url: string, onConnected: (webSocket: WebSocket) => Promise, onClose: () => void, onMessage: (message: T) => void, ) { const connection = new WebSocket(url, 'lime') await new Promise((resolve, reject) => { connection.onopen = () => { resolve() } connection.onclose = () => { if (!this.closing) { this.connectionAttempts++ if (this.connectionAttempts < 3) { this.currentWebSocketPromise = this.connect(url, onConnected, onClose, onMessage) } else { reject(new Error('Failed to connect to WebSocket')) } onClose() } } connection.onmessage = (event) => { onMessage(JSON.parse(event.data)) } connection.onerror = (err) => { reject('message' in err ? new Error(`WebSocket error: ${err.message}`, { cause: err }) : err) } }) await onConnected(connection) this.connectionAttempts = 0 return connection } }