import * as _ from "lodash"; import DemultiplexedReconnectingWebsocket, {WebSocketOptions} from "../../helpers/reconnectingWebsocket"; import {WebsocketMessage} from "../../resources/baseDemultiplexedHttp.resource"; import {mc_log, MultichatCoreClientSettings} from '../../settings'; let websocketManagerInstances: { [key: string]: WebsocketManager } = {}; // Singletons - track an object with endpoints as keys export type WebsocketManagerSettings = { DEBUG: boolean, WSS: boolean; WS_DOMAIN: string; WS_PORT: string; WS_ENDPOINT: string; }; const WebsocketManagerConfiguration: WebsocketManagerSettings = MultichatCoreClientSettings; /** * Used to manage streams exposed using the Django Channels swebsocketBridge demultiplexing functionality. * But it also integrates with the events that Django-Channels API provides. * @constructor */ export class WebsocketManager { url: string; base_url: string; private _endpoint: string; _webSocketBridge: DemultiplexedReconnectingWebsocket; _listeners: Array = []; /** * @return WebsocketManager instance */ constructor(endpoint: string, settings?: WebsocketManagerSettings) { this._endpoint = endpoint; if (!(endpoint in websocketManagerInstances)) { // Only if WebsocketManager not exists already if (typeof settings === 'undefined') settings = WebsocketManagerConfiguration; let debug = settings.DEBUG; let wsSecure = settings.WSS; this.base_url = (!wsSecure ? 'ws' : 'wss') + '://' + settings.WS_DOMAIN + ':' + settings.WS_PORT + settings.WS_ENDPOINT; this.url = (this.base_url + endpoint); let wsOptions = new WebSocketOptions(debug); // Connect this._webSocketBridge = new DemultiplexedReconnectingWebsocket(wsOptions); this.connect(); } return websocketManagerInstances[endpoint]; } /** * Connect the wrapped webSocketBridge */ private connect() { const interval; const _doConnect = () => { if (this._webSocketBridge.isClosed) { this._webSocketBridge.connect(this.url, true); websocketManagerInstances[this._endpoint] = this; // Add it to instance map to support singleton pattern const that = this; this._webSocketBridge.addEventListener('open', () => { this._listeners.forEach(function (listener) { listener.listen(); }); }); } if (typeof interval !== 'undefined' && (this._webSocketBridge.isClosed || this._webSocketBridge.isOpen)) { clearInterval(interval); // Stop trying to connect } }; if (this._webSocketBridge.isConnecting || this._webSocketBridge.isClosing) { interval = setInterval(_doConnect, 200); } else { _doConnect(); } } getUrl() { return this.url } /** * Find listeners based on stream and message * @param {string} stream - The stream to look for * @param {message} [message] - A message received on the Websocket, can be used to choose a StreamListener * @return {Array} - The StreamListeners that are valid for the (stream, message) combination * @private */ _findListeners(stream: string, message?: WebsocketMessage) { if (stream !== undefined && message === undefined) { return _.filter(this._listeners, function (listener: StreamListener) { return listener.stream === stream; }) } else { return _.filter(this._listeners, function (listener: StreamListener) { try { return listener.validator(stream, message); } catch (e) { console.error("Unable to call validator function on listener", listener, "Error was:", e); } }) } } /** * Demultiplex on events, not only on stream. * @private * @param demultiplexedStream */ _demultiplex(demultiplexedStream: string) { let that = this; this._webSocketBridge.demultiplex(demultiplexedStream, function (response: WebsocketMessage, stream: string) { let listeners = that._findListeners(stream, response); mc_log("Found " + listeners.length + " registered listeners for message:", response); _.forEach(listeners, function (listener: StreamListener) { listener.processResponse(response, stream) }); }); } /** * Get a stream object * @param stream * @returns A stream */ stream(stream: string) { return this._webSocketBridge.stream(stream) } /** * Send rawData on the websocket * @param stream * @param data */ send(stream: string, data: object) { let that = this; if (this._webSocketBridge.isOpen) { this.stream(stream).send(data); } else { let sendmsg = function () { that.stream(stream).send(data); mc_log("Sending message onOpen callback", data); that._webSocketBridge.removeEventListener('open', sendmsg); mc_log("Removed eventlistener", sendmsg); }; this._webSocketBridge.addEventListener('open', sendmsg) } } close() { // Close with default code 1000 (normal) this._listeners = []; this._webSocketBridge.close(1000, 'From client.', {keepClosed: true, fastClose: true, delay: 0}); console.log(this._listeners); mc_log("Closed WS Manager: ", this); } /** * @param {StreamListener} streamListener * @return {StreamListener} */ registerStreamListener(streamListener: StreamListener) { this.connect(); let existing = _.find(this._listeners, function (listener: StreamListener) { return streamListener.isEqual(listener); }); mc_log("Looking for StreamListener.. Result:", existing); if (existing) { existing.deregister(); } // Start demultiplexing this stream if there is no active listener on it if (this._findListeners(streamListener.stream).length === 0) { this._demultiplex(streamListener.stream); } this._listeners.push(streamListener); mc_log("Registered new StreamListener, current active stream listeners:", this._listeners); return streamListener }; deregisterStreamListener(streamListener: StreamListener) { _.remove(this._listeners, streamListener); mc_log("Removing StreamListener:", streamListener); } } /** * A StreamListener can listen to several events (e.g. ['update', 'create']), on one single resource under a specific * stream. * * For example, you can listen on the 'chats' stream to a chat identified by {id: 1}, on the 'update' and 'create' * events. A second one: you can listen on the 'messages' stream to a chat identified by {chat: 1} on the 'create' * event. * * @constructor */ export class StreamListener { stream: string; events: Array; validator: Function; metaData: object; callback: Function; _socketManager: WebsocketManager; _onDeregistration: Function; /** * @param {String} [endpoint] - The endpoint to listen on * @param {String} stream - The stream to listen on * @param {Array} events - The events to listen on (e.g. 'create' or 'update'), within the specified stream * @param {function} validator - A function that accepts stream and a message, returns a boolean indicating whether * the StreamListener callback should be called * @param metaData - Metadata identifying a StreamListener, can be used to compare StreamListeners * @param {function} callback - The callback to execute when validator(stream, response) returns true * @param {function} [onDeregistration] - Handle deregistration on the WebsocketManager, can be used to track/remove * listeners in other contexts too */ constructor(endpoint: string, stream: string, events: Array, validator: Function, metaData: object, callback: Function, onDeregistration: Function) { this.stream = stream; this.events = events; this.validator = validator; this.metaData = metaData; this.callback = callback; // TODO -> only run if current state is open? this._socketManager = new WebsocketManager(endpoint ? endpoint : ''); this._onDeregistration = onDeregistration ? onDeregistration : function () { }; this._register(); this.listen(); } /** * Process a response */ processResponse(response: WebsocketMessage, stream: string) { mc_log("Processing response for stream ", this.stream, response, this.callback); this.callback(response, stream); } /** * Function that validates whether a given StreamListener is equal to this one. * @param streamListener * @return {boolean} */ isEqual(streamListener: StreamListener) { return _.isEqual(streamListener.metaData, this.metaData) && _.isEqual(streamListener.events, this.events) && streamListener.stream === this.stream; } /** * Subscribe to 'event' using the metaData passed to the StreamListener constructor to identify the resource to * subscribe to. */ listen() { let that = this; this.events.forEach((event) => { that._socketManager.send(that.stream, { action: "subscribe", data: _.extend({action: event}, that.metaData) }); }); } /** * Remove the StreamListener from the SocketManager, and handle deregistration. */ deregister() { this._socketManager.deregisterStreamListener(this); this._onDeregistration(this); } /** * Register this StreamListener on the WebsocketManager * @private */ _register() { this._socketManager.registerStreamListener(this); } /** * Send rawData to the WebsocketManager * @param action string - The action (e.g. 'create', 'update') * @param rawData object - The of the resource to create. Can contain identifying rawData. * @param useMetadata [{}] - Whether to use the available metadata */ send(action: string, rawData: object, useMetadata?: boolean) { let preparedData = { action: action, data: useMetadata === false ? rawData : _.merge(_.merge({}, this.metaData), rawData) }; this._socketManager.send(this.stream, preparedData); }; }