import {StreamListener} from "../core/websockets/websocketManager"; import {find, includes, map, reduce, remove, isNil, get} from "lodash"; import {HttpStatusHelper} from "../helpers/httpStatus.helper"; import {BaseHttpResource} from "./baseHttp.resource"; import {ChatMessage, MultichatMessage} from "../core/entities/message"; import {mc_log} from '../settings'; export type WebsocketMessage = { action: string, response_status: number, data: MultichatMessage } /** * A class that can be subclassed to create resources managed by both websockets and HTTP. * Can be configured to use HTTP API calls and Websockets to sync states and rawData. */ export class BaseDemultiplexedHttpResource extends BaseHttpResource { resourceIdentifyingMetadata: { [key: string]: any }; _callbacks: { [key: string]: any } = {}; _errorCallbacks: { [key: string]: any } = {}; _listeners: StreamListener[] = []; data: { [key: string]: any } = {}; stream_endpoint: string; actions = ['create', 'update', 'received']; streams: string[] = []; constructor(streams?: string[], data?: { [key: string]: any }) { super(); let that = this; streams = streams ? streams : []; this.streams = streams; streams.forEach(function (stream: string) { that._errorCallbacks[stream] = []; that._callbacks[stream] = []; }); } /** * @param stream */ getStreamListener(stream: string): StreamListener | undefined { return find(this._listeners, (listener: StreamListener) => { return listener.stream === stream }); }; getIncomingHandler() { let that = this; /** * @param {WebsocketMessage} response * @returns {boolean} - True if this resource is the one the message (response) is for */ function isForThisResource(response: WebsocketMessage): boolean { return reduce(map(that.channelIdentifyingMetadata, (v: any, k: string) => { return !isNil(response.data) && get(response, 'data.' + k, false) === v }), function (curr: boolean, v: any) { return curr && v }) === true } /** * On every message in the chat (only executed when listening for messages). * @param response * @param stream * @protected */ function _handleIncoming(response: WebsocketMessage, stream: string) { let action = response.action; if (isForThisResource(response)) { // We skip messages that are responses. We could handle 'sent', 'received' and 'read' statuses later. if (!('response_status' in response)) { if (includes(that.actions, action)) { if (action === 'create') { // Call all message callbacks that._callbacks[stream].forEach(function (cb: Function) { cb(response, stream); }); } } else { mc_log("Invalid action received for message", response); } } else { // It's a response for something if (HttpStatusHelper.isFailure(response.response_status)) { that._errorCallbacks[stream].forEach(function (cb: Function) { cb(response, stream); }); mc_log("Got bad response: ", response); } else { // Handle OK response (message has been sent) } } } } return _handleIncoming; } callStreamCallbacksForMessage(stream: string, message: ChatMessage) { this._callbacks[stream].forEach(function (cb: Function) { cb(message, stream); }); } _onUpdate(response: WebsocketMessage, stream: string) { this._callbacks[stream].forEach(function (cb: Function) { cb(response, stream); }) }; deregisterStreamListener(streamListener: StreamListener) { remove(this._listeners, streamListener); } /** * Return a function that can be used to handle the deregistration of a StreamListener. * @returns {(streamListener: ) => any} */ getDeregistrationHandler() { let that = this; function _onDeregistration(streamListener: StreamListener) { that.deregisterStreamListener(streamListener); } return _onDeregistration; } /** * Basically cache keys * @return {{chat, pk}} */ get channelIdentifyingMetadata(): { [key: string]: any } { return this.resourceIdentifyingMetadata; } /** * @param actions * @param stream * @return {Function} - Returns a closure that enables any external entity to validate whether a stream/response * combination matches this chat. Returns true when a response should be demultiplexed in the context of this * Chat. */ getResponseMatcher(actions: Array, stream: string): Function { let that = this; return function (_stream: string, response: WebsocketMessage) { if (response.action && includes(actions, response.action)) { return _stream === stream && includes(that.actions, response.action); } else { return false; } } } /** * Send rawData on a stream * @param data * @param stream */ sendDataOnStream(data: object, stream: string): void { let listener = this.getStreamListener(stream); if (listener) { listener.send('create', data); } } /** * Make sure we're listening on the passed stream. Don't create another StreamListener if we are. * @param {string} stream * @param {Array} actions */ ensureListeningOnStream(stream: string, actions: Array): void { mc_log("Trying to register", !this.isListening(stream)); if (!this.isListening(stream)) { mc_log("Creating StreamListener for stream '" + stream + "'"); this._listeners.push(new StreamListener(this.stream_endpoint, stream, actions, this.getResponseMatcher(actions, 'messages'), this.channelIdentifyingMetadata, this.getIncomingHandler(), this.getDeregistrationHandler())); } } stopListeningOnStream(stream: string): void { mc_log("Stopping StreamListener for stream '" + stream + "'"); const streamListener = this.getStreamListener(stream); this.deregisterStreamListener(streamListener); streamListener._socketManager.close(); } /** * @param stream * @returns {boolean} - true if this chat has listeners on the message stream */ isListening(stream: string): boolean { return this.getStreamListener(stream) !== undefined } /** * * @param callback - Function that gets executed on every received message * @param stream */ onStatusUpdate(callback: Function, stream: string) { let that = this; this._callbacks[stream].push(callback); return function () { remove(that._callbacks[stream], function (c: Function) { return callback === c }); }; }; /** * Execute callback on every message. * Note: Remember to remove handlers returned by this function when not needed by executing them * @param stream * @param {Function} callback - Function to execute on a message * @param {Function} [errorCallback] - Function to execute on a !201 response * @return {Function} - Closure that can be executed to deregister the callback */ onAction(stream: string, callback: Function, errorCallback?: Function) { this._callbacks[stream].push(callback); if (typeof errorCallback !== 'undefined') { this._errorCallbacks[stream].push(errorCallback); } let that = this; return function () { remove(that._callbacks[stream], function (c: Function) { return callback === c }); remove(that._errorCallbacks[stream], function (c: Function) { return errorCallback === c }); }; } }