import xs, {Stream, InternalListener, OutSender, Operator} from 'xstream'; import {InternalInstances} from './types'; class PickMergeListener implements InternalListener, OutSender { public ins: Stream; public out: Stream; public p: PickMerge; constructor(out: Stream, p: PickMerge, ins: Stream) { this.ins = ins; this.out = out; this.p = p; } _n(t: T): void { const p = this.p, out = this.out; if (out === null) { return; } out._n(t); } _e(err: any): void { const out = this.out; if (out === null) { return; } out._e(err); } _c(): void { } } class PickMerge implements Operator, T> { public type = 'pickMerge'; public ins: Stream>; public out: Stream; public sel: string; public ils: Map>; public inst: InternalInstances; constructor(sel: string, ins: Stream>) { this.ins = ins; this.out = null as any; this.sel = sel; this.ils = new Map(); this.inst = null as any; } _start(out: Stream): void { this.out = out; this.ins._add(this); } _stop(): void { this.ins._remove(this); const ils = this.ils; ils.forEach((il, key) => { il.ins._remove(il); il.ins = null as any; il.out = null as any; ils.delete(key); }); ils.clear(); this.out = null as any; this.ils = new Map(); this.inst = null as any; } _n(inst: InternalInstances): void { this.inst = inst; const arrSinks = inst.arr; const ils = this.ils; const out = this.out; const sel = this.sel; const n = arrSinks.length; // add for (let i = 0; i < n; ++i) { const sinks = arrSinks[i]; const key = sinks._key as any as string; const sink: Stream = xs.fromObservable(sinks[sel] || xs.never()); if (!ils.has(key)) { ils.set(key, new PickMergeListener(out, this, sink)); sink._add(ils.get(key) as PickMergeListener); } } // remove ils.forEach((il, key) => { if (!inst.dict.has(key) || !inst.dict.get(key)) { il.ins._remove(il); il.ins = null as any; il.out = null as any; ils.delete(key); } }); } _e(err: any) { const u = this.out; if (u === null) return; u._e(err); } _c() { const u = this.out; if (u === null) return; u._c(); } } export function pickMerge(selector: string) { return function pickMergeOperator(inst$: Stream>): Stream { return new Stream(new PickMerge(selector, inst$)); }; }