import xs, {Stream, InternalListener, OutSender, Operator, NO} from 'xstream'; import {InternalInstances} from './types'; class PickCombineListener implements InternalListener, OutSender> { private key: string; public out: Stream>; public p: PickCombine; public val: T; public ins: Stream; constructor(key: string, out: Stream>, p: PickCombine, ins: Stream) { this.key = key; this.out = out; this.p = p; this.val = NO as any; this.ins = ins; } _n(t: T): void { const p = this.p, out = this.out; this.val = t; if (out === null) { return; } this.p.up(); } _e(err: any): void { const out = this.out; if (out === null) { return; } out._e(err); } _c(): void { } } class PickCombine implements Operator, Array> { public type = 'combine'; public ins: Stream>; public out: Stream>; public sel: string; public ils: Map>; public inst: InternalInstances; constructor(sel: string, ins: Stream>) { this.ins = ins; this.sel = sel; this.out = null as any; this.ils = new Map(); this.inst = null as any; } _start(out: Stream>): void { this.out = out; this.ins._add(this); } _stop(): void { this.ins._remove(this); const ils = this.ils; ils.forEach(il => { il.ins._remove(il); il.ins = null as any; il.out = null as any; il.val = null as any; }); ils.clear(); this.out = null as any; this.ils = new Map(); this.inst = null as any; } up(): void { const arr = this.inst.arr; const n = arr.length; const ils = this.ils; const outArr: Array = Array(n); for (let i = 0; i < n; ++i) { const sinks = arr[i]; const key = sinks._key as any as string; if (!ils.has(key)) { return; } const val = (ils.get(key) as any).val; if (val === NO) { return; } outArr[i] = val; } this.out._n(outArr); } _n(inst: InternalInstances): void { this.inst = inst; const arrSinks = inst.arr; const ils = this.ils; const out = this.out; const sel = this.sel; const dict = inst.dict; const n = arrSinks.length; // remove let removed = false; ils.forEach((il, key) => { if (!dict.has(key)) { il.ins._remove(il); il.ins = null as any; il.out = null as any; il.val = null as any; ils.delete(key); removed = true; } }); if (n === 0) { out._n([]); return; } // add for (let i = 0; i < n; ++i) { const sinks = arrSinks[i]; const key = sinks._key as any as string; if (!sinks[sel]) { throw new Error('pickCombine found an undefined child sink stream'); } const sink: Stream = xs.fromObservable(sinks[sel]); if (!ils.has(key)) { ils.set(key, new PickCombineListener(key, out, this, sink)); sink._add(ils.get(key) as PickCombineListener); } } if (removed) { this.up(); } } _e(e: any): void { const out = this.out; if (out === null) { return; } out._e(e); } _c(): void { const out = this.out; if (out === null) { return; } out._c(); } } export function pickCombine(selector: string) { return function pickCombineOperator(inst$: Stream>): Stream> { return new Stream(new PickCombine(selector, inst$)); }; }