Home Manual Reference Source Test

src/lib/db/SyncStream.js

import { stat as fsStat } from 'fs';
import { OPCUAClient, AttributeIds } from 'node-opcua';
import through from 'through2';
import { series } from 'async';
import merge from 'merge-stream';
import { colors, log } from 'gulp-util';
import Project from '../config/ProjectConfig';

/**
 * A Stream that divides a file Stream into a push stream and a pull stream to synchronize between
 * local files and atvise-server.
 */
export default class SyncStream {

  /**
   * Returns a new SyncStream based on a file stream.
   * @param {Stream} fileStream The file stream to use.
   * @return {SyncStream} The resulting SyncStream.
   */
  static from(fileStream) {
    return new SyncStream(fileStream);
  }

  /**
   * Creates a new SyncStream based on a file stream.
   * @param {Stream} fileStream The file stream to use.
   */
  constructor(fileStream) {
    /** @type {node-opcua.OPCUAClient} */
    this.client = new OPCUAClient();
    /** @type {node-opcua.ClientSession} */
    this.session = null;

    /**
     * The pull stream
     * @type {Stream}
     */
    this.pullStream = through.obj();
    /**
     * The push stream
     * @type {Stream}
     */
    this.pushStream = through.obj();
    /**
     * A stream created by combing pull and push stream.
     * @type {Stream}
     */
    this.merged = merge(this.pullStream, this.pushStream);

    series([
      done => this._connect(done),
      done => this._createSession(done),
      done => this._populate(fileStream, done),
      done => this._closeSession(done),
      done => this._disconnect(done),
    ], err => {
      if (err) {
        this.merged.emit('error', err);
      }
    });
  }

  /**
   * Add transforms to the pull stream.
   * @param {Function(stream: Stream)} fn Apply transforms to the pull stream.
   * @return {SyncStream} Itself, to be chainable.
   */
  pull(fn) {
    fn(this.pullStream);

    return this;
  }

  /**
   * Add transforms to the push stream.
   * @param {Function(stream: Stream)} fn Apply transforms to the push stream.
   * @return {SyncStream} Itself, to be chainable.
   */
  push(fn) {
    fn(this.pushStream);

    return this;
  }

  /**
   * Compares two dates and calculates which is newer.
   * @param {Date} a The first date to compare.
   * @param {Date} b The second date to comapre.
   * @return {Number} A positive number if `a` is newer than `b`, a negative number if `b` is newer
   * than `a`. 0 if `a` equals `b`.
   */
  static newer(a, b) {
    a.setMilliseconds(0);
    b.setMilliseconds(0);

    return a - b;
  }

  _populate(fileStream, done) {
    const self = this;

    fileStream
      .pipe(through.obj(function(file, enc, cb) {
        self.session.read([{
          nodeId: file.nodeId.toString(),
          attributeId: AttributeIds.Value,
        }], (readErr, nodes, results) => {
          if (readErr) {
            throw new Error(`Error reading: ${readErr}`);
          } else {
            const modifiedDB = results[0].sourceTimestamp;
            const diff = SyncStream.newer(file.stat.mtime, modifiedDB);

            if (diff > 0) {
              self.pushStream.push(file);
            } else if (diff < 0) {
              const readResult = results[0];
              const dbFile = file.clone({ contents: false });
              const source = readResult.value.value === null ?
                readResult.value : readResult.value.value;

              dbFile.contents = Buffer.from(source.toString());
              dbFile.stat = { mtime: modifiedDB };

              self.pullStream.push(dbFile);
            }

            cb();
          }
        });
      }, cb => {
        this.pullStream.end();
        this.pushStream.end();

        cb();
        done();
      }));
  }

  _connect(done) {
    this.client.connect(`opc.tcp://${Project.host}:${Project.port.opc}`, done);
  }

  _createSession(done) {
    this.client.createSession((err, sess) => {
      if (!err) {
        this.session = sess;
      }

      done(err);
    });
  }

  _closeSession(done) {
    this.session.close(done);
  }

  _disconnect(done) {
    this.client.disconnect(done);
  }

}