'use strict'; //import Element from "ltx"; import {NameSpacesLabels} from "../../connection/XMPPService"; import {DataStoreType} from "../../config/config"; import {Deferred, getJsonFromXML, getStoreStanzaValue, stackTrace} from "../Utils"; //import {Element} from "adaptive-expressions/lib/builtinFunctions"; import {XMPPUTils} from "../XMPPUtils"; const xmppUtils = XMPPUTils.getXMPPUtils(); export {}; /* const Client = require('./lib/Client') const {xml, jid} = require('@xmpp/client-core') module.exports.Client = Client module.exports.xml = xml module.exports.jid = jid */ //let client = require("@xmpp/client").client; let client = require("./XmppClientWrapper").client; let XmppQueue = require("./XmppQueue"); let utils = require("../Utils"); const _sasl = require('@xmpp/sasl'); const _middleware = require('@xmpp/middleware'); const _streamFeatures = require('@xmpp/stream-features'); const plain = require('@xmpp/sasl-plain'); const xml = require("@xmpp/xml"); //const debug = require("@xmpp/debug"); // @ts-ignore const Element = require('ltx').Element; const parse = require('ltx').parse; let LOG_ID='XMPPCLIENT - '; class XmppClient { public options: any; public xmppOptions: any; public eventEmitter: any; public restartConnectEnabled: any; public client: any; public iqGetEventWaiting: any; // public onIqErrorReceived: any; // public onIqResultReceived: any; public logger: any; public xmppQueue: any; public timeBetweenXmppRequests: any; public maxPendingAsyncLockXmppQueue: any; public username: any; public password: any; socketClosed: boolean = false; storeMessages: any; enablesendurgentpushmessages: any; copyMessage: any = true; rateLimitPerHour: any; private nbMessagesSentThisHour: number; lastTimeReset: Date; timeBetweenReset: number; messagesDataStore: DataStoreType; //private iqSetEventRoster: any; //private iqSetEventHttp: any; public socket = undefined; public pendingRequests: Array<{ id: string, prom: Deferred }> = []; public interval = undefined; constructor(...args) { //super(...args); const {username, password} = args[0]; let that = this; this.options = [...args]; this.restartConnectEnabled = true; this.iqGetEventWaiting = {}; this.client = client(...args); //debug(this.client, true); this.socket = client.socket; this.nbMessagesSentThisHour = 0; this.timeBetweenReset = 1000 * 60 * 60; // */ //process.on('unhandledRejection', e => { console.log("(unhandledRejection) !!! CATCH Error e : ", e, ", stack : ", stackTrace()); throw e; }); process.on('unhandledRejection', e => { that.logger.log("error", LOG_ID + "(unhandledRejection) !!! CATCH Error e : ", e, ", stack : ", stackTrace()); //console.log("(unhandledRejection) !!! CATCH Error e : ", e, ", stack : ", stackTrace()); }); } fn_pong() { let that = this; that.logger.log("debug", LOG_ID + "(on) pong - pong response received!"); //console.log("Réponse pong reçue !"); } async init(_logger, _eventemitter, _timeBetweenXmppRequests, _storeMessages, _rateLimitPerHour, _messagesDataStore, _copyMessage, _enablesendurgentpushmessages, _maxPendingAsyncLockXmppQueue) { let that = this; that.client.getQuery('urn:xmpp:ping', 'ping', that.iqGetEventPing.bind(that)); that.client.setQuery('jabber:iq:roster', 'query', that.iqSetEventRoster.bind(that)); that.client.setQuery('urn:xmpp:http', 'req', that.iqSetEventHttp.bind(that)); that.client.setQuery('jabber:iq:rpc', 'query', that.iqSetEventRpc.bind(that)); that.client.getQuery('urn:xmpp:calendar:0', 'events', that.iqGetEventEventsCalendar.bind(that)); that.client.getQuery('urn:xmpp:calendar:0', 'autoreply', that.iqGetEventAutoreplyCalendar.bind(that)); that.logger = _logger; that.eventEmitter = _eventemitter; that.timeBetweenXmppRequests = _timeBetweenXmppRequests ? _timeBetweenXmppRequests:20; that.maxPendingAsyncLockXmppQueue = _maxPendingAsyncLockXmppQueue ? _maxPendingAsyncLockXmppQueue:20; that.xmppQueue = XmppQueue.getXmppQueue(_logger, that.timeBetweenXmppRequests, that.maxPendingAsyncLockXmppQueue); that.storeMessages = _storeMessages; that.rateLimitPerHour = _rateLimitPerHour; that.messagesDataStore = _messagesDataStore; that.lastTimeReset = new Date(); that.copyMessage = _copyMessage; that.enablesendurgentpushmessages = _enablesendurgentpushmessages; that.on('open', () => { that.logger.log("debug", LOG_ID + "(event) open"); that.socketClosed = false; let ws = that.client?.socket?.socket?that.client?.socket?.socket:{}; // Le Server send ping every 10 seconds. if (that.interval) { clearInterval(that.interval); ws.removeAllListeners("pong"); } that.interval = setInterval(() => { //that.logger.log("debug", LOG_ID + "(sendWebsocketPing) ping - Will send ping!"); ws.ping(); }, 10000); ws.on("pong", that.fn_pong.bind(that)); ws.on("close", () => { that.logger.log("warn", LOG_ID + "(close) close - received!"); if (that.interval) { clearInterval(that.interval); ws.removeAllListeners("pong"); } }); ws.on("error", (err) => { if (that.interval) { clearInterval(that.interval); ws.removeAllListeners("pong"); } that.logger.log("error", LOG_ID + "(event) WebSocket error : ", err); //console.error("Erreur WebSocket :", err); that.emit("error", {"condition": "rainbow-websocket-down", error: err}); ws.close(); // force la reconnexion via @xmpp/reconnect }); // */ }); /*this.client.websocket.on('message', () => { that.socketClosed = true; }); // */ that.on('error', () => { that.logger.log("debug", LOG_ID + "(event) error"); that.socketClosed = true; }); that.on('close', () => { that.logger.log("debug", LOG_ID + "(event) close"); that.socketClosed = true; }); setInterval(that.resetnbMessagesSentThisHour.bind(this), that.timeBetweenReset); } onIqErrorReceived(msg, stanzaTab) { let that = this; let stanza = stanzaTab[0]; let prettyStanza = stanzaTab[1]; let jsonStanza = stanzaTab[2]; //let children = stanza.children; let iqId = stanza.attrs.id; let errorMsg = stanza.getChild("error") ? stanza.getChild("error").getChild("text").getText() || "":""; that.logger.log("warn", LOG_ID + "(XmmpClient) onIqErrorReceived received iq result - 'stanza id '", iqId, ", msg : ", msg, ", errorMsg : ", errorMsg, ", that.iqGetEventWaiting[iqId] : ", that.iqGetEventWaiting[iqId]); // reject and delete the waiting iq. if (typeof that.iqGetEventWaiting[iqId]==="function") { that.logger.log("debug", LOG_ID + "(XmmpClient) onIqErrorReceived call iqGetEventWaiting function id : ", iqId); that.iqGetEventWaiting[iqId](stanza); } else { that.logger.log("debug", LOG_ID + "(XmmpClient) onIqErrorReceived delete iqGetEventWaiting function id : ", iqId); delete that.iqGetEventWaiting[iqId]; } }; iqGetEventPing(ctx) { let that = this; //that.logger.log("debug", LOG_ID + "(XmmpClient) iqGetEventPing ctx : ", ctx); that.logger.log("debug", LOG_ID + "(XmmpClient) iqGetEventPing ping iq request received from server."); return {}; } iqSetEventRoster(ctx) { let that = this; that.logger.log("internal", LOG_ID + "(XmmpClient) iqSetEventRoster set iq receiv - :", ctx); return {}; }; async iqSetEventHttp(ctx) { let that = this; let result = true; //that.logger.log("internal", LOG_ID + "(XmmpClient) iqSetEventHttp set iq receiv - :", ctx); // return {}; try { let stanza = ctx.stanza; //let xmlstanzaStr = stanza ? stanza.toString():""; //let reqObj = await getJsonFromXML(xmlstanzaStr); that.logger.log("debug", LOG_ID + "(XmmpClient) iqSetEventHttp ctx.stanza : ", ctx.stanza); //let eventWaited = { id : reqObj["$attrs"]["id"], prom : new Deferred()}; let eventWaited = {id: stanza.attrs.id, prom: new Deferred()}; that.pendingRequests.push(eventWaited); result = await eventWaited.prom.promise; that.logger.log("debug", LOG_ID + "(XmmpClient) iqSetEventHttp prom result : ", result); } catch (e) { that.logger.log("error", LOG_ID + "(XmmpClient) iqSetEventHttp CATCH Error !!! error : ", e); } return result; }; async iqSetEventRpc(ctx) { let that = this; let result = true; //that.logger.log("internal", LOG_ID + "(XmmpClient) iqSetEventRpc set iq receiv - :", ctx); // return {}; try { let stanza = ctx.stanza; //let xmlstanzaStr = stanza ? stanza.toString():""; //let reqObj = await getJsonFromXML(xmlstanzaStr); that.logger.log("debug", LOG_ID + "(XmmpClient) iqSetEventRpc ctx.stanza : ", ctx.stanza); //let eventWaited = { id : reqObj["$attrs"]["id"], prom : new Deferred()}; let eventWaited = {id: stanza.attrs.id, prom: new Deferred()}; that.pendingRequests.push(eventWaited); result = await eventWaited.prom.promise; that.logger.log("debug", LOG_ID + "(XmmpClient) iqSetEventRpc prom result : ", result); } catch (e) { that.logger.log("error", LOG_ID + "(XmmpClient) iqSetEventRpc CATCH Error !!! error : ", e); } return result; }; async iqGetEventEventsCalendar(ctx) { let that = this; let result = true; //that.logger.log("internal", LOG_ID + "(XmmpClient) iqGetEventEventsCalendar set iq receiv - :", ctx); // return {}; try { let stanza = ctx.stanza; //let xmlstanzaStr = stanza ? stanza.toString():""; //let reqObj = await getJsonFromXML(xmlstanzaStr); that.logger.log("debug", LOG_ID + "(XmmpClient) iqGetEventEventsCalendar ctx.stanza : ", ctx.stanza); //let eventWaited = { id : reqObj["$attrs"]["id"], prom : new Deferred()}; let eventWaited = {id: stanza.attrs.id, prom: new Deferred()}; that.pendingRequests.push(eventWaited); result = await eventWaited.prom.promise; that.logger.log("debug", LOG_ID + "(XmmpClient) iqGetEventEventsCalendar prom result : ", result); } catch (e) { that.logger.log("error", LOG_ID + "(XmmpClient) iqGetEventEventsCalendar CATCH Error !!! error : ", e); } return result; }; async iqGetEventAutoreplyCalendar(ctx) { let that = this; let result = true; //that.logger.log("internal", LOG_ID + "(XmmpClient) iqGetEventAutoreplyCalendar set iq receiv - :", ctx); // return {}; try { let stanza = ctx.stanza; //let xmlstanzaStr = stanza ? stanza.toString():""; //let reqObj = await getJsonFromXML(xmlstanzaStr); that.logger.log("debug", LOG_ID + "(XmmpClient) iqGetEventAutoreplyCalendar ctx.stanza : ", ctx.stanza); //let eventWaited = { id : reqObj["$attrs"]["id"], prom : new Deferred()}; let eventWaited = {id: stanza.attrs.id, prom: new Deferred()}; that.pendingRequests.push(eventWaited); result = await eventWaited.prom.promise; that.logger.log("debug", LOG_ID + "(XmmpClient) iqGetEventAutoreplyCalendar prom result : ", result); } catch (e) { that.logger.log("error", LOG_ID + "(XmmpClient) iqGetEventAutoreplyCalendar CATCH Error !!! error : ", e); } return result; }; onIqResultReceived(msg, stanzaTab) { let that = this; let stanza = stanzaTab[0]; let prettyStanza = stanzaTab[1]; let jsonStanza = stanzaTab[2]; //let children = stanza.children; let iqId = stanza.attrs.id; that.logger.log("debug", LOG_ID + "(XmmpClient) onIqResultReceived received iq result - 'stanza id '", iqId); if (that.iqGetEventWaiting[iqId]) { // The result iq correspond to a stored promise from our request, so resolve it to allow sendIq to get back a result. if (typeof that.iqGetEventWaiting[iqId]==="function") { that.iqGetEventWaiting[iqId](stanza); } else { delete that.iqGetEventWaiting[iqId]; } } else { } /* children.forEach((node) => { switch (node.getName()) { case "query": that._onIqGetQueryReceived(stanza, node); break; case "pbxagentstatus": // The treatment is in telephonyEventHandler //that._onIqGetPbxAgentStatusReceived(stanza, node); break; case "default": that.logger.log("warn", LOG_ID + "(handleXMPPConnection, onIqResultReceived) not managed - 'stanza'", node.getName()); break; default: that .logger .log("warn", LOG_ID + "(handleXMPPConnection, onIqResultReceived) child not managed for iq - 'stanza'", node.getName()); } }); if (stanza.attrs.id === "enable_xmpp_carbon") { that.eventEmitter.emit("rainbow_oncarbonactivated"); } */ }; async resolvPendingRequest(id, stanza) { let that = this; let found = false; for (const pendingRequest of that.pendingRequests) { if (pendingRequest && pendingRequest.id===id) { pendingRequest.prom.resolve(stanza); found = true; } } return found; } resetnbMessagesSentThisHour() { let that = this; that.logger.log("debug", LOG_ID + "(resetnbMessagesSentThisHour) _entering_"); that.logger.log("debug", LOG_ID + "(resetnbMessagesSentThisHour) before reset, that.nbMessagesSentThisHour : ", that.nbMessagesSentThisHour); that.nbMessagesSentThisHour = 0; that.lastTimeReset = new Date(); that.logger.log("debug", LOG_ID + "(resetnbMessagesSentThisHour) _exiting_"); } send(...args) { let that = this; that.logger.log("debug", LOG_ID + "(send) _entering_", ...args); return this.xmppQueue.add(async (resolve2, reject2) => { /* if (args && args[0]) { that.logger.log("internal", LOG_ID + "(send) stanza to send ", that.logger.colors.gray(args[0].toString())); } else { that.logger.log("error", LOG_ID + "(send) stanza to send is empty"); } // */ //that.logger.log("debug", LOG_ID + "(send) this.client.websocket : ", this.client.Socket); let stanza = args[0]; let p_messagesDataStore: DataStoreType = args[1]; let stanzaJson = await getJsonFromXML(stanza); that.logger.log("debug", LOG_ID + "(send) JSONstanza : ", stanzaJson); //if (that.socketClosed ) { if (that.socketClosed && !stanzaJson.close) { // The "" stanza is ignored that.logger.log("warn", LOG_ID + "(send) Error the socket is close, so do not send data on it. this.client.websocket closed. "); //return Promise.reject("Error the socket is close, so do not send data on it.") return reject2({ timestamp: (new Date()).toLocaleTimeString(), reason: "Error the socket is close, so do not send data on it." }); // */ /* return { timestamp: (new Date()).toLocaleTimeString(), reason:"Error the socket is close, so do not send data on it." }; // */ } if (that.enablesendurgentpushmessages && stanza && stanza.name=="message") { //stanzaJson = await getJsonFromXML(stanza); //that.logger.log("internal", LOG_ID + "(send) enablesendurgentpushmessages is setted, and of type message, JSONstanza is : ", stanzaJson); //if (stanzaJson && stanzaJson.message != undefined) { //that.logger.log("internal", LOG_ID + "(send) enablesendurgentpushmessages is setted, stanza of type message."); if (stanzaJson.message.body && stanzaJson.message.body!="") { //that.logger.log("debug", LOG_ID + "(send) enablesendurgentpushmessages is setted, stanza of type message with not empty body."); // let retryPush = "retry-push"; stanza.append(xml(retryPush, { "xmlns": NameSpacesLabels.HintsNameSpace })); // */ that.logger.log("debug", LOG_ID + "(send) enablesendurgentpushmessages is setted, stanza of type message with not empty body."); } //} } let storeStanzaValue = getStoreStanzaValue(that.storeMessages, that.messagesDataStore, p_messagesDataStore); if (storeStanzaValue!=DataStoreType.StoreTwinSide && stanza && typeof stanza==="object" && stanza.name=="message") { //if ((that.storeMessages==false || p_messagesDataStore) && p_messagesDataStore != DataStoreType.StoreTwinSide && stanza && typeof stanza==="object" && stanza.name=="message") { // if (that.storeMessages == false && stanza && typeof stanza === "object" && stanza.name == "message") { // that.logger.log("debug", LOG_ID + "(send) will add to stanza."); // that.logger.log("internal", LOG_ID + "(send) will add to stanza : ", stanza); //that.logger.log("debug", LOG_ID + "(send) original stanza : ", stanza); // // /* stanza.append(xml("no-copy", { "xmlns": NameSpacesLabels.HintsNameSpace })); // */ let nostoreTag = "no-store"; if (storeStanzaValue && storeStanzaValue!=DataStoreType.UsestoreMessagesField) { nostoreTag = storeStanzaValue; } if (stanzaJson.message.body) { stanza.append(xml(nostoreTag, { "xmlns": NameSpacesLabels.HintsNameSpace })); } // */ //that.logger.log("internal", LOG_ID + "(send) no-store stanza : ", stanza); } /*if (that.copyMessage == false) { stanza.append(xml("no-copy", { "xmlns": NameSpacesLabels.HintsNameSpace })); }//*/ // test the rate-limit if (this.nbMessagesSentThisHour > that.rateLimitPerHour) { let timeWhenRateLimitPerHourHappens = new Date().getTime(); let timeToWaitBeforeNextMessageAvabilityMs = that.timeBetweenReset - (timeWhenRateLimitPerHourHappens - that.lastTimeReset.getTime()); let error = { "errorCode": -1, "timeWhenRateLimitPerHourHappens": timeWhenRateLimitPerHourHappens, "nbMessagesSentThisHour": this.nbMessagesSentThisHour, "rateLimitPerHour": that.rateLimitPerHour, "timeToWaitBeforeNextMessageAvabilityMs": timeToWaitBeforeNextMessageAvabilityMs, "label": "error number of sent messages is over the rate limit.", "sendArgs": args }; that.logger.log("error", LOG_ID + "(send) error number of sent messages is over the rate limit : ", error); that.logger.log("internalerror", LOG_ID + "(send) error number of sent messages is over the rate limit : ", error); return reject2(error); } /* const check = xmppUtils.willExceedStanzaLimit(typeof stanza === "string" ? stanza : (stanza?.toString ? stanza.toString() : String(stanza)), 1500, 16384); that.logger.log("debug", LOG_ID + "(send) check byte size : ", check); if (check.exceeds) { // Plan B: héberger le JSON, envoyer un lien, ou simplifier la card throw new Error(`Card trop volumineuse (~${check.estimated} octets).`); return reject( new Error(`Card trop volumineuse (~${check.estimated} octets).`)); } // */ try { let stanzaSize = xmppUtils.estimateStanzaByteSize(stanza); that.logger.log("debug", LOG_ID + "(send) estimateStanzaByteSize : ", stanzaSize); if (stanzaSize > that.xmppOptions.stanzaMaxLength) { return reject2 (new Error(`Stanza size ${stanzaSize} Bytes is over the Max size allowed by transport layer ${that.xmppOptions.stanzaMaxLength}`)); } // */ // that.logger.log("info", LOG_ID + "(send) wire stanza : ", stanza.toString()); await this.client.send(...args).then(() => { if (stanzaJson?.message?.body || stanzaJson?.message?.content) { that.nbMessagesSentThisHour++; } resolve2({"code": 1, "label": "OK"}); }); } catch (err) { that.logger.log("error", LOG_ID + "(send) _catch error_ at super.send", err, ", stanzaJson : ", stanzaJson); //that.logger.log("debug", LOG_ID + "(send) restart the xmpp client"); return reject2(err); } /* return this.client.send(...args).then(() => { that.nbMessagesSentThisHour++; resolve2({"code": 1, "label":"OK"}); }).catch(async (err) => { that.logger.log("error", LOG_ID + "(send) _catch error_ at super.send", err); //that.logger.log("debug", LOG_ID + "(send) restart the xmpp client"); return reject2(err); }); // */ }); } // send_orig(...args) { // let that = this; // that.logger.log("debug", LOG_ID + "(send) _entering_"); // return new Promise((resolve, reject) => { // let prom = this.xmppQueue.addPromise( // new Promise(async (resolve2, reject2) => { // /* // if (args && args[0]) { // that.logger.log("internal", LOG_ID + "(send) stanza to send ", that.logger.colors.gray(args[0].toString())); // } else { // that.logger.log("error", LOG_ID + "(send) stanza to send is empty"); // } // */ // // //that.logger.log("debug", LOG_ID + "(send) this.client.websocket : ", this.client.Socket); // // if (that.socketClosed) { // that.logger.log("warn", LOG_ID + "(send) Error the socket is close, so do not send data on it."); // //return Promise.reject("Error the socket is close, so do not send data on it.") // return reject2({ // timestamp: (new Date()).toLocaleTimeString(), // reason:"Error the socket is close, so do not send data on it." // }); // // */ // /* return { // timestamp: (new Date()).toLocaleTimeString(), // reason:"Error the socket is close, so do not send data on it." // }; // // */ // } // // let stanza = args[0]; // // if (that.enablesendurgentpushmessages && stanza && stanza.name == "message") { // let stanzaJson = await getJsonFromXML(stanza); // that.logger.log("internal", LOG_ID + "(send) enablesendurgentpushmessages is setted, and of type message, JSONstanza is : ", stanzaJson); // //if (stanzaJson && stanzaJson.message != undefined) { // //that.logger.log("internal", LOG_ID + "(send) enablesendurgentpushmessages is setted, stanza of type message."); // if (stanzaJson.message.body && stanzaJson.message.body != "") { // that.logger.log("internal", LOG_ID + "(send) enablesendurgentpushmessages is setted, stanza of type message with not empty body."); // // // let retryPush = "retry-push"; // stanza.append(xml(retryPush, { // "xmlns": NameSpacesLabels.HintsNameSpace // })); // that.logger.log("internal", LOG_ID + "(send) enablesendurgentpushmessages is setted, stanza of type message with not empty body."); // } // //} // } // // if (that.storeMessages == false && stanza && typeof stanza === "object" && stanza.name == "message") { // // if (that.storeMessages == false && stanza && typeof stanza === "object" && stanza.name == "message") { // // that.logger.log("debug", LOG_ID + "(send) will add to stanza."); // // that.logger.log("internal", LOG_ID + "(send) will add to stanza : ", stanza); // //that.logger.log("debug", LOG_ID + "(send) original stanza : ", stanza); // // // // // /* stanza.append(xml("no-copy", { // "xmlns": NameSpacesLabels.HintsNameSpace // })); // // */ // // //let nostoreTag="no-store"; // let nostoreTag=that.messagesDataStore; // stanza.append(xml(nostoreTag, { // "xmlns": NameSpacesLabels.HintsNameSpace // })); // // */ // //that.logger.log("internal", LOG_ID + "(send) no-store stanza : ", stanza); // } // // /*if (that.copyMessage == false) { // stanza.append(xml("no-copy", { // "xmlns": NameSpacesLabels.HintsNameSpace // })); // }//*/ // // // test the rate-limit // if (this.nbMessagesSentThisHour > that.rateLimitPerHour) { // let timeWhenRateLimitPerHourHappens = new Date().getTime(); // let timeToWaitBeforeNextMessageAvabilityMs = that.timeBetweenReset - (timeWhenRateLimitPerHourHappens - that.lastTimeReset.getTime()); // let error = { // "errorCode": -1, // "timeWhenRateLimitPerHourHappens": timeWhenRateLimitPerHourHappens, // "nbMessagesSentThisHour" : this.nbMessagesSentThisHour, // "rateLimitPerHour": that.rateLimitPerHour, // "timeToWaitBeforeNextMessageAvabilityMs": timeToWaitBeforeNextMessageAvabilityMs, // "label": "error number of sent messages is over the rate limit.", // "sendArgs": args // }; // that.logger.log("error", LOG_ID + "(send) error number of sent messages is over the rate limit : ", error); // that.logger.log("internalerror", LOG_ID + "(send) error number of sent messages is over the rate limit : ", error); // return reject2(error); // } // // try { // await this.client.send(...args).then(() => { // that.nbMessagesSentThisHour++; // resolve2({"code": 1, "label": "OK"}); // }); // } catch(err){ // that.logger.log("error", LOG_ID + "(send) _catch error_ at super.send", err); // //that.logger.log("debug", LOG_ID + "(send) restart the xmpp client"); // return reject2(err); // } // /* return this.client.send(...args).then(() => { // that.nbMessagesSentThisHour++; // resolve2({"code": 1, "label":"OK"}); // }).catch(async (err) => { // that.logger.log("error", LOG_ID + "(send) _catch error_ at super.send", err); // //that.logger.log("debug", LOG_ID + "(send) restart the xmpp client"); // return reject2(err); // }); // */ // }) // ).then((result) => { // that.logger.log("debug", LOG_ID + "(send) sent"); // return (result); // }).catch((errr) => { // that.logger.log("error", LOG_ID + "(send) error in send promise : ", errr); // that.logger.log("internalerror", LOG_ID + "(send) error in send promise : ", errr); // if (errr && errr.reason && errr.reason.indexOf("the socket is close") != -1 ){ // that.logger.log("error", LOG_ID + "(send) error in send, the socket is closed, so set socketClosed to true.", errr); // that.socketClosed = true; // } // throw errr; // }); // // // Wait a few time between requests to avoid burst with lot of it. // utils.setTimeoutPromised(that.timeBetweenXmppRequests).then(() => { // //that.logger.log("debug", LOG_ID + "(send) setTimeout resolve"); // resolve(prom); // }); // // /* // // Wait a few time between requests to avoid burst with lot of it. // setTimeout(() => { // //that.logger.log("debug", LOG_ID + "(send) setTimeout resolve"); // resolve(prom); // }, that.timeBetweenXmppRequests); // // */ // }).then((promiseToreturn) => { // that.logger.log("debug", LOG_ID + "(send) _exiting_ return promise"); // return promiseToreturn; // }).catch(async(err) => { // that.logger.log("error", LOG_ID + "(send) catch an error during sending! ", err); // // // if the error is the exceed of maximum message by a time laps then do not reconnecte // if (err && err.errorCode === -1 ) { // //return Promise.resolve(undefined); // throw err; // //return ; // } // // that.logger.log("debug", LOG_ID + "(send) restart the xmpp client"); // await that.restartConnect().then((res) => { // that.logger.log("debug", LOG_ID + "(send) restartConnect result : ", res); // }).catch((errr) => { // that.logger.log("error", LOG_ID + "(send) restartConnect catch : ", errr); // }); // /* // .then(() => { // that.logger.log("debug", LOG_ID + "(send) _exiting_ return promise with a throw error : ", err); // throw err; // }); // // */ // /* // this.client.restart().finally(() => { // that.logger.log("debug", LOG_ID + "(send) _exiting_ return promise with a throw error"); // throw err; // }); */ // }); // } sendIq(...args) { let that = this; that.logger.log("debug", LOG_ID + "(sendIq) _entering_"); return new Promise((resolve, reject) => { if (args.length > 0) { let idId = (args[0] && args[0].attrs) ? args[0].attrs.id:undefined; let prom = this.xmppQueue.add(async (resolve2, reject2, id) => { // return ; // To do failed the lock acquire. if (that.socketClosed) { that.logger.log("error", LOG_ID + "(send) - id : ", id, " - Error the socket is close, so do not send data on it."); //return Promise.reject("Error the socket is close, so do not send data on it.") return reject2({ timestamp: (new Date()).toLocaleTimeString(), reason: "Error the socket is close, so do not send data on it. - id : " + id + " -" }); } try { await that.client.send(...args).then(() => { that.nbMessagesSentThisHour++; resolve2({"code": 1, "label": "OK"}); }); } catch (err) { that.logger.log("debug", LOG_ID + "(sendIq) - id : ", id, " - _catch error_ at idId : ", idId, ", super.send : ", err); //that.logger.log("debug", LOG_ID + "(send) restart the xmpp client"); return reject2(err); } /* return this.client.send(...args).catch((err) => { that.logger.log("debug", LOG_ID + "(sendIq) _catch error_ at idId : " , idId, ", super.send : ", err); }) // */ }); /*.then((res) => { that.logger.log("debug", LOG_ID + "(sendIq) sent idId : ", idId, ""); }); // */ // callback to be called when the IQ Get result event is received from server. function cb(result) { // Wait a few time between requests to avoid burst with lot of it. setTimeout(() => { that.logger.log("debug", LOG_ID + "(send) - idId : ", idId, ", setTimeout resolve"); that.logger.log("internal", LOG_ID + "(send) - idId : ", idId, ", setTimeout resolve : ", result); resolve(prom.then(() => { return result; }).catch(() => { reject(result); })); }, that.timeBetweenXmppRequests); } // Store the promise to be resolved this.iqGetEventWaiting[idId] = cb; /* // Wait a few time between requests to avoid burst with lot of it. setTimeout(()=> { //that.logger.log("debug", LOG_ID + "(send) setTimeout resolve"); resolve(prom); }, that.timeBetweenXmppRequests); // */ } else { resolve(Promise.resolve()); } }).then((promiseToreturn) => { that.logger.log("debug", LOG_ID + "(sendIq) _exiting_ return promise"); return promiseToreturn; }); } sendIq_orig(...args) { let that = this; that.logger.log("debug", LOG_ID + "(sendIq) _entering_"); return new Promise((resolve, reject) => { if (args.length > 0) { let prom = this.xmppQueue.addPromise(this.client.send(...args).catch((err) => { that.logger.log("debug", LOG_ID + "(sendIq) _catch error_ at super.send", err); })).then(() => { that.logger.log("debug", LOG_ID + "(sendIq) sent"); }); // callback to be called when the IQ Get result event is received from server. function cb(result) { // Wait a few time between requests to avoid burst with lot of it. setTimeout(() => { //that.logger.log("debug", LOG_ID + "(send) setTimeout resolve"); resolve(prom.then(() => { return result; }).catch(() => { reject(result); })); }, that.timeBetweenXmppRequests); } let idId = args[0].attrs.id; // Store the promise to be resolved this.iqGetEventWaiting[idId] = cb; /* // Wait a few time between requests to avoid burst with lot of it. setTimeout(()=> { //that.logger.log("debug", LOG_ID + "(send) setTimeout resolve"); resolve(prom); }, that.timeBetweenXmppRequests); // */ } else { resolve(Promise.resolve()); } }).then((promiseToreturn) => { that.logger.log("debug", LOG_ID + "(sendIq) _exiting_ return promise"); return promiseToreturn; }); } /*handle(evt, cb) { this.client.entity.handle(evt, cb); } // */ emit(evtname, stanza) { let that = this; //let stanzaElmt : Element = parse(stanza); let stanzaElmt: any = parse(stanza); // stanzaElmt.find("to") = that.fullJid; this.client.entity.emit(evtname, stanzaElmt); } on(evt, cb) { this.client.entity.on(evt, cb); } get sasl() { return this.client.sasl; } setgetMechanism(cb) { //this.client.sasl.findMechanism = cb; //this.client.mechanisms = ["PLAIN"]; } get reconnect() { return this.client.reconnect; } /** * @description * Do not use this method to reconnect. Use the @xmpp/reconnect pluging else (with the method XmppClient::reconnect). * * @returns {Promise} */ async restartConnect() { let that = this; that.logger.log("debug", LOG_ID + "(restartConnect) _entering_"); if (this.restartConnectEnabled) { //let result = await that.client.disconnect(5000); //that.logger.log("debug", LOG_ID + "(restartConnect) disconnect result : ", result); //return that.client.open(that.options); try { await that.client._reset(); await that.client.start(); } catch (err) { that.logger.log("debug", LOG_ID + "(restartConnect) error while reconnect : ", err); } } else { return Promise.resolve("restartReconnect is disabled"); } } start(...args) { this.restartConnectEnabled = true; return this.client.start(...args); } stop(...args) { this.restartConnectEnabled = false; return this.client.stop(...args); } } function getXmppClient(...args) { let xmppClient = new XmppClient(...args); Object.assign(xmppClient, client()); } // ************************************* // Increase Element Behaviour // ************************************* // Find elements of child by name // If none is found then an empty Element is return (to allow call of methods like text...) // If only one is found then return it, but with a length value to 1 // If severals are found then return an Array with them Element.prototype.find = function (name) { // Warning do not put an Array function because the "this" will be lost let result = new Element(); result.length = 0; if ( this instanceof Element && (this.getName ? this.getName () : this.name) == name) { result = this; result.length = 1; return result; } let children = this.getChildrenByFilter((element) => { let isInstanceOfElement = element instanceof Element; let elmtName = element.getName ? element.getName () : element.name; let isTheNameSearched = name === elmtName; return isInstanceOfElement && isTheNameSearched; }, true); if (children.length === 1) { result = children[0]; result.length = 1; } else if (children.length > 1) { // Fake the get of an attribute if the result is a tab. children.attr = () => {return undefined;}; children.attrs = {}; result = children; } return result; }; // Shortcut to getText Element.prototype.text = function () { return this.getText(); }; // Shortcut to attrs Element.prototype.attr = function (attrName) { return this.attrs[attrName]; }; module.exports.getXmppClient = getXmppClient; module.exports.XmppClient = XmppClient; export {getXmppClient, XmppClient};