src/lib/db/NodeStream.js
import through from 'through2';
import { Buffer } from 'buffer';
import { Job } from 'atvise-dbworker';
import DBBrowser from './Browser';
import NodeAddress from '../NodeAddress';
import AtviseFile from '../AtviseFile';
export class Node extends AtviseFile {
constructor(referenceDescription, readResult) {
let contents = Buffer.from([]);
if (readResult.value.value !== null) {
const source = readResult.value.value;
if (readResult.value.arrayType.value === 1) {
// FIXME: Probably need special handling for non-buffer array
const res = [];
if (!(source.length === 1 && source[0] === null)) {
source.forEach(s => res.push(s.toString()));
}
contents = Buffer.from(JSON.stringify(res));
} else if (!(source instanceof Buffer)) {
contents = Buffer.from(source.toString());
} else {
contents = source;
}
}
const path = NodeAddress.toFilePath(referenceDescription, readResult);
super({
path,
contents,
rc: {
typeDefinition: referenceDescription.typeDefinition.value,
dataType: readResult.value.dataType.key,
array: readResult.value.arrayType.value === 1,
namespace: referenceDescription.nodeId.namespace,
},
});
this.stat = {
mtime: readResult.sourceTimestamp,
};
}
}
/**
* A transform stream handling {@link AtviseFile}s from atvise-server
*/
export default class NodeStream {
/**
* Creates a new NodeStream based on some root nodes.
* @param {NodeId[]|String[]} startNodes The nodes to start with. Directly passed to
* {@link DBBrowser}.
* @return {NodeStream}
*/
static forNodes(startNodes) {
const stream = through.obj();
const browser = new DBBrowser();
browser.on('discoveredvariables', nodes => {
browser.worker.addJob(
new Job((session, cb) => {
session.read(nodes, function(err, nodesToRead, results) {
if (!err) {
results.forEach((res, i) => {
if (res.value !== null) {
stream.push(new Node(nodesToRead[i], res));
}
});
}
cb(err);
});
})
);
});
browser.on('complete', () => {
stream.end();
});
browser.browse(startNodes);
return stream;
}
}