// tslint:disable max-classes-per-file interface-name import { BehaviorSubject } from 'rxjs/BehaviorSubject'; import { Subject } from 'rxjs/Subject'; import { cloneDeep } from 'lodash'; import { clone } from 'lodash'; import { map } from 'lodash'; import { union } from 'lodash'; import { isFunction } from 'lodash'; import { isEmpty } from 'lodash'; import { isUndefined } from 'lodash'; import { ArrayEmitter } from './array-emitter.js'; import { Chain } from './chain'; import { Database, Model, Query as IQuery, QueryFunction, QueryList, Resource, SocketFactory } from './types'; import { convertToResolvedSubject, Deferred } from './utils'; export class Query extends ArrayEmitter implements IQuery { public $refresh: (force?: boolean) => Promise>; public $promise: Promise>; public $subject: BehaviorSubject>; public $resolvedSubject: Subject>; public $resolved: boolean; public $Model: any; public $serverResults?: Query; public $skip?: number; public $hasNext: boolean; public $hasPrev: boolean; public extend: (obj: any, noSanitize?: boolean, retServer?: boolean) => Promise>; public replace: (qry: any, retServer?: boolean) => Promise>; public get: (...args: any[]) => any; public next: (num?: number) => Promise>; public prev: (num?: number) => Promise>; public chain: (Model: Resource | QueryFunction, qryFn?: QueryFunction) => IQuery; public transform: (tmpResults: any[]) => Promise | any[]; public currentLimit: number; public currentSkip: number; public loading: boolean; public hasNext: boolean; public hasPrev: boolean; } export class LocalQuery extends Query {} export class ServerQuery extends Query { public $newData?: (qryId: string, response: any, force?: boolean) => void; } function normalizeQuery(qry: any) { if (!qry) { return; } if (qry._q) { return qry._q; } else if (qry.find) { return qry; } else { return {find: qry}; } } function extendQuery(qry1: any, qry2: any) { // Calc the new query that we want const _qry = cloneDeep(qry1); ['limit', 'skip', 'sort'].forEach(prop => { if (qry2[prop]) { _qry[prop] = qry2[prop]; } }); if (!isEmpty(qry2.find)) { // Want to or together - but is the toplevel already an or? (and only an or) let existingOr = false; if (_qry.find.$or) { let valid = true; for (const key in _qry.find) { if (key !== '$or') { valid = false; break; } } existingOr = valid; } if (existingOr) { _qry.find = clone(_qry.find); } else { _qry.find = {$or: [_qry.find]}; } _qry.find.$or.push(qry2.find); } return _qry; } export function createQueryFactory(socket: SocketFactory) { function QueryFactory(url: string, rootKey: string, rootKeyPlural: string, db?: Database): QueryList { const sock = socket(url, rootKey, rootKeyPlural); // Object holding queries (keys are the query ids, values are arrays of arrays) const _localQueries: Array> = []; /** * Local Query List * @param {[type]} qry [description] * @param {[type]} limit [description] * @param {[type]} Resource [description] * @param {[type]} toRes [description] * * Strategy - we run the query on the server. However if we can deal with it locally (via * nedb) then do so. If the query at any time becomes more complicated we just fall through * to the server version */ const LocalQueryList: QueryList = (qry: any, limit: number, Resource: Resource, toRes: (id: string) => Model): LocalQuery => { // Allow qry to be a promise, and we return the raw (non _q bit) qry = Promise.resolve(qry).then(normalizeQuery).then(_qry => { if (limit) { _qry.limit = limit; } return _qry; }); // Generate the ServerQuery. We do this so we have something to fall back on // and also so that we will (or should!) get notified of changes from the server if // someone else creates or removes something that should go in these results const serverResults = ServerQueryList(qry, limit, Resource); const results = new LocalQuery(); _localQueries.push(results); let currentLimit = 0; let currentSkip = 0; let lastBatchSize = 0; let fallback = false; results.loading = true; // When the server results are updated we want to check paging options and apply // them to our results serverResults.$emitter.on('update', syncFromServer); function syncFromServer() { results.hasNext = serverResults.hasNext; results.hasPrev = serverResults.hasPrev; // If we are falling back then set up and copy our letious properties across if (fallback) { results.$hasNext = false; results.$hasPrev = false; results.length = 0; serverResults.forEach(res => { results.push(res); }); currentLimit = results.length; currentSkip = results.$skip; lastBatchSize = lastBatchSize || results.length; } } function get() { return Resource.get.apply(Resource, arguments); } function chain(Model: Resource | QueryFunction, qryFn?: QueryFunction) { return Chain(results, Model, qryFn); } function query(_qry: any) { // Store off the qry making sure its a promise qry = Promise.resolve(_qry); fallback = !db.qryIsSimple(_qry); // If we are fallingback then just resolve with our results. The server should // do the rest. if (fallback) { // We want to return the server's promise here so that people will be notified // when the results are actually loaded let prom; if (!serverResults.$resolved) { prom = serverResults.$promise.then(() => { return results; }); } else { prom = Promise.resolve(results); } return prom; } return db.query(_qry).then((ids: string[]) => { // We can set the hasNext and hasPrev values to true here if we know there // are some. However only the server has the definitive ability to say // there arent any results.$hasNext = false; results.$hasPrev = false; if (limit && ids.length > limit) { // We have more results to fetch results.hasNext = true; results.$hasNext = true; // Trim the results down to size ids.length -= 1; } const skip = _qry.skip; if (skip) { results.hasPrev = true; results.$hasPrev = true; } // Calculate paging options currentLimit = ids.length; currentSkip = skip || 0; lastBatchSize = lastBatchSize || ids.length; // Go to resource types const tmpResults = ids.map(toRes); // Do we need to do a transform? let rprom; if (isFunction(results.transform)) { rprom = Promise.resolve(results.transform(tmpResults)); } else { rprom = Promise.resolve(tmpResults); } return rprom.then(transformed => { // Put the resources into the list const old = results.slice(); results.length = 0; transformed.forEach((res: Model) => { results.push(res); }); results.$emitter.emit('update', results, old); return results; }); }); } function refreshQuery(forceServer?: boolean): Promise> { // Perform our query let prom = qry.then(query).then((res: any[]) => { // If we don't have any results then maybe wait for the server to return if ((res.length === 0 && !serverResults.$resolved)) { return serverResults.$promise.then(() => { return refreshQuery(); }); } else { return res; } }); if (forceServer) { prom = serverResults.$refresh(true); } return prom.then((res: any) => { results.loading = false; return res; }); } function extendResults(obj: any, noSanitize?: boolean, retServer?: boolean) { // Sanitize the object if (!noSanitize) { if (obj._q) { obj = obj._q; } else { obj = {find: obj}; } } return qry.then((resolved: any) => { const _qry = extendQuery(resolved, obj); return replace(_qry, retServer); }); } function replace(_qry: any, retServer?: boolean) { // Sync down to the serverquery const serverProm = serverResults.replace(_qry); // Do the query but replace the existing query const localProm = Promise.resolve(_qry).then(normalizeQuery).then(normQry => { // We allow a query to resolve to something falsy - in which case we just // drop it if (!normQry) { return; } return qry.then((oldqry: any) => { if (JSON.stringify(oldqry) !== JSON.stringify(normQry)) { // query is different - continue return query(normQry); } }); }); return retServer ? serverProm : localProm; } function next(num?: number) { let promise; let extendSize; if (results.hasNext) { lastBatchSize = extendSize = num || lastBatchSize; const extendObj = { limit: currentLimit + extendSize, }; promise = extendResults(extendObj, true, !results.$hasNext); } else { const deferred = new Deferred(); deferred.resolve(); promise = deferred.promise; } return promise; } function prev(num?: number) { let promise; let extendSize; if (results.hasPrev) { lastBatchSize = extendSize = num || lastBatchSize; const extendObj = { limit: currentLimit + extendSize, skip: Math.max(currentSkip - extendSize, 0), }; promise = extendResults(extendObj, true, !results.$hasPrev); } else { const deferred = new Deferred(); deferred.resolve(); promise = deferred.promise; } return promise; } results.$refresh = refreshQuery; results.extend = extendResults; results.replace = replace; results.get = get; results.next = next; results.prev = prev; results.$promise = refreshQuery(); results.$resolvedSubject = convertToResolvedSubject(results.$subject, results.$promise); results.chain = chain; results.$Model = Resource; results.$serverResults = serverResults; // Put currentSkip and currentLimit onto the results Object.defineProperty(results, 'currentLimit', { enumerable: true, get: () => { return currentLimit; }, }); Object.defineProperty(results, 'currentSkip', { enumerable: true, get: () => { return currentSkip; }, }); return results; }; let refreshTimer: number; LocalQueryList.refresh = () => { // If we have a timer outstanding then just return - a refresh will happen soon. if (!refreshTimer) { refreshTimer = setTimeout(doRefresh, 100); } return refreshTimer; function doRefresh() { _localQueries.forEach(res => { res.$refresh(); }); refreshTimer = null; } }; /** * Server query list. The server makes sure we keep in sync * @param {[type]} qry [description] * @param {[type]} limit [description] * @param {[type]} Resource [description] */ function ServerQueryList(qry: any, limit: number, Resource: any): ServerQuery { // Allow qry to be a promise qry = Promise.resolve(qry).then(normalizeQuery).then(_qry => { if (limit) { _qry.limit = limit; } return _qry; }); let emitPromise: Promise = null; const results = new ServerQuery(); let currentLimit = 0; let currentSkip = 0; let lastBatchSize = 0; let _pagingOpts: any = {}; results.loading = true; function query(data: any) { // We only want to do one emit at a time (otherwise we could get into a bad state) if (isFunction(data)) { data = data(); } // Store off the query - make sure its a promise qry = Promise.resolve(data); return qry.then((_data: any) => { // If we have an existing emitPromise then wait for it to resolve before we run let promise; if (emitPromise) { promise = emitPromise.then(() => { return sock.query(_data); }); } else { emitPromise = promise = sock.query(_data); } return promise; }); } function newData(_qryId: string, response: any, force?: boolean) { if (!response) { return; } const deferred = new Deferred(); const ids: string[] = response.ids; const pagingOpts = response.pagingOpts; // So far we've only got the ids of the qry result - go and fetch the actual objects. // This mechanism saves bandwidth by only getting the object data once then listening // for changes to it // // If we are forcing we want to get both the old and new ids to check for any changes // deletions etc.. let getIds = ids; if (force) { const oldIds = map(results, '_id'); getIds = union(ids, oldIds); } Resource.get(getIds, force).$promise.then((ress: any[]) => { const ressmap: any = {}; ress.forEach(res => { ressmap[res._id] = res; }); // We don't allow repeated ids so just iterate over the results const tmpResults: any[] = []; ids.forEach(id => { tmpResults.push(ressmap[id]); }); // Do we need to do a transform? let rprom; if (isFunction(results.transform)) { rprom = Promise.resolve(results.transform(tmpResults)); } else { rprom = Promise.resolve(tmpResults); } return rprom.then(transformed => { results.length = 0; transformed.forEach((res: Model) => { results.push(res); }); // Since we now have data in our array store off limit data currentLimit = tmpResults.length; results.$skip = currentSkip = (qry && qry.skip) ? qry.skip : 0; lastBatchSize = lastBatchSize || tmpResults.length; _pagingOpts = pagingOpts; results.hasNext = (pagingOpts.next != null); results.hasPrev = (pagingOpts.prev != null); // Data has come back results.loading = false; results.$emitter.emit('update', results); deferred.resolve(results); }); }); return deferred.promise; } function refreshQuery(force?: boolean): Promise> { const req = query(qry); const promise = req.then((res: any) => { // If we get no response (the app could be offline) then just resolve with // the existing results if (isUndefined(res)) { results.loading = false; return results; } const _qryId = res.qryId; const response = res.data; return newData(_qryId, response, force); }); return promise; } function replace(_qry: any) { // Do the query but replace the existing query return Promise.resolve(_qry).then(normalizeQuery).then(normQry => { // We allow a query to resolve to something falsy - in which case we just // drop it if (!normQry) { return; } return qry.then((oldqry: any) => { if (JSON.stringify(oldqry) !== JSON.stringify(normQry)) { // query is different - continue const req = query(normQry); const promise = req.then((res: any) => { // If we get no response (the app could be offline) then just resolve with // the existing results if (isUndefined(res)) { results.loading = false; return results; } // We do have a response. Continue const _qryId = res.qryId; const data = res.data; return newData(_qryId, data); }); return promise; } }); }); } function extendResults(obj: any, noSanitize?: boolean) { // Sanitize the object if (!noSanitize) { if (obj._q) { obj = obj._q; } else { obj = {find: obj}; } } return qry.then((resolved: any) => { const _qry = extendQuery(resolved, obj); return replace(_qry); }); } function next(num: number) { let promise; let extendSize; if (_pagingOpts.next) { lastBatchSize = extendSize = num || lastBatchSize; const extendObj = { limit: currentLimit + extendSize, }; promise = extendResults(extendObj, true); } else { const deferred = new Deferred(); deferred.resolve(); promise = deferred.promise; } return promise; } function prev(num: number) { let promise; let extendSize; if (_pagingOpts.prev) { lastBatchSize = extendSize = num || lastBatchSize; const extendObj = { limit: currentLimit + extendSize, skip: Math.max(currentSkip - extendSize, 0), }; promise = extendResults(extendObj, true); } else { const deferred = new Deferred(); deferred.resolve(); promise = deferred.promise; } return promise; } function get() { return Resource.get.apply(Resource, arguments); } function chain(Model: any, qryFn: QueryFunction) { return Chain(results, Model, qryFn); } results.$newData = newData; results.$refresh = refreshQuery; results.extend = extendResults; results.replace = replace; results.get = get; results.next = next; results.prev = prev; results.$promise = refreshQuery().then(res => { results.$resolved = true; return res; }); results.$resolvedSubject = convertToResolvedSubject(results.$subject, results.$promise); results.$resolved = false; results.chain = chain; results.$Model = Resource; results.$skip = 0; // Put currentSkip and currentLimit onto the results Object.defineProperty(results, 'currentLimit', { enumerable: true, get: () => { return currentLimit; }, }); Object.defineProperty(results, 'currentSkip', { enumerable: true, get: () => { return currentSkip; }, }); return results; } return db ? LocalQueryList : ServerQueryList; } return QueryFactory; }