'use strict'; import randomstring from 'randomstring'; import TimeoutError from '../timeoutError'; import {ValidationError, NotFoundError, InternalError, UnauthorizedError, TooManyRequestsError, ForbiddenError} from '../errorHandler'; import OptionsValidator from '../optionsValidator'; import NotSynchronizedError from './notSynchronizedError'; import NotConnectedError from './notConnectedError'; import TradeError from './tradeError'; import PacketOrderer from './packetOrderer'; import SynchronizationThrottler, {SynchronizationThrottlerOpts} from './synchronizationThrottler'; import SubscriptionManager from './subscriptionManager'; import LoggerManager from '../../logger'; import any from 'promise.any'; import LatencyService from './latencyService'; import MetatraderAccount from '../../metaApi/metatraderAccount'; import DomainClient from '../domain.client'; import _ from 'lodash'; import ClientStickySocket from '../../packages/sticky-sockets/clientStickySocket'; import {RetryOpts} from '../../metaApi/metaApi'; import {PacketLoggerOpts} from './packetLogger'; import StickySocketConnection from '../../packages/sticky-sockets/stickySocketConnection'; import {Logger} from '../../logger'; import * as helpers from '../../helpers/helpers'; export * from './metaApiWebsocket.client.schemas'; let PacketLogger; if (typeof window === 'undefined') { // don't import PacketLogger for browser version PacketLogger = require('./packetLogger').default; } /** * MetaApi websocket API client (see https://metaapi.cloud/docs/client/websocket/overview/) */ class MetaApiWebsocketClient { private _domainClient: DomainClient; private _application: any; private _region: any; private _hostname: string; private _metaApi: any; private _url: null; private _requestTimeout: number; private _connectTimeout: number; private _retries: any; private _minRetryDelayInSeconds: number; private _maxRetryDelayInSeconds: number; private _maxAccountsPerInstance: number; private _subscribeCooldownInSeconds: any; private _sequentialEventProcessing: boolean; private _useSharedClientApi: any; private _unsubscribeThrottlingInterval: number; private _socketMinimumReconnectTimeout: number; private _latencyService: LatencyService; private _token: any; private _synchronizationListeners: {}; private _latencyListeners: any[]; private _reconnectListeners: any[]; private _connectedHosts: {}; private _socketInstances: SocketInstances = {}; private _socketInstancesByAccounts: {[instanceNumber: number]: {[accountId: string]: number}} = {}; private _regionsByAccounts: RegionsByAccounts = {}; private _accountsByReplicaId: MetaApiWebsocketClient.AccountsByReplica = {}; private _accountReplicas: MetaApiWebsocketClient.AccountReplicas = {}; private _synchronizationThrottlerOpts: any; private _subscriptionManager: any; private _statusTimers: {}; private _eventQueues: {[accountId: string]: Array<() => Promise>} = {}; private _synchronizationFlags: {[synchronizationId: string]: SynchronizationFlag} = {}; private _synchronizationIdByInstance: {}; private _subscribeLock: any; private _firstConnect: boolean; private _lastRequestsTime: {}; private _packetOrderer: PacketOrderer; private _synchronizationHashes: {}; private _updateEvents: {[instanceId: string]: Packet[]} = {}; private _packetLogger: any; private _logger: Logger; private _clearAccountCacheInterval: NodeJS.Timeout; private _clearInactiveSyncDataInterval: NodeJS.Timeout; private _useNativeSocketIoServer: boolean; /** * Constructs MetaApi websocket API client instance * @param {MetaApi} metaApi metaApi instance * @param {DomainClient} domainClient domain client * @param {String} token authorization token * @param {MetaApiWebsocketClientOptions} opts websocket client options */ // eslint-disable-next-line complexity,max-statements constructor(metaApi, domainClient, token, opts: MetaApiWebsocketClient.Options = {}) { const validator = new OptionsValidator(); opts = opts || {}; opts.packetOrderingTimeout = validator.validateNonZero(opts.packetOrderingTimeout, 60, 'packetOrderingTimeout'); opts.synchronizationThrottler = opts.synchronizationThrottler || {}; this._domainClient = domainClient; this._application = opts.application || 'MetaApi'; this._region = opts.region; this._hostname = 'mt-client-api-v1'; this._metaApi = metaApi; this._url = null; this._requestTimeout = validator.validateNonZero(opts.requestTimeout, 60, 'requestTimeout') * 1000; this._connectTimeout = validator.validateNonZero(opts.connectTimeout, 60, 'connectTimeout') * 1000; const retryOpts = opts.retryOpts || {}; this._retries = validator.validateNumber(retryOpts.retries, 5, 'retryOpts.retries'); this._minRetryDelayInSeconds = validator.validateNonZero(retryOpts.minDelayInSeconds, 1, 'retryOpts.minDelayInSeconds'); this._maxRetryDelayInSeconds = validator.validateNonZero(retryOpts.maxDelayInSeconds, 30, 'retryOpts.maxDelayInSeconds'); this._maxAccountsPerInstance = 100; this._subscribeCooldownInSeconds = validator.validateNonZero(retryOpts.subscribeCooldownInSeconds, 600, 'retryOpts.subscribeCooldownInSeconds'); this._sequentialEventProcessing = true; this._useSharedClientApi = validator.validateBoolean(opts.useSharedClientApi, false, 'useSharedClientApi'); this._unsubscribeThrottlingInterval = validator.validateNonZero(opts.unsubscribeThrottlingIntervalInSeconds, 10, 'unsubscribeThrottlingIntervalInSeconds') * 1000; this._useNativeSocketIoServer = validator.validateBoolean(opts.useNativeSocketIoServer, false, 'useNativeSocketIoServer'); this._socketMinimumReconnectTimeout = validator.validateNumber(opts.minReconnectTimeoutInMs, 500, 'minReconnectTimeoutInMs'); this._latencyService = new LatencyService(this, token, this._connectTimeout); this._token = token; this._synchronizationListeners = {}; this._latencyListeners = []; this._reconnectListeners = []; this._connectedHosts = {}; this._synchronizationThrottlerOpts = opts.synchronizationThrottler; this._subscriptionManager = new SubscriptionManager(this, metaApi); this._statusTimers = {}; this._synchronizationIdByInstance = {}; this._subscribeLock = null; this._firstConnect = true; this._lastRequestsTime = {}; this._packetOrderer = new PacketOrderer(this, opts.packetOrderingTimeout); this._packetOrderer.start(); this._synchronizationHashes = {}; if (opts.packetLogger?.enabled) { this._packetLogger = new PacketLogger(opts.packetLogger); this._packetLogger.start(); } this._logger = LoggerManager.getLogger('MetaApiWebsocketClient'); if (!opts.disableInternalJobs) { this._clearAccountCacheInterval = setInterval(this._clearAccountCacheJob.bind(this), 30 * 60 * 1000); this._clearInactiveSyncDataInterval = setInterval(this._clearInactiveSyncDataJob.bind(this), 5 * 60 * 1000); } } /** * Restarts the account synchronization process on an out of order packet * @param {String} accountId account id * @param {Number} instanceIndex instance index * @param {Number} expectedSequenceNumber expected s/n * @param {Number} actualSequenceNumber actual s/n * @param {Object} packet packet data * @param {Date} receivedAt time the packet was received at */ onOutOfOrderPacket(accountId, instanceIndex, expectedSequenceNumber, actualSequenceNumber, packet, receivedAt) { const primaryAccountId = this._accountsByReplicaId[accountId]; if (this._subscriptionManager.isSubscriptionActive(accountId)) { const level = this._latencyService.getSynchronizedAccountInstances(primaryAccountId).length ? 'debug' : 'error'; this._logger[level]('MetaApi websocket client received an out of order ' + `packet type ${packet.type} for account id ${accountId}:${instanceIndex}. Expected s/n ` + `${expectedSequenceNumber} does not match the actual of ${actualSequenceNumber}`); this.ensureSubscribe(accountId, instanceIndex); } } /** * Patch server URL for use in unit tests * @param {String} url patched server URL */ set url(url) { this._url = url; } /** * Websocket client predefined region * @returns {String} predefined region */ get region() { return this._region; } /** * Returns the list of socket instance dictionaries * @return {Object[]} list of socket instance dictionaries */ get socketInstances() { return this._socketInstances; } /** * Returns the dictionary of socket instances by account ids * @return {Object} dictionary of socket instances by account ids */ get socketInstancesByAccounts() { return this._socketInstancesByAccounts; } /** * Returns the dictionary of account replicas by region * @return {Object} dictionary of account replicas by region */ get accountReplicas(): MetaApiWebsocketClient.AccountReplicas { return this._accountReplicas; } /** * Returns the dictionary of primary account ids by replica ids * @return {Object} dictionary of primary account ids by replica ids */ get accountsByReplicaId(): MetaApiWebsocketClient.AccountsByReplica { return this._accountsByReplicaId; } /** * Returns clear account cache job. Used for tests * @return {Function} clear account cache job */ get clearAccountCacheJob() { return this._clearAccountCacheJob.bind(this); } /** * Returns latency service * @returns {LatencyService} latency service */ get latencyService() { return this._latencyService; } /** * Returns the list of subscribed account ids * @param {Number} instanceNumber instance index number * @param {String} socketInstanceIndex socket instance index * @param {String} region server region * @return {string[]} list of subscribed account ids */ subscribedAccountIds(instanceNumber, socketInstanceIndex, region) { const connectedIds = []; if (this._socketInstancesByAccounts[instanceNumber]) { Object.keys(this._connectedHosts).forEach(instanceId => { const accountId = instanceId.split(':')[0]; const accountRegion = this.getAccountRegion(accountId); if (!connectedIds.includes(accountId) && this._socketInstancesByAccounts[instanceNumber][accountId] !== undefined && ( this._socketInstancesByAccounts[instanceNumber][accountId] === socketInstanceIndex || socketInstanceIndex === undefined) && accountRegion === region) { connectedIds.push(accountId); } }); } return connectedIds; } /** * Returns websocket client connection status * @param {Number} instanceNumber instance index number * @param {Number} socketInstanceIndex socket instance index * @param {String} region server region * @returns {Boolean} websocket client connection status */ connected(instanceNumber, socketInstanceIndex, region) { const instance = this._socketInstances[region] && this._socketInstances[region][instanceNumber].length > socketInstanceIndex ? this._socketInstances[region][instanceNumber][socketInstanceIndex] : null; return (instance && instance.socket && instance.socket.connected) || false; } /** * Returns list of accounts assigned to instance * @param {Number} instanceNumber instance index number * @param {String} socketInstanceIndex socket instance index * @param {String} region server region * @returns */ private _getAssignedAccounts(instanceNumber, socketInstanceIndex, region) { const accountIds = []; Object.keys(this._socketInstancesByAccounts[instanceNumber]).forEach(key => { const accountRegion = this.getAccountRegion(key); if (accountRegion === region && this._socketInstancesByAccounts[instanceNumber][key] === socketInstanceIndex) { accountIds.push(key); } }); return accountIds; } /** * Returns account region by id * @param {String} accountId account id * @returns {String} account region */ getAccountRegion(accountId) { return this._regionsByAccounts[accountId] && this._regionsByAccounts[accountId].region; } /** * Adds account cache info * @param {String} accountId account id * @param {Object} replicas account replicas, including primary replica */ addAccountCache(accountId: string, replicas: MetatraderAccount.AccountsByRegion) { this._accountReplicas[accountId] = replicas; Object.keys(replicas).forEach(region => { const replicaId = replicas[region]; if (!this._regionsByAccounts[replicaId]) { this._regionsByAccounts[replicaId] = { region, connections: 1, lastUsed: Date.now() }; } else { this._regionsByAccounts[replicaId].connections++; } this._accountsByReplicaId[replicaId] = accountId; }); this._logger.debug(`${accountId}: added account cache`); } /** * Updates account cache info * @param {String} accountId account id * @param {Object} replicas account replicas */ updateAccountCache(accountId: string, replicas: MetatraderAccount.AccountsByRegion) { const oldReplicas = this._accountReplicas[accountId]; if (oldReplicas) { const connectionCount = this._regionsByAccounts[accountId].connections; Object.keys(oldReplicas).forEach(region => { const replicaId = replicas[region]; delete this._accountsByReplicaId[replicaId]; delete this._regionsByAccounts[replicaId]; }); this._accountReplicas[accountId] = replicas; Object.keys(replicas).forEach(region => { const replicaId = replicas[region]; this._regionsByAccounts[replicaId] = { region, connections: connectionCount, lastUsed: Date.now() }; this._accountsByReplicaId[replicaId] = accountId; }); this._logger.debug(`${accountId}: updated account cache`); } } /** * Removes account region info * @param {String} accountId account id */ removeAccountCache(accountId: string) { if (this._regionsByAccounts[accountId]?.connections > 0) { this._regionsByAccounts[accountId].connections--; } } /** * Locks subscription for a socket instance based on TooManyRequestsError metadata * @param {Number} instanceNumber instance index number * @param {String} socketInstanceIndex socket instance index * @param {String} region server region * @param {Object} metadata TooManyRequestsError metadata */ async lockSocketInstance(instanceNumber, socketInstanceIndex, region, metadata) { if (metadata.type === 'LIMIT_ACCOUNT_SUBSCRIPTIONS_PER_USER') { this._subscribeLock = { recommendedRetryTime: metadata.recommendedRetryTime, lockedAtAccounts: this.subscribedAccountIds(instanceNumber, undefined, region).length, lockedAtTime: Date.now() }; } else { const subscribedAccounts = this.subscribedAccountIds(instanceNumber, socketInstanceIndex, region); if (subscribedAccounts.length === 0) { const socketInstance = this.socketInstances[region][instanceNumber][socketInstanceIndex]; await socketInstance.socket.disconnect(); await this._reconnect(instanceNumber, socketInstanceIndex, region); } else { const instance = this.socketInstances[region][instanceNumber][socketInstanceIndex]; instance.subscribeLock = { recommendedRetryTime: metadata.recommendedRetryTime, type: metadata.type, lockedAtAccounts: subscribedAccounts.length }; } } } /** * Connects to MetaApi server via socket.io protocol * @param {Number} instanceNumber instance index number * @param {String} region server region * @returns {Promise} promise which resolves when connection is established */ async connect(instanceNumber, region) { if (this._region && region !== this._region) { throw new ValidationError(`Trying to connect to ${region} region, but configured with ${this._region}`); } let clientId = Math.random(); this._socketInstances[region] ||= {}; this._socketInstances[region][instanceNumber] ||= []; const socketInstanceIndex = this._socketInstances[region][instanceNumber].length; const instance: MetaApiWebsocketClient.SocketInstance = { id: socketInstanceIndex, reconnectWaitTime: this._socketMinimumReconnectTimeout, connected: true, requestResolves: {}, resolved: false, connectResult: helpers.createHandlePromise(), sessionId: randomstring.generate(32), isReconnecting: false, socket: null, synchronizationThrottler: new SynchronizationThrottler(this, socketInstanceIndex, instanceNumber, region, this._synchronizationThrottlerOpts), subscribeLock: null, instanceNumber, region }; this._socketInstances[region][instanceNumber].push(instance); instance.synchronizationThrottler.start(); instance.socket = this._createSocket( await this._getServerUrl(instanceNumber, socketInstanceIndex, region), clientId, instance ); return instance.connectResult; } /** * Closes connection to MetaApi server */ close() { Object.keys(this._socketInstances).forEach(region => { Object.keys(this._socketInstances[region]).forEach(instanceNumber => { this._socketInstances[region][instanceNumber].forEach(async (instance) => { if (instance.connected) { instance.connected = false; await instance.socket.disconnect(); for (let requestResolve of Object.values(instance.requestResolves)) { requestResolve.reject(new Error('MetaApi connection closed')); } instance.requestResolves = {}; } }); this._socketInstancesByAccounts[instanceNumber] = {}; this._socketInstances[region][instanceNumber] = []; }); }); this._synchronizationListeners = {}; this._latencyListeners = []; this._packetOrderer.stop(); } /** * Stops the client */ stop() { clearInterval(this._clearAccountCacheInterval); clearInterval(this._clearInactiveSyncDataInterval); this._latencyService.stop(); } /** * Returns account information for a specified MetaTrader account. * @param {String} accountId id of the MetaTrader account to return information for * @param {GetAccountInformationOptions} [options] additional request options * @returns {Promise} promise resolving with account information */ async getAccountInformation(accountId, options?) { let response = await this.rpcRequest(accountId, {application: 'RPC', type: 'getAccountInformation', ...options}); return response.accountInformation; } /** * Returns positions for a specified MetaTrader account. * @param {String} accountId id of the MetaTrader account to return information for * @param {GetPositionsOptions} [options] additional request options * @returns {Promise} promise resolving with array of open positions */ async getPositions(accountId, options?) { let response = await this.rpcRequest(accountId, {application: 'RPC', type: 'getPositions', ...options}); return response.positions; } /** * Returns specific position for a MetaTrader account. * @param {String} accountId id of the MetaTrader account to return information for * @param {String} positionId position id * @param {GetPositionOptions} [options] additional request options * @return {Promise} promise resolving with MetaTrader position found */ async getPosition(accountId, positionId, options?) { let response = await this.rpcRequest(accountId, {application: 'RPC', type: 'getPosition', positionId, ...options}); return response.position; } /** * Returns open orders for a specified MetaTrader account. * @param {String} accountId id of the MetaTrader account to return information for * @param {GetOrdersOptions} [options] additional request options * @return {Promise>} promise resolving with open MetaTrader orders */ async getOrders(accountId, options?) { let response = await this.rpcRequest(accountId, {application: 'RPC', type: 'getOrders', ...options}); return response.orders; } /** * Returns specific open order for a MetaTrader account. * @param {String} accountId id of the MetaTrader account to return information for * @param {String} orderId order id (ticket number) * @param {GetOrderOptions} [options] additional request options * @return {Promise} promise resolving with metatrader order found */ async getOrder(accountId, orderId, options?) { let response = await this.rpcRequest(accountId, {application: 'RPC', type: 'getOrder', orderId, ...options}); return response.order; } /** * MetaTrader history orders search query response * @typedef {Object} MetatraderHistoryOrders * @property {Array} historyOrders array of history orders returned * @property {Boolean} synchronizing flag indicating that history order initial synchronization is still in progress * and thus search results may be incomplete */ /** * Returns the history of completed orders for a specific ticket number. * @param {String} accountId id of the MetaTrader account to return information for * @param {String} ticket ticket number (order id) * @returns {Promise} promise resolving with request results containing history orders found */ async getHistoryOrdersByTicket(accountId, ticket) { let response = await this.rpcRequest(accountId, {application: 'RPC', type: 'getHistoryOrdersByTicket', ticket}); return { historyOrders: response.historyOrders, synchronizing: response.synchronizing }; } /** * Returns the history of completed orders for a specific position id * @param {String} accountId id of the MetaTrader account to return information for * @param {String} positionId position id * @returns {Promise} promise resolving with request results containing history orders found */ async getHistoryOrdersByPosition(accountId, positionId) { let response = await this.rpcRequest(accountId, {application: 'RPC', type: 'getHistoryOrdersByPosition', positionId}); return { historyOrders: response.historyOrders, synchronizing: response.synchronizing }; } /** * Returns the history of completed orders for a specific time range * @param {String} accountId id of the MetaTrader account to return information for * @param {Date} startTime start of time range, inclusive * @param {Date} endTime end of time range, exclusive * @param {Number} offset pagination offset, default is 0 * @param {Number} limit pagination limit, default is 1000 * @returns {Promise} promise resolving with request results containing history orders found */ async getHistoryOrdersByTimeRange(accountId, startTime, endTime, offset = 0, limit = 1000) { let response = await this.rpcRequest(accountId, {application: 'RPC', type: 'getHistoryOrdersByTimeRange', startTime, endTime, offset, limit}); return { historyOrders: response.historyOrders, synchronizing: response.synchronizing }; } /** * MetaTrader history deals search query response * @typedef {Object} MetatraderDeals * @property {Array} deals array of history deals returned * @property {Boolean} synchronizing flag indicating that deal initial synchronization is still in progress * and thus search results may be incomplete */ /** * MetaTrader deal * @typedef {Object} MetatraderDeal * @property {String} id deal id (ticket number) * @property {String} type deal type (one of DEAL_TYPE_BUY, DEAL_TYPE_SELL, DEAL_TYPE_BALANCE, DEAL_TYPE_CREDIT, * DEAL_TYPE_CHARGE, DEAL_TYPE_CORRECTION, DEAL_TYPE_BONUS, DEAL_TYPE_COMMISSION, DEAL_TYPE_COMMISSION_DAILY, * DEAL_TYPE_COMMISSION_MONTHLY, DEAL_TYPE_COMMISSION_AGENT_DAILY, DEAL_TYPE_COMMISSION_AGENT_MONTHLY, * DEAL_TYPE_INTEREST, DEAL_TYPE_BUY_CANCELED, DEAL_TYPE_SELL_CANCELED, DEAL_DIVIDEND, DEAL_DIVIDEND_FRANKED, * DEAL_TAX). See https://www.mql5.com/en/docs/constants/tradingconstants/dealproperties#enum_deal_type * @property {String} entryType deal entry type (one of DEAL_ENTRY_IN, DEAL_ENTRY_OUT, DEAL_ENTRY_INOUT, * DEAL_ENTRY_OUT_BY). See https://www.mql5.com/en/docs/constants/tradingconstants/dealproperties#enum_deal_entry * @property {String} [symbol] symbol deal relates to * @property {Number} [magic] deal magic number, identifies the EA which initiated the deal * @property {Date} time time the deal was conducted at * @property {String} brokerTime time time the deal was conducted at, in broker timezone, YYYY-MM-DD HH:mm:ss.SSS format * @property {Number} [volume] deal volume * @property {Number} [price] the price the deal was conducted at * @property {Number} [commission] deal commission * @property {Number} [swap] deal swap * @property {Number} profit deal profit * @property {String} [positionId] id of position the deal relates to * @property {String} [orderId] id of order the deal relates to * @property {String} [comment] deal comment. The sum of the line lengths of the comment and the clientId * must be less than or equal to 26. For more information see https://metaapi.cloud/docs/client/clientIdUsage/ * @property {String} [brokerComment] current comment value on broker side (possibly overriden by the broker) * @property {String} [clientId] client-assigned id. The id value can be assigned when submitting a trade and * will be present on position, history orders and history deals related to the trade. You can use this field to bind * your trades to objects in your application and then track trade progress. The sum of the line lengths of the * comment and the clientId must be less than or equal to 26. For more information see * https://metaapi.cloud/docs/client/clientIdUsage/ * @property {String} platform platform id (mt4 or mt5) * @property {String} [reason] optional deal execution reason. One of DEAL_REASON_CLIENT, DEAL_REASON_MOBILE, * DEAL_REASON_WEB, DEAL_REASON_EXPERT, DEAL_REASON_SL, DEAL_REASON_TP, DEAL_REASON_SO, DEAL_REASON_ROLLOVER, * DEAL_REASON_VMARGIN, DEAL_REASON_SPLIT, DEAL_REASON_UNKNOWN. See * https://www.mql5.com/en/docs/constants/tradingconstants/dealproperties#enum_deal_reason. * @property {Number} [accountCurrencyExchangeRate] current exchange rate of account currency into account base * currency (USD if you did not override it) * @property {number} [stopLoss] deal stop loss. For MT5 opening deal this is the SL of the order opening the * position. For MT4 deals or MT5 closing deal this is the last known position SL. * @property {number} [takeProfit] deal take profit. For MT5 opening deal this is the TP of the order opening the * position. For MT4 deals or MT5 closing deal this is the last known position TP. */ /** * Returns history deals with a specific ticket number * @param {String} accountId id of the MetaTrader account to return information for * @param {String} ticket ticket number (deal id for MT5 or order id for MT4) * @returns {Promise} promise resolving with request results containing deals found */ async getDealsByTicket(accountId, ticket) { let response = await this.rpcRequest(accountId, {application: 'RPC', type: 'getDealsByTicket', ticket}); return { deals: response.deals, synchronizing: response.synchronizing }; } /** * Returns history deals for a specific position id * @param {String} accountId id of the MetaTrader account to return information for * @param {String} positionId position id * @returns {Promise} promise resolving with request results containing deals found */ async getDealsByPosition(accountId, positionId) { let response = await this.rpcRequest(accountId, {application: 'RPC', type: 'getDealsByPosition', positionId}); return { deals: response.deals, synchronizing: response.synchronizing }; } /** * Returns history deals with for a specific time range * @param {String} accountId id of the MetaTrader account to return information for * @param {Date} startTime start of time range, inclusive * @param {Date} endTime end of time range, exclusive * @param {Number} offset pagination offset, default is 0 * @param {Number} limit pagination limit, default is 1000 * @returns {Promise} promise resolving with request results containing deals found */ async getDealsByTimeRange(accountId, startTime, endTime, offset = 0, limit = 1000) { let response = await this.rpcRequest(accountId, {application: 'RPC', type: 'getDealsByTimeRange', startTime, endTime, offset, limit}); return { deals: response.deals, synchronizing: response.synchronizing }; } /** * Clears the order and transaction history of a specified application and removes the application * @param {String} accountId id of the MetaTrader account to remove history and application for * @return {Promise} promise resolving when the history is cleared */ removeApplication(accountId) { return this.rpcRequest(accountId, {type: 'removeApplication'}); } /** * MetaTrader trade response * @typedef {Object} MetatraderTradeResponse * @property {Number} numericCode numeric response code, see * https://www.mql5.com/en/docs/constants/errorswarnings/enum_trade_return_codes and * https://book.mql4.com/appendix/errors. Response codes which indicate success are 0, 10008-10010, 10025. The rest * codes are errors * @property {String} stringCode string response code, see * https://www.mql5.com/en/docs/constants/errorswarnings/enum_trade_return_codes and * https://book.mql4.com/appendix/errors. Response codes which indicate success are ERR_NO_ERROR, * TRADE_RETCODE_PLACED, TRADE_RETCODE_DONE, TRADE_RETCODE_DONE_PARTIAL, TRADE_RETCODE_NO_CHANGES. The rest codes are * errors. * @property {String} message human-readable response message * @property {String} orderId order id which was created/modified during the trade * @property {String} positionId position id which was modified during the trade */ /** * Execute a trade on a connected MetaTrader account * @param {String} accountId id of the MetaTrader account to execute trade for * @param {MetatraderTrade} trade trade to execute (see docs for possible trade types) * @param {String} [application] application to use * @param {String} [reliability] account reliability * @returns {Promise} promise resolving with trade result * @throws {TradeError} on trade error, check error properties for error code details */ // eslint-disable-next-line complexity async trade(accountId, trade, application?, reliability?) { let response; if (application === 'RPC') { response = await this.rpcRequest(accountId, {type: 'trade', trade, application}); } else { response = await this.rpcRequestAllInstances(accountId, {type: 'trade', trade, application: application || this._application, requestId: randomstring.generate(32)}, reliability); } response.response = response.response || {}; response.response.stringCode = response.response.stringCode || response.response.description; response.response.numericCode = response.response.numericCode !== undefined ? response.response.numericCode : response.response.error; if (['ERR_NO_ERROR', 'TRADE_RETCODE_PLACED', 'TRADE_RETCODE_DONE', 'TRADE_RETCODE_DONE_PARTIAL', 'TRADE_RETCODE_NO_CHANGES'].includes(response.response.stringCode || response.response.description)) { return response.response; } else { throw new TradeError(response.response.message, response.response.numericCode, response.response.stringCode); } } /** * Creates a task that ensures the account gets subscribed to the server * @param {String} accountId account id to subscribe * @param {Number} instanceNumber instance index number */ ensureSubscribe(accountId, instanceNumber) { this._subscriptionManager.scheduleSubscribe(accountId, instanceNumber); } /** * Subscribes to the Metatrader terminal events * @param {String} accountId id of the MetaTrader account to subscribe to * @param {Number} instanceNumber instance index number * @returns {Promise} promise which resolves when subscription started */ subscribe(accountId, instanceNumber) { return this._subscriptionManager.subscribe(accountId, instanceNumber); } /** * Requests the terminal to start synchronization process * @param {String} accountId id of the MetaTrader account to synchronize * @param {Number} instanceIndex instance index * @param {String} host name of host to synchronize with * @param {String} synchronizationId synchronization request id * @param {Date} startingHistoryOrderTime from what date to start synchronizing history orders from. If not specified, * the entire order history will be downloaded. * @param {Date} startingDealTime from what date to start deal synchronization from. If not specified, then all * history deals will be downloaded. * @param {Function} getHashes function to get terminal state hashes * @returns {Promise} promise which resolves when synchronization started */ async synchronize( accountId, instanceIndex, host, synchronizationId, startingHistoryOrderTime, startingDealTime, hashes ) { if (this._getSocketInstanceByAccount(accountId, instanceIndex) === undefined) { this._logger.debug(`${accountId}:${instanceIndex}: creating socket instance on synchronize`); await this._createSocketInstanceByAccount(accountId, instanceIndex); } const syncThrottler = this._getSocketInstanceByAccount(accountId, instanceIndex).synchronizationThrottler; this._synchronizationHashes[synchronizationId] = hashes; this._synchronizationHashes[synchronizationId].lastUpdated = Date.now(); return syncThrottler.scheduleSynchronize(accountId, {requestId: synchronizationId, version: 2, type: 'synchronize', startingHistoryOrderTime, startingDealTime, instanceIndex, host}, hashes); } /** * Waits for server-side terminal state synchronization to complete * @param {String} accountId id of the MetaTrader account to synchronize * @param {Number} [instanceNumber] instance index number * @param {String} applicationPattern MetaApi application regular expression pattern, default is .* * @param {Number} timeoutInSeconds timeout in seconds, default is 300 seconds * @param {String} [application] application to synchronize with * @returns {Promise} promise which resolves when synchronization started */ waitSynchronized(accountId, instanceNumber, applicationPattern, timeoutInSeconds, application?) { return this.rpcRequest(accountId, {type: 'waitSynchronized', applicationPattern, timeoutInSeconds, instanceIndex: instanceNumber, application: application || this._application}, timeoutInSeconds + 1); } /** * Market data subscription * @typedef {Object} MarketDataSubscription * @property {string} type subscription type, one of quotes, candles, ticks, or marketDepth * @property {string} [timeframe] when subscription type is candles, defines the timeframe according to which the * candles must be generated. Allowed values for MT5 are 1m, 2m, 3m, 4m, 5m, 6m, 10m, 12m, 15m, 20m, 30m, 1h, 2h, 3h, * 4h, 6h, 8h, 12h, 1d, 1w, 1mn. Allowed values for MT4 are 1m, 5m, 15m 30m, 1h, 4h, 1d, 1w, 1mn * @property {number} [intervalInMilliseconds] defines how frequently the terminal will stream data to client. If not * set, then the value configured in account will be used */ /** * Subscribes on market data of specified symbol * @param {String} accountId id of the MetaTrader account * @param {String} symbol symbol (e.g. currency pair or an index) * @param {Array} subscriptions array of market data subscription to create or update * @param {String} [reliability] account reliability * @returns {Promise} promise which resolves when subscription request was processed */ subscribeToMarketData(accountId, symbol, subscriptions, reliability?) { return this.rpcRequestAllInstances(accountId, {type: 'subscribeToMarketData', symbol, subscriptions}, reliability); } /** * Refreshes market data subscriptions on the server to prevent them from expiring * @param {String} accountId id of the MetaTrader account * @param {Number} instanceNumber instance index number * @param {Array} subscriptions array of subscriptions to refresh */ refreshMarketDataSubscriptions(accountId, instanceNumber, subscriptions) { return this.rpcRequest(accountId, {type: 'refreshMarketDataSubscriptions', subscriptions, instanceIndex: instanceNumber}); } /** * Market data unsubscription * @typedef {Object} MarketDataUnsubscription * @property {string} type subscription type, one of quotes, candles, ticks, or marketDepth */ /** * Unsubscribes from market data of specified symbol * @param {String} accountId id of the MetaTrader account * @param {String} symbol symbol (e.g. currency pair or an index) * @param {Array} subscriptions array of subscriptions to cancel * @param {String} [reliability] account reliability * @returns {Promise} promise which resolves when unsubscription request was processed */ unsubscribeFromMarketData(accountId, symbol, subscriptions, reliability) { return this.rpcRequestAllInstances(accountId, {type: 'unsubscribeFromMarketData', symbol, subscriptions}, reliability); } /** * Retrieves symbols available on an account * @param {String} accountId id of the MetaTrader account to retrieve symbols for * @returns {Promise>} promise which resolves when symbols are retrieved */ async getSymbols(accountId) { let response = await this.rpcRequest(accountId, {application: 'RPC', type: 'getSymbols'}); return response.symbols; } /** * Retrieves specification for a symbol * @param {String} accountId id of the MetaTrader account to retrieve symbol specification for * @param {String} symbol symbol to retrieve specification for * @returns {Promise} promise which resolves when specification is retrieved */ async getSymbolSpecification(accountId, symbol) { let response = await this.rpcRequest(accountId, {application: 'RPC', type: 'getSymbolSpecification', symbol}); return response.specification; } /** * Retrieves price for a symbol * @param {String} accountId id of the MetaTrader account to retrieve symbol price for * @param {String} symbol symbol to retrieve price for * @param {boolean} keepSubscription if set to true, the account will get a long-term subscription to symbol market * data. Long-term subscription means that on subsequent calls you will get updated value faster. If set to false or * not set, the subscription will be set to expire in 12 minutes. * @returns {Promise} promise which resolves when price is retrieved */ async getSymbolPrice(accountId, symbol, keepSubscription = false) { let response = await this.rpcRequest(accountId, {application: 'RPC', type: 'getSymbolPrice', symbol, keepSubscription}); return response.price; } /** * Retrieves price for a symbol * @param {string} accountId id of the MetaTrader account to retrieve candle for * @param {string} symbol symbol to retrieve candle for * @param {string} timeframe defines the timeframe according to which the candle must be generated. Allowed values for * MT5 are 1m, 2m, 3m, 4m, 5m, 6m, 10m, 12m, 15m, 20m, 30m, 1h, 2h, 3h, 4h, 6h, 8h, 12h, 1d, 1w, 1mn. Allowed values * for MT4 are 1m, 5m, 15m 30m, 1h, 4h, 1d, 1w, 1mn * @param {boolean} keepSubscription if set to true, the account will get a long-term subscription to symbol market * data. Long-term subscription means that on subsequent calls you will get updated value faster. If set to false or * not set, the subscription will be set to expire in 12 minutes. * @returns {Promise} promise which resolves when candle is retrieved */ async getCandle(accountId, symbol, timeframe, keepSubscription = false) { let response = await this.rpcRequest(accountId, {application: 'RPC', type: 'getCandle', symbol, timeframe, keepSubscription}); return response.candle; } /** * Retrieves latest tick for a symbol. MT4 G1 accounts do not support this API * @param {string} accountId id of the MetaTrader account to retrieve symbol tick for * @param {string} symbol symbol to retrieve tick for * @param {boolean} keepSubscription if set to true, the account will get a long-term subscription to symbol market * data. Long-term subscription means that on subsequent calls you will get updated value faster. If set to false or * not set, the subscription will be set to expire in 12 minutes. * @returns {Promise} promise which resolves when tick is retrieved */ async getTick(accountId, symbol, keepSubscription = false) { let response = await this.rpcRequest(accountId, {application: 'RPC', type: 'getTick', symbol, keepSubscription}); return response.tick; } /** * Retrieves latest order book for a symbol. MT4 accounts do not support this API * @param {string} accountId id of the MetaTrader account to retrieve symbol order book for * @param {string} symbol symbol to retrieve order book for * @param {boolean} keepSubscription if set to true, the account will get a long-term subscription to symbol market * data. Long-term subscription means that on subsequent calls you will get updated value faster. If set to false or * not set, the subscription will be set to expire in 12 minutes. * @returns {Promise} promise which resolves when order book is retrieved */ async getBook(accountId, symbol, keepSubscription = false) { let response = await this.rpcRequest(accountId, {application: 'RPC', type: 'getBook', symbol, keepSubscription}); return response.book; } /** * Forces refresh of most recent quote updates for symbols subscribed to by the terminal * @param {string} accountId id of the MetaTrader account * @returns {Promise} promise which resolves with recent quote symbols that was initiated to process */ async refreshTerminalState(accountId) { let response = await this.rpcRequest(accountId, {application: 'RPC', type: 'refreshTerminalState'}); return response.symbols; } /** * Forces refresh and retrieves latest quotes for a subset of symbols the terminal is subscribed to * @param {string} accountId id of the MetaTrader account * @param {string[]} symbols quote symbols to refresh * @returns {Promise} refreshed quotes and basic account information info */ async refreshSymbolQuotes(accountId, symbols) { let response = await this.rpcRequest(accountId, {application: 'RPC', type: 'refreshSymbolQuotes', symbols}); return response.refreshedQuotes; } /** * Sends client uptime stats to the server. * @param {String} accountId id of the MetaTrader account to save uptime * @param {Object} uptime uptime statistics to send to the server * @returns {Promise} promise which resolves when uptime statistics is submitted */ saveUptime(accountId, uptime) { return this.rpcRequest(accountId, {type: 'saveUptime', uptime}); } /** * Unsubscribe from account * @param {String} accountId id of the MetaTrader account to unsubscribe * @returns {Promise} promise which resolves when socket unsubscribed */ async unsubscribe(accountId) { const region = this.getAccountRegion(accountId); this._latencyService.onUnsubscribe(accountId); Object.keys(this._updateEvents) .filter(key => key.startsWith(accountId)) .forEach(key => delete this._updateEvents[key]); if (this._socketInstances[region]) { await Promise.all(Object.keys(this._socketInstances[region]).map(async instanceNumber => { try { await this._subscriptionManager.unsubscribe(accountId, Number(instanceNumber)); delete this._socketInstancesByAccounts[instanceNumber][accountId]; } catch (err) { if (!(['TimeoutError', 'NotFoundError'].includes(err.name))) { this._logger.warn(`${accountId}:${instanceNumber}: failed to unsubscribe`, err); } } })); } } /** * Current server time (see https://metaapi.cloud/docs/client/models/serverTime/) * @typedef {Object} ServerTime * @property {Date} time current server time * @property {String} brokerTime current broker time, in broker timezone, YYYY-MM-DD HH:mm:ss.SSS format * @property {Date} [lastQuoteTime] last quote time * @property {String} [lastQuoteBrokerTime] last quote time, in broker timezone, YYYY-MM-DD HH:mm:ss.SSS format */ /** * Returns server time for a specified MetaTrader account * @param {string} accountId id of the MetaTrader account to return server time for * @returns {Promise} promise resolving with server time */ async getServerTime(accountId) { let response = await this.rpcRequest(accountId, {application: 'RPC', type: 'getServerTime'}); return response.serverTime; } /** * Margin required to open a trade (see https://metaapi.cloud/docs/client/models/margin/) * @typedef {Object} Margin * @property {number} [margin] margin required to open a trade. If margin can not be calculated, then this field is * not defined */ /** * Contains order to calculate margin for (see https://metaapi.cloud/docs/client/models/marginOrder/) * @typedef {Object} MarginOrder * @property {string} symbol order symbol * @property {string} type order type, one of ORDER_TYPE_BUY or ORDER_TYPE_SELL * @property {number} volume order volume, must be greater than 0 * @property {number} openPrice order open price, must be greater than 0 */ /** * Calculates margin required to open a trade on the specified trading account * @param {string} accountId id of the trading account to calculate margin for * @param {string} application application to send the request to * @param {string} reliability account reliability * @param {MarginOrder} order order to calculate margin for * @returns {Promise} promise resolving with margin calculation result */ async calculateMargin(accountId, application, reliability, order) { let response; if (application === 'RPC') { response = await this.rpcRequest(accountId, {application, type: 'calculateMargin', order}); } else { response = await this.rpcRequestAllInstances(accountId, {application, type: 'calculateMargin', order}, reliability); } return response.margin; } /** * Calls onUnsubscribeRegion listener event * @param {String} accountId account id * @param {String} region account region to unsubscribe */ async unsubscribeAccountRegion(accountId, region) { const unsubscribePromises = []; for (let listener of this._synchronizationListeners[accountId] || []) { unsubscribePromises.push(this ._processEvent(() => listener.onUnsubscribeRegion(region), `${accountId}:${region}:onUnsubscribeRegion`, true) .catch(err => this._logger.error( `${accountId}:${region}: Failed to notify listener about onUnsubscribeRegion event`, err )) ); } await Promise.all(unsubscribePromises); } /** * Adds synchronization listener for specific account * @param {String} accountId account id * @param {SynchronizationListener} listener synchronization listener to add */ addSynchronizationListener(accountId, listener) { this._logger.trace(`${accountId}: Added synchronization listener`); let listeners = this._synchronizationListeners[accountId]; if (!listeners) { listeners = []; this._synchronizationListeners[accountId] = listeners; } listeners.push(listener); } /** * Removes synchronization listener for specific account * @param {String} accountId account id * @param {SynchronizationListener} listener synchronization listener to remove */ removeSynchronizationListener(accountId, listener) { this._logger.trace(`${accountId}: Removed synchronization listener`); let listeners = this._synchronizationListeners[accountId]; if (!listeners) { listeners = []; } listeners = listeners.filter(l => l !== listener); this._synchronizationListeners[accountId] = listeners; } /** * Adds latency listener * @param {LatencyListener} listener latency listener to add */ addLatencyListener(listener) { this._latencyListeners.push(listener); } /** * Removes latency listener * @param {LatencyListener} listener latency listener to remove */ removeLatencyListener(listener) { this._latencyListeners = this._latencyListeners.filter(l => l !== listener); } /** * Adds reconnect listener * @param {ReconnectListener} listener reconnect listener to add * @param {String} accountId account id of listener */ addReconnectListener(listener, accountId) { this._reconnectListeners.push({accountId, listener}); } /** * Removes reconnect listener * @param {ReconnectListener} listener listener to remove */ removeReconnectListener(listener) { this._reconnectListeners = this._reconnectListeners.filter(l => l.listener !== listener); } /** * Removes all listeners. Intended for use in unit tests. */ removeAllListeners() { this._synchronizationListeners = {}; this._reconnectListeners = []; } /** * Clears account or replica data from client records and unsubscribes * @param {string} accountId account id to process the removal of */ onAccountDeleted(accountId) { this._subscriptionManager.cancelAccount(accountId); this._latencyService.onUnsubscribe(accountId); const masterAccountId = this._accountsByReplicaId[accountId]; if (masterAccountId) { if (masterAccountId === accountId) { const regionData = this._accountReplicas[masterAccountId]; const replicas = Object.values(regionData); Object.keys(this._synchronizationIdByInstance) .filter(instance => instance.startsWith(`${masterAccountId}`)) .forEach(instance => delete this._synchronizationIdByInstance[instance]); replicas.forEach(replica => { Object.values(this._socketInstancesByAccounts).forEach(instance => delete instance[replica]); delete this._accountsByReplicaId[replica]; delete this._regionsByAccounts[replica]; }); delete this._accountReplicas[masterAccountId]; this._logger.debug(`${masterAccountId}: processed primary account removal`); } else { Object.values(this._socketInstancesByAccounts).forEach(instance => delete instance[accountId]); const regionData = this._regionsByAccounts[accountId]; if (regionData) { const region = regionData.region; Object.keys(this._synchronizationIdByInstance) .filter(instance => instance.startsWith(`${masterAccountId}:${region}`)) .forEach(instance => delete this._synchronizationIdByInstance[instance]); delete this._accountReplicas[masterAccountId][region]; this._logger.debug(`${masterAccountId}: processed removal of replica ${accountId}`); } delete this._accountsByReplicaId[accountId]; delete this._regionsByAccounts[accountId]; } } } /** * Queues an account packet for processing * @param {Object} packet packet to process */ queuePacket(socket: MetaApiWebsocketClient.SocketInstance, packet) { const accountId = packet.accountId; const packets = this._packetOrderer.restoreOrder({ accountId: packet.accountId, host: packet.host, instanceIndex: packet.instanceIndex, sequenceNumber: packet.sequenceNumber, sequenceTimestamp: packet.sequenceTimestamp, synchronizationId: packet.synchronizationId, type: packet.type, socket, packet }).filter(p => p.type !== 'noop'); if (this._sequentialEventProcessing && packet.sequenceNumber !== undefined) { const events = packets.map((item: RestoredOrderPacket) => () => ( this._processSynchronizationPacket(item.socket, item.packet) )); if (!this._eventQueues[accountId]) { this._eventQueues[accountId] = events; this._callAccountEvents(accountId); } else { this._eventQueues[accountId] = this._eventQueues[accountId].concat(events); } } else { packets.forEach((item: RestoredOrderPacket) => this._processSynchronizationPacket(item.socket, item.packet)); } } /** * Queues account event for processing * @param {String} accountId account id * @param {String} name event label name * @param {Function} callable async or regular function to execute */ queueEvent(accountId, name, callable) { let event = () => this._processEvent(callable, `${accountId}:${name}`); if (this._sequentialEventProcessing) { if (!this._eventQueues[accountId]) { this._eventQueues[accountId] = [event]; this._callAccountEvents(accountId); } else { this._eventQueues[accountId].push(event); } } else { event(); } } async _callAccountEvents(accountId) { if (this._eventQueues[accountId]) { while(this._eventQueues[accountId].length) { await this._eventQueues[accountId][0](); this._eventQueues[accountId].shift(); } delete this._eventQueues[accountId]; } } async _reconnect(instanceNumber, socketInstanceIndex, region) { const instance = this.socketInstances[region][instanceNumber][socketInstanceIndex]; if (instance) { while (!instance.socket.connected && !instance.isReconnecting && instance.connected) { await this._tryReconnect(instanceNumber, socketInstanceIndex, region); } } } private _tryReconnect(instanceNumber, socketInstanceIndex, region: string) { const instance = this.socketInstances[region][instanceNumber][socketInstanceIndex]; instance.reconnectWaitTime = Math.min(instance.reconnectWaitTime * 2, 30000); return new Promise(resolve => setTimeout(async () => { if (!instance.socket.connected && !instance.isReconnecting && instance.connected) { try { this._logger.info(`${region}:${instanceNumber}: reconnecting socket`); instance.sessionId = randomstring.generate(32); const clientId = Math.random(); instance.socket.disconnect(); instance.isReconnecting = true; instance.socket = this._createSocket( await this._getServerUrl(instanceNumber, socketInstanceIndex, region), clientId, instance ); } catch (error) { instance.isReconnecting = false; } } resolve(); }, instance.reconnectWaitTime)); } /** * Simulataneously sends RPC requests to all synchronized instances * @param {String} accountId metatrader account id * @param {Object} request base request data * @param {String} [reliability] account reliability * @param {Number} [timeoutInSeconds] request timeout in seconds */ async rpcRequestAllInstances(accountId, request, reliability?, timeoutInSeconds?) { if (reliability === 'high') { try { return await any([0, 1].map(instanceNumber => { return this.rpcRequest(accountId, Object.assign({}, request, {instanceIndex: instanceNumber}), timeoutInSeconds); })); } catch (error) { throw error.errors[0]; } } else { return await this.rpcRequest(accountId, request, timeoutInSeconds); } } /** * Makes a RPC request * @param {String} accountId replica account id * @param {Object} request base request data * @param {Number} [timeoutInSeconds] request timeout in seconds */ //eslint-disable-next-line complexity, max-statements async rpcRequest(accountId: string, request, timeoutInSeconds?): Promise { const ignoredRequestTypes = ['subscribe', 'synchronize', 'refreshMarketDataSubscriptions', 'unsubscribe']; const primaryAccountId = this._accountsByReplicaId[accountId]; let connectedInstance = this._latencyService.getActiveAccountInstances(primaryAccountId)[0]; if (!ignoredRequestTypes.includes(request.type)) { connectedInstance ||= await this._latencyService.waitConnectedInstance(accountId); const activeRegion = connectedInstance.split(':')[1]; accountId = this._accountReplicas[primaryAccountId][activeRegion]; } let socketInstanceIndex = null; let instanceNumber = 0; const region = this.getAccountRegion(accountId); this._refreshAccountRegion(accountId); if (request.instanceIndex !== undefined) { instanceNumber = request.instanceIndex; } else { if (connectedInstance) { instanceNumber = Number(connectedInstance.split(':')[2]); } if (request.application !== 'RPC') { request = Object.assign({}, request, {instanceIndex: instanceNumber}); } } this._socketInstancesByAccounts[instanceNumber] ||= {}; this._socketInstances[region] ||= {}; this._socketInstances[region][instanceNumber] ||= []; if (this._socketInstancesByAccounts[instanceNumber][accountId] !== undefined) { socketInstanceIndex = this._socketInstancesByAccounts[instanceNumber][accountId]; } else { this._logger.debug(`${accountId}:${instanceNumber}: creating socket instance on RPC request`); await this._createSocketInstanceByAccount(accountId, instanceNumber); socketInstanceIndex = this._socketInstancesByAccounts[instanceNumber][accountId]; } const instance = this._socketInstances[region][instanceNumber][socketInstanceIndex]; if (!instance.connected) { this._logger.debug(`${accountId}:${instanceNumber}: connecting socket instance on RPC request`); await this.connect(instanceNumber, region); } else if (!this.connected(instanceNumber, socketInstanceIndex, region)) { await instance.connectResult; } if (request.type === 'subscribe') { request.sessionId = instance.sessionId; } if (['trade', 'subscribe'].includes(request.type)) { return this._makeRequest(accountId, instanceNumber, request, timeoutInSeconds); } let retryCounter = 0; while (true) { //eslint-disable-line no-constant-condition try { return await this._makeRequest(accountId, instanceNumber, request, timeoutInSeconds); } catch (err) { if (err.name === 'TooManyRequestsError') { let calcRetryCounter = retryCounter; let calcRequestTime = 0; while (calcRetryCounter < this._retries) { calcRetryCounter++; calcRequestTime += Math.min(Math.pow(2, calcRetryCounter) * this._minRetryDelayInSeconds, this._maxRetryDelayInSeconds) * 1000; } const retryTime = new Date(err.metadata.recommendedRetryTime).getTime(); if (Date.now() + calcRequestTime > retryTime && retryCounter < this._retries) { if (Date.now() < retryTime) { await new Promise(res => setTimeout(res, retryTime - Date.now())); } retryCounter++; } else { throw err; } } else if (['NotSynchronizedError', 'TimeoutError', 'NotAuthenticatedError', 'InternalError'].includes(err.name) && retryCounter < this._retries) { await new Promise(res => setTimeout(res, Math.min(Math.pow(2, retryCounter) * this._minRetryDelayInSeconds, this._maxRetryDelayInSeconds) * 1000)); retryCounter++; } else { throw err; } if (this._socketInstancesByAccounts[instanceNumber][accountId] === undefined) { throw err; } } } } private _makeRequest(accountId, instanceNumber, request, timeoutInSeconds) { const socketInstance = this._getSocketInstanceByAccount(accountId, instanceNumber); let requestId = request.requestId || randomstring.generate(32); request.timestamps = {clientProcessingStarted: new Date()}; let result = Promise.race([ new Promise((resolve, reject) => socketInstance.requestResolves[requestId] = {resolve, reject, type: request.type}), new Promise((resolve, reject) => setTimeout(() => { reject(new TimeoutError(`MetaApi websocket client request ${request.requestId} of type ${request.type} ` + 'timed out. Please make sure your account is connected to broker before retrying your request.')); delete socketInstance.requestResolves[requestId]; }, (timeoutInSeconds * 1000) || this._requestTimeout)) ]); request.accountId = accountId; request.application = request.application || this._application; if (!request.requestId) { request.requestId = requestId; } if (request.type === 'unsubscribe' || request.application === 'RPC' || request.instanceIndex === socketInstance.instanceNumber) { this._logger.debug(() => `${accountId}: Sending request: ${JSON.stringify(request)}`); socketInstance.socket.send('request', request); return result; } else { this._logger.trace(() => `${accountId}:${request.instanceIndex}: skipping request because it is being sent to ` + `the socket of the wrong instance index, request=${JSON.stringify(request)}`); return result; } } // eslint-disable-next-line complexity _convertError(data) { if (data.error === 'ValidationError') { return new ValidationError(data.message, data.details); } else if (data.error === 'NotFoundError') { return new NotFoundError(data.message); } else if (data.error === 'NotSynchronizedError') { return new NotSynchronizedError(data.message); } else if (data.error === 'TimeoutError') { return new TimeoutError(data.message); } else if (data.error === 'NotAuthenticatedError') { return new NotConnectedError(data.message); } else if (data.error === 'ForbiddenError') { return new ForbiddenError(data.message); } else if (data.error === 'TradeError') { return new TradeError(data.message, data.numericCode, data.stringCode); } else if (data.error === 'UnauthorizedError') { this.close(); return new UnauthorizedError(data.message); } else if (data.error === 'TooManyRequestsError') { return new TooManyRequestsError(data.message, data.metadata); } else { return new InternalError(data.message); } } // eslint-disable-next-line complexity _convertIsoTimeToDate(packet) { // eslint-disable-next-line guard-for-in for (let field in packet) { let value = packet[field]; if (typeof value === 'string' && field.match(/time$|Time$/) && !field.match(/brokerTime$|BrokerTime$|timeframe$/)) { packet[field] = new Date(value); } if (Array.isArray(value)) { for (let item of value) { this._convertIsoTimeToDate(item); } } if (typeof value === 'object') { this._convertIsoTimeToDate(value); } } if (packet && packet.timestamps) { // eslint-disable-next-line guard-for-in for (let field in packet.timestamps) { packet.timestamps[field] = new Date(packet.timestamps[field]); } } if (packet && packet.type === 'prices') { for (let price of packet.prices || []) { if (price.timestamps) { // eslint-disable-next-line guard-for-in for (let field in price.timestamps) { price.timestamps[field] = new Date(price.timestamps[field]); } } } } } /** * MetaTrader symbol price. Contains current price for a symbol (see * https://metaapi.cloud/docs/client/models/metatraderSymbolPrice/) * @typedef {Object} MetatraderSymbolPrice * @property {String} symbol symbol (e.g. a currency pair or an index) * @property {Number} bid bid price * @property {Number} ask ask price * @property {Number} profitTickValue tick value for a profitable position * @property {Number} lossTickValue tick value for a losing position * @property {Number} [accountCurrencyExchangeRate] current exchange rate of account currency into account base * currency (USD if you did not override it) * @property {Date} time quote time, in ISO format * @property {String} brokerTime time quote time, in broker timezone, YYYY-MM-DD HH:mm:ss.SSS format */ /** * MetaTrader candle * @typedef {Object} MetatraderCandle * @property {string} symbol symbol (e.g. currency pair or an index) * @property {string} timeframe timeframe candle was generated for, e.g. 1h. One of 1m, 2m, 3m, 4m, 5m, 6m, 10m, 12m, * 15m, 20m, 30m, 1h, 2h, 3h, 4h, 6h, 8h, 12h, 1d, 1w, 1mn * @property {Date} time candle opening time * @property {string} brokerTime candle opening time, in broker timezone, YYYY-MM-DD HH:mm:ss.SSS format * @property {number} open open price * @property {number} high high price * @property {number} low low price * @property {number} close close price * @property {number} tickVolume tick volume, i.e. number of ticks inside the candle * @property {number} spread spread in points * @property {number} volume trade volume */ /** * MetaTrader tick data * @typedef {Object} MetatraderTick * @property {string} symbol symbol (e.g. a currency pair or an index) * @property {Date} time time * @property {string} brokerTime time, in broker timezone, YYYY-MM-DD HH:mm:ss.SSS format * @property {number} [bid] bid price * @property {number} [ask] ask price * @property {number} [last] last deal price * @property {number} [volume] volume for the current last deal price * @property {string} side is tick a result of buy or sell deal, one of buy or sell */ /** * MetaTrader order book * @typedef {Object} MetatraderBook * @property {string} symbol symbol (e.g. a currency pair or an index) * @property {Date} time time * @property {string} brokerTime time, in broker timezone, YYYY-MM-DD HH:mm:ss.SSS format * @property {Array} book list of order book entries */ /** * MetaTrader order book entry * @typedef {Object} MetatraderBookEntry * @property {string} type entry type, one of BOOK_TYPE_SELL, BOOK_TYPE_BUY, BOOK_TYPE_SELL_MARKET, * BOOK_TYPE_BUY_MARKET * @property {number} price price * @property {number} volume volume */ // eslint-disable-next-line complexity,max-statements private async _processSynchronizationPacket(socket: MetaApiWebsocketClient.SocketInstance, data) { try { const instanceNumber = data.instanceIndex || 0; const socketInstance = this._getSocketInstanceByAccount(data.accountId, instanceNumber); if (data.synchronizationId && socketInstance) { socketInstance.synchronizationThrottler.updateSynchronizationId(data.synchronizationId); } const region = this.getAccountRegion(data.accountId); const primaryAccountId = this._accountsByReplicaId[data.accountId]; let instanceId = primaryAccountId + ':' + region + ':' + instanceNumber + ':' + (data.host || 0); let instanceIndex = region + ':' + instanceNumber + ':' + (data.host || 0); let sourceLabel = _.compact([instanceIndex, socket.clientApiHostname]).join(':'); const isOnlyActiveInstance = () => { const activeInstanceIds = Object.keys(this._connectedHosts).filter(instance => instance.startsWith(primaryAccountId + ':' + region + ':' + instanceNumber)); return !activeInstanceIds.length || activeInstanceIds.length === 1 && activeInstanceIds[0] === instanceId; }; const cancelDisconnectTimer = () => { if (this._statusTimers[instanceId]) { clearTimeout(this._statusTimers[instanceId]); } }; const resetDisconnectTimer = () => { cancelDisconnectTimer(); this._statusTimers[instanceId] = setTimeout(() => { this._logger.warn(`${data.accountId}:${sourceLabel}: timed out waiting for connection status`); if (isOnlyActiveInstance()) { this._subscriptionManager.onTimeout(data.accountId, 0); this._subscriptionManager.onTimeout(data.accountId, 1); } this.queueEvent(primaryAccountId, `${instanceIndex}:onDisconnected`, () => onDisconnected(true)); clearTimeout(this._statusTimers[instanceId]); }, 60000); }; // eslint-disable-next-line complexity const onDisconnected = async (isTimeout = false) => { if (this._connectedHosts[instanceId]) { this._latencyService.onDisconnected(instanceId); if (isOnlyActiveInstance()) { for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onDisconnected(instanceIndex), `${primaryAccountId}:${sourceLabel}:onDisconnected`); } } this._packetOrderer.onStreamClosed(instanceId); socketInstance?.synchronizationThrottler.removeIdByParameters(data.accountId, instanceNumber, data.host); for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onStreamClosed(instanceIndex), `${primaryAccountId}:${sourceLabel}:onStreamClosed`); } delete this._connectedHosts[instanceId]; if (isOnlyActiveInstance() && !isTimeout) { await this._subscriptionManager.onDisconnected(data.accountId, 0); await this._subscriptionManager.onDisconnected(data.accountId, 1); } } }; if (data.type === 'authenticated') { resetDisconnectTimer(); if (!data.sessionId || socketInstance && data.sessionId === socketInstance.sessionId) { this._latencyService.onConnected(instanceId); this._connectedHosts[instanceId] = data.host; for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onConnected(instanceIndex, data.replicas), `${primaryAccountId}:${sourceLabel}:onConnected`); } this._subscriptionManager.cancelSubscribe(data.accountId + ':' + instanceNumber); if (data.replicas === 1) { this._subscriptionManager.cancelAccount(data.accountId); } else { this._subscriptionManager.cancelSubscribe(data.accountId + ':' + instanceNumber); } } } else if (data.type === 'disconnected') { cancelDisconnectTimer(); await onDisconnected(); } else if (data.type === 'synchronizationStarted') { this._updateEvents[instanceId] = []; this._synchronizationFlags[data.synchronizationId] = { accountId: data.accountId, instanceNumber, specificationsUpdated: data.specificationsHashIndex === undefined, positionsUpdated: data.positionsHashIndex === undefined, ordersUpdated: data.ordersHashIndex === undefined }; this._synchronizationIdByInstance[instanceId] = data.synchronizationId; const specificationsHash = (data.specificationsHashIndex !== undefined) ? this._synchronizationHashes[data.synchronizationId] && this._synchronizationHashes[data.synchronizationId] .specificationsHashes[data.specificationsHashIndex] : undefined; const positionsHash = (data.positionsHashIndex !== undefined) ? this._synchronizationHashes[data.synchronizationId] && this._synchronizationHashes[data.synchronizationId].positionsHashes[data.positionsHashIndex] : undefined; const ordersHash = (data.ordersHashIndex !== undefined) ? this._synchronizationHashes[data.synchronizationId] && this._synchronizationHashes[data.synchronizationId].ordersHashes[data.ordersHashIndex] : undefined; delete this._synchronizationHashes[data.synchronizationId]; for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onSynchronizationStarted(instanceIndex, specificationsHash, positionsHash, ordersHash, data.synchronizationId), `${primaryAccountId}:${sourceLabel}:onSynchronizationStarted`); } } else if (data.type === 'accountInformation') { if (data.synchronizationId && data.synchronizationId !== this._synchronizationIdByInstance[instanceId]) { return; } if (data.accountInformation) { for (let listener of this._synchronizationListeners[primaryAccountId] || []) { try { await this._processEvent( () => listener.onAccountInformationUpdated(instanceIndex, data.accountInformation), `${primaryAccountId}:${sourceLabel}:onAccountInformationUpdated`, true); // eslint-disable-next-line max-depth if (this._synchronizationFlags[data.synchronizationId] && !this._synchronizationFlags[data.synchronizationId].positionsUpdated) { await this._processEvent( () => listener.onPositionsSynchronized(instanceIndex, data.synchronizationId), `${primaryAccountId}:${sourceLabel}:onPositionsSynchronized`, true); // eslint-disable-next-line max-depth if (!this._synchronizationFlags[data.synchronizationId].ordersUpdated) { await this._processEvent( () => listener.onPendingOrdersSynchronized(instanceIndex, data.synchronizationId), `${primaryAccountId}:${sourceLabel}:onPendingOrdersSynchronized`, true); } } } catch (err) { this._logger.error(`${primaryAccountId}:${sourceLabel}: Failed to notify listener ` + 'about accountInformation event', err); } } if (this._synchronizationFlags[data.synchronizationId] && !this._synchronizationFlags[data.synchronizationId].positionsUpdated && !this._synchronizationFlags[data.synchronizationId].ordersUpdated) { delete this._synchronizationFlags[data.synchronizationId]; } } } else if (data.type === 'deals') { if (data.synchronizationId && data.synchronizationId !== this._synchronizationIdByInstance[instanceId]) { return; } for (let deal of (data.deals || [])) { for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onDealAdded(instanceIndex, deal), `${primaryAccountId}:${sourceLabel}:onDealAdded`); } } } else if (data.type === 'orders') { if (data.synchronizationId && data.synchronizationId !== this._synchronizationIdByInstance[instanceId]) { return; } for (let listener of this._synchronizationListeners[primaryAccountId] || []) { try { await this._processEvent( () => listener.onPendingOrdersReplaced(instanceIndex, data.orders || []), `${primaryAccountId}:${sourceLabel}:onPendingOrdersReplaced`, true); await this._processEvent( () => listener.onPendingOrdersSynchronized(instanceIndex, data.synchronizationId), `${primaryAccountId}:${sourceLabel}:onPendingOrdersSynchronized`, true); } catch (err) { this._logger.error(`${primaryAccountId}:${sourceLabel}: Failed to notify listener ` + 'about orders event', err); } } if (this._synchronizationFlags[data.synchronizationId]) { delete this._synchronizationFlags[data.synchronizationId]; } } else if (data.type === 'historyOrders') { if (data.synchronizationId && data.synchronizationId !== this._synchronizationIdByInstance[instanceId]) { return; } for (let historyOrder of (data.historyOrders || [])) { for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onHistoryOrderAdded(instanceIndex, historyOrder), `${primaryAccountId}:${sourceLabel}:onHistoryOrderAdded`); } } } else if (data.type === 'positions') { if (data.synchronizationId && data.synchronizationId !== this._synchronizationIdByInstance[instanceId]) { return; } for (let listener of this._synchronizationListeners[primaryAccountId] || []) { try { await this._processEvent( () => listener.onPositionsReplaced(instanceIndex, data.positions || []), `${primaryAccountId}:${sourceLabel}:onPositionsReplaced`, true); await this._processEvent( () => listener.onPositionsSynchronized(instanceIndex, data.synchronizationId), `${primaryAccountId}:${sourceLabel}:onPositionsSynchronized`, true); if (this._synchronizationFlags[data.synchronizationId] && !this._synchronizationFlags[data.synchronizationId].ordersUpdated) { await this._processEvent( () => listener.onPendingOrdersSynchronized(instanceIndex, data.synchronizationId), `${primaryAccountId}:${sourceLabel}:onPendingOrdersSynchronized`, true); } } catch (err) { this._logger.error(`${primaryAccountId}:${sourceLabel}: Failed to notify listener ` + 'about positions event', err); } } if (this._synchronizationFlags[data.synchronizationId] && !this._synchronizationFlags[data.synchronizationId].ordersUpdated) { delete this._synchronizationFlags[data.synchronizationId]; } } else if (data.type === 'update') { this._updateEvents[instanceId]?.push({socket, data}); if (data.accountInformation) { for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onAccountInformationUpdated(instanceIndex, data.accountInformation), `${primaryAccountId}:${sourceLabel}:onAccountInformationUpdated`); } } const updatedPositions = data.updatedPositions || []; const removedPositionIds = data.removedPositionIds || []; if (updatedPositions.length || removedPositionIds.length) { for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onPositionsUpdated(instanceIndex, updatedPositions, removedPositionIds), `${primaryAccountId}:${sourceLabel}:onPositionsUpdated`); } } for (let position of updatedPositions) { for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onPositionUpdated(instanceIndex, position), `${primaryAccountId}:${sourceLabel}:onPositionUpdated`); } } for (let positionId of removedPositionIds) { for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onPositionRemoved(instanceIndex, positionId), `${primaryAccountId}:${sourceLabel}:onPositionRemoved`); } } const updatedOrders = data.updatedOrders || []; const completedOrderIds = data.completedOrderIds || []; if (updatedOrders.length || completedOrderIds.length) { for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onPendingOrdersUpdated(instanceIndex, updatedOrders, completedOrderIds), `${primaryAccountId}:${sourceLabel}:onPendingOrdersUpdated`); } } for (let order of updatedOrders) { for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onPendingOrderUpdated(instanceIndex, order), `${primaryAccountId}:${sourceLabel}:onPendingOrderUpdated`); } } for (let orderId of completedOrderIds) { for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onPendingOrderCompleted(instanceIndex, orderId), `${primaryAccountId}:${sourceLabel}:onPendingOrderCompleted`); } } for (let historyOrder of (data.historyOrders || [])) { for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onHistoryOrderAdded(instanceIndex, historyOrder), `${primaryAccountId}:${sourceLabel}:onHistoryOrderAdded`); } } for (let deal of (data.deals || [])) { for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onDealAdded(instanceIndex, deal), `${primaryAccountId}:${sourceLabel}:onDealAdded`); } } if (data.timestamps) { data.timestamps.clientProcessingFinished = new Date(); // eslint-disable-next-line max-depth for (let listener of this._latencyListeners || []) { await this._processEvent( () => listener.onUpdate(data.accountId, data.timestamps), `${primaryAccountId}:${sourceLabel}:onUpdate`); } } } else if (data.type === 'dealSynchronizationFinished') { if (data.synchronizationId && data.synchronizationId !== this._synchronizationIdByInstance[instanceId]) { delete this._synchronizationIdByInstance[instanceId]; return; } this._latencyService.onDealsSynchronized(instanceId); for (let listener of this._synchronizationListeners[primaryAccountId] || []) { socketInstance?.synchronizationThrottler.removeSynchronizationId(data.synchronizationId); await this._processEvent( () => listener.onDealsSynchronized(instanceIndex, data.synchronizationId), `${primaryAccountId}:${sourceLabel}:onDealsSynchronized`); } if (this._updateEvents[instanceId]) { let eventCalls = this._updateEvents[instanceId].map(event => ( () => this._processSynchronizationPacket(event.socket, event.data) )); if (this._eventQueues[primaryAccountId]) { this._eventQueues[primaryAccountId] = eventCalls.concat(this._eventQueues[primaryAccountId]); delete this._updateEvents[instanceId]; } else { this._eventQueues[primaryAccountId] = eventCalls; delete this._updateEvents[instanceId]; this._callAccountEvents(primaryAccountId); } } } else if (data.type === 'orderSynchronizationFinished') { if (data.synchronizationId && data.synchronizationId !== this._synchronizationIdByInstance[instanceId]) { return; } for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onHistoryOrdersSynchronized(instanceIndex, data.synchronizationId), `${primaryAccountId}:${sourceLabel}:onHistoryOrdersSynchronized`); } } else if (data.type === 'status') { if (!this._connectedHosts[instanceId]) { if (this._statusTimers[instanceId] && data.authenticated && (this._subscriptionManager.isDisconnectedRetryMode(data.accountId, instanceNumber) || !this._subscriptionManager.isAccountSubscribing(data.accountId, instanceNumber))) { this._subscriptionManager.cancelSubscribe(data.accountId + ':' + instanceNumber); await new Promise(res => setTimeout(res, 10)); // eslint-disable-next-line no-console this._logger.info( 'it seems like we are not connected to a running API server yet, retrying subscription for account' + _.compact([instanceId, socket.clientApiHostname]).join(':') ); this.ensureSubscribe(data.accountId, instanceNumber); } } else { resetDisconnectTimer(); for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onBrokerConnectionStatusChanged(instanceIndex, !!data.connected), `${primaryAccountId}:${sourceLabel}:onBrokerConnectionStatusChanged`); } if (data.healthStatus) { // eslint-disable-next-line max-depth for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onHealthStatus(instanceIndex, data.healthStatus), `${primaryAccountId}:${sourceLabel}:onHealthStatus`); } } } } else if (data.type === 'downgradeSubscription') { this._logger.info(`${primaryAccountId}:${sourceLabel}: Market data subscriptions for symbol ` + `${data.symbol} were downgraded by the server due to rate limits. Updated subscriptions: ` + `${JSON.stringify(data.updates)}, removed subscriptions: ${JSON.stringify(data.unsubscriptions)}. ` + 'Please read https://metaapi.cloud/docs/client/rateLimiting/ for more details.'); for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onSubscriptionDowngraded(instanceIndex, data.symbol, data.updates, data.unsubscriptions), `${primaryAccountId}:${sourceLabel}:onSubscriptionDowngraded`); } } else if (data.type === 'specifications') { if (data.synchronizationId && data.synchronizationId !== this._synchronizationIdByInstance[instanceId]) { return; } for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onSymbolSpecificationsUpdated(instanceIndex, data.specifications || [], data.removedSymbols || []), `${primaryAccountId}:${sourceLabel}:onSymbolSpecificationsUpdated`); } for (let specification of (data.specifications || [])) { for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onSymbolSpecificationUpdated(instanceIndex, specification), `${primaryAccountId}:${sourceLabel}:onSymbolSpecificationUpdated`); } } for (let removedSymbol of (data.removedSymbols || [])) { for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onSymbolSpecificationRemoved(instanceIndex, removedSymbol), `${primaryAccountId}:${sourceLabel}:onSymbolSpecificationRemoved`); } } } else if (data.type === 'prices') { if (data.synchronizationId && data.synchronizationId !== this._synchronizationIdByInstance[instanceId]) { return; } let prices = data.prices || []; let candles = data.candles || []; let ticks = data.ticks || []; let books = data.books || []; for (let listener of this._synchronizationListeners[primaryAccountId] || []) { if (prices.length) { await this._processEvent( () => listener.onSymbolPricesUpdated(instanceIndex, prices, data.equity, data.margin, data.freeMargin, data.marginLevel, data.accountCurrencyExchangeRate), `${primaryAccountId}:${sourceLabel}:onSymbolPricesUpdated`); } if (candles.length) { await this._processEvent( () => listener.onCandlesUpdated(instanceIndex, candles, data.equity, data.margin, data.freeMargin, data.marginLevel, data.accountCurrencyExchangeRate), `${primaryAccountId}:${sourceLabel}:onCandlesUpdated`); } if (ticks.length) { await this._processEvent( () => listener.onTicksUpdated(instanceIndex, ticks, data.equity, data.margin, data.freeMargin, data.marginLevel, data.accountCurrencyExchangeRate), `${primaryAccountId}:${sourceLabel}:onTicksUpdated`); } if (books.length) { await this._processEvent( () => listener.onBooksUpdated(instanceIndex, books, data.equity, data.margin, data.freeMargin, data.marginLevel, data.accountCurrencyExchangeRate), `${primaryAccountId}:${sourceLabel}:onBooksUpdated`); } } for (let price of prices) { for (let listener of this._synchronizationListeners[primaryAccountId] || []) { await this._processEvent( () => listener.onSymbolPriceUpdated(instanceIndex, price), `${primaryAccountId}:${sourceLabel}:onSymbolPriceUpdated`); } } for (let price of prices) { if (price.timestamps) { price.timestamps.clientProcessingFinished = new Date(); // eslint-disable-next-line max-depth for (let listener of this._latencyListeners || []) { await this._processEvent( () => listener.onSymbolPrice(data.accountId, price.symbol, price.timestamps), `${primaryAccountId}:${sourceLabel}:onSymbolPrice`); } } } } } catch (err) { this._logger.error('Failed to process incoming synchronization packet', err); } } async _processEvent(callable, label, throwError?) { const startTime = Date.now(); let isLongEvent = false; let isEventDone = false; const checkLongEvent = async () => { await new Promise(res => setTimeout(res, 1000)); if (!isEventDone) { isLongEvent = true; this._logger.warn(`${label}: event is taking more than 1 second to process`); } }; checkLongEvent(); try { await callable(); } catch (err) { if (throwError) { throw err; } this._logger.error(`${label}: event failed with error`, err); } isEventDone = true; if (isLongEvent) { this._logger.warn(`${label}: finished in ${Math.floor((Date.now() - startTime) / 1000)} seconds`); } } private _fireReconnected(instanceNumber, socketInstanceIndex, region) { try { const reconnectListeners = []; for (let listener of this._reconnectListeners) { if (this._socketInstancesByAccounts[instanceNumber][listener.accountId] === socketInstanceIndex && this.getAccountRegion(listener.accountId) === region) { reconnectListeners.push(listener); } } Object.keys(this._synchronizationFlags).forEach(synchronizationId => { const accountId = this._synchronizationFlags[synchronizationId].accountId; if (this._socketInstancesByAccounts[instanceNumber][accountId] === socketInstanceIndex && this._synchronizationFlags[synchronizationId].instanceNumber === instanceNumber && this._regionsByAccounts[accountId] && this._regionsByAccounts[accountId].region === region) { delete this._synchronizationFlags[synchronizationId]; } }); const reconnectAccountIds = reconnectListeners.map(listener => listener.accountId); this._subscriptionManager.onReconnected(instanceNumber, socketInstanceIndex, reconnectAccountIds); this._packetOrderer.onReconnected(reconnectAccountIds); for (let listener of reconnectListeners) { Promise.resolve(listener.listener.onReconnected(region, instanceNumber)) .catch(err => this._logger.error('Failed to notify reconnect listener', err)); } } catch (err) { this._logger.error('Failed to process reconnected event', err); } } _getSocketInstanceByAccount(accountId, instanceNumber) { const region = this.getAccountRegion(accountId); return this._socketInstances[region][instanceNumber][this._socketInstancesByAccounts[instanceNumber][accountId]]; } async getUrlSettings(instanceNumber, region) { if (this._url) { return {url: this._url, isSharedClientApi: true}; } const urlSettings = await this._domainClient.getSettings(); const getUrl = (hostname) => `https://${hostname}.${region}-${String.fromCharCode(97 + Number(instanceNumber))}.${urlSettings.domain}`; let url; if (this._useSharedClientApi) { url = getUrl(this._hostname); } else { url = getUrl(urlSettings.hostname); } const isSharedClientApi = url === getUrl(this._hostname); return {url, isSharedClientApi}; } // eslint-disable-next-line complexity private async _getServerUrl(instanceNumber: number, socketInstanceIndex, region) { if (this._url) { return this._url; } while (this.socketInstances[region][instanceNumber][socketInstanceIndex].connected) { try { const urlSettings = await this.getUrlSettings(instanceNumber, region); const url = urlSettings.url; const isSharedClientApi = urlSettings.isSharedClientApi; let logMessage = 'Connecting MetaApi websocket client to the MetaApi server ' + `via ${url} ${isSharedClientApi ? 'shared' : 'dedicated'} server.`; if (this._firstConnect && !isSharedClientApi) { logMessage += ' Please note that it can take up to 3 minutes for your dedicated server to start for the ' + 'first time. During this time it is OK if you see some connection errors.'; this._firstConnect = false; } this._logger.info(logMessage); return url; } catch (err) { this._logger.error('Failed to retrieve server URL', err); await new Promise(res => setTimeout(res, 1000)); } } } _throttleRequest(type, accountId, instanceNumber, timeInMs) { this._lastRequestsTime[instanceNumber] = this._lastRequestsTime[instanceNumber] || {}; this._lastRequestsTime[instanceNumber][type] = this._lastRequestsTime[instanceNumber][type] || {}; let lastTime = this._lastRequestsTime[instanceNumber][type][accountId]; if (!lastTime || (lastTime < Date.now() - timeInMs)) { this._lastRequestsTime[instanceNumber][type][accountId] = Date.now(); return !!lastTime; } return false; } _refreshAccountRegion(accountId) { if (this._regionsByAccounts[accountId]) { this._regionsByAccounts[accountId].lastUsed = Date.now(); } } //eslint-disable-next-line complexity async _createSocketInstanceByAccount(accountId, instanceNumber) { const region = this.getAccountRegion(accountId); if (this._socketInstancesByAccounts[instanceNumber][accountId] === undefined) { let socketInstanceIndex = null; while (this._subscribeLock && ((new Date(this._subscribeLock.recommendedRetryTime).getTime() > Date.now() && this.subscribedAccountIds(instanceNumber, undefined, region).length < this._subscribeLock.lockedAtAccounts) || (new Date(this._subscribeLock.lockedAtTime).getTime() + this._subscribeCooldownInSeconds * 1000 > Date.now() && this.subscribedAccountIds(instanceNumber, undefined, region).length >= this._subscribeLock.lockedAtAccounts))) { await new Promise(res => setTimeout(res, 1000)); } for (let index = 0; index < this._socketInstances[region][instanceNumber].length; index++) { const accountCounter = this._getAssignedAccounts(instanceNumber, index, region).length; const instance = this.socketInstances[region][instanceNumber][index]; if (instance.subscribeLock) { if (instance.subscribeLock.type === 'LIMIT_ACCOUNT_SUBSCRIPTIONS_PER_USER_PER_SERVER' && (new Date(instance.subscribeLock.recommendedRetryTime).getTime() > Date.now() || this.subscribedAccountIds(instanceNumber, index, region).length >= instance.subscribeLock.lockedAtAccounts)) { continue; } if (instance.subscribeLock.type === 'LIMIT_ACCOUNT_SUBSCRIPTIONS_PER_SERVER' && new Date(instance.subscribeLock.recommendedRetryTime).getTime() > Date.now() && this.subscribedAccountIds(instanceNumber, index, region).length >= instance.subscribeLock.lockedAtAccounts) { continue; } } if (accountCounter < this._maxAccountsPerInstance) { socketInstanceIndex = index; break; } } if (socketInstanceIndex === null) { socketInstanceIndex = this._socketInstances[region][instanceNumber].length; await this.connect(instanceNumber, region); } this._socketInstancesByAccounts[instanceNumber][accountId] = socketInstanceIndex; } } private _createSocket(url: string, clientId: number, instance: MetaApiWebsocketClient.SocketInstance) { let label = `${instance.region}:${instance.instanceNumber}`; let socket = new ClientStickySocket(url, { connection: { path: '/ws', timeout: this._connectTimeout, extraHeaders: {'Client-Id': clientId.toString()}, query: { 'auth-token': this._token, clientId: clientId, protocol: 3 } }, label, reconnectionDelayInMs: 1000, reconnectionDelayMaxInMs: 5000, useNativeSocketIoServer: this._useNativeSocketIoServer }); socket.on('connect', () => { this._logger.info(`${label}: MetaApi websocket client connected to the MetaApi server`); instance.reconnectWaitTime = this._socketMinimumReconnectTimeout; instance.isReconnecting = false; if (!instance.resolved) { instance.resolved = true; instance.connectResult.resolve(); } else { this._fireReconnected(instance.instanceNumber, instance.id, instance.region); } if (!instance.connected) { instance.socket.disconnect(); } }); socket.on('disconnect', err => { instance.synchronizationThrottler.onDisconnect(); err ? this._logger.error(`${label}: MetaApi websocket client closed`, err) : this._logger.info(`${label}: MetaApi websocket client closed`); instance.isReconnecting = false; this._reconnect(instance.instanceNumber, instance.id, instance.region); }); socket.on('response', data => { if (typeof data === 'string') { data = JSON.parse(data); } this._logger.debug(() => `${data.accountId}: Response received: ${JSON.stringify({ requestId: data.requestId, timestamps: data.timestamps})}`); let requestResolve = (instance.requestResolves[data.requestId] || {resolve: () => {}, reject: () => {}}); delete instance.requestResolves[data.requestId]; this._convertIsoTimeToDate(data); requestResolve.resolve(data); if (data.timestamps && requestResolve.type) { data.timestamps.clientProcessingFinished = new Date(); for (let listener of this._latencyListeners) { Promise.resolve() .then(() => requestResolve.type === 'trade' ? listener.onTrade(data.accountId, data.timestamps) : listener.onResponse(data.accountId, requestResolve.type, data.timestamps)) .catch(error => this._logger.error('Failed to process onResponse event for account ' + data.accountId + ', request type ' + requestResolve.type, error)); } } }); socket.on('processingError', data => { let requestResolve = (instance.requestResolves[data.requestId] || {resolve: () => {}, reject: () => {}}); delete instance.requestResolves[data.requestId]; requestResolve.reject(this._convertError(data)); }); // eslint-disable-next-line complexity socket.on('synchronization', async data => { if (typeof data === 'string') { data = JSON.parse(data); } if (data.instanceIndex && data.instanceIndex !== instance.instanceNumber) { this._logger.trace(() => `${data.accountId}:${data.instanceNumber}: received packet with wrong instance ` + `index via a socket with instance number of ${instance.instanceNumber}, data=${JSON.stringify({ type: data.type, sequenceNumber: data.sequenceNumber, sequenceTimestamp: data.sequenceTimestamp, synchronizationId: data.synchronizationId, application: data.application, host: data.host, specificationsUpdated: data.specificationsUpdated, positionsUpdated: data.positionsUpdated, ordersUpdated: data.ordersUpdated, specifications: data.specifications ? (data.specifications || []).length : undefined})}`); return; } this._regionsByAccounts[data.accountId] ||= {region: instance.region, connections: 0, lastUsed: Date.now()}; this._logger.trace(() => `${data.accountId}:${data.instanceIndex}: Sync packet received: ${JSON.stringify({ type: data.type, sequenceNumber: data.sequenceNumber, sequenceTimestamp: data.sequenceTimestamp, synchronizationId: data.synchronizationId, application: data.application, host: data.host, specificationsUpdated: data.specificationsUpdated, positionsUpdated: data.positionsUpdated, ordersUpdated: data.ordersUpdated, specifications: data.specifications ? (data.specifications || []).length : undefined})}, ` + `active listeners: ${(this._synchronizationListeners[data.accountId] || []).length}`); let activeSynchronizationIds = instance.synchronizationThrottler.activeSynchronizationIds; if (!data.synchronizationId || activeSynchronizationIds.includes(data.synchronizationId)) { if (this._packetLogger) { await this._packetLogger.logPacket(data); } const ignoredPacketTypes = ['disconnected', 'status', 'keepalive']; if (!this._subscriptionManager.isSubscriptionActive(data.accountId) && !ignoredPacketTypes.includes(data.type)) { this._logger.debug(`${data.accountId}: Packet arrived to inactive connection, attempting` + ` unsubscribe, packet: ${data.type}`); if (this._throttleRequest('unsubscribe', data.accountId, data.instanceIndex, this._unsubscribeThrottlingInterval)) { this.unsubscribe(data.accountId).catch(err => { this._logger.warn(`${data.accountId}:${data.instanceIndex || 0}: failed to unsubscribe`, err); }); } return; } this._convertIsoTimeToDate(data); } else { data.type = 'noop'; } this.queuePacket(instance, data); }); socket.on('metadata', (data: MetadataPacket) => { instance.clientApiHostname = data.clientApiHostname; }); return socket; } private _clearAccountCacheJob() { const date = Date.now(); Object.keys(this._regionsByAccounts).forEach(replicaId => { const data = this._regionsByAccounts[replicaId]; if (data && data.connections === 0 && date - data.lastUsed > 2 * 60 * 60 * 1000) { const primaryAccountId = this._accountsByReplicaId[replicaId]; const replicas = Object.values(this._accountReplicas[primaryAccountId] || {}); replicas.forEach(replica => { delete this._accountsByReplicaId[replica]; delete this._regionsByAccounts[replica]; }); delete this._accountReplicas[primaryAccountId]; this._logger.debug(`${primaryAccountId}: removed expired account replicas data`); } }); } private _clearInactiveSyncDataJob() { const date = Date.now(); Object.keys(this._synchronizationHashes).forEach(synchronizationId => { if (this._synchronizationHashes[synchronizationId].lastUpdated < date - 30 * 60 * 1000) { delete this._synchronizationHashes[synchronizationId]; } }); } } namespace MetaApiWebsocketClient { /** Options */ export type Options = { /** Region to connect */ region?: string; /** Whether to not run internal interval jobs. Used for tests only */ disableInternalJobs?: boolean; /** Packet ordering timeout in seconds. Defaults to `60` */ packetOrderingTimeout?: number; /** Options for synchronization throttler */ synchronizationThrottler?: SynchronizationThrottlerOpts; /** Application ID. Defaults to `MetaApi` */ application?: string; /** Domain to connect to. Defaults to `agiliumtrade.agiliumtrade.ai` */ domain?: string; /** Timeout for socket requests in seconds. Defaults to `60` */ requestTimeout?: number; /** Timeout for connecting to server in seconds. Defaults to `60` */ connectTimeout?: number; /** Minimum reconnect timeout. Defaults to `500` */ minReconnectTimeoutInMs?: number; /** Options for request retries */ retryOpts?: RetryOpts & { /** Subscribe cooldown in seconds. Defaults to `600` */ subscribeCooldownInSeconds?: number; }; /** Option to use a shared server */ useSharedClientApi?: boolean; /** * A timeout in seconds for throttling repeat unsubscribe requests when synchronization packets still arrive after * unsubscription. Defaults to 10 seconds */ unsubscribeThrottlingIntervalInSeconds?: number; /** Packet logger options */ packetLogger?: PacketLoggerOpts; } & Pick; /** Account replicas by region (including primary replica) by primary ID */ export type AccountReplicas = {[primaryId: string]: MetatraderAccount.AccountsByRegion}; /** * Account IDs by replica * - keys are replica or primary account IDs * - values are primary account IDs */ export type AccountsByReplica = {[replicaId: string]: string}; /** * Socket instance * @todo describe fields */ export type SocketInstance = { id: number; reconnectWaitTime: number; connected: boolean; requestResolves: any; resolved: boolean; connectResult: helpers.HandlePromise; sessionId: string; isReconnecting: boolean; socket?: ClientStickySocket; /** The final client API hostname the SDK has connected to behind load balancer */ clientApiHostname?: string; synchronizationThrottler: SynchronizationThrottler; subscribeLock?: any; instanceNumber: number; /** Region */ region: string; }; } export default MetaApiWebsocketClient; /** * Regions by accounts * - keys are replica or primary account ID */ type RegionsByAccounts = {[accountId: string]: { region: string; connections: number; lastUsed: number; }}; type SynchronizationFlag = { accountId: string, instanceNumber: number; specificationsUpdated: boolean; positionsUpdated: boolean; ordersUpdated: boolean; }; type SocketInstances = {[region: string]: {[instanceNumber: string | number]: MetaApiWebsocketClient.SocketInstance[]}}; type Packet = { socket: MetaApiWebsocketClient.SocketInstance; data: any; }; type RestoredOrderPacket = PacketOrderer.Packet & { socket: MetaApiWebsocketClient.SocketInstance; packet: any; }; type MetadataPacket = { clientApiHostname: string; };