/** * @module node-opcua-client-private */ // tslint:disable:only-arrow-functions import chalk from "chalk"; import { assert } from "node-opcua-assert"; import { invalidateExtraDataTypeManager } from "node-opcua-client-dynamic-extension-object"; import { checkDebugFlag, make_debugLog, make_errorLog, make_warningLog } from "node-opcua-debug"; import { TransferSubscriptionsRequest, type TransferSubscriptionsResponse } from "node-opcua-service-subscription"; import { StatusCodes } from "node-opcua-status-code"; import { CloseSessionRequest } from "node-opcua-types"; import type { ClientSessionImpl, Reconnectable } from "../client_session_impl"; import type { ClientSubscriptionImpl } from "../client_subscription_impl"; import type { IClientBase } from "../i_private_client"; import { republish } from "./client_publish_engine_reconnection"; import { recreateSubscriptionAndMonitoredItem } from "./client_subscription_reconnection"; const debugLog = make_debugLog("RECONNECTION"); const doDebug = checkDebugFlag("RECONNECTION"); const errorLog = make_errorLog("RECONNECTION"); const warningLog = make_warningLog("RECONNECTION"); function _shouldNotContinue3(client: IClientBase) { if (!client._secureChannel) { return new Error("Failure during reconnection : client or session is not usable anymore"); } return null; } export function _shouldNotContinue(session: ClientSessionImpl) { if (!session._client || session.hasBeenClosed() || !session._client._secureChannel || session._client.isUnusable()) { return new Error("Failure during reconnection : client or session is not usable anymore"); } return null; } export function _shouldNotContinue2(subscription: ClientSubscriptionImpl) { if (!subscription.hasSession) { return new Error("Failure during reconnection : client or session is not usable anymore"); } return _shouldNotContinue(subscription.session); } function _throwIfShouldNotContinue(session: ClientSessionImpl): void { const err = _shouldNotContinue(session); if (err) throw err; } // // a new secure channel has be created, we need to reactivate the corresponding session, // and reestablish the subscription and restart the publish engine. // // // see OPC UA part 4 ( version 1.03 ) figure 34 page 106 // 6.5 Reestablishing subscription.... // // // // +---------------------+ // | CreateSecureChannel | // | CreateSession | // | ActivateSession | // +---------------------+ // | // | // v // +---------------------+ // | CreateSubscription |<-------------------------------------------------------------+ // +---------------------+ | // | (1) // | // v // +---------------------+ // (2)------------->| StartPublishEngine | // +---------------------+ // | // V // +---------------------+ // +------->| Monitor Connection | // | +---------------------+ // | | // | v // | Good / \ // +-----------------/ SR? \______Broken_____+ // \ / | // \ / | // | // v // +---------------------+ // | | // | CreateSecureChannel |<-----+ // | | | // +---------------------+ | // | | // v | // / \ | // / SR? \______Bad________+ // \ / // \ / // | // |Good // v // +---------------------+ // | | // | ActivateSession | // | | // +---------------------+ // | // +----------------------+ // | // v +-------------------+ +----------------------+ // / \ | CreateSession | | | // / SR? \______Bad_____> | ActivateSession |-----> | TransferSubscription | // \ / | | | | (1) // \ / +-------------------+ +----------------------+ ^ // | Good | | // v (for each subscription) | | // +--------------------+ / \ | // | | OK / OK? \______Bad________+ // | RePublish |<----------------------------------------- \ / // +-->| | \ / // | +--------------------+ // | | // | v // | GOOD / \ // +------ / SR? \______Bad SubscriptionInvalidId______>(1) // (2) \ / // ^ \ / // | | // | | // | BadMessageNotAvailable | // +-------------------------------+ function _ask_for_subscription_republish(session: ClientSessionImpl, callback: (err?: Error) => void) { // prettier-ignore { const err = _shouldNotContinue(session); if (err) { return callback(err); } } doDebug && debugLog(chalk.bgCyan.yellow.bold("_ask_for_subscription_republish ")); // assert(session.getPublishEngine().nbPendingPublishRequests === 0, // "at this time, publish request queue shall still be empty"); const engine = session.getPublishEngine(); republish(engine, (err?: Error) => { doDebug && debugLog("_ask_for_subscription_republish : republish sent"); // prettier-ignore { const err = _shouldNotContinue(session); if (err) { return callback(err); } } doDebug && debugLog(chalk.bgCyan.green.bold("_ask_for_subscription_republish done "), err ? err.message : "OK"); if (err) { warningLog("republish has failed with error :", err.message); doDebug && debugLog("_ask_for_subscription_republish has : recreating subscription"); return repair_client_session_by_recreating_a_new_session(session._client!, session, callback); } callback(err); }); } function create_session_and_repeat_if_failed(client: IClientBase, session: ClientSessionImpl): Promise { return new Promise((resolve, reject) => { _throwIfShouldNotContinue(session); doDebug && debugLog(chalk.bgWhite.red(" => creating a new session ....")); // create new session, based on old session, // so we can reuse subscriptions data client.__createSession_step2(session, (err: Error | null, session1?: ClientSessionImpl) => { _throwIfShouldNotContinue(session); if (!err && session1) { assert(session === session1, "session should have been recycled"); resolve(session); } else { doDebug && debugLog("Cannot complete subscription republish err = ", err?.message); reject(err || new Error("Failed to create session")); } }); }); } function activate_session(client: IClientBase, session: ClientSessionImpl, newSession: ClientSessionImpl): Promise { return new Promise((resolve, reject) => { _throwIfShouldNotContinue(session); doDebug && debugLog(chalk.bgWhite.red(" => activating a new session ....")); client._activateSession(newSession, newSession.userIdentityInfo!, (err: Error | null, _session1?: ClientSessionImpl) => { // c8 ignore next doDebug && debugLog(" => activating a new session .... Done err=", err ? err.message : "null"); if (err) { doDebug && debugLog( "reactivation of the new session has failed: let be smart and close it before failing this repair attempt" ); // but just on the server side, not on the client side const closeSessionRequest = new CloseSessionRequest({ requestHeader: { authenticationToken: newSession.authenticationToken }, deleteSubscriptions: true }); newSession._client?.performMessageTransaction(closeSessionRequest, (err2?: Error | null) => { if (err2) { warningLog("closing session", err2.message); } // c8 ignore next doDebug && debugLog("the temporary replacement session is now closed"); // c8 ignore next doDebug && debugLog(" err ", err.message, "propagated upwards"); reject(err); }); } else { resolve(); } }); }); } async function recreateSubscriptions( session: ClientSessionImpl, newSession: ClientSessionImpl, subscriptionsToRecreate: number[] ): Promise { for (const subscriptionId of subscriptionsToRecreate) { _throwIfShouldNotContinue(session); if (!session.getPublishEngine().hasSubscription(subscriptionId)) { doDebug && debugLog(chalk.red(" => CANNOT RECREATE SUBSCRIPTION "), subscriptionId); continue; } const subscription = session.getPublishEngine().getSubscription(subscriptionId); doDebug && debugLog(chalk.red(" => RECREATING SUBSCRIPTION "), subscriptionId); assert(subscription.session === newSession, "must have the new session"); try { await recreateSubscriptionAndMonitoredItem(subscription); doDebug && debugLog( chalk.cyan(" => RECREATING SUBSCRIPTION AND MONITORED ITEM DONE subscriptionId="), subscriptionId ); } catch (err) { doDebug && debugLog(`_recreateSubscription failed !${(err as Error).message}`); } } _throwIfShouldNotContinue(session); } function attempt_subscription_transfer( _client: IClientBase, session: ClientSessionImpl, newSession: ClientSessionImpl ): Promise { return new Promise((resolve, reject) => { _throwIfShouldNotContinue(session); // get the old subscriptions id from the old session const subscriptionsIds = session.getPublishEngine().getSubscriptionIds(); doDebug && debugLog(" session subscriptionCount = ", newSession.getPublishEngine().subscriptionCount); if (subscriptionsIds.length === 0) { doDebug && debugLog(" No subscriptions => skipping transfer subscriptions"); return resolve(); // no need to transfer subscriptions } doDebug && debugLog(" => asking server to transfer subscriptions = [", subscriptionsIds.join(", "), "]"); // Transfer subscriptions - ask for initial values.... const subscriptionsToTransfer = new TransferSubscriptionsRequest({ sendInitialValues: true, subscriptionIds: subscriptionsIds }); if (newSession.getPublishEngine().nbPendingPublishRequests !== 0) { warningLog("Warning : we should not be publishing here"); } newSession.transferSubscriptions( subscriptionsToTransfer, (err: Error | null, transferSubscriptionsResponse?: TransferSubscriptionsResponse) => { // may be the connection with server has been disconnected _throwIfShouldNotContinue(session); if (err || !transferSubscriptionsResponse) { warningLog(chalk.bgCyan("May be the server is not supporting this feature")); // when transfer subscription has failed, we have no other choice but // recreate the subscriptions on the server side const subscriptionsToRecreate = [...(subscriptionsToTransfer.subscriptionIds || [])]; warningLog(chalk.bgCyan("We need to recreate entirely the subscription")); recreateSubscriptions(session, newSession, subscriptionsToRecreate) .then(() => resolve()) .catch((e) => reject(e)); return; } const results = transferSubscriptionsResponse.results || []; // c8 ignore next if (doDebug) { debugLog( chalk.cyan(" => transfer subscriptions done"), results.map((x) => x.statusCode.toString()).join(" ") ); } const subscriptionsToRecreate = []; // some subscriptions may be marked as invalid on the server side .. // those one need to be recreated and repaired .... for (let i = 0; i < results.length; i++) { const statusCode = results[i].statusCode; if (statusCode.equals(StatusCodes.BadSubscriptionIdInvalid)) { // repair subscription doDebug && debugLog( chalk.red(" WARNING SUBSCRIPTION "), subscriptionsIds[i], chalk.red(" SHOULD BE RECREATED") ); subscriptionsToRecreate.push(subscriptionsIds[i]); } else { const availableSequenceNumbers = results[i].availableSequenceNumbers; doDebug && debugLog( chalk.green(" SUBSCRIPTION "), subscriptionsIds[i], chalk.green(" CAN BE REPAIRED AND AVAILABLE "), availableSequenceNumbers ); // should be Good. } } doDebug && debugLog(" new session subscriptionCount = ", newSession.getPublishEngine().subscriptionCount); recreateSubscriptions(session, newSession, subscriptionsToRecreate) .then(() => resolve()) .catch((e) => reject(e)); } ); }); } async function repair_client_session_by_recreating_a_new_session_async( client: IClientBase, session: ClientSessionImpl ): Promise { _throwIfShouldNotContinue(session); // As we don't know if server has been rebooted or not, // and may be upgraded in between, we have to invalidate the extra data type manager invalidateExtraDataTypeManager(session); // c8 ignore next if (doDebug) { debugLog(" repairing client session by_recreating a new session for old session ", session.sessionId.toString()); } const listenerCountBefore = session.listenerCount(""); // Step 1: suspend old session publish engine _throwIfShouldNotContinue(session); // c8 ignore next doDebug && debugLog(chalk.bgWhite.red(" => suspend old session publish engine....")); session.getPublishEngine().suspend(true); // Step 2: create new session const newSession = await create_session_and_repeat_if_failed(client, session); _throwIfShouldNotContinue(session); // Step 3: activate new session await activate_session(client, session, newSession); _throwIfShouldNotContinue(session); // Step 4: before subscription repair hook if (client.beforeSubscriptionRecreate) { const hookErr = await client.beforeSubscriptionRecreate(newSession); _throwIfShouldNotContinue(session); if (hookErr) { throw hookErr; } } // Step 5: attempt subscription transfer await attempt_subscription_transfer(client, session, newSession); _throwIfShouldNotContinue(session); // Step 6: ask for subscription republish await new Promise((resolve, reject) => { _ask_for_subscription_republish(newSession, (err) => { if (err) { warningLog("warning: Subscription republished has failed ", err.message); reject(err); } else { resolve(); } }); }); _throwIfShouldNotContinue(session); // Step 7: start publishing as normal newSession.getPublishEngine().suspend(false); const listenerCountAfter = session.listenerCount(""); assert(newSession === session); doDebug && debugLog("listenerCountBefore =", listenerCountBefore, "listenerCountAfter = ", listenerCountAfter); } function repair_client_session_by_recreating_a_new_session( client: IClientBase, session: ClientSessionImpl, callback: (err?: Error) => void ) { repair_client_session_by_recreating_a_new_session_async(client, session) .then(() => callback()) .catch((err) => { doDebug && debugLog("repair_client_session_by_recreating_a_new_session failed with ", err.message); callback(err); }); } function _repair_client_session(client: IClientBase, session: ClientSessionImpl, callback: (err?: Error) => void): void { const callback2 = (err2?: Error) => { doDebug && debugLog("Session repair completed with err: ", err2 ? err2.message : "", session.sessionId.toString()); if (!err2) { session.emit("session_repaired"); } else { session.emit("session_repaired_failed", err2); } callback(err2); }; if (doDebug) { doDebug && debugLog(chalk.yellow(" TRYING TO REACTIVATE EXISTING SESSION"), session.sessionId.toString()); doDebug && debugLog(" SubscriptionIds :", session.getPublishEngine().getSubscriptionIds()); } // prettier-ignore { const err = _shouldNotContinue(session); if (err) { return callback(err); } } client._activateSession(session, session.userIdentityInfo!, (err: Error | null, _session2?: ClientSessionImpl) => { // prettier-ignore { const err = _shouldNotContinue(session); if (err) { return callback(err); } } // // Note: current limitation : // - The reconnection doesn't work yet, if connection break is caused by a server that crashes and restarts. // doDebug && debugLog(" ActivateSession : ", err ? chalk.red(err.message) : chalk.green(" SUCCESS !!! ")); if (err) { // activate old session has failed => let's recreate a new Channel and transfer the subscription return repair_client_session_by_recreating_a_new_session(client, session, callback2); } else { // activate old session has succeeded => let's call Republish return _ask_for_subscription_republish(session, callback2); } }); } type EmptyCallback = (err?: Error) => void; export function repair_client_session(client: IClientBase, session: ClientSessionImpl, callback: EmptyCallback): void { if (!client) { doDebug && debugLog("Aborting reactivation of old session because user requested session to be close"); return callback(); } doDebug && debugLog(chalk.yellow("Starting client session repair")); const privateSession = session as unknown as Reconnectable; privateSession._reconnecting = privateSession._reconnecting || { reconnecting: false, pendingCallbacks: [] }; if (session.hasBeenClosed()) { privateSession._reconnecting.reconnecting = false; doDebug && debugLog("Aborting reactivation of old session because session has been closed"); return callback(); } if (privateSession._reconnecting.reconnecting) { doDebug && debugLog(chalk.bgCyan("Reconnection is already happening for session"), session.sessionId.toString()); privateSession._reconnecting.pendingCallbacks.push(callback); return; } privateSession._reconnecting.reconnecting = true; // get old transaction queue ... const transactionQueue = privateSession._reconnecting.pendingTransactions.splice(0); const repeatedAction = (callback: EmptyCallback) => { // prettier-ignore { const err = _shouldNotContinue(session); if (err) { return callback(err); } } _repair_client_session(client, session, (err) => { // prettier-ignore { const err = _shouldNotContinue(session); if (err) { return callback(err); } } if (err) { errorLog( chalk.red("session restoration has failed! err ="), err.message, session.sessionId.toString(), " => Let's retry" ); if (!session.hasBeenClosed()) { const delay = 2000; errorLog(chalk.red(`... will retry session repair... in ${delay} ms`)); setTimeout(() => { { const err = _shouldNotContinue(session); if (err) { warningLog("cancelling session repair"); return callback(err); } } errorLog(chalk.red("Retrying session repair...")); repeatedAction(callback); }, delay); return; } else { errorLog(chalk.red("session restoration should be interrupted because session has been closed forcefully")); } // session does not need to be repaired anymore callback(); return; } // c8 ignore next doDebug && debugLog(chalk.yellow("session has been restored"), session.sessionId.toString()); session.emit("session_restored"); callback(err); }); }; repeatedAction((err) => { privateSession._reconnecting.reconnecting = false; const otherCallbacks = privateSession._reconnecting.pendingCallbacks.splice(0); // re-inject element in queue // c8 ignore next if (transactionQueue.length > 0) { doDebug && debugLog(chalk.yellow("re-injecting transaction queue"), transactionQueue.length); transactionQueue.forEach((e) => privateSession._reconnecting.pendingTransactions.push(e)); } otherCallbacks.forEach((c: EmptyCallback) => c(err)); callback(err); }); } export async function repair_client_sessions(client: IClientBase, callback: (err?: Error) => void): Promise { // repair session const sessions = client.getSessions(); doDebug && debugLog(chalk.red.bgWhite(" Starting sessions reactivation", sessions.length)); const allErrors: (Error | undefined)[] = []; for (const session of sessions) { await new Promise((resolve) => { repair_client_session(client, session as ClientSessionImpl, (err) => { allErrors.push(err); resolve(); }); }); } // prettier-ignore { const err = _shouldNotContinue3(client); if (err) { return callback(err); } } const firstError = allErrors.find((e) => e != null); callback(firstError); }