Home Manual Reference Source Test

src/lib/db/NodeStream.js

import through from 'through2';
import { Buffer } from 'buffer';
import { Job } from 'atvise-dbworker';
import DBBrowser from './Browser';
import NodeAddress from '../NodeAddress';
import AtviseFile from '../AtviseFile';

export class Node extends AtviseFile {
  constructor(referenceDescription, readResult) {
    let contents = Buffer.from([]);

    if (readResult.value.value !== null) {
      const source = readResult.value.value;

      if (readResult.value.arrayType.value === 1) {
        // FIXME: Probably need special handling for non-buffer array

        const res = [];
        if (!(source.length === 1 && source[0] === null)) {
          source.forEach(s => res.push(s.toString()));
        }

        contents = Buffer.from(JSON.stringify(res));
      } else if (!(source instanceof Buffer)) {
        contents = Buffer.from(source.toString());
      } else {
        contents = source;
      }
    }

    const path = NodeAddress.toFilePath(referenceDescription, readResult);

    super({
      path,
      contents,
      rc: {
        typeDefinition: referenceDescription.typeDefinition.value,
        dataType: readResult.value.dataType.key,
        array: readResult.value.arrayType.value === 1,
        namespace: referenceDescription.nodeId.namespace,
      },
    });

    this.stat = {
      mtime: readResult.sourceTimestamp,
    };
  }
}

/**
 * A transform stream handling {@link AtviseFile}s from atvise-server
 */
export default class NodeStream {

  /**
   * Creates a new NodeStream based on some root nodes.
   * @param {NodeId[]|String[]} startNodes The nodes to start with. Directly passed to
   * {@link DBBrowser}.
   * @return {NodeStream}
   */
  static forNodes(startNodes) {
    const stream = through.obj();
    const browser = new DBBrowser();

    browser.on('discoveredvariables', nodes => {
      browser.worker.addJob(
        new Job((session, cb) => {
          session.read(nodes, function(err, nodesToRead, results) {
            if (!err) {
              results.forEach((res, i) => {
                if (res.value !== null) {
                  stream.push(new Node(nodesToRead[i], res));
                }
              });
            }

            cb(err);
          });
        })
      );
    });

    browser.on('complete', () => {
      stream.end();
    });

    browser.browse(startNodes);

    return stream;
  }

}