import { Dictionary, Set } from 'typescript-collections'; import { Stream, StreamWithSend } from './Stream'; import { Vertex, Source } from './Vertex'; export class Router { private _inStream: Stream; private _table: Dictionary[]>; private _vertex: Vertex; public constructor(inStream: Stream, selector: (a: A) => K[], keyToStr?: (k: K)=>string) { this._inStream = inStream; this._table = new Dictionary(keyToStr); this._vertex = new Vertex( "Router", this._inStream.getVertex__().rank + 1, // <-- estimated rank only, may be adjusted by ensureBiggerThan [] ); this._vertex.addSource( new Source( this._inStream.getVertex__(), () => this._inStream.listen_( this._vertex, (a: A) => { let ks = selector(a); let visited = new Set(keyToStr); let outs: StreamWithSend[] = []; for (let i = 0; i < ks.length; ++i) { let k = ks[i]; if (visited.contains(k)) { continue; } visited.add(k); let outs2 = this._table.getValue(k); if (outs2 != undefined) { for (let j = 0; j < outs2.length; ++j) { outs.push(outs2[j]); } } } for (let i = 0; i < outs.length; ++i) { outs[i].send_(a); } }, true ) ) ); } public filterMatches(k: K): Stream { let out = new StreamWithSend(); let vertex = new Vertex( "Router::filterMatches", this._vertex.rank + 1, // <-- estimated rank only, may be adjusted by ensureBiggerThan [ new Source( this._vertex, () => { this._vertex.increment(out.getVertex__()); let outs: StreamWithSend[] = this._table.getValue(k); if (outs == undefined) { outs = []; this._table.setValue(k, outs); } outs.push(out); return () => { this._vertex.decrement(out.getVertex__()); let outs2 = this._table.getValue(k); for (let i = outs2.length-1; i >= 0; --i) { if (outs2[i] == out) { outs2.splice(i, 1); break; } } if (outs2.length == 0) { this._table.remove(k); } }; } ) ] ); out.setVertex__(vertex); return out; } }