import * as utils from '../../utils'; import Datastore from '../Datastore'; import Aggregator from './Aggregator'; describe('sdk/Aggregator', () => { const DateNow = Date.now; let client; let aggregator; beforeEach(() => { Date.now = jest .fn() .mockImplementation(() => new Date('2021-01-01T00:00:00.000Z').getTime()); client = new Datastore(); client.maxEventsVersion = jest.fn().mockResolvedValue(1); client.axios = { request: jest.fn(), }; const datastores = new Map([['datastore', client]]); aggregator = new Aggregator(datastores); }); afterEach(() => { Date.now = DateNow; jest.restoreAllMocks(); }); describe('constructor', () => { it('creates a new aggregator with datastores', () => { const datastores = new Map([['datastore', client]]); aggregator = new Aggregator(datastores); expect(aggregator).toBeInstanceOf(Aggregator); expect(aggregator.datastores).toEqual(datastores); }); it('creates a new aggregator with configuration', () => { const datastores = new Map([['datastore', client]]); aggregator = new Aggregator(datastores, { max_retry: 1, }); expect(aggregator.config).toEqual({ max_retry: 1, }); }); }); describe('ok', () => { it('performs a noop if the condition is `true`', () => { let error = null; try { Aggregator.ok(true, 'My message'); } catch (err) { error = err; } expect(error).toEqual(null); }); it('throws an exception if the condition is `false`', () => { let error = null; try { Aggregator.ok(false, 'My message'); } catch (err) { error = err; } expect(error).toEqual(new Error('My message')); }); }); describe('#validate', () => { it('validates pipeline contract', () => { const datastores = new Map([['datastore', client]]); aggregator = new Aggregator(datastores, { max_retry: 1, }); let error; try { aggregator.validate([ { type: 'fetch', model: 'accounts', map: [ { from: 'test', to: 'test', default: {}, }, ], }, ]); } catch (err) { error = err; } expect(error).toEqual(undefined); }); it('throws an error if the pipeline is invalid', () => { const datastores = new Map([['datastore', client]]); aggregator = new Aggregator(datastores, { max_retry: 1, }); let error; try { aggregator.validate([ { type: 'invalid', model: 'accounts', map: [ { from: 'test', to: 'test', default: {}, }, ], }, ]); } catch (err) { error = err; } expect(error).toEqual(Aggregator.ERROR_INVALID_PIPELINE_DEFINITION); }); }); describe('#addStepType', () => { it('adds a new step to the aggregator', () => { const stepDef = { handler: jest.fn(), }; const datastores = new Map([['datastore', client]]); aggregator = new Aggregator(datastores); aggregator.addStepType('my_step', stepDef); expect(aggregator.steps.get('my_step')).toEqual(stepDef); }); it('throws an error in case of step type prior existence', () => { const stepFn = jest.fn(); const datastores = new Map([['datastore', client]]); aggregator = new Aggregator(datastores); aggregator.addStepType('my_step', stepFn); let error; try { aggregator.addStepType('my_step', stepFn); } catch (err) { error = err; } expect(error).toEqual(Aggregator.ERROR_CONFLICT_STEP_TYPE); }); }); describe('#removeStepType', () => { it('removes a step type from the list', () => { const stepFn = jest.fn(); const datastores = new Map([['datastore', client]]); aggregator = new Aggregator(datastores); aggregator.addStepType('my_step', stepFn); aggregator.removeStepType('my_step'); expect(aggregator.steps.has('my_step')).toEqual(false); }); it('noop if the step type does not exist', () => { const datastores = new Map([['datastore', client]]); aggregator = new Aggregator(datastores); aggregator.removeStepType('my_step'); expect(aggregator.steps.has('my_step')).toEqual(false); }); }); describe('#applyMap', () => { it('returns a default empty query if none is defined in the step', () => { const query = aggregator.applyMap({}); expect(query).toEqual({}); }); it('returns the query defined in the step if none is defined', () => { const query = aggregator.applyMap({ firstname: 'john', }); expect(query).toEqual({ firstname: 'john', }); }); it('returns the query built with previous data if `map` is defined', () => { const query = aggregator.applyMap( {}, [ { from: 'profile.firstname', to: 'firstname', }, ], { profile: { firstname: 'john', }, }, ); expect(query).toEqual({ firstname: 'john', }); }); it('returns the query built with a relative timing with `relative_date_in_seconds`', () => { const query = aggregator.applyMap( {}, [ { from: 'profile.created_at', to: ['created_at', 'date($gt)'], relative_date_in_seconds: -3600, }, ], { profile: { created_at: '2021-02-01', }, }, ); expect(query).toEqual({ created_at: { 'date($gt)': new Date('2021-01-31T23:00:00.000Z'), }, }); }); it('returns the query built with a relative timing with `relative_date_in_seconds` without date', () => { const query = aggregator.applyMap( {}, [ { from: '_', to: ['created_at', 'date($gt)'], relative_date_in_seconds: -3600, }, ], { profile: { created_at: '2021-02-01', }, }, ); expect(query).toEqual({ created_at: { 'date($gt)': new Date('2020-12-31T23:00:00.000Z'), }, }); }); it('returns the hash value of the string value', () => { const query = aggregator.applyMap( {}, [ { from: 'profile.firstname', to: 'firstname', must_hash: true, }, ], { profile: { firstname: 'john', }, }, ); expect(query).toEqual({ firstname: 'b7fcc6e612145267d2ffea04be754a34128c1ed8133a09bfbbabd6afe6327688aa71d47343dd36e719f35f30fa79aec540e91b81c214fddfe0bedd53370df46d', }); }); it('returns the hash value of the stringified value', () => { const query = aggregator.applyMap( {}, [ { from: 'profile', to: 'profile', must_hash: true, json_stringify: true, }, ], { profile: { firstname: 'john', }, }, ); expect(query).toEqual({ profile: '79d299cb21cc50cbbd4abb18885ccc602e88cc3993faf160b68c5bdb2b941794063b31e2b1c5fda1787118dfae75afea4f604ad3f5ad7fc7dd7b7a946ce5274a', }); }); it('returns the query built with default value if `map` and `default` are defined and no value is found', () => { const query = aggregator.applyMap( {}, [ { from: 'profile.firstname', to: 'firstname', default: 'unknown', }, ], { profile: { // firstname: 'john', // <- missing value }, }, ); expect(query).toEqual({ firstname: 'unknown', }); }); it('returns undefined on unitialized data', () => { const query = aggregator.applyMap({}, [ { from: 'profile.firstname', to: 'firstname', default: 'unknown', }, ]); expect(query).toEqual({ firstname: 'unknown', }); }); it('replaces the object with the targeted value if `to` is `.`', () => { const query = aggregator.applyMap( {}, [ { from: 'profile.firstname', to: '.', default: 'unknown', }, ], { profile: { firstname: 'john', }, }, ); expect(query).toEqual('john'); }); it('copy the data to the result if from is equal to `.`', () => { const query = aggregator.applyMap( {}, [ { from: '.', to: '.', }, ], { profile: { firstname: 'john', }, }, ); expect(query).toEqual({ profile: { firstname: 'john', }, }); }); it('applies the default value to the result if from is equal to `.` and no data is provided', () => { const query = aggregator.applyMap({}, [ { from: '.', to: '.', default: { profile: { firstname: 'john', }, }, }, ]); expect(query).toEqual({ profile: { firstname: 'john', }, }); }); }); describe('#fetch', () => { it('queries the first available datastore if none is defined', async () => { jest.spyOn(client, 'firstEventVersion').mockImplementation(() => 0); jest.spyOn(client, 'walkNext').mockImplementation(() => ({ data: [], headers: { page: 1, 'page-size': 10, }, })); const results = await aggregator.fetch({ model: 'profiles' }); expect(client.walkNext).toHaveBeenCalledWith( 'profiles', {}, 'entities', 0, 20, { current_version: -1, cursor_last_id: '', cursor_last_correlation_id: '', headers: undefined, version_ordered: false, }, ); }); it('returns all results associated to a query', async () => { jest.spyOn(client, 'firstEventVersion').mockImplementation(() => 0); jest .spyOn(client, 'walkNext') .mockImplementationOnce(() => ({ data: [ { a: 1, created_at: '1', }, { a: 2, created_at: '2', }, { a: 3, created_at: '3', }, ], headers: { page: 0, 'page-size': 10, }, })) .mockImplementation(() => ({ data: [], headers: { page: 1, 'page-size': 10, }, })); const results = await aggregator.fetch({ datastore: 'datastore', model: 'profiles', query: { is_enabled: true, }, }); expect(results).toEqual([ { a: 1, created_at: '1', }, { a: 2, created_at: '2', }, { a: 3, created_at: '3', }, ]); }); it('returns second page of results associated to a query if `page` is defined', async () => { jest .spyOn(client, 'find') .mockImplementationOnce(() => ({ data: [ { a: 1, }, ], headers: { page: 1, 'page-size': 10, }, })) .mockImplementation(() => ({ data: [], headers: { page: 1, 'page-size': 10, }, })); const results = await aggregator.fetch({ datastore: 'datastore', model: 'profiles', page: 1, query: { is_enabled: true, }, }); expect(results).toEqual([ { a: 1, }, ]); expect(client.find).toHaveBeenCalledWith( 'profiles', { is_enabled: true }, 1, undefined, undefined, ); }); it('returns first page with a page size of results associated to a query if `page_size` is defined', async () => { client.find = jest .fn() .mockImplementationOnce((model, query, page, pageSize) => ({ data: [ { a: 1, }, ], headers: { page, 'page-size': pageSize, }, })) .mockImplementation(() => ({ data: [], headers: { page: 1, 'page-size': 10, }, })); const results = await aggregator.fetch({ datastore: 'datastore', model: 'profiles', page_size: 10, query: { is_enabled: true, }, }); expect(results).toEqual([ { a: 1, }, ]); expect(client.find).toHaveBeenCalledWith( 'profiles', { is_enabled: true }, undefined, 10, undefined, ); }); it('returns second page of events of results associated to a query if `page` is defined', async () => { client.allEvents = jest .fn() .mockImplementationOnce(() => ({ data: [ { type: 'CREATED', a: 1, }, ], headers: { page: 1, 'page-size': 10, }, })) .mockImplementation(() => ({ data: [], headers: { page: 1, 'page-size': 10, }, })); const results = await aggregator.fetch({ datastore: 'datastore', model: 'profiles', source: 'events', page: 1, query: { is_enabled: true, }, }); expect(results).toEqual([ { type: 'CREATED', a: 1, }, ]); expect(client.allEvents).toHaveBeenCalledWith( 'profiles', { is_enabled: true }, 1, undefined, undefined, ); }); it('returns first page of events with a page size of results associated to a query if `page_size` is defined', async () => { client.allEvents = jest .fn() .mockImplementationOnce(() => ({ data: [ { type: 'CREATED', a: 1, }, ], headers: { page: 1, 'page-size': 10, }, })) .mockImplementation(() => ({ data: [], headers: { page: 1, 'page-size': 10, }, })); const results = await aggregator.fetch({ datastore: 'datastore', model: 'profiles', source: 'events', page_size: 10, query: { is_enabled: true, }, }); expect(results).toEqual([ { type: 'CREATED', a: 1, }, ]); expect(client.allEvents).toHaveBeenCalledWith( 'profiles', { is_enabled: true }, undefined, 10, undefined, ); }); it('returns the decrypted entities if requested', async () => { client.decrypt = jest.fn().mockImplementation(() => ({ data: [ { firstname: 'Alice', }, ], })); client.find = jest .fn() .mockImplementationOnce(() => ({ data: [{ firstname: 'encrypted' }], headers: { page: 0, 'page-size': 10, }, })) .mockImplementation(() => ({ data: [], headers: { page: 1, 'page-size': 10, }, })); const results = await aggregator.fetch({ datastore: 'datastore', model: 'profiles', must_decrypt: true, page: 0, query: { is_enabled: true, }, }); expect(results).toEqual([ { firstname: 'Alice', }, ]); }); it('returns the entity from timetravel if requested (walk)', async () => { client.at = jest.fn().mockImplementation(() => ({ data: { profile_id: 'alice', firstname: 'Alice 0', version: 0 }, })); client.find = jest .fn() .mockImplementationOnce((model, query, page, pageSize) => ({ data: [ { profile_id: 'alice', firstname: 'Alice 1', version: 1, }, ], headers: { page, 'page-size': pageSize, }, })); const results = await aggregator.fetch( { datastore: 'datastore', model: 'profiles', correlation_field: 'profile_id', timetravel: 'entity.created_at', page: 0, query: { is_enabled: true, }, }, { entity: { created_at: new Date('2021-01-01T00:00:00.000Z').toISOString(), }, }, ); expect(results).toEqual([ { firstname: 'Alice 0', profile_id: 'alice', version: 0, }, ]); expect(client.at).toHaveBeenCalledWith( 'profiles', 'alice', '2021-01-01T00:00:00.000Z', ); }); it('throws a timetravel exception if no `correlation_field` is specified', async () => { client.at = jest.fn().mockImplementation(() => ({ data: { profile_id: 'alice', firstname: 'Alice 0', version: 0 }, })); client.find = jest .fn() .mockImplementationOnce((model, query, page, pageSize) => ({ data: [ { profile_id: 'alice', firstname: 'Alice 1', version: 1, }, ], headers: { page, 'page-size': pageSize, }, })); let error; try { const results = await aggregator.fetch( { datastore: 'datastore', model: 'profiles', timetravel: 'entity.created_at', page: 0, query: { is_enabled: true, }, }, { entity: { created_at: new Date(2021, 0, 1).toISOString(), }, }, ); } catch (err) { error = err; } expect(error).toEqual(new Error('Invalid timetravel condition')); }); it('throws an error if no timetravel date can be found from state', async () => { client.at = jest.fn().mockImplementation(() => ({ data: { profile_id: 'alice', firstname: 'Alice 0', version: 0 }, })); client.find = jest .fn() .mockImplementationOnce((model, query, page, pageSize) => ({ data: [ { profile_id: 'alice', firstname: 'Alice 1', version: 1, }, ], headers: { page, 'page-size': pageSize, }, })); let error; try { const results = await aggregator.fetch( { datastore: 'datastore', model: 'profiles', correlation_field: 'profile_id', timetravel: 'entity.invalid', page: 0, query: { is_enabled: true, }, }, { entity: { created_at: new Date(2021, 0, 1).toISOString(), }, }, ); } catch (err) { error = err; } expect(error).toEqual(new Error('Invalid timetravel condition')); }); }); describe('#persist', () => { beforeEach(() => { client.create = jest.fn().mockImplementation(() => ({ data: { created: true }, })); client.update = jest.fn().mockImplementation(() => ({ data: { updated: true }, })); }); it('persists an empty fragment into a datastore', async () => { const result = await aggregator.persist({ model: 'profiles', }); expect(client.create).toHaveBeenCalledWith('profiles', {}, {}); }); it('persists an empty fragment into a specific datastore', async () => { const result = await aggregator.persist({ model: 'profiles', datastore: 'datastore', }); expect(client.create).toHaveBeenCalledWith('profiles', {}, {}); expect(client.update).not.toHaveBeenCalled(); }); it('returns the created entity', async () => { const result = await aggregator.persist({ model: 'profiles', destination: 'entity', }); expect(result).toEqual({ created: true }); }); it('persists a fragment with defined payload into a datastore', async () => { const result = await aggregator.persist({ model: 'profiles', payload: { firstname: 'John', }, }); expect(client.update).not.toHaveBeenCalled(); expect(client.create).toHaveBeenCalledWith( 'profiles', { firstname: 'John', }, {}, ); }); it('persists a fragment with headers into a datastore', async () => { const result = await aggregator.persist({ model: 'profiles', payload: { firstname: 'John', }, headers: { upsert: true, }, }); expect(client.update).not.toHaveBeenCalled(); expect(client.create).toHaveBeenCalledWith( 'profiles', { firstname: 'John', }, { upsert: true, }, ); }); it('persists a fragment with `created_at` header if provided into the payload', async () => { const result = await aggregator.persist({ model: 'profiles', payload: { firstname: 'John', created_at: '2020-01-01T00:00:00.000Z', }, headers: { upsert: true, }, }); expect(client.update).not.toHaveBeenCalled(); expect(client.create).toHaveBeenCalledWith( 'profiles', { firstname: 'John', }, { upsert: true, 'created-at': '2020-01-01T00:00:00.000Z', }, ); }); it('persists a fragment with a mapping payload logic', async () => { const result = await aggregator.persist( { model: 'profiles', map: [ { from: 'profile.firstname', to: 'firstname', }, ], }, { profile: { firstname: 'John', }, }, ); expect(client.create).toHaveBeenCalledWith( 'profiles', { firstname: 'John', }, {}, ); }); it('updates en entity if a `correlation_field` is defined and available', async () => { const result = await aggregator.persist( { model: 'profiles', correlation_field: 'profile_id', map: [ { from: 'profile.profile_id', to: 'profile_id', }, { from: 'profile.firstname', to: 'firstname', }, ], }, { profile: { profile_id: 'profile_id', firstname: 'John', }, }, ); expect(client.update).toHaveBeenCalledWith( 'profiles', 'profile_id', { profile_id: 'profile_id', firstname: 'John', }, {}, ); }); it('updates en entity preserving the `updated_at` value', async () => { const result = await aggregator.persist( { model: 'profiles', correlation_field: 'profile_id', map: [ { from: 'profile.updated_at', to: 'updated_at', }, { from: 'profile.profile_id', to: 'profile_id', }, { from: 'profile.firstname', to: 'firstname', }, ], }, { profile: { profile_id: 'profile_id', firstname: 'John', updated_at: '2020-01-01T00:00:00.000Z', }, }, ); expect(client.update).toHaveBeenCalledWith( 'profiles', 'profile_id', { profile_id: 'profile_id', firstname: 'John', }, { 'created-at': '2020-01-01T00:00:00.000Z', }, ); }); it('updates en entity preserving the `created_at` value of the event', async () => { const result = await aggregator.persist( { model: 'profiles', source: 'events', correlation_field: 'profile_id', map: [ { from: 'event.created_at', to: 'created_at', }, { from: 'event.profile_id', to: 'profile_id', }, { from: 'event.firstname', to: 'firstname', }, ], }, { event: { profile_id: 'profile_id', firstname: 'John', type: 'CREATED', created_at: '2020-01-01T00:00:00.000Z', }, }, ); expect(client.update).toHaveBeenCalledWith( 'profiles', 'profile_id', { profile_id: 'profile_id', firstname: 'John', }, { 'created-at': '2020-01-01T00:00:00.000Z', }, ); }); it('returns the updated entity', async () => { const result = await aggregator.persist( { model: 'profiles', correlation_field: 'profile_id', map: [ { from: 'profile.profile_id', to: 'profile_id', }, { from: 'profile.firstname', to: 'firstname', }, ], }, { profile: { profile_id: 'profile_id', firstname: 'John', }, }, ); expect(result).toEqual({ updated: true }); }); it('updates en entity if a `correlation_field` is defined, available and with headers', async () => { const result = await aggregator.persist( { model: 'profiles', correlation_field: 'profile_id', headers: { upsert: true, }, map: [ { from: 'profile.profile_id', to: 'profile_id', }, { from: 'profile.firstname', to: 'firstname', }, ], }, { profile: { profile_id: 'profile_id', firstname: 'John', }, }, ); expect(client.update).toHaveBeenCalledWith( 'profiles', 'profile_id', { profile_id: 'profile_id', firstname: 'John', }, { upsert: true, }, ); }); it('updates an entity with an imperative version if defined', async () => { const result = await aggregator.persist( { model: 'profiles', correlation_field: 'profile_id', imperative_version_next: 'profile.version', headers: { upsert: true, }, map: [ { from: 'profile.profile_id', to: 'profile_id', }, { from: 'profile.firstname', to: 'firstname', }, ], }, { profile: { profile_id: 'profile_id', firstname: 'John', version: 1, }, }, ); expect(client.update).toHaveBeenCalledWith( 'profiles', 'profile_id', { profile_id: 'profile_id', firstname: 'John', }, { upsert: true, version: 2, }, ); }); it('updates an entity with an imperative version to 0 if path not found', async () => { const result = await aggregator.persist( { model: 'profiles', correlation_field: 'profile_id', imperative_version_next: 'path.not.found', headers: { upsert: true, }, map: [ { from: 'profile.profile_id', to: 'profile_id', }, { from: 'profile.firstname', to: 'firstname', }, ], }, { profile: { profile_id: 'profile_id', firstname: 'John', version: 1, }, }, ); expect(client.update).toHaveBeenCalledWith( 'profiles', 'profile_id', { profile_id: 'profile_id', firstname: 'John', }, { upsert: true, version: 0, }, ); }); it('throws an exception if a `correlation_field` is defined but not found', async () => { let error; try { const result = await aggregator.persist( { model: 'profiles', correlation_field: 'profile_id', map: [ { from: 'profile.profile_id', to: 'profile_id', }, { from: 'profile.firstname', to: 'firstname', }, ], }, { profile: { firstname: 'John', }, }, ); } catch (err) { error = err; } expect(error).toEqual(new Error('Correlation ID must be null or exist')); }); }); describe('#mergeData', () => { it('returns the data initialized with an empty object', () => { const data = aggregator.mergeData({ destination: 'counts', as_entity: false, default: [ { count: 0, }, ], }); expect(data).toEqual({ counts: [ { count: 0, }, ], }); }); it('returns the default objects if no results are provided and defaults are defined', () => { const data = aggregator.mergeData( { destination: 'counts', as_entity: false, default: [ { count: 0, }, ], }, { firstname: 'John', }, ); expect(data).toEqual({ firstname: 'John', counts: [ { count: 0, }, ], }); }); it('returns an empty array if no results are provided not default value', () => { const data = aggregator.mergeData( { destination: 'profiles', as_entity: false, }, { firstname: 'John', }, ); expect(data).toEqual({ firstname: 'John', profiles: [], }); }); it('returns the default object if no result found and a default value is defined', () => { const data = aggregator.mergeData( { destination: 'count', as_entity: true, default: 0, }, { firstname: 'John', }, ); expect(data).toEqual({ firstname: 'John', count: 0, }); }); it('returns a default object with results linked', () => { const data = aggregator.mergeData( { destination: 'profile', as_entity: true, }, {}, [ { firstname: 'John', }, ], ); expect(data).toEqual({ profile: { firstname: 'John', }, }); }); it('returns the data with all results stored in the destination', () => { const data = aggregator.mergeData( { destination: 'profiles', }, {}, [ { firstname: 'Alice', }, { firstname: 'Bernard', }, ], ); expect(data).toEqual({ profiles: [ { firstname: 'Alice', }, { firstname: 'Bernard', }, ], }); }); it('adds entries in the data object if an array already exists', () => { const data = aggregator.mergeData( { destination: 'profiles', }, { profiles: [ { firstname: 'Alice', }, ], }, [ { firstname: 'Bernard', }, ], ); expect(data).toEqual({ profiles: [ { firstname: 'Alice', }, { firstname: 'Bernard', }, ], }); }); it('adds an object at the root level', () => { const data = aggregator.mergeData( { destination: '.', as_entity: true, }, {}, [ { firstname: 'Alice', }, ], ); expect(data).toEqual({ firstname: 'Alice', }); }); it('throws a not found exception if `as_entity=true` and no results exist', () => { let error; try { const data = aggregator.mergeData( { destination: '.', as_entity: true, }, {}, [], ); } catch (err) { error = err; } expect(error).toEqual(Aggregator.ERROR_ENTITY_NOT_FOUND); }); it('throws an error if the destination is not defined', () => { let error; try { const data = aggregator.mergeData({ as_entity: true, }); } catch (err) { error = err; } expect(error).toBeInstanceOf(Error); expect(error).toEqual(Aggregator.ERROR_DESTINATION_UNDEFINED); }); }); describe('#runStepFetch', () => { it('fetches data and updates the aggregation object', async () => { aggregator.fetch = jest.fn().mockImplementation(() => [ { firstname: 'Alice', }, { firstname: 'Bernard', }, ]); const data = await aggregator.runStepFetch({ type: 'fetch', datastore: 'datastore', model: 'profiles', destination: 'profiles', query: {}, }); expect(data).toEqual({ profiles: [ { firstname: 'Alice', }, { firstname: 'Bernard', }, ], }); }); }); describe('#runStepPersist', () => { it('persists data and updates the aggregation object', async () => { aggregator.persist = jest.fn().mockImplementation(() => ({ firstname: 'Alice', })); const data = await aggregator.runStepPersist({ type: 'fetch', datastore: 'datastore', model: 'profiles', destination: 'profile', payload: {}, }); expect(data).toEqual({ profile: { firstname: 'Alice', }, }); }); it('persists data into a default `persist` location', async () => { aggregator.persist = jest.fn().mockImplementation(() => ({ firstname: 'Alice', })); const data = await aggregator.runStepPersist({ type: 'fetch', datastore: 'datastore', model: 'profiles', // destination: 'profile', // <- missing payload: {}, }); expect(data).toEqual({ persist: { firstname: 'Alice', }, }); }); }); describe('#runStepJsonPatch', () => { it('returns the data with JSON Patch applied', async () => { const data = await aggregator.runStepJsonPatch({ type: 'json_patch', patch: [{ op: 'add', path: '/hello', value: 'world' }], }); expect(data).toEqual({ hello: 'world', }); }); }); describe('#runStepMap', () => { it('returns an empty object if not initiliazed', async () => { const data = await aggregator.runStepMap({ type: 'map', map: [ { from: 'profile.firstname', to: 'firstname', }, ], }); expect(data).toEqual({}); }); it('maps data inside the object', async () => { const data = await aggregator.runStepMap( { type: 'map', map: [ { from: 'profile.firstname', to: 'firstname', }, ], }, { profile: { firstname: 'Alice', }, }, ); expect(data).toEqual({ profile: { firstname: 'Alice', }, firstname: 'Alice', }); }); }); describe('#runStepUnset', () => { it('returns empty object', async () => { const data = await aggregator.runStepUnset({ type: 'unset', path: 'profile.lastname', }); expect(data).toEqual({}); }); it('removes a single key from the aggregation', async () => { const data = await aggregator.runStepUnset( { type: 'unset', path: 'profile.lastname', }, { profile: { firstname: 'Alice', lastname: 'Cooper', }, }, ); expect(data).toEqual({ profile: { firstname: 'Alice', }, }); }); }); describe('#runStepValidate', () => { it('validate default empty `data` if not defined', async () => { let error; try { const data = await aggregator.runStepValidate({ type: 'validate', schema: { type: 'string', }, must_throw: true, }); } catch (err) { error = err; } expect(error.message).toEqual( Aggregator.ERROR_VALIDATE_STEP_FAILED.message, ); }); it('validate a sub path if defined', async () => { const data = await aggregator.runStepValidate( { type: 'validate', path: 'sub.count', schema: { type: 'number', enum: [0], }, }, { sub: { count: 0, }, }, ); expect(data).toMatchObject({ validation: { is_valid: true, }, }); }); it('throws an exception if requested', async () => { let error; try { const data = await aggregator.runStepValidate( { type: 'validate', schema: { type: 'string', }, must_throw: true, }, { invalid: true, }, ); } catch (err) { error = err; } expect(error.message).toEqual( Aggregator.ERROR_VALIDATE_STEP_FAILED.message, ); }); it('stores the validation error in the default destination `validation`', async () => { const data = await aggregator.runStepValidate( { type: 'validate', schema: { type: 'string', }, }, { invalid: true, }, ); expect(data).toEqual({ invalid: true, validation: { is_valid: false, errors: [ { instancePath: '', keyword: 'type', message: 'must be string', params: { type: 'string', }, schemaPath: '#/type', }, ], }, }); }); it('stores the validation error in the requested destination', async () => { const data = await aggregator.runStepValidate( { type: 'validate', schema: { type: 'string', }, destination: 'my_destination', }, { invalid: true, }, ); expect(data).toEqual({ invalid: true, my_destination: { is_valid: false, errors: [ { instancePath: '', keyword: 'type', message: 'must be string', params: { type: 'string', }, schemaPath: '#/type', }, ], }, }); }); }); describe('#runStepEach', () => { it('throws an exception on uninitialized data', async () => { let error; try { const data = await aggregator.runStepEach({ type: 'each', path: 'items', destination: 'results', pipeline: [ { type: 'map', map: [ { from: 'id', to: 'new_id', }, ], }, { type: 'unset', path: 'id', }, ], }); } catch (err) { error = err; } expect(error).toEqual(Aggregator.ERROR_ITERATE_STEP_IS_NOT_ARRAY); }); it('performs the defined step on each element of an array', async () => { const data = await aggregator.runStepEach( { type: 'each', path: 'items', destination: 'results', pipeline: [ { type: 'map', map: [ { from: 'id', to: 'new_id', }, ], }, { type: 'unset', path: 'id', }, ], }, { items: [ { id: 1, }, { id: 2, }, ], }, ); expect(data).toMatchObject({ results: [ { new_id: 1, }, { new_id: 2, }, ], }); }); it('performs the defined step on the same array', async () => { const data = await aggregator.runStepEach( { type: 'each', path: 'items', pipeline: [ { type: 'map', map: [ { from: 'id', to: 'new_id', }, ], }, { type: 'unset', path: 'id', }, ], }, { items: [ { id: 1, }, { id: 2, }, ], }, ); expect(data).toEqual({ items: [ { new_id: 1, }, { new_id: 2, }, ], }); }); it('throws an exception if the value to iterate on is not an array', async () => { let error; try { const data = await aggregator.runStepEach( { type: 'each', path: 'invalid', destination: 'results', pipeline: [], }, { invalid: {}, }, ); } catch (err) { error = err; } expect(error).toEqual(Aggregator.ERROR_ITERATE_STEP_IS_NOT_ARRAY); }); it('throws an exception if no value is found', async () => { let error; try { const data = await aggregator.runStepEach( { type: 'each', path: 'invalid', destination: 'results', pipeline: [], }, { target: [], }, ); } catch (err) { error = err; } expect(error).toEqual(Aggregator.ERROR_ITERATE_STEP_IS_NOT_ARRAY); }); }); describe('#runStepIf', () => { it('returns an empty object if the state is noot initialized yet', async () => { const data = await aggregator.runStepIf({ type: 'if', path: 'firstname', schema: { type: 'object', }, }); expect(data).toEqual({}); }); it('returns the current data if the validation part is false', async () => { const data = await aggregator.runStepIf( { type: 'if', path: 'firstname', schema: { type: 'object', }, }, { firstname: 'John', }, ); expect(data).toEqual({ firstname: 'John', }); }); it('returns the current data if the validation part is false without path definition', async () => { const data = await aggregator.runStepIf( { type: 'if', schema: { type: 'object', properties: { firstname: { type: 'object', }, }, }, }, { firstname: 'John', }, ); expect(data).toEqual({ firstname: 'John', }); }); it('invokes the pipeline if the validation part is true', async () => { const data = await aggregator.runStepIf( { type: 'if', path: 'firstname', schema: { type: 'string', }, pipeline: [ { type: 'map', map: [ { from: '_', to: 'lastname', default: 'Doe', }, ], }, ], }, { firstname: 'John', lastname: 'Doe', }, ); expect(data).toEqual({ firstname: 'John', lastname: 'Doe', }); }); it('invokes the pipeline several time if repeat_while_true is true', async () => { const data = await aggregator.runStepIf( { type: 'if', path: 'count', repeat_while_true: true, schema: { type: 'number', maximum: 2, }, pipeline: [ { type: 'op', func: 'add', destination: 'count', args: [ { path: 'count', }, { default: 1, }, ], }, ], }, { count: 0, }, ); expect(data).toEqual({ count: 3, }); }); it('stops the step after 100 iterations if not defined', async () => { const data = await aggregator.runStepIf( { type: 'if', path: 'count', repeat_while_true: true, schema: { type: 'number', }, pipeline: [ { type: 'op', func: 'add', destination: 'count', args: [ { path: 'count', }, { default: 1, }, ], }, ], }, { count: 0, }, ); expect(data).toEqual({ count: 100, }); }); it('stops the step after `max_iteration_count` iterations if defined', async () => { const data = await aggregator.runStepIf( { type: 'if', path: 'count', repeat_while_true: true, max_iteration_count: 2, schema: { type: 'number', }, pipeline: [ { type: 'op', func: 'add', destination: 'count', args: [ { path: 'count', }, { default: 1, }, ], }, ], }, { count: 0, }, ); expect(data).toEqual({ count: 2, }); }); }); describe('#runStepFilter', () => { it('throws an exception on uninitialized data', async () => { let error; try { const data = await aggregator.runStepFilter({ type: 'filter', path: 'items', destination: 'filtered', schema: { type: 'object', properties: { id: { type: 'number', enum: [1], }, }, }, }); } catch (err) { error = err; } expect(error).toEqual(Aggregator.ERROR_ITERATE_STEP_IS_NOT_ARRAY); }); it('filter an array based on a JSON Schema', async () => { const data = await aggregator.runStepFilter( { type: 'filter', path: 'items', destination: 'filtered', schema: { type: 'object', properties: { id: { type: 'number', enum: [1], }, }, }, }, { items: [ { id: 1, }, { id: 2, }, ], }, ); expect(data).toMatchObject({ filtered: [ { id: 1, }, ], }); }); it('filter an array based on a JSON Schema mapped with data value', async () => { const data = await aggregator.runStepFilter( { type: 'filter', path: 'items', destination: 'filtered', schema: { type: 'object', properties: { id: { type: 'number', enum: [], }, }, }, map: [ { to: 'properties.id.enum.0', default: 1, }, ], }, { items: [ { id: 1, }, { id: 2, }, ], }, ); expect(data).toMatchObject({ filtered: [ { id: 1, }, ], }); }); it('filters items on the same path', async () => { const data = await aggregator.runStepFilter( { type: 'filter', path: 'items', schema: { type: 'object', properties: { id: { type: 'number', enum: [1], }, }, }, }, { items: [ { id: 1, }, { id: 2, }, ], }, ); expect(data).toEqual({ items: [ { id: 1, }, ], }); }); it('finds an item in an array', async () => { const data = await aggregator.runStepFilter( { type: 'filter', path: 'items', destination: 'result', as_entity: true, schema: { type: 'object', properties: { id: { type: 'number', enum: [1], }, }, }, }, { items: [ { id: 1, }, { id: 2, }, ], }, ); expect(data).toMatchObject({ result: { id: 1, }, }); }); it('throws an exception if the filter must find an entity but failed', async () => { let error; try { const data = await aggregator.runStepFilter( { type: 'filter', path: 'items', destination: 'result', as_entity: true, schema: { type: 'object', properties: { id: { type: 'number', enum: [3], }, }, }, }, { items: [ { id: 1, }, { id: 2, }, ], }, ); } catch (err) { error = err; } expect(error).toEqual(Aggregator.ERROR_ENTITY_NOT_FOUND); }); it('throws an exception if the value to iterate on is not an array', async () => { let error; try { const data = await aggregator.runStepFilter( { type: 'filter', path: 'invalid', pipeline: [], }, { invalid: {}, }, ); } catch (err) { error = err; } expect(error).toEqual(Aggregator.ERROR_ITERATE_STEP_IS_NOT_ARRAY); }); it('throws an exception if no value is found', async () => { let error; try { const data = await aggregator.runStepFilter( { type: 'filter', path: 'invalid', destination: 'results', pipeline: [], }, { target: [], }, ); } catch (err) { error = err; } expect(error).toEqual(Aggregator.ERROR_ITERATE_STEP_IS_NOT_ARRAY); }); }); describe('#runStepOp', () => { afterEach(() => { jest.restoreAllMocks(); }); it('returns the length of an array', async () => { const data = await aggregator.runStepOp( { type: 'op', func: 'length', path: 'items', destination: 'count_items', }, { items: [1, 2], }, ); expect(data).toEqual({ items: [1, 2], count_items: 2, }); }); it('returns the current ISO String', async () => { jest .spyOn(utils, 'getDate') .mockReturnValue(new Date('2021-01-01T00:00:00.000Z')); const data = await aggregator.runStepOp( { type: 'op', func: 'date', destination: 'date', }, {}, ); expect(data.date).toEqual( new Date('2021-01-01T00:00:00.000Z').toISOString(), ); }); it('returns the ISO Date String', async () => { const data = await aggregator.runStepOp( { type: 'op', func: 'date', path: 'timestamp', destination: 'date', }, { timestamp: 0, }, ); expect(data).toEqual({ timestamp: 0, date: '1970-01-01T00:00:00.000Z', }); }); it('returns the current timestamp', async () => { jest .spyOn(utils, 'getDate') .mockReturnValue(new Date('2021-01-01T00:00:00.000Z')); const data = await aggregator.runStepOp( { type: 'op', func: 'timestamp', destination: 'timestamp', }, {}, ); expect(data.timestamp).toBeGreaterThanOrEqual( new Date('2021-01-01T00:00:00.000Z').getTime(), ); }); it('returns the timestamp from a Date string', async () => { const data = await aggregator.runStepOp( { type: 'op', func: 'timestamp', path: 'date', destination: 'timestamp', }, { date: '1970-01-01T00:00:00.000Z', }, ); expect(data).toEqual({ timestamp: 0, date: '1970-01-01T00:00:00.000Z', }); }); it('sums 2 values', async () => { const data = await aggregator.runStepOp( { type: 'op', func: 'sum', args: [{ path: 'a' }, { path: 'b' }], args_as_array: true, }, { a: 1, b: 2, }, ); expect(data).toEqual({ a: 1, b: 2, sum: 3, }); }); it('adds 1 value to another', async () => { const data = await aggregator.runStepOp( { type: 'op', func: 'add', args: [{ path: 'a' }, { path: 'b' }], args_as_array: false, }, { a: 1, b: 2, }, ); expect(data).toEqual({ a: 1, b: 2, add: 3, }); }); it('adds 1 value to another default one', async () => { const data = await aggregator.runStepOp( { type: 'op', func: 'add', args: [{ path: 'b' }, 1], args_as_array: false, }, { b: 2, }, ); expect(data).toEqual({ b: 2, add: 3, }); }); it('adds 1 value to another default one without matching data', async () => { const data = await aggregator.runStepOp( { type: 'op', func: 'add', path: 'invalid', args: [{ path: 'a', default: 0 }, { path: 'b' }], args_as_array: false, default: { a: 0, b: 0, }, }, {}, ); expect(data).toEqual({ add: 0, }); }); it('adds 1 value to another default one without any data', async () => { const data = await aggregator.runStepOp({ type: 'op', func: 'add', args: [{ path: 'a', default: 0 }, { path: 'b' }], args_as_array: false, default: { a: 0, b: 0, }, }); expect(data).toEqual({ add: 0, }); }); it('adds 1 value to path with `.`', async () => { const data = await aggregator.runStepOp( { type: 'op', func: 'add', path: 'a', args: [{ path: '.' }, 1], }, { a: 0, }, ); expect(data).toEqual({ a: 0, add: 1, }); }); it('pick some fields only on data example', async () => { const data = await aggregator.runStepOp( { type: 'op', func: 'pick', destination: '.', args: [{ path: '.' }, 'a', 'c'], }, { a: 0, b: 0, c: 0, }, ); expect(data).toEqual({ a: 0, c: 0, }); }); it('invokes `func` iteratee on complex uniquBy', async () => { const data = await aggregator.runStepOp({ type: 'op', func: 'uniqBy', args: [{ path: 'items' }, { func: 'isEqual' }], args_as_array: false, default: { items: [ { a: 1, }, { a: 1, }, ], }, }); expect(data).toEqual({ uniqBy: [ { a: 1, }, ], }); }); }); describe('#runStepFrom', () => { it('performs a noop if no steps are found', async () => { const data = await aggregator.runStepFrom( { type: 'from', path: 'steps', }, { firstname: 'Alice', }, ); expect(data).toMatchObject({ firstname: 'Alice', }); }); it('initializes the state with an object if not available yet', async () => { const data = await aggregator.runStepFrom({ type: 'from', path: 'steps', }); expect(data).toEqual({}); }); it('performs the aggregation with steps available on path', async () => { const data = await aggregator.runStepFrom( { type: 'from', path: 'steps', }, { steps: [ { type: 'map', map: [ { from: '_', to: 'firstname', default: 'John', }, ], }, ], }, ); expect(data).toMatchObject({ firstname: 'John', }); }); }); describe('#runStep', () => { it('throws an exception if the step type is not supported', async () => { let error; try { await aggregator.runStep({ type: 'invalid', }); } catch (err) { error = err; } expect(error).toEqual(Aggregator.ERROR_INVALID_STEP_TYPE); }); it('invokes the #runStepFetch with dynamic templating paths', async () => { aggregator.runStepFetch = jest.fn(); const data = aggregator.runStep( { type: 'fetch', datastore: '{ds}', model: 'profiles', query: {}, }, { ds: 'datastore', }, ); expect(aggregator.runStepFetch).toHaveBeenCalledWith( { datastore: 'datastore', model: 'profiles', query: {}, type: 'fetch' }, { ds: 'datastore', }, ); }); it('invokes the #runStepFetch if the step type is `fetch`', async () => { aggregator.runStepFetch = jest.fn(); const data = aggregator.runStep({ type: 'fetch', datastore: 'datastore', model: 'profiles', query: {}, }); expect(aggregator.runStepFetch).toHaveBeenCalledWith( { datastore: 'datastore', model: 'profiles', query: {}, type: 'fetch' }, {}, ); }); it('invokes the #runStepMap if the step type is `map`', async () => { aggregator.runStepJsonPatch = jest.fn(); const data = aggregator.runStep({ type: 'json_patch', }); expect(aggregator.runStepJsonPatch).toHaveBeenCalledWith( { type: 'json_patch' }, {}, ); }); it('invokes the #runStepMap if the step type is `map`', async () => { aggregator.runStepMap = jest.fn(); const data = aggregator.runStep({ type: 'map', }); expect(aggregator.runStepMap).toHaveBeenCalledWith({ type: 'map' }, {}); }); it('invokes the #runStepUnset if the step type is `unset`', async () => { aggregator.runStepUnset = jest.fn(); const data = aggregator.runStep({ type: 'unset', }); expect(aggregator.runStepUnset).toHaveBeenCalledWith( { type: 'unset' }, {}, ); }); it('invokes the #runStepValidate if the step type is `validate`', async () => { aggregator.runStepValidate = jest.fn(); const data = aggregator.runStep({ type: 'validate', }); expect(aggregator.runStepValidate).toHaveBeenCalledWith( { type: 'validate' }, {}, ); }); it('invokes the #runStepEach if the step type is `each`', async () => { aggregator.runStepEach = jest.fn(); const data = aggregator.runStep({ type: 'each', }); expect(aggregator.runStepEach).toHaveBeenCalledWith({ type: 'each' }, {}); }); it('invokes the #runStepIf if the step type is `if`', async () => { aggregator.runStepIf = jest.fn(); const data = aggregator.runStep({ type: 'if', }); expect(aggregator.runStepIf).toHaveBeenCalledWith({ type: 'if' }, {}); }); it('invokes the #runStepFilter if the step type is `filter`', async () => { aggregator.runStepFilter = jest.fn(); const data = aggregator.runStep({ type: 'filter', }); expect(aggregator.runStepFilter).toHaveBeenCalledWith( { type: 'filter' }, {}, ); }); it('invokes the #runStepOp if the step type is `op`', async () => { aggregator.runStepOp = jest.fn(); const data = aggregator.runStep({ type: 'op', }); expect(aggregator.runStepOp).toHaveBeenCalledWith({ type: 'op' }, {}); }); it('invokes the #runStepFrom if the step type is `from`', async () => { aggregator.runStepFrom = jest.fn(); const data = aggregator.runStep({ type: 'from', }); expect(aggregator.runStepFrom).toHaveBeenCalledWith({ type: 'from' }, {}); }); it('invokes the registered step if available in the list', async () => { const myStep = jest.fn().mockImplementation(() => ({})); const myStepDefinition = { handler: myStep, }; aggregator.addStepType('my_step', myStepDefinition); const data = await aggregator.runStep({ type: 'my_step', }); expect(myStep).toHaveBeenCalledWith({ type: 'my_step' }, {}); }); it('invokes the #runStepPersist if the step type is `persist`', async () => { aggregator.runStepPersist = jest.fn(); const data = aggregator.runStep({ type: 'persist', datastore: 'datastore', model: 'profiles', payload: {}, }); expect(aggregator.runStepPersist).toHaveBeenCalledWith( { type: 'persist', datastore: 'datastore', model: 'profiles', payload: {}, }, {}, ); }); it('throws a contract error on step type of the configuration does not respect the definition schema', async () => { const myStep = jest.fn().mockImplementation(() => ({})); const myStepDefinition = { handler: myStep, schema: { type: 'object', required: ['hello'], properties: { hello: { type: 'string', }, }, }, }; aggregator.addStepType('my_step', myStepDefinition); let error; try { const data = await aggregator.runStep({ type: 'my_step', }); } catch (err) { error = err; } expect(error).toEqual(Aggregator.ERROR_CONTRACT_ERROR_STEP_TYPE); }); }); describe('#aggregate', () => { it('runs all steps in the aggregation pipeline', async () => { aggregator.fetch = jest .fn() .mockImplementationOnce(() => [ { count: 2, }, { count: 3, }, ]) .mockImplementation(() => [ { firstname: 'Alice', }, { firstname: 'Bernard', }, ]); const data = await aggregator.aggregate([ { type: 'fetch', datastore: 'datastore', model: 'counts', query: {}, }, { type: 'fetch', datastore: 'datastore', model: 'profiles', destination: 'profiles', query: {}, }, ]); expect(data).toEqual({ counts: [ { count: 2, }, { count: 3, }, ], profiles: [ { firstname: 'Alice', }, { firstname: 'Bernard', }, ], }); }); it('retries the pipeline the number of max retries', async () => { aggregator.config.max_retry = 1; const runStepMock = jest .spyOn(aggregator, 'runStep') .mockImplementationOnce(() => { throw new Error('Ooops'); }); aggregator.fetch = jest .fn() .mockImplementationOnce(() => [ { count: 2, }, { count: 3, }, ]) .mockImplementation(() => [ { firstname: 'Alice', }, { firstname: 'Bernard', }, ]); const data = await aggregator.aggregate([ { type: 'fetch', datastore: 'datastore', model: 'counts', query: {}, }, { type: 'fetch', datastore: 'datastore', model: 'profiles', destination: 'profiles', query: {}, }, ]); expect(data).toEqual({ counts: [ { count: 2, }, { count: 3, }, ], profiles: [ { firstname: 'Alice', }, { firstname: 'Bernard', }, ], }); expect(runStepMock).toHaveBeenCalledTimes(3); // 1 with exception, 2 for the steps }); it('throws an error in case of pipeline max retries reached', async () => { aggregator.config.max_retry = 0; const _error = new Error('Ooops'); const runStepMock = jest .spyOn(aggregator, 'runStep') .mockImplementation(() => { throw _error; }); let error; try { const data = await aggregator.aggregate([ { type: 'fetch', datastore: 'datastore', model: 'counts', query: {}, }, { type: 'fetch', datastore: 'datastore', model: 'profiles', destination: 'profiles', query: {}, }, ]); } catch (err) { error = err; } expect(error).toEqual(_error); }); }); describe('#getDatastore', () => { it('returns step datastore', () => { const datastore = aggregator.getDatastore('datastore'); const _datastore = aggregator.datastores.get('datastore'); expect(datastore).toEqual(_datastore); }); it('returns first datastore of the datastores map if the parameter is undefined', () => { const datastore = aggregator.getDatastore(undefined); const [_datastore] = aggregator.datastores.values(); expect(datastore).toEqual(_datastore); }); it('returns first datastore of the datastores map if the datastore name passed as a param is not valid', () => { const datastore = aggregator.getDatastore('invalid_datastore'); const [_datastore] = aggregator.datastores.values(); expect(datastore).toEqual(_datastore); }); }); });