/** * @module node-opcua-client-private */ // tslint:disable:unified-signatures // tslint:disable:no-empty import { EventEmitter } from "node:events"; import { assert } from "node-opcua-assert"; import { AttributeIds } from "node-opcua-data-model"; import { coerceTimestampsToReturn, type DataValue } from "node-opcua-data-value"; import { checkDebugFlag, make_debugLog, make_warningLog } from "node-opcua-debug"; import type { ExtensionObject } from "node-opcua-extension-object"; import { EventFilter } from "node-opcua-service-filter"; import { ReadValueId, type ReadValueIdOptions, TimestampsToReturn } from "node-opcua-service-read"; import { type MonitoredItemCreateResult, type MonitoredItemModifyResult, MonitoringMode, MonitoringParameters, type MonitoringParametersOptions } from "node-opcua-service-subscription"; import { type Callback, type ErrorCallback, type StatusCode, StatusCodes } from "node-opcua-status-code"; import type { Variant } from "node-opcua-variant"; import { ClientMonitoredItem } from "../client_monitored_item"; import { type ClientMonitoredItemBaseEx, ClientMonitoredItemToolbox } from "../client_monitored_item_toolbox"; import type { ClientSubscription } from "../client_subscription"; import { ClientMonitoredItem_create, type ClientSubscriptionImpl } from "./client_subscription_impl"; const _debugLog = make_debugLog(__filename); const warningLog = make_warningLog(__filename); const _doDebug = checkDebugFlag(__filename); export type PrepareForMonitoringResult = | { error: string } | { error?: null; itemToMonitor: ReadValueIdOptions; monitoringMode: MonitoringMode; requestedParameters: MonitoringParameters; }; /** * ClientMonitoredItem * @class ClientMonitoredItem * @extends ClientMonitoredItemBase * * - event: * - "initialized" * - "err" * - "changed" * * note: this.monitoringMode = subscription_service.MonitoringMode.Reporting; */ export class ClientMonitoredItemImpl extends EventEmitter implements ClientMonitoredItem, ClientMonitoredItemBaseEx { public itemToMonitor: ReadValueId; public monitoringParameters: MonitoringParameters; public subscription: ClientSubscriptionImpl; public monitoringMode: MonitoringMode; public statusCode: StatusCode; public monitoredItemId?: number; public result?: MonitoredItemCreateResult; public filterResult?: ExtensionObject; public timestampsToReturn: TimestampsToReturn; #pendingDataValue?: DataValue[]; #pendingEvents?: Variant[][]; #terminated?: boolean; public internalSetMonitoringMode(monitoringMode: MonitoringMode): void { this.monitoringMode = monitoringMode; } constructor( subscription: ClientSubscription, itemToMonitor: ReadValueIdOptions, monitoringParameters: MonitoringParametersOptions, timestampsToReturn: TimestampsToReturn, monitoringMode: MonitoringMode = MonitoringMode.Reporting ) { super(); this.statusCode = StatusCodes.BadDataUnavailable; this.subscription = subscription as ClientSubscriptionImpl; this.itemToMonitor = new ReadValueId(itemToMonitor); this.monitoringParameters = new MonitoringParameters(monitoringParameters); this.monitoringMode = monitoringMode; assert(this.monitoringParameters.clientHandle === 0xffffffff, "should not have a client handle yet"); assert(subscription.session, "expecting session"); timestampsToReturn = coerceTimestampsToReturn(timestampsToReturn); this.timestampsToReturn = timestampsToReturn; } public toJSON(): Record { return { itemToMonitor: this.itemToMonitor.toString(), monitoringParameters: this.monitoringParameters.toString(), timestampsToReturn: TimestampsToReturn[this.timestampsToReturn], monitoredItemId: this.monitoredItemId, statusCode: this.statusCode?.toString(), result: this.result?.toString() }; } public toString(): string { let ret = ""; ret += `itemToMonitor: ${this.itemToMonitor.toString()}\n`; ret += `monitoringParameters: ${this.monitoringParameters.toString()}\n`; ret += `timestampsToReturn: ${TimestampsToReturn[this.timestampsToReturn]}\n`; ret += `monitoredItemId: ${this.monitoredItemId}\n`; ret += `statusCode: ${this.statusCode?.toString()}\n`; ret += `result: ${this.result?.toString()}\n`; return ret; } public [Symbol.for("nodejs.util.inspect.custom")](): string { return this.toString(); } /** * terminate the monitored item by removing the MonitoredItem from its subscription */ public async terminate(): Promise; public terminate(callback: ErrorCallback): void; public terminate(callback?: ErrorCallback): Promise | undefined { const subscription = this.subscription as ClientSubscriptionImpl; subscription._delete_monitored_items([this], (err?: Error) => { if (callback) { callback(err); } }); return undefined; } public async modify(parameters: MonitoringParametersOptions): Promise; public async modify( parameters: MonitoringParametersOptions, timestampsToReturn: TimestampsToReturn ): Promise; public modify(parameters: MonitoringParametersOptions, callback: Callback): void; public modify( parameters: MonitoringParametersOptions, timestampsToReturn: TimestampsToReturn | null, callback: Callback ): void; public modify(...args: unknown[]): undefined | Promise { // remember we are garantied to be in a callback env here. if (args.length === 2) { this.modify(args[0] as MonitoringParametersOptions, null, args[1] as Callback); return undefined; } const parameters = args[0] as MonitoringParametersOptions; const timestampsToReturn = args[1] as TimestampsToReturn; const callback = args[2] as Callback; this.timestampsToReturn = timestampsToReturn || this.timestampsToReturn; ClientMonitoredItemToolbox._toolbox_modify( this.subscription, [this], parameters, this.timestampsToReturn, (err: Error | null, results?: MonitoredItemModifyResult[]) => { if (err) { return callback(err); } if (!results) { return callback(new Error("internal error")); } assert(results?.length === 1); callback(null, results[0]); } ); return undefined; } public async setMonitoringMode(monitoringMode: MonitoringMode): Promise; public setMonitoringMode(monitoringMode: MonitoringMode, callback: Callback): void; public setMonitoringMode(monitoringMode: MonitoringMode, callback?: Callback): undefined | Promise { ClientMonitoredItemToolbox._toolbox_setMonitoringMode( this.subscription, [this], monitoringMode, (err?: Error | null, statusCodes?: StatusCode[]) => { callback?.(err ? err : null, statusCodes ? statusCodes[0] : undefined); } ); return undefined; } /** * @internal * @param value * @private */ public _notify_value_change(value: DataValue): void { // it is possible that the first notification arrives before the CreateMonitoredItemsRequest is fully proceed // in this case we need to put the dataValue aside so we can send the notification changed after // the node-opcua client had time to fully install the on("changed") event handler if (this.statusCode?.value === StatusCodes.BadDataUnavailable.value) { this.#pendingDataValue = this.#pendingDataValue || []; this.#pendingDataValue.push(value); return; } /** * Notify the observers that the MonitoredItem value has changed on the server side. * @event changed * @param value */ try { this.emit("changed", value); } catch (err) { warningLog( "[NODE-OPCUA-W28] Exception raised inside the event handler called by ClientMonitoredItem.on('change')", err ); warningLog(" Please verify the application using this node-opcua client"); } } /** * @internal * @param eventFields * @private */ public _notify_event(eventFields: Variant[]): void { if (this.statusCode?.value === StatusCodes.BadDataUnavailable.value) { this.#pendingEvents = this.#pendingEvents || []; this.#pendingEvents.push(eventFields); return; } /** * Notify the observers that the MonitoredItem value has changed on the server side. * @event changed * @param value */ try { this.emit("changed", eventFields); } catch (err) { warningLog( "[NODE-OPCUA-W29] Exception raised inside the event handler called by ClientMonitoredItem.on('change')", err ); warningLog(" Please verify the application using this node-opcua client"); } } /** * @internal * @private */ public _prepare_for_monitoring(): PrepareForMonitoringResult { assert(this.monitoringParameters.clientHandle === 4294967295, "should not have a client handle yet"); const subscription = this.subscription as ClientSubscriptionImpl; this.monitoringParameters.clientHandle = subscription._nextClientHandle(); assert(this.monitoringParameters.clientHandle > 0 && this.monitoringParameters.clientHandle !== 4294967295); // If attributeId is EventNotifier then monitoring parameters need a filter. // The filter must then either be DataChangeFilter, EventFilter or AggregateFilter. // todo can be done in another way? // todo implement AggregateFilter // todo support DataChangeFilter // todo support whereClause if (this.itemToMonitor.attributeId === AttributeIds.EventNotifier) { // // see OPCUA Spec 1.02 part 4 page 65 : 5.12.1.4 Filter // see part 4 page 130: 7.16.3 EventFilter // part 3 page 11 : 4.6 Event Model // To monitor for Events, the attributeId element of the ReadValueId structure is the // the id of the EventNotifierAttribute // OPC Unified Architecture 1.02, Part 4 5.12.1.2 Sampling interval page 64: // "A Client shall define a sampling interval of 0 if it subscribes for Events." // toDO // note : the EventFilter is used when monitoring Events. this.monitoringParameters.filter = this.monitoringParameters.filter || new EventFilter({}); const filter = this.monitoringParameters.filter; // c8 ignore next if (!filter) { return { error: "Internal Error" }; } if (filter.schema.name !== "EventFilter") { return { error: "Mismatch between attributeId and filter in monitoring parameters : " + "Got a " + filter.schema.name + " but a EventFilter object is required " + "when itemToMonitor.attributeId== AttributeIds.EventNotifier" }; } } else if (this.itemToMonitor.attributeId === AttributeIds.Value) { // the DataChangeFilter and the AggregateFilter are used when monitoring Variable Values // The Value Attribute is used when monitoring Variables. Variable values are monitored for a change // in value or a change in their status. The filters defined in this standard (see 7.16.2) and in Part 8 are // used to determine if the value change is large enough to cause a Notification to be generated for the // to do : check 'DataChangeFilter' && 'AggregateFilter' } else { if (this.monitoringParameters.filter) { return { error: "Mismatch between attributeId and filter in monitoring parameters : " + "no filter expected when attributeId is not Value or EventNotifier" }; } } return { itemToMonitor: this.itemToMonitor, monitoringMode: this.monitoringMode, requestedParameters: this.monitoringParameters }; } /** * @internal * @param monitoredItemResult * @private */ public _applyResult(monitoredItemResult: MonitoredItemCreateResult): void { this.statusCode = monitoredItemResult.statusCode; if (monitoredItemResult.statusCode.isGood()) { this.result = monitoredItemResult; this.monitoredItemId = monitoredItemResult.monitoredItemId; this.monitoringParameters.samplingInterval = monitoredItemResult.revisedSamplingInterval; this.monitoringParameters.queueSize = monitoredItemResult.revisedQueueSize; this.filterResult = monitoredItemResult.filterResult || undefined; } // some PublishRequest with DataNotificationChange might have been sent by the server, before the monitored // item has been fully initialized it is time to process now any pending notification that were put on hold. if (this.#pendingDataValue) { const dataValues = this.#pendingDataValue; this.#pendingDataValue = undefined; setImmediate(() => { dataValues.map((dataValue) => this._notify_value_change(dataValue)); }); } if (this.#pendingEvents) { const events = this.#pendingEvents; this.#pendingEvents = undefined; setImmediate(() => { events.map((event) => this._notify_event(event)); }); } } public _before_create(): void { const subscription = this.subscription as ClientSubscriptionImpl; subscription._add_monitored_item(this.monitoringParameters.clientHandle, this); } /** * @internal * @param monitoredItemResult * @private */ public _after_create(monitoredItemResult: MonitoredItemCreateResult): void { this._applyResult(monitoredItemResult); if (this.statusCode.isGood()) { /** * Notify the observers that the monitored item is now fully initialized. * @event initialized */ this.emit("initialized"); } else { /** * Notify the observers that the monitored item has failed to initialize. * @event err * @param statusCode {StatusCode} */ const err = new Error(monitoredItemResult.statusCode.toString()); this._terminate_and_emit(err); } } public _terminate_and_emit(err?: Error): void { if (this.#terminated) { return; // already terminated } if (err) { this.emit("err", err.message); } this.#terminated = true; /** * Notify the observer that this monitored item has been terminated. * @event terminated */ this.emit("terminated", err); this.removeAllListeners(); // also remove from subscription const clientHandle = this.monitoringParameters.clientHandle; this.subscription._removeMonitoredItem(clientHandle); } } // tslint:disable:no-var-requires // tslint:disable:max-line-length import { withCallback } from "thenify-ex"; const _opts = { multiArgs: false }; ClientMonitoredItemImpl.prototype.terminate = withCallback(ClientMonitoredItemImpl.prototype.terminate); ClientMonitoredItemImpl.prototype.setMonitoringMode = withCallback(ClientMonitoredItemImpl.prototype.setMonitoringMode); ClientMonitoredItemImpl.prototype.modify = withCallback(ClientMonitoredItemImpl.prototype.modify); ClientMonitoredItem.create = ( subscription: ClientSubscription, itemToMonitor: ReadValueIdOptions, monitoringParameters: MonitoringParametersOptions, timestampsToReturn: TimestampsToReturn = TimestampsToReturn.Neither, monitoringMode: MonitoringMode = MonitoringMode.Reporting ): ClientMonitoredItem => { return ClientMonitoredItem_create(subscription, itemToMonitor, monitoringParameters, timestampsToReturn, monitoringMode); };