/** * Credits where they're due: this is a custom combination/implementation of reconnecting-websocket and the Django Channels JS client! */ import {mc_log} from '../settings'; export interface RawChannelsWebsocketMessage { action: string response_status: number, data: string } export interface Options { maxReconnectionDelay?: number, minReconnectionDelay: number, reconnectionDelayGrowFactor: number, connectionTimeout?: number, maxRetries?: number, debug?: boolean, url?: string, protocols?: string | string[] } interface _Options extends Options { [key: string]: any, constructor: new(url: string, protocols?: string | string[]) => WebSocket } export interface EventListener { (event?: any): any; } export interface EventListeners { [key: string]: [EventListener, any][]; } const isWebSocket = (constructor: any) => constructor && constructor.CLOSING === 2; const isGlobalWebSocket = () => typeof WebSocket !== 'undefined' && isWebSocket(WebSocket); export class WebSocketOptions implements Options { [key: string]: any; maxReconnectionDelay = 10000; minReconnectionDelay = 1500; reconnectionDelayGrowFactor = 1.3; connectionTimeout = 4000; maxRetries: number = Infinity; constructor(public debug: boolean) { }; public static get WebSocket() { return isGlobalWebSocket() ? WebSocket : null; } } const initReconnectionDelay = (config: WebSocketOptions) => (config.minReconnectionDelay + Math.random() * config.minReconnectionDelay); const updateReconnectionDelay = (config: WebSocketOptions, previousDelay: number) => { const newDelay = previousDelay * config.reconnectionDelayGrowFactor; return (newDelay > config.maxReconnectionDelay) ? config.maxReconnectionDelay : newDelay; }; const LEVEL_0_EVENTS = ['onopen', 'onclose', 'onmessage', 'onerror']; export default class DemultiplexedReconnectingWebsocket { ws: WebSocket; connectingTimeout: number; reconnectDelay = 0; retriesCount = 0; shouldRetry = true; savedOnClose: any = null; listeners: EventListeners = {}; config: WebSocketOptions = new WebSocketOptions(false); log: Function; streams: { [key: string]: Function } = {}; // Map stream to callback for stream default_cb: Function; // Default callback url: string; protocols: string | string[]; constructor(options: WebSocketOptions) { this.url = options.url; this.protocols = options.protocols; Object.keys(this.config) .filter(key => options.hasOwnProperty(key)) .forEach(key => this.config[key] = options[key]); this.log = (...params: any[]) => mc_log('RWS:', ...params); } public get isOpen() { return typeof this.ws !== 'undefined' && this.ws.readyState === WebSocket.OPEN; } public get isClosed() { return typeof this.ws === 'undefined' || this.ws.readyState === WebSocket.CLOSED; } public get isClosing() { return typeof this.ws !== 'undefined' && this.ws.readyState === WebSocket.CLOSING; } public get isConnecting() { return typeof this.ws !== 'undefined' && this.ws.readyState === WebSocket.CONNECTING; } connect(url?: string, force = false) { this.log('Connecting..', url); if (!force && !this.shouldRetry) { return; } // Create a new WebSocket instance to wrap this.createOrOverwriteWebSocket(url ? url : this.url); this.savedOnClose = null; }; createUrl(url: string) { let _url; // Use wss:// if running on https:// const scheme = window.location.protocol === 'https:' ? 'wss' : 'ws'; const base_url = `${scheme}://${window.location.host}`; if (url === undefined) { _url = this.url; } else { // Support relative URLs if (url[0] == '/') { _url = `${base_url}${url}`; } else { _url = url; } } _url = typeof _url === 'function' ? _url() : _url; this.url = _url; // Set the url for this websocket to the one spit out. return _url } private createOrOverwriteWebSocket(url: string) { let that = this; this.url = this.createUrl(url); if (WebSocketOptions.WebSocket) { this.ws = new WebSocketOptions.WebSocket(this.url, this.protocols); } this.connectingTimeout = window.setTimeout(() => { that.log('Timeout!'); that.ws.close(); that.emitError('ETIMEDOUT', 'Connection timeout'); }, that.config.connectionTimeout); this.ws.addEventListener('open', () => { clearTimeout(this.connectingTimeout); that.log('Websocket open!'); that.reconnectDelay = initReconnectionDelay(this.config); that.log('Reconnection delay:', this.reconnectDelay); that.retriesCount = 0; }); this.ws.addEventListener('close', this.closeHandler); // // // Reassign the eventlisteners Object.keys(this.listeners).forEach(type => { this.listeners[type].forEach(([listener, options]) => { this.ws.addEventListener(type, listener, options); }); }); // because when closing with fastClose=true, it is saved and set to null to avoid double calls this.ws.onclose = this.ws.onclose || this.savedOnClose; this.ws.onmessage = function () { that._onmessage.apply(that, arguments); }; } /** * Starts listening for messages on the websocket, demultiplexing if necessary. * * @param {Function} [cb] Callback to be execute when a message * arrives. The callback will receive `action` and `stream` parameters * * @example * const webSocketBridge = new WebSocketBridge(); * webSocketBridge.connect(); * webSocketBridge.listen(function(action, stream) { * console.this.log(action, stream); * }); */ listen = (cb: Function) => { this.default_cb = cb; }; _onmessage(event: RawChannelsWebsocketMessage) { const msg = JSON.parse(event.data); let action; let stream; if (msg.stream !== undefined) { action = msg.payload; stream = msg.stream; const stream_cb = this.streams[stream]; stream_cb ? stream_cb(action, stream) : null; } else { action = msg; stream = null; this.default_cb ? this.default_cb(action, stream) : null; } }; demultiplex(stream: string, cb: Function) { this.streams[stream] = cb; }; send(data: any) { this.ws.send(JSON.stringify(data)) }; /** * Returns an object to send messages to a specific stream * * @param {String} stream The stream name * @return {Object} convenience object to send messages to `stream`. * @example * webSocketBridge.stream('mystream').send({prop1: 'value1', prop2: 'value1'}) */ stream(stream: string) { return { send: (payload: object) => { const msg = { stream, payload: payload }; this.send(msg); } } }; /** * Not using dispatchEvent, otherwise we must use a DOM Event object * Deferred because we want to handle the close event before this */ emitError = (code: string, msg: string) => setTimeout(() => { const err = new Error(msg); err.code = code; if (Array.isArray(this.listeners.error)) { this.listeners.error.forEach(([fn]) => fn(err)); } if (this.ws.onerror) { this.ws.onerror(err); } }, 0); handleClose() { this.log('Handling close.. Should retry? ', this.shouldRetry); this.retriesCount++; this.log('Retries count:', this.retriesCount); if (this.retriesCount > this.config.maxRetries) { this.emitError('EHOSTDOWN', 'Too many failed connection attempts'); return; } if (!this.reconnectDelay) { this.reconnectDelay = initReconnectionDelay(this.config); } else { this.reconnectDelay = updateReconnectionDelay(this.config, this.reconnectDelay); } if (this.shouldRetry) { this.log("Will retry after:", this.reconnectDelay); let that = this; setTimeout(function () { that.connect.apply(that, arguments); }, this.reconnectDelay); } }; get closeHandler() { let that = this; return function () { that.handleClose.apply(that); } } close(code = 1000, reason = '', {keepClosed = false, fastClose = true, delay = 0} = {}) { let that = this; this.log('close - params:', { reason, keepClosed, fastClose, delay, retriesCount: that.retriesCount, maxRetries: this.config.maxRetries }); this.shouldRetry = !keepClosed && this.retriesCount <= this.config.maxRetries; if (delay) { this.reconnectDelay = delay; } this.ws.close(code, reason); if (fastClose) { const fakeCloseEvent = { code, reason, wasClean: true, }; // execute close listeners soon with a fake closeEvent // and remove them from the WS instance so they // don't get fired on the real close. this.handleClose(); this.ws.removeEventListener('close', this.closeHandler); // run and remove level2 if (Array.isArray(this.listeners.close)) { that.listeners.close.forEach(([listener, options]) => { listener(fakeCloseEvent); that.ws.removeEventListener('close', listener, options); }); } // run and remove level0 if (this.ws.onclose) { this.savedOnClose = this.ws.onclose; this.ws.onclose(fakeCloseEvent); this.ws.onclose = function () { }; } } }; addEventListener(type: string, listener: EventListener, options?: any) { if (Array.isArray(this.listeners[type])) { if (!this.listeners[type].some(([l]) => l === listener)) { this.listeners[type].push([listener, options]); } } else { this.listeners[type] = [[listener, options]]; } this.ws.addEventListener(type, listener, options); }; removeEventListener(type: string, listener: EventListener, options?: any) { if (Array.isArray(this.listeners[type])) { this.listeners[type] = this.listeners[type].filter(([l]) => l !== listener); } this.ws.removeEventListener(type, listener, options); }; }