// tslint:disable variable-name only-arrow-functions import { Observable } from 'rxjs/Observable'; import { Subscription } from 'rxjs/Subscription'; import 'rxjs/add/observable/fromPromise'; import 'rxjs/add/observable/of'; import 'rxjs/add/operator/distinctUntilChanged'; import 'rxjs/add/operator/first'; import 'rxjs/add/operator/mergeMap'; import 'rxjs/add/operator/skip'; import 'rxjs/add/operator/toPromise'; import { ArrayEmitter } from './array-emitter'; import { Chain } from './chain'; import { isEqual } from 'lodash'; import { isString } from 'lodash'; import { throttle } from 'lodash'; import { Collection as ICollection, CollectionSearcher, CollectionStatic, Model, Query, QueryFunction, Resource, Seeds } from './types'; // Recursively follows a property that links to the same model. // For now rather obtusively just redraw from the seeds when anything changes. export function CollectionFactory() { let _collections: Collection[] = []; class Collection extends ArrayEmitter implements ICollection { public static check = throttle(() => { _collections.forEach(coll => coll.check()); }, 100); public $promise: Promise>; public $resolved: boolean; public chain: {(Model: any, qryFn: QueryFunction): Query}; public update: {(): void}; public check: {(): void}; constructor(Resource: Resource, seeds: Model | Seeds, searcher: CollectionSearcher) { super(); let nodes: Collection = this; let watchers: Subscription[] = []; let collecting = false; let recollect = false; _collections.push(nodes); let resolve: Function; nodes.$promise = new Promise(_resolve => resolve = _resolve); nodes.$resolved = false; nodes.chain = chain; nodes.update = update; function chain(Model: Resource, qryFn: QueryFunction) { return Chain(nodes, Model, qryFn); } function clear() { nodes.length = 0; watchers.forEach(sub => sub.unsubscribe()); } function runCollection() { if (!collecting) { recollect = false; collecting = true; clear(); collectRecursive(seeds).then(function() { collecting = false; if (recollect) { runCollection(); } else { // Notify that we've updated and settled nodes.$emitter.emit('update', nodes); if (!nodes.$resolved) { nodes.$resolved = true; resolve(nodes); } } }); } else { // We're running - rerun once we are done recollect = true; } } function collectRecursive(start: (Model | Model[])): Promise { let starters: Model[]; if (Array.isArray(start)) { starters = start as Model[]; } else { starters = [start as Model]; } let unseen = starters.filter(model => nodes.indexOf(model) === -1); if (unseen.length === 0) { return Promise.resolve(); } for (let model of unseen) { nodes.push(model); } let searcherObs = searcher(unseen); // Check for changes and rerun collection (the first is just the // initial value) let watcher = searcherObs.distinctUntilChanged((newVal, oldVal) => { return isEqual(oldVal, newVal); }).skip(1).subscribe(() => { runCollection(); }); watchers.push(watcher); return searcherObs.mergeMap(function (result) { // Result could either be an array of ids, or the objects themselves if (result && isString(result[0])) { // Ids return Observable.fromPromise(Resource.get(result as string[]).$promise as Promise); } else { return Observable.of(result); } }).first().toPromise().then(function(results: Seeds) { return collectRecursive(results); }); } function update() { runCollection(); } // Do an initial collection run runCollection(); } } return Collection as CollectionStatic; }