import splitUrlParams from 'lib/split-url-params' import ReconnectingWebSocket, { Event } from 'reconnecting-websocket' type CurrentConnectionState = | 'socket_closed' // Socket is closed. | 'socket_join_error' // Error on joining socket. This is useful when you want to restart a chat session. | 'socket_connected' // Joined socket successfully. | 'attach_channel_response' // Attached channel to system. | 'attach_channel_erred' // Error attaching channel. type ConnectionState = { connected: boolean ready: boolean currentState: CurrentConnectionState } // Subscribers set which are used to subscribe to changes in the conversation. // Needs to be outside of the ConversationConnector class so its not recreated for each instance. const subscribers = new Set() // Syncs the lifecycle of the conversation with Preact. Each subscriber will fetch the latest value from the conversation if needed. const emitChange = () => { // Call the callback function for each subscriber subscribers.forEach((callback) => callback()) } export default class ConversationConnector { #connectionListeners: ((_payload: ConnectionState) => boolean | void)[] = [] #statusInterval: ReturnType | null = null #openListener: ((this: ReconnectingWebSocket, ev: Event) => void) | null = null #messageListener = (event) => { const json = JSON.parse(event.data) if (this.#statusInterval) clearInterval(this.#statusInterval) this.#statusInterval = setInterval(() => { if (this.socket?.readyState === ReconnectingWebSocket.OPEN) { this.socket?.send(JSON.stringify({ type: 'ping' })) } }, 30000) try { switch (json.type) { case 'attach_channel_response': if (!json.payload.success) { this.#emitConnectionState({ connected: true, ready: false, currentState: 'attach_channel_erred', }) return } this.#emitConnectionState({ connected: true, ready: true, currentState: 'attach_channel_response', }) break default: break } } catch (err) { console.warn(err) } } #closeListener = () => { this.#emitConnectionState({ connected: false, ready: false, currentState: 'socket_closed', }) } #errorListener = () => { this.#emitConnectionState({ connected: false, ready: false, currentState: 'socket_join_error', }) } accessToken: string = '' channelName: string = '' channelTopic: string = '' url: string = '' socket?: ReconnectingWebSocket async connect( url: string, channelName: string, channelTopic: string, accessToken: string, ) { this.url = url this.accessToken = accessToken this.channelName = channelName this.channelTopic = channelTopic const { url: splittedUrl } = splitUrlParams(this.url) this.socket = new ReconnectingWebSocket(splittedUrl, [], { maxReconnectionDelay: 10000, minReconnectionDelay: 500, reconnectionDelayGrowFactor: 2, }) const attachChannel = { type: 'attach_channel', payload: { accessToken: this.accessToken, channelName: this.channelName, }, } this.#openListener = () => { this.socket?.send(JSON.stringify(attachChannel)) this.#emitConnectionState({ connected: true, ready: false, currentState: 'socket_connected', }) } this.socket?.addEventListener('open', this.#openListener) this.socket?.addEventListener('message', this.#messageListener) this.socket?.addEventListener('close', this.#closeListener) this.socket?.addEventListener('error', this.#errorListener) } disconnect() { this.socket?.close() if (this.#openListener) this.socket?.removeEventListener('open', this.#openListener) this.socket?.removeEventListener('message', this.#messageListener) this.socket?.removeEventListener('close', this.#closeListener) this.socket?.removeEventListener('error', this.#errorListener) this.#connectionListeners = [] } onOpen = (cb) => { this?.socket?.addEventListener('open', cb) } onError = (cb) => { this?.socket?.addEventListener('error', cb) } addListener = (type, cb) => { this?.socket?.addEventListener(type, (event: { data: string }) => cb(JSON.parse(event.data)), ) } onMessage = (cb) => { if (this?.socket) { this.socket.onmessage = (event: { data: string }) => cb(JSON.parse(event.data)) } } removeListener = (type, listener) => { this.socket?.removeEventListener(type, listener) } onConnection(cb: (_payload: ConnectionState) => boolean | void) { this.#connectionListeners.push(cb) } #emitConnectionState(payload: ConnectionState) { this.#connectionListeners = this.#connectionListeners.filter((item) => { const complete = item(payload) // If we only want to execute the callback once, remove it from the listener return !complete }) emitChange() } pushToChannel(command: string, payload: { type: string; error: unknown }) { this.socket?.send(JSON.stringify({ type: command, payload })) } // Adds a callback function to the subscribers set and returns a method to unsubscribe from the store. // This method is used to sync the state with the lifecycle of Preact. static subscribe(callback: Function) { subscribers.add(callback) // Returns a function that removes the callback from the subscribers set. return () => subscribers.delete(callback) } }