import io, { Socket } from 'socket.io-client'; import { Observable } from 'lib0/observable'; import { logger } from '../utils'; import { SESSION_ADD_EVENT, SESSION_AUTO_CREATED, SESSION_STOPPED_EVENT, SESSION_SUBSCRIBE_EVENT, SESSION_UNSUBSCRIBE_EVENT, SOCKET_SET_USER_EVENT, REMOTE_SESSION_RECORDING_START, REMOTE_SESSION_RECORDING_STOP, SESSION_STARTED_EVENT, SESSION_SAVE_BUFFER_EVENT, } from '../config'; import { type ISession, type IUserAttributes, ATTR_MULTIPLAYER_SESSION_CLIENT_ID, } from '@multiplayer-app/session-recorder-common'; const MAX_RECONNECTION_ATTEMPTS = 2; export type SocketServiceEvents = | typeof SESSION_STOPPED_EVENT | typeof SESSION_AUTO_CREATED | typeof REMOTE_SESSION_RECORDING_START | typeof REMOTE_SESSION_RECORDING_STOP | typeof SESSION_SAVE_BUFFER_EVENT; export interface SocketServiceOptions { apiKey: string; socketUrl: string; keepAlive?: boolean; clientId?: string; } export class SocketService extends Observable { private socket: Socket | null = null; private queue: any[] = []; private isConnecting: boolean = false; private isConnected: boolean = false; private attempts: number = 0; private sessionId: string | null = null; private options: SocketServiceOptions; private isInitialized: boolean = false; private lastUserPayload: { userAttributes: IUserAttributes | null; clientId?: string; } | null = null; private lastSubscribeSession: ISession | null = null; constructor() { super(); this.options = { apiKey: '', socketUrl: '', keepAlive: false, }; } /** * Initialize the socket service * @param config - Socket service configuration */ public init(config: SocketServiceOptions): void { this.options = { ...this.options, ...config, }; this.isInitialized = true; if ( this.options.keepAlive && this.options.socketUrl && this.options.apiKey ) { this._initConnection(); } } /** * Update the socket service configuration * @param config - Partial configuration to update */ public updateConfigs(config: Partial): void { // If any config changed, reconnect if connected const hasChanges = Object.keys(config).some((key) => { const typedKey = key as keyof SocketServiceOptions; return ( config[typedKey] !== undefined && config[typedKey] !== this.options[typedKey] ); }); if (hasChanges) { this.options = { ...this.options, ...config }; if (this.socket?.connected) { this.close().then(() => { if ( this.options.keepAlive && this.options.socketUrl && this.options.apiKey ) { this._initConnection(); } }); } } } private _initConnection(): void { if (this.isConnecting || this.isConnected || !this.isInitialized) return; this.attempts++; this.isConnecting = true; this.socket = io(this.options.socketUrl, { path: '/v0/radar/ws', auth: { 'x-api-key': this.options.apiKey, ...(this.options.clientId ? { [ATTR_MULTIPLAYER_SESSION_CLIENT_ID]: this.options.clientId } : {}), }, reconnectionAttempts: 2, transports: ['websocket'], }); this.socket.on('ready', () => { this.isConnecting = false; this.isConnected = true; logger.info('SocketService', 'Connected to server'); // Re-establish identity and session on every (re)connect: socket.io's auto-reconnect // fires 'ready' again after a drop, but the server has no memory of the previous // setUser/subscribe calls — the client must replay them. if (this.lastUserPayload && this.socket) { this.socket.emit(SOCKET_SET_USER_EVENT, this.lastUserPayload); } if (this.lastSubscribeSession) { this._emitSubscribe(this.lastSubscribeSession); } this.flushQueue(); }); this.socket.on('disconnect', (_err: any) => { this.isConnecting = false; this.isConnected = false; logger.info('SocketService', 'Disconnected from server'); }); this.socket.on('connect_error', (err: any) => { this.isConnecting = false; this.isConnected = false; this.checkReconnectionAttempts(); logger.error('SocketService', 'Error connecting to server', err); }); this.socket.on(SESSION_STOPPED_EVENT, (data: any) => { this.emit(SESSION_STOPPED_EVENT, [data]); }); this.socket.on(SESSION_AUTO_CREATED, (data: any) => { this.emit(SESSION_AUTO_CREATED, [data]); }); this.socket.on(REMOTE_SESSION_RECORDING_START, (data: any) => { this.emit(REMOTE_SESSION_RECORDING_START, [data]); }); this.socket.on(REMOTE_SESSION_RECORDING_STOP, (data: any) => { this.emit(REMOTE_SESSION_RECORDING_STOP, [data]); }); this.socket.on(SESSION_SAVE_BUFFER_EVENT, (data: any) => { this.emit(SESSION_SAVE_BUFFER_EVENT, [data]); }); } private checkReconnectionAttempts(): void { if (this.attempts >= MAX_RECONNECTION_ATTEMPTS) { this.flushQueue(); } } private emitSocketEvent(name: string, data: any): void { if (this.socket && this.isConnected) { this.socket.emit(name, data); } else { this.queue.push({ data, name }); this._initConnection(); } } private flushQueue(): void { while (this.queue.length > 0 && this.isConnected) { const event = this.queue.shift(); if (!event) continue; if (this.socket && this.isConnected) { this.socket.emit(event.name, event.data); } } } public send(event: any): void { this.emitSocketEvent(SESSION_ADD_EVENT, event); } public subscribeToSession(session: ISession): void { // Remember the session so reconnects replay subscribe + started. this.lastSubscribeSession = session; this._emitSubscribe(session); } private _emitSubscribe(session: ISession): void { this.sessionId = session.shortId || session._id; const subscribePayload = { projectId: session.project, workspaceId: session.workspace, debugSessionId: this.sessionId, sessionType: session.creationType, }; const startedPayload = { debugSessionId: session._id }; // Send directly (not via the queue). The queue would cause a duplicate emit on // the next 'ready' alongside the replay, since 'ready' replays lastSubscribeSession. if (this.socket && this.isConnected) { this.socket.emit(SESSION_SUBSCRIBE_EVENT, subscribePayload); this.socket.emit(SESSION_STARTED_EVENT, startedPayload); } else { // Not connected: 'ready' will replay via lastSubscribeSession once the socket is up. this._initConnection(); } } public unsubscribeFromSession(stopSession?: boolean) { if (this.sessionId) { this.emitSocketEvent(SESSION_UNSUBSCRIBE_EVENT, { debugSessionId: this.sessionId, }); if (stopSession) { this.emitSocketEvent(SESSION_STOPPED_EVENT, {}); } } this.lastSubscribeSession = null; } public setUser(data: { userAttributes: IUserAttributes | null; clientId?: string; }): void { // Remember the last identity so 'ready' (initial connect + every reconnect) can replay it. // Send directly here rather than queuing: the queue would cause a duplicate emit on the // next 'ready' alongside the replay. this.lastUserPayload = data; if (this.socket && this.isConnected) { this.socket.emit(SOCKET_SET_USER_EVENT, data); } else { // Not connected yet: 'ready' will replay lastUserPayload once the socket is up. this._initConnection(); } } public close(): Promise { return new Promise((resolve) => { if (this.socket?.connected) { setTimeout(() => { this.unsubscribeFromSession(); this.attempts = 0; this.isConnected = false; this.isConnecting = false; this.socket?.disconnect(); this.socket = null; resolve(); }, 500); } else { resolve(); } }); } }