import { isWebDomainRealmFilter } from './../../api/core/webdomain/UserAgentIdentifierRealmSelectionInterface'; /* eslint-disable @typescript-eslint/no-misused-promises */ /* eslint-disable @typescript-eslint/unbound-method */ import express from 'express'; import _ from 'lodash'; import { PluginProperty, PluginPropertyResponse } from '../../'; import { AudienceSegmentExternalFeedResource, AudienceSegmentexternalResourceResponse, AudienceSegmentResource, AudienceSegmentResourceResponse, } from '../../api/core/audiencesegment/AudienceSegmentInterface'; import { FeedDestinationCredentials, FeedDestinationCredentialsResponse, } from '../../api/core/audiencesegment/FeedDestinationInterface'; import { BatchUpdateHandler } from '../../api/core/batchupdate/BatchUpdateHandler'; import { BatchUpdatePluginResponse, BatchUpdateRequest } from '../../api/core/batchupdate/BatchUpdateInterface'; import { RealmFilter, UserAgentIdentifierRealmSelectionResource, UserAgentIdentifierRealmSelectionResourcesResponse, } from '../../api/core/webdomain/UserAgentIdentifierRealmSelectionInterface'; import { BatchedUserSegmentUpdatePluginResponse, CreateOAuthRedirectUrlPluginResponse, TestAuthenticationPluginResponse, ExternalSegmentAuthenticationResponse, ExternalSegmentAuthenticationStatusQueryResponse, ExternalSegmentConnectionPluginResponse, ExternalSegmentCreationPluginResponse, ExternalSegmentDynamicPropertyValuesQueryResponse, ExternalSegmentLogoutResponse, ExternalSegmentTroubleshootResponse, MissingRealmError, UserSegmentUpdatePluginResponse, } from '../../api/plugin/audiencefeedconnector/AudienceFeedConnectorPluginResponseInterface'; import { AudienceFeedBatchContext, CreateOAuthRedirectUrlRequest, TestAuthenticationRequest, ExternalSegmentAuthenticationRequest, ExternalSegmentAuthenticationStatusQueryRequest, ExternalSegmentConnectionRequest, ExternalSegmentCreationRequest, ExternalSegmentDynamicPropertyValuesQueryRequest, ExternalSegmentLogoutRequest, ExternalSegmentTroubleshootActions, ExternalSegmentTroubleshootRequest, UserSegmentUpdateRequest, } from '../../api/plugin/audiencefeedconnector/AudienceFeedConnectorRequestInterface'; import { BasePlugin, PropertiesWrapper } from '../common'; export interface AudienceFeedConnectorBaseInstanceContext { feed: AudienceSegmentExternalFeedResource; feedProperties: PropertiesWrapper; } abstract class GenericAudienceFeedConnectorBasePlugin< T, R extends BatchedUserSegmentUpdatePluginResponse | UserSegmentUpdatePluginResponse, > extends BasePlugin { constructor(enableThrottling = false) { super(enableThrottling); this.initExternalSegmentCreation(); this.initExternalSegmentConnection(); this.initUserSegmentUpdate(); this.initTroubleshoot(); this.initAuthenticationStatusQuery(); this.initAuthentication(); this.initLogoutQuery(); this.initDynamicPropertyValuesQuery(); this.initTestAuthentication(); this.initCreateOAuthRedirectUrl(); } async fetchAudienceSegment(feedId: string): Promise { const response = await super.requestGatewayHelper( 'GET', `${this.outboundPlatformUrl}/v1/audience_segment_external_feeds/${feedId}/audience_segment`, ); this.logger.debug(`Fetched External Segment: FeedId: ${feedId}`, response); return response.data; } async fetchUserAgentIdentifierRealms(datamartId: string): Promise> { const response = await super.requestGatewayHelper( 'GET', `${this.outboundPlatformUrl}/v1/datamarts/${datamartId}/user_agent_identifier_realm_selections`, ); this.logger.debug(`Fetched user agent identifier realms for the datamart with id: ${datamartId}`); return response.data; } async checkUserAgentIdentifierRealm(datamartId: string, realmFilter: RealmFilter): Promise { const realms = await this.fetchUserAgentIdentifierRealms(datamartId); const hasRealm = realms.some((realm) => { if (isWebDomainRealmFilter(realmFilter)) { return realm.realm_type === realmFilter.realmType && realm.web_domain.sld_name === realmFilter.sld_name; } else return realm.realm_type === realmFilter.realmType; }); if (!hasRealm) { throw new MissingRealmError(datamartId, realmFilter); } } async fetchAudienceFeed(feedId: string): Promise { const response = await super.requestGatewayHelper( 'GET', `${this.outboundPlatformUrl}/v1/audience_segment_external_feeds/${feedId}`, ); this.logger.debug(`Fetched External Feed: ${feedId}`, { response }); return response.data; } // Method to build an instance context // To be overriden to get a cutom behavior async fetchAudienceFeedProperties(feedId: string): Promise { const response = await super.requestGatewayHelper( 'GET', `${this.outboundPlatformUrl}/v1/audience_segment_external_feeds/${feedId}/properties`, ); this.logger.debug(`Fetched External Feed Properties: ${feedId}`, { response }); return response.data; } async fetchFeedDestinationCredentials(feedDestinationId: string): Promise { const response = await super.requestGatewayHelper( 'GET', `${this.outboundPlatformUrl}/v1/feed_destinations/${feedDestinationId}/credentials`, ); this.logger.debug(`Fetched credentials for feed destination: ${feedDestinationId}`); return response.data; } async upsertFeedDestinationCredentials( feedDestinationId: string, credentials: FeedDestinationCredentials, ): Promise { const body = { scheme: credentials.scheme, credentials: credentials.credentials }; await super.requestGatewayHelper( 'POST', `${this.outboundPlatformUrl}/v1/feed_destinations/${feedDestinationId}/credentials`, body, ); this.logger.debug(`Upserted credentials for feed destination: ${feedDestinationId}`); } async createAudienceFeedProperties(feedId: string, property: PluginProperty): Promise { const response = await super.requestGatewayHelper( 'POST', `${this.outboundPlatformUrl}/v1/audience_segment_external_feeds/${feedId}/properties`, property, ); this.logger.debug(`Created External Feed Properties: ${feedId}`, { response }); return response.data; } async updateAudienceFeedProperties(feedId: string, property: PluginProperty): Promise { const response = await super.requestGatewayHelper( 'PUT', `${this.outboundPlatformUrl}/v1/audience_segment_external_feeds/${feedId}/properties/technical_name=${property.technical_name}`, property, ); this.logger.debug(`Updated External Feed Properties: ${feedId}`, { response }); return response.data; } // This is a default provided implementation protected async instanceContextBuilder(feedId: string): Promise { const audienceFeedP = this.fetchAudienceFeed(feedId); const audienceFeedPropsP = this.fetchAudienceFeedProperties(feedId); const results = await Promise.all([audienceFeedP, audienceFeedPropsP]); const audienceFeed = results[0]; const audienceFeedProps = results[1]; const context: AudienceFeedConnectorBaseInstanceContext = { feed: audienceFeed, feedProperties: new PropertiesWrapper(audienceFeedProps), }; return context; } protected abstract onExternalSegmentCreation( request: ExternalSegmentCreationRequest, instanceContext: AudienceFeedConnectorBaseInstanceContext, ): Promise; protected abstract onExternalSegmentConnection( request: ExternalSegmentConnectionRequest, instanceContext: AudienceFeedConnectorBaseInstanceContext, ): Promise; protected abstract onUserSegmentUpdate( request: UserSegmentUpdateRequest, instanceContext: AudienceFeedConnectorBaseInstanceContext, ): Promise; protected onTroubleshoot( request: ExternalSegmentTroubleshootRequest, instanceContext: AudienceFeedConnectorBaseInstanceContext, ): Promise { return Promise.resolve({ status: 'not_implemented' }); } protected onAuthenticationStatusQuery( request: ExternalSegmentAuthenticationStatusQueryRequest, ): Promise { return Promise.resolve({ status: 'not_implemented' }); } protected onAuthentication( request: ExternalSegmentAuthenticationRequest, ): Promise { return Promise.resolve({ status: 'not_implemented' }); } protected onLogout(request: ExternalSegmentLogoutRequest): Promise { return Promise.resolve({ status: 'not_implemented' }); } protected onDynamicPropertyValuesQuery( request: ExternalSegmentDynamicPropertyValuesQueryRequest, ): Promise { return Promise.resolve({ status: 'not_implemented' }); } protected onTestAuthentication( request: TestAuthenticationRequest, credentials: FeedDestinationCredentials, ): Promise { return Promise.resolve({ status: 'not_implemented' }); } protected onCreateOAuthRedirectUrl( request: CreateOAuthRedirectUrlRequest, ): Promise { return Promise.reject(new Error('onCreateOAuthRedirectUrl is not implemented')); } protected async getInstanceContext( feedId: string, forceRefresh?: boolean, ): Promise { if (forceRefresh || !this.pluginCache.get(feedId)) { void this.pluginCache.put( feedId, this.instanceContextBuilder(feedId).catch((error) => { this.logger.error(`Error while caching instance context`, error); this.pluginCache.del(feedId); throw error; }), this.getInstanceContextCacheExpiration(), ); } return this.pluginCache.get(feedId) as Promise; } protected emptyBodyFilter = (req: express.Request, res: express.Response, next: express.NextFunction) => { if (!req.body || _.isEmpty(req.body)) { const msg = { error: 'Missing request body', }; this.logger.error(`POST /v1/${req.url} : %s`, JSON.stringify(msg)); res.status(500).json(msg); } else { next(); } } private initExternalSegmentCreation(): void { this.app.post( '/v1/external_segment_creation', this.emptyBodyFilter, async (req: express.Request, res: express.Response) => { try { this.logger.debug('POST /v1/external_segment_creation', { request: req.body }); if (!this.httpIsReady()) { throw new Error('Plugin not initialized'); } const request = req.body as ExternalSegmentCreationRequest; if (!this.onExternalSegmentCreation) { throw new Error('No External Segment Creation listener registered!'); } const instanceContext = await this.getInstanceContext(request.feed_id, true); const response = await this.onExternalSegmentCreation(request, instanceContext); const pluginResponse: ExternalSegmentCreationPluginResponse = { status: response.status, visibility: response.visibility || 'PUBLIC', }; if (response.message) { pluginResponse.message = response.message; } const statusCode = response.status === 'ok' ? 200 : 500; this.logger.debug(`FeedId: ${request.feed_id} - External segment creation returning: ${statusCode}`, { response, }); return res.status(statusCode).send(JSON.stringify(pluginResponse)); } catch (error) { this.logger.error('Something bad happened on creation', error); const pluginResponse: ExternalSegmentCreationPluginResponse = { status: 'error', message: `${(error as Error).message}`, visibility: error.visibility === 'PUBLIC' ? 'PUBLIC' : 'PRIVATE', }; return res.status(500).send(pluginResponse); } }, ); } private initExternalSegmentConnection(): void { this.app.post( '/v1/external_segment_connection', this.emptyBodyFilter, async (req: express.Request, res: express.Response) => { try { this.logger.debug('POST /v1/external_segment_connection', { request: req.body }); if (!this.httpIsReady()) { throw new Error('Plugin not initialized'); } const request = req.body as ExternalSegmentConnectionRequest; if (!this.onExternalSegmentConnection) { throw new Error('No External Segment Connection listener registered!'); } const instanceContext = await this.getInstanceContext(request.feed_id); const response = await this.onExternalSegmentConnection(request, instanceContext); const pluginResponse: ExternalSegmentConnectionPluginResponse = { status: response.status, }; if (response.message) { pluginResponse.message = response.message; } let statusCode; switch (response.status) { case 'external_segment_not_ready_yet': statusCode = 502; break; case 'ok': statusCode = 200; break; case 'error': statusCode = 500; break; default: statusCode = 500; } this.logger.debug(`FeedId: ${request.feed_id} - External segment connection returning: ${statusCode}`, { response, }); return res.status(statusCode).send(JSON.stringify(pluginResponse)); } catch (error) { this.logger.error('Something bad happened on connection', error); return res.status(500).send({ status: 'error', message: `${(error as Error).message}` }); } }, ); } private initUserSegmentUpdate(): void { this.app.post( '/v1/user_segment_update', this.emptyBodyFilter, async (req: express.Request, res: express.Response) => { try { this.logger.debug('POST /v1/user_segment_update', { request: req.body }); const request = req.body as UserSegmentUpdateRequest; if (!this.onUserSegmentUpdate) { throw new Error('No User Segment Update listener registered!'); } const instanceContext = await this.getInstanceContext(request.feed_id); const response: R = await this.onUserSegmentUpdate(request, instanceContext); if (response.next_msg_delay_in_ms) { res.set('x-mics-next-msg-delay', response.next_msg_delay_in_ms.toString()); } let statusCode: number; switch (response.status) { case 'ok': statusCode = 200; break; case 'error': statusCode = 500; break; case 'retry': statusCode = 429; break; case 'no_eligible_identifier': statusCode = 400; break; default: statusCode = 500; } this.logger.debug(`FeedId: ${request.feed_id} - External segment update returning: ${statusCode}`, { response, }); return res.status(statusCode).send(JSON.stringify(response)); } catch (error) { this.logger.error('Something bad happened on update', error); return res.status(500).send({ status: 'error', message: `${(error as Error).message}` }); } }, ); } private initTroubleshoot(): void { this.app.post('/v1/troubleshoot', this.emptyBodyFilter, async (req: express.Request, res: express.Response) => { try { this.logger.debug('POST /v1/troubleshoot', { request: req.body }); const request = req.body as ExternalSegmentTroubleshootRequest; if (!ExternalSegmentTroubleshootActions.includes(request.action)) { const response: ExternalSegmentTroubleshootResponse = { status: 'not_implemented', message: `Action ${request.action} not supported`, }; return res.status(400).send(JSON.stringify(response)); } const instanceContext = await this.getInstanceContext(request.feed_id); const response = await this.onTroubleshoot(request, instanceContext); let statusCode: number; switch (response.status) { case 'ok': statusCode = 200; break; case 'error': statusCode = 500; break; case 'not_implemented': statusCode = 400; break; default: statusCode = 500; } this.logger.debug(`FeedId: ${request.feed_id} - Troubleshoot returning: ${statusCode}`, { response }); return res.status(statusCode).send(JSON.stringify(response)); } catch (error) { this.logger.error('Something bad happened on troubleshoot', error); return res.status(500).send({ status: 'error', message: `${(error as Error).message}` }); } }); } private initAuthenticationStatusQuery(): void { this.app.post( '/v1/authentication_status_queries', this.emptyBodyFilter, async (req: express.Request, res: express.Response) => { try { const request = req.body as ExternalSegmentAuthenticationStatusQueryRequest; const response = await this.onAuthenticationStatusQuery(request); let statusCode: number; switch (response.status) { case 'authenticated': case 'not_authenticated': statusCode = 200; break; case 'error': statusCode = 500; break; case 'not_implemented': statusCode = 400; break; default: statusCode = 500; } this.logger.debug( `Request: ${JSON.stringify(request)} - Authentication status query returning: ${statusCode}`, { response, }, ); return res.status(statusCode).send(JSON.stringify(response)); } catch (error) { this.logger.error('Something bad happened on authentication status query', error); return res.status(500).send({ status: 'error', message: `${(error as Error).message}` }); } }, ); } private initAuthentication(): void { this.app.post('/v1/authentication', this.emptyBodyFilter, async (req: express.Request, res: express.Response) => { try { const request = req.body as ExternalSegmentAuthenticationRequest; const response = await this.onAuthentication(request); if (request.feed_destination_id && response.status === 'ok') { if (!response.refresh_token) { throw new Error( `Plugin must return a refresh_token when feed_destination_id is present`, ); } await this.upsertFeedDestinationCredentials(request.feed_destination_id, { scheme: 'OAUTH2', credentials: { refresh_token: response.refresh_token }, }); this.logger.debug( `FeedDestinationId: ${request.feed_destination_id} - Credentials upserted after authentication`, ); } const { refresh_token: _refresh_token, ...responseWithoutCredentials } = response; let statusCode: number; switch (response.status) { case 'ok': statusCode = 200; break; case 'error': statusCode = 500; break; case 'not_implemented': statusCode = 400; break; default: statusCode = 500; } this.logger.debug(`Request: ${JSON.stringify(req.body)} - Authentication returning: ${statusCode}`, { response: responseWithoutCredentials, }); return res.status(statusCode).send(JSON.stringify(responseWithoutCredentials)); } catch (error) { this.logger.error('Something bad happened on authentication', error); return res.status(500).send({ status: 'error', message: `${(error as Error).message}` }); } }); } private initLogoutQuery(): void { this.app.post('/v1/logout', this.emptyBodyFilter, async (req: express.Request, res: express.Response) => { try { const request = req.body as ExternalSegmentLogoutRequest; const response = await this.onLogout(request); let statusCode: number; switch (response.status) { case 'ok': statusCode = 200; break; case 'error': statusCode = 500; break; case 'not_implemented': statusCode = 400; break; default: statusCode = 500; } this.logger.debug(`Request: ${JSON.stringify(request)} - Logout query returning: ${statusCode}`, { response, }); return res.status(statusCode).send(JSON.stringify(response)); } catch (error) { this.logger.error('Something bad happened on logout query', error); return res.status(500).send({ status: 'error', message: `${(error as Error).message}` }); } }); } private initTestAuthentication(): void { this.app.post( '/v1/test_authentication', this.emptyBodyFilter, async (req: express.Request, res: express.Response) => { try { this.logger.debug('POST /v1/test_authentication', { request: req.body }); if (!this.httpIsReady()) { throw new Error('Plugin not initialized'); } const request = req.body as TestAuthenticationRequest; let credentials: FeedDestinationCredentials; try { credentials = await this.fetchFeedDestinationCredentials(request.feed_destination_id); } catch { const response: TestAuthenticationPluginResponse = { status: 'error', message: 'Could not fetch feed destination credentials', }; return res.status(500).send(JSON.stringify(response)); } const response = await this.onTestAuthentication(request, credentials); let statusCode: number; switch (response.status) { case 'ok': statusCode = 200; break; case 'error': statusCode = 500; break; case 'not_implemented': statusCode = 400; break; default: statusCode = 500; } this.logger.debug( `FeedDestinationId: ${request.feed_destination_id} - Check destination credentials returning: ${statusCode}`, { response }, ); return res.status(statusCode).send(JSON.stringify(response)); } catch (error) { this.logger.error('Something bad happened on check destination credentials', error); return res.status(500).send({ status: 'error', message: `${(error as Error).message}` }); } }, ); } private initCreateOAuthRedirectUrl(): void { this.app.post( '/v1/oauth_redirect_url', this.emptyBodyFilter, async (req: express.Request, res: express.Response) => { try { this.logger.debug('POST /v1/oauth_redirect_url', { request: req.body }); if (!this.httpIsReady()) { throw new Error('Plugin not initialized'); } const request = req.body as CreateOAuthRedirectUrlRequest; if (!request.feed_destination_id) { throw new Error('feed_destination_id is required'); } const response = await this.onCreateOAuthRedirectUrl(request); const url = new URL(response.login_url); const feedDestinationIdParam = url.searchParams.get('feed_destination_id'); if (feedDestinationIdParam !== request.feed_destination_id) { throw new Error( `login_url must contain feed_destination_id as query param: ${request.feed_destination_id}`, ); } this.logger.debug( `FeedDestinationId: ${request.feed_destination_id} - OAuth redirect URL generated`, ); return res.status(200).send(JSON.stringify(response)); } catch (error) { this.logger.error('Something bad happened on create OAuth redirect URL', error); return res.status(500).send({ status: 'error', message: `${(error as Error).message}` }); } }, ); } private initDynamicPropertyValuesQuery(): void { this.app.post( '/v1/dynamic_property_values_queries', this.emptyBodyFilter, async (req: express.Request, res: express.Response) => { try { const request = req.body as ExternalSegmentDynamicPropertyValuesQueryRequest; const response = await this.onDynamicPropertyValuesQuery(request); let statusCode: number; switch (response.status) { case 'ok': case 'empty': statusCode = 200; break; case 'error': statusCode = 500; break; case 'not_implemented': statusCode = 400; break; default: statusCode = 500; } this.logger.debug( `Request: ${JSON.stringify(request)} - Dynamic property values query returning: ${statusCode}`, { response, }, ); return res.status(statusCode).send(JSON.stringify(response)); } catch (error) { this.logger.error('Something bad happened on dynamic property values query', error); return res.status(500).send({ status: 'error', message: `${(error as Error).message}` }); } }, ); } } export abstract class BatchedAudienceFeedConnectorBasePlugin extends GenericAudienceFeedConnectorBasePlugin< T, BatchedUserSegmentUpdatePluginResponse > { constructor(enableThrottling = false) { super(enableThrottling); const batchUpdateHandler = new BatchUpdateHandler( this.app, this.emptyBodyFilter, this.logger, ); batchUpdateHandler.registerRoute(async (request) => { const instanceContext = await this.getInstanceContext(request.context.feed_id); return this.onBatchUpdate(request, instanceContext); }); } protected abstract onBatchUpdate( request: BatchUpdateRequest, instanceContext: AudienceFeedConnectorBaseInstanceContext, ): Promise; } export abstract class AudienceFeedConnectorBasePlugin extends GenericAudienceFeedConnectorBasePlugin< void, UserSegmentUpdatePluginResponse > { constructor(enableThrottling = false) { super(enableThrottling); this.initBatchUpdate(); } private initBatchUpdate(): void { this.app.post('/v1/batch_update', this.emptyBodyFilter, async (req: express.Request, res: express.Response) => { res.status(500).send({ status: 'error', message: "Plugin doesn't support batch update" }); }); } }