import type { IncomingMessage as IncomingHttpMessage } from 'http'; import { deserialize } from 'bson'; import type { RawData, WebSocket } from 'ws'; import { set } from '../../messages/utils/set'; import type { IncomingMessage, OutgoingMessage } from '../../messages'; import { Connection } from './Connection'; import { BinaryMessage } from '../../messages'; import { BinaryReadableStream } from '../BinaryReadableStream'; export class WebSocketConnection extends Connection { constructor(public readonly ws: WebSocket, public readonly request: IncomingHttpMessage) { super(); this.headers = request.headers; ws.on('close', () => this.onClose()); ws.on('message', async (m, isBinary) => { try { if (isBinary) { await this.onBinaryMessage( this.deserializeBinaryMessage(m as Buffer), ); } else { await this.onMessage( this.createIncomingMessageObject(m), ); } } catch (e) { console.error(e); } }); } public async reply(message: OutgoingMessage): Promise { this.ws.send(JSON.stringify(message)); } public async send(message: OutgoingMessage): Promise { this.ws.send(JSON.stringify(message)); } public getSocket(): WebSocket { return this.ws; } public async close(data?: string): Promise { return this.ws.close(data ? 3400 : undefined, data); } protected deserializeBinaryMessage(data: Buffer): BinaryMessage { const message = deserialize(data) as BinaryMessage; if (message && message.transactionId && message.path && message.hasOwnProperty('chunk')) { return message; } throw new Error(`Invalid binary message: ${data}`); } protected createIncomingMessageObject(msg: RawData): IncomingMessage { const m = JSON.parse(msg.toString()); this.addBinaryStream(m); return m as any; } protected addBinaryStream(message: IncomingMessage) { const { messageObjectInfo } = message.meta || {}; if (messageObjectInfo && messageObjectInfo.binarySize > 0) { messageObjectInfo.binaryData.forEach((item) => { item.data = new BinaryReadableStream() as any; set(message, item.path, item.data); }); } } }