src/lib/db/SyncStream.js
import { stat as fsStat } from 'fs';
import { OPCUAClient, AttributeIds } from 'node-opcua';
import through from 'through2';
import { series } from 'async';
import merge from 'merge-stream';
import { colors, log } from 'gulp-util';
import Project from '../config/ProjectConfig';
/**
* A Stream that divides a file Stream into a push stream and a pull stream to synchronize between
* local files and atvise-server.
*/
export default class SyncStream {
/**
* Returns a new SyncStream based on a file stream.
* @param {Stream} fileStream The file stream to use.
* @return {SyncStream} The resulting SyncStream.
*/
static from(fileStream) {
return new SyncStream(fileStream);
}
/**
* Creates a new SyncStream based on a file stream.
* @param {Stream} fileStream The file stream to use.
*/
constructor(fileStream) {
/** @type {node-opcua.OPCUAClient} */
this.client = new OPCUAClient();
/** @type {node-opcua.ClientSession} */
this.session = null;
/**
* The pull stream
* @type {Stream}
*/
this.pullStream = through.obj();
/**
* The push stream
* @type {Stream}
*/
this.pushStream = through.obj();
/**
* A stream created by combing pull and push stream.
* @type {Stream}
*/
this.merged = merge(this.pullStream, this.pushStream);
series([
done => this._connect(done),
done => this._createSession(done),
done => this._populate(fileStream, done),
done => this._closeSession(done),
done => this._disconnect(done),
], err => {
if (err) {
this.merged.emit('error', err);
}
});
}
/**
* Add transforms to the pull stream.
* @param {Function(stream: Stream)} fn Apply transforms to the pull stream.
* @return {SyncStream} Itself, to be chainable.
*/
pull(fn) {
fn(this.pullStream);
return this;
}
/**
* Add transforms to the push stream.
* @param {Function(stream: Stream)} fn Apply transforms to the push stream.
* @return {SyncStream} Itself, to be chainable.
*/
push(fn) {
fn(this.pushStream);
return this;
}
/**
* Compares two dates and calculates which is newer.
* @param {Date} a The first date to compare.
* @param {Date} b The second date to comapre.
* @return {Number} A positive number if `a` is newer than `b`, a negative number if `b` is newer
* than `a`. 0 if `a` equals `b`.
*/
static newer(a, b) {
a.setMilliseconds(0);
b.setMilliseconds(0);
return a - b;
}
_populate(fileStream, done) {
const self = this;
fileStream
.pipe(through.obj(function(file, enc, cb) {
self.session.read([{
nodeId: file.nodeId.toString(),
attributeId: AttributeIds.Value,
}], (readErr, nodes, results) => {
if (readErr) {
throw new Error(`Error reading: ${readErr}`);
} else {
const modifiedDB = results[0].sourceTimestamp;
const diff = SyncStream.newer(file.stat.mtime, modifiedDB);
if (diff > 0) {
self.pushStream.push(file);
} else if (diff < 0) {
const readResult = results[0];
const dbFile = file.clone({ contents: false });
const source = readResult.value.value === null ?
readResult.value : readResult.value.value;
dbFile.contents = Buffer.from(source.toString());
dbFile.stat = { mtime: modifiedDB };
self.pullStream.push(dbFile);
}
cb();
}
});
}, cb => {
this.pullStream.end();
this.pushStream.end();
cb();
done();
}));
}
_connect(done) {
this.client.connect(`opc.tcp://${Project.host}:${Project.port.opc}`, done);
}
_createSession(done) {
this.client.createSession((err, sess) => {
if (!err) {
this.session = sess;
}
done(err);
});
}
_closeSession(done) {
this.session.close(done);
}
_disconnect(done) {
this.client.disconnect(done);
}
}