Home Manual Reference Source Test

src/lib/db/Watcher.js

import Emitter from 'events';
import {
  ClientSubscription,
  AttributeIds,
  OPCUAClient,
  subscription_service as SubscriptionService
} from 'node-opcua';
import { series, map } from 'async';
import { log, colors } from 'gulp-util';
import DBBrowser from './Browser';
import Project from '../config/ProjectConfig';

export default class DBWatcher extends Emitter {

  constructor(nodesToWatch) {
    super();

    this.browser = new DBBrowser();
    this._trackChanges = false;
    const nodes = [];

    this.browser.on('discovered', node => nodes.push(node));

    this.browser.on('complete', () => {
      const client = new OPCUAClient({
        requestedSessionTimeout: 600000,
        keepSessionAlive: true,
      });

      series([
        done => client.connect(`opc.tcp://${Project.host}:${Project.port.opc}`, done),
        done => client.createSession((err, sess) => {
          if (!err) {
            this.session = sess;
          }
          done(err);
        }),
        done => this._createSubscription(done),
        done => {
          map(nodes, (n, cb) => this._monitorNode(n, cb), err => {
            if (err) {
              done(err);
            } else {
              this.emit('ready');
              this._trackChanges = true;
            }
          });
        },
      ]);
    });

    this.browser.browse(nodesToWatch);
  }

  _createSubscription(cb) {
    this.subscription = new ClientSubscription(this.session, {
      requestedPublishingInterval: 100,
      requestedLifetimeCount: 1000,
      requestedMaxKeepAliveCount: 12,
      maxNotificationsPerPublish: 10,
      publishingEnabled: true,
      priority: 10,
    });

    this.subscription.on('started', () => cb());
  }

  _monitorNode(node, callback) {
    const nodeId = node.nodeId.toString();
    const item = this.subscription.monitor({
      nodeId,
      attributeId: AttributeIds.Value,
    }, {
      clientHandle: 13,
      samplingInterval: 250,
      queueSize: 1,
      discardOldest: true,
    }, SubscriptionService.TimestampsToReturn.Both);

    item.on('changed', dataValue => {
      if (!this._trackChanges) {
        callback(); // Ignore first notification
      } else {
        this.emit('change', { node, data: dataValue });
      }
    });

    item.on('err', err => {
      log(colors.red('Error monitoring', colors.cyan(nodeId)));
      log(colors.red(err));

      callback(err);
    });
  }

}