/** * @module node-opcua-client-private */ import chalk from "chalk"; import { assert } from "node-opcua-assert"; import { getMinOPCUADate } from "node-opcua-date-time"; import { checkDebugFlag, make_debugLog, make_warningLog } from "node-opcua-debug"; import { PublishRequest, type PublishResponse } from "node-opcua-service-subscription"; import type { ClientSession, SubscriptionId } from "../client_session"; import type { ClientSubscription } from "../client_subscription"; import type { ClientSessionImpl } from "../private/client_session_impl"; import type { ClientSubscriptionImpl } from "./client_subscription_impl"; const debugLog = make_debugLog(__filename); const doDebug = checkDebugFlag(__filename); const warningLog = make_warningLog(__filename); /** * A client side implementation to deal with publish service. * * @class ClientSidePublishEngine * The ClientSidePublishEngine encapsulates the mechanism to * deal with a OPCUA Server and constantly sending PublishRequest * The ClientSidePublishEngine also performs notification acknowledgements. * Finally, ClientSidePublishEngine dispatch PublishResponse to the correct * Subscription id callback * * @private */ export class ClientSidePublishEngine { public static publishRequestCountInPipeline = 5; public timeoutHint: number; public activeSubscriptionCount: number; public nbPendingPublishRequests: number; public nbMaxPublishRequestsAcceptedByServer: number; public isSuspended: boolean; public session: ClientSession | null; private subscriptionAcknowledgements: any[]; /** * @internal * @private */ readonly subscriptionMap: { [key: number]: ClientSubscriptionImpl }; public lastRequestSentTime: Date = getMinOPCUADate(); constructor(session: ClientSession) { this.session = session; this.subscriptionAcknowledgements = []; this.subscriptionMap = {}; this.timeoutHint = 10000; // 10 s by default this.activeSubscriptionCount = 0; // number of pending Publish request sent to the server and awaited for being processed by the server this.nbPendingPublishRequests = 0; // the maximum number of publish requests we think that the server can queue. // we will adjust this value . this.nbMaxPublishRequestsAcceptedByServer = 1000; this.isSuspended = false; assert(this.session, "Session must exist"); } /** * the number of active subscriptions managed by this publish engine. * @property subscriptionCount * @type {Number} */ get subscriptionCount(): number { return Object.keys(this.subscriptionMap).length; } public suspend(suspendedState: boolean): void { if (this.isSuspended === suspendedState) { // nothing to do ... return; } this.isSuspended = suspendedState; if (!this.isSuspended) { this.replenish_publish_request_queue(); } } public acknowledge_notification(subscriptionId: SubscriptionId, sequenceNumber: number): void { this.subscriptionAcknowledgements.push({ subscriptionId, sequenceNumber }); } public cleanup_acknowledgment_for_subscription(subscriptionId: SubscriptionId): void { this.subscriptionAcknowledgements = this.subscriptionAcknowledgements.filter((a) => a.subscriptionId !== subscriptionId); } /** * @private */ public send_publish_request(): void { if (this.isSuspended) { return; } if (this.nbPendingPublishRequests >= this.nbMaxPublishRequestsAcceptedByServer) { return; } const session = this.session as ClientSessionImpl; if (session && !session.isChannelValid()) { // wait for channel to be valid setTimeout(() => { if (this.subscriptionCount) { this.send_publish_request(); } }, 100); } else { setImmediate(() => { if (!this.session || this.isSuspended) { // session has been terminated or suspended return; } this.internalSendPublishRequest(); }); } } /** * @private */ public terminate(): void { debugLog("Terminated ClientPublishEngine "); this.session = null; } /** * @private */ public registerSubscription(subscription: ClientSubscription): void { debugLog("ClientSidePublishEngine#registerSubscription ", subscription.subscriptionId); const _subscription = subscription as ClientSubscriptionImpl; assert(arguments.length === 1); assert(isFinite(subscription.subscriptionId)); assert(!Object.hasOwn(this.subscriptionMap, subscription.subscriptionId)); // already registered ? assert(typeof _subscription.onNotificationMessage === "function"); assert(isFinite(subscription.timeoutHint)); this.activeSubscriptionCount += 1; this.subscriptionMap[subscription.subscriptionId] = _subscription; this.timeoutHint = Math.min(Math.max(this.timeoutHint, subscription.timeoutHint), 0x7ffffff); debugLog(" setting timeoutHint = ", this.timeoutHint, subscription.timeoutHint); this.replenish_publish_request_queue(); } /** * @private */ public replenish_publish_request_queue(): void { // Spec 1.03 part 4 5.13.5 Publish // [..] in high latency networks, the Client may wish to pipeline Publish requests // to ensure cyclic reporting from the Server. Pipe-lining involves sending more than one Publish // request for each Subscription before receiving a response. For example, if the network introduces a // delay between the Client and the Server of 5 seconds and the publishing interval for a Subscription // is one second, then the Client will have to issue Publish requests every second instead of waiting for // a response to be received before sending the next request. this.send_publish_request(); // send more than one publish request to server to cope with latency for (let i = 0; i < ClientSidePublishEngine.publishRequestCountInPipeline - 1; i++) { this.send_publish_request(); } } /** * * @param subscriptionId * @private */ public unregisterSubscription(subscriptionId: SubscriptionId): void { debugLog("ClientSidePublishEngine#unregisterSubscription ", subscriptionId); assert(isFinite(subscriptionId) && subscriptionId > 0); this.activeSubscriptionCount -= 1; // note : it is possible that we get here while the server has already requested // a session shutdown ... in this case it is possible that subscriptionId is already // removed if (Object.hasOwn(this.subscriptionMap, subscriptionId)) { delete this.subscriptionMap[subscriptionId]; } else { debugLog("ClientSidePublishEngine#unregisterSubscription cannot find subscription ", subscriptionId); } } public getSubscriptionIds(): SubscriptionId[] { return Object.keys(this.subscriptionMap).map((a) => parseInt(a, 10)); } /*** * get the client subscription from Id */ public getSubscription(subscriptionId: SubscriptionId): ClientSubscription { assert(isFinite(subscriptionId) && subscriptionId > 0); assert(Object.hasOwn(this.subscriptionMap, subscriptionId)); return this.subscriptionMap[subscriptionId]; } public hasSubscription(subscriptionId: SubscriptionId): boolean { assert(isFinite(subscriptionId) && subscriptionId > 0); return Object.hasOwn(this.subscriptionMap, subscriptionId); } public internalSendPublishRequest(): void { assert(this.session, "ClientSidePublishEngine terminated ?"); this.nbPendingPublishRequests += 1; debugLog(chalk.yellow("sending publish request "), this.nbPendingPublishRequests); const subscriptionAcknowledgements = this.subscriptionAcknowledgements; this.subscriptionAcknowledgements = []; // as started in the spec (Spec 1.02 part 4 page 81 5.13.2.2 Function DequeuePublishReq()) // the server will dequeue the PublishRequest in first-in first-out order // and will validate if the publish request is still valid by checking the timeoutHint in the RequestHeader. // If the request timed out, the server will send a BadTimeout service result for the request and de-queue // another publish request. // // in Part 4. page 144 Request Header the timeoutHint is described this way. // timeoutHint UInt32 This timeout in milliseconds is used in the Client side Communication Stack to // set the timeout on a per-call base. // For a Server this timeout is only a hint and can be used to cancel long running // operations to free resources. If the Server detects a timeout, he can cancel the // operation by sending the Service result BadTimeout. The Server should wait // at minimum the timeout after he received the request before cancelling the operation. // The value of 0 indicates no timeout. // In issue#40 (MonitoredItem on changed not fired), we have found that some server might wrongly interpret // the timeoutHint of the request header ( and will bang a BadTimeout regardless if client send timeoutHint=0) // as a work around here , we force the timeoutHint to be set to a suitable value. // // see https://github.com/node-opcua/node-opcua/issues/141 // This suitable value shall be at least the time between two keep alive signal that the server will send. // (i.e revisedLifetimeCount * revisedPublishingInterval) // also ( part 3 - Release 1.03 page 140) // The Server shall check the timeoutHint parameter of a PublishRequest before processing a PublishResponse. // If the request timed out, a BadTimeout Service result is sent and another PublishRequest is used. // The value of 0 indicates no timeout // in our case: assert(this.nbPendingPublishRequests > 0); const calculatedTimeout = Math.min(0x7fffffff, this.nbPendingPublishRequests * this.timeoutHint); const publishRequest = new PublishRequest({ requestHeader: { timeoutHint: calculatedTimeout }, // see note subscriptionAcknowledgements }); let active = true; const session = this.session! as ClientSessionImpl; session.publish(publishRequest, (err: Error | null, response?: PublishResponse) => { this.nbPendingPublishRequests -= 1; this.lastRequestSentTime = new Date(); if (err) { debugLog( chalk.cyan("ClientSidePublishEngine.prototype.internalSendPublishRequest callback : "), chalk.yellow(err.message) ); debugLog("'" + err.message + "'"); if (err.message.match("not connected")) { debugLog(chalk.bgWhite.red(" WARNING : CLIENT IS NOT CONNECTED :" + " MAY BE RECONNECTION IS IN PROGRESS")); debugLog("this.activeSubscriptionCount =", this.activeSubscriptionCount); // the previous publish request has ended up with an error because // the connection has failed ... // There is no need to send more publish request for the time being until reconnection is completed active = false; } // c8 ignore next if (err.message.match(/BadNoSubscription/) && this.activeSubscriptionCount >= 1) { // there is something wrong happening here. // the server tells us that there is no subscription for this session // but the client have some active subscription left. // This could happen if the client has missed or not received the StatusChange Notification debugLog(chalk.bgWhite.red(" WARNING: server tells that there is no Subscription, but client disagree")); debugLog("this.activeSubscriptionCount =", this.activeSubscriptionCount); active = false; } if (err.message.match(/BadSessionClosed|BadSessionIdInvalid/)) { // // server has closed the session .... // may be the session timeout is shorted than the subscription life time // and the client does not send intermediate keepAlive request to keep the connection working. // debugLog(chalk.bgWhite.red(" WARNING : Server tells that the session has closed ...")); debugLog( " the ClientSidePublishEngine shall now be disabled," + " as server will reject any further request" ); // close all active subscription.... active = false; } if (err.message.match(/BadTooManyPublishRequests/)) { // preventing queue overflow // ------------------------- // if the client send too many publish requests that the server can queue, the server returns // a Service result of BadTooManyPublishRequests. // // let adjust the nbMaxPublishRequestsAcceptedByServer value so we never overflow the server // with extraneous publish requests in the future. // this.nbMaxPublishRequestsAcceptedByServer = Math.min( this.nbPendingPublishRequests, this.nbMaxPublishRequestsAcceptedByServer ); active = false; if (this.nbPendingPublishRequests < 10) { warningLog(chalk.bgWhite.red(" warning : server tells that too many publish request has been send ...")); warningLog(" On our side nbPendingPublishRequests = ", this.nbPendingPublishRequests); warningLog(" => nbMaxPublishRequestsAcceptedByServer =", this.nbMaxPublishRequestsAcceptedByServer); } } if (err.message.match(/BadSecureChannelIdInvalid/)) { // This can happen transiently during session transfer to a new // SecureChannel (e.g. after server certificate change). The // PublishRequest arrived on the new channel before ActivateSession // completed the session transfer. We should pause and let the // reconnection flow replenish publish requests once the session // transfer completes. debugLog( chalk.bgWhite.yellow( " WARNING: BadSecureChannelIdInvalid on PublishRequest" + " - session transfer may be in progress" ) ); active = false; } } else { // c8 ignore next if (doDebug) { debugLog(chalk.cyan("ClientSidePublishEngine.prototype.internalSendPublishRequest callback ")); } this._receive_publish_response(response!); } // feed the server with a new publish Request to the server if (!this.isSuspended && active && this.activeSubscriptionCount > 0) { if (err && err.message.match(/Connection Break/)) { // do not renew when connection is broken } else { this.send_publish_request(); } } }); } private _receive_publish_response(response: PublishResponse) { debugLog(chalk.yellow("receive publish response")); // the id of the subscription sending the notification message const subscriptionId = response.subscriptionId; // the sequence numbers available in this subscription // for retransmission and not acknowledged by the client // -- var available_seq = response.availableSequenceNumbers; // has the server more notification for us ? // -- var moreNotifications = response.moreNotifications; const notificationMessage = response.notificationMessage; // notificationMessage.sequenceNumber // notificationMessage.publishTime // notificationMessage.notificationData[] notificationMessage.notificationData = notificationMessage.notificationData || []; if (notificationMessage.notificationData.length !== 0) { this.acknowledge_notification(subscriptionId, notificationMessage.sequenceNumber); } // else { // this is a keep-alive notification // in this case , we shall not acknowledge notificationMessage.sequenceNumber // which is only an information of what will be the future sequenceNumber. // } const subscription = this.subscriptionMap[subscriptionId]; if (subscription && this.session !== null) { try { // delegate notificationData to the subscription callback subscription.onNotificationMessage(notificationMessage); } catch (err) { // c8 ignore next if (doDebug) { debugLog(err); debugLog("Exception in onNotificationMessage"); } } } else { debugLog(" ignoring notificationMessage", notificationMessage, " for subscription", subscriptionId); debugLog(" because there is no subscription."); debugLog(" or because there is no session for the subscription (session terminated ?)."); } } }