src/lib/db/Watcher.js
import Emitter from 'events';
import {
ClientSubscription,
AttributeIds,
OPCUAClient,
subscription_service as SubscriptionService
} from 'node-opcua';
import { series, map } from 'async';
import { log, colors } from 'gulp-util';
import DBBrowser from './Browser';
import Project from '../config/ProjectConfig';
export default class DBWatcher extends Emitter {
constructor(nodesToWatch) {
super();
this.browser = new DBBrowser();
this._trackChanges = false;
const nodes = [];
this.browser.on('discovered', node => nodes.push(node));
this.browser.on('complete', () => {
const client = new OPCUAClient({
requestedSessionTimeout: 600000,
keepSessionAlive: true,
});
series([
done => client.connect(`opc.tcp://${Project.host}:${Project.port.opc}`, done),
done => client.createSession((err, sess) => {
if (!err) {
this.session = sess;
}
done(err);
}),
done => this._createSubscription(done),
done => {
map(nodes, (n, cb) => this._monitorNode(n, cb), err => {
if (err) {
done(err);
} else {
this.emit('ready');
this._trackChanges = true;
}
});
},
]);
});
this.browser.browse(nodesToWatch);
}
_createSubscription(cb) {
this.subscription = new ClientSubscription(this.session, {
requestedPublishingInterval: 100,
requestedLifetimeCount: 1000,
requestedMaxKeepAliveCount: 12,
maxNotificationsPerPublish: 10,
publishingEnabled: true,
priority: 10,
});
this.subscription.on('started', () => cb());
}
_monitorNode(node, callback) {
const nodeId = node.nodeId.toString();
const item = this.subscription.monitor({
nodeId,
attributeId: AttributeIds.Value,
}, {
clientHandle: 13,
samplingInterval: 250,
queueSize: 1,
discardOldest: true,
}, SubscriptionService.TimestampsToReturn.Both);
item.on('changed', dataValue => {
if (!this._trackChanges) {
callback(); // Ignore first notification
} else {
this.emit('change', { node, data: dataValue });
}
});
item.on('err', err => {
log(colors.red('Error monitoring', colors.cyan(nodeId)));
log(colors.red(err));
callback(err);
});
}
}