// Copyright (c) Dolittle. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. import { DateTime } from 'luxon'; import { Logger } from 'winston'; import { StringValue } from 'google-protobuf/google/protobuf/wrappers_pb'; import { IServiceProvider } from '@dolittle/sdk.dependencyinversion'; import { EventContext, EventSourceId, EventType, IEventTypes } from '@dolittle/sdk.events'; import { ExecutionContext } from '@dolittle/sdk.execution'; import { Internal, MissingEventInformation } from '@dolittle/sdk.events.processing'; import { Artifacts, ExecutionContexts, Guids } from '@dolittle/sdk.protobuf'; import { Cancellation } from '@dolittle/sdk.resilience'; import { IReverseCallClient, reactiveDuplex, ReverseCallClient } from '@dolittle/sdk.services'; import { Failure } from '@dolittle/contracts/Protobuf/Failure_pb'; import { ProcessorFailure, RetryProcessingState } from '@dolittle/contracts/Runtime/Events.Processing/Processors_pb'; import { ProjectionsClient } from '@dolittle/contracts/Runtime/Events.Processing/Projections_grpc_pb'; import { EventPropertyKeySelector as ProtobufEventPropertyKeySelector, EventSourceIdKeySelector as ProtobufEventSourceIdKeySelector, PartitionIdKeySelector as ProtobufPartitionIdKeySelector, ProjectionClientToRuntimeMessage, ProjectionCopies, ProjectionCopyToMongoDB, ProjectionDeleteResponse, ProjectionEventSelector, ProjectionRegistrationRequest, StaticKeySelector as ProtobufStaticKeySelector, EventOccurredKeySelector as ProtobufEventOccurredKeySelector, ProjectionRegistrationResponse, ProjectionReplaceResponse, ProjectionRequest, ProjectionResponse, ProjectionRuntimeToClientMessage } from '@dolittle/contracts/Runtime/Events.Processing/Projections_pb'; import { ProjectionCurrentStateType } from '@dolittle/contracts/Runtime/Projections/State_pb'; import { DeleteReadModelInstance } from '../DeleteReadModelInstance'; import { EventPropertyKeySelector } from '../EventPropertyKeySelector'; import { EventSourceIdKeySelector } from '../EventSourceIdKeySelector'; import { IProjection } from '../IProjection'; import { Key } from '../Key'; import { KeySelector } from '../KeySelector'; import { PartitionIdKeySelector } from '../PartitionIdKeySelector'; import { ProjectionContext } from '../ProjectionContext'; import { ProjectionId } from '../ProjectionId'; import { UnknownKeySelectorType } from '../UnknownKeySelectorType'; import { Conversion } from '../Copies/MongoDB/Conversion'; import { UnknownMongoDBConversion } from '../Copies/MongoDB/UnknownMongoDBConversion'; import { PropertyConversion } from '../Copies/MongoDB/PropertyConversion'; import { StaticKeySelector } from '../StaticKeySelector'; import { EventOccurredKeySelector } from '../EventOccurredKeySelector'; /** * Represents an implementation of {@link Internal.EventProcessor} for {@link Projection}. * @template T The type of the projection read model. */ export class ProjectionProcessor extends Internal.EventProcessor { /** * Initializes a new instance of {@link ProjectionProcessor}. * @param {IProjection} _projection - The projection. * @param {IEventTypes} _eventTypes - The registered event types for this projection. */ constructor( private _projection: IProjection, private _eventTypes: IEventTypes ) { super('Projection', _projection.projectionId); } /** @inheritdoc */ protected get registerArguments(): ProjectionRegistrationRequest { const registerArguments = new ProjectionRegistrationRequest(); registerArguments.setProjectionid(Guids.toProtobuf(this._projection.projectionId.value)); registerArguments.setScopeid(Guids.toProtobuf(this._projection.scopeId.value)); registerArguments.setInitialstate(JSON.stringify(this._projection.initialState)); registerArguments.setCopies(this.createCopiesSpecification()); if (this._projection.hasAlias) { registerArguments.setAlias(this._projection.alias!.value); } const events: ProjectionEventSelector[] = []; for (const eventSelector of this._projection.events) { const selector = new ProjectionEventSelector(); selector.setEventtype(Artifacts.toProtobuf(eventSelector.eventType)); this.setKeySelector(selector, eventSelector.keySelector); events.push(selector); } registerArguments.setEventsList(events); return registerArguments; } private setKeySelector(protobufSelector: ProjectionEventSelector, selector: KeySelector) { if (selector instanceof EventPropertyKeySelector) { const propertyKeySelector = new ProtobufEventPropertyKeySelector(); propertyKeySelector.setPropertyname(selector.propertyName.value); protobufSelector.setEventpropertykeyselector(propertyKeySelector); } else if (selector instanceof EventSourceIdKeySelector) { protobufSelector.setEventsourcekeyselector(new ProtobufEventSourceIdKeySelector()); } else if (selector instanceof PartitionIdKeySelector) { protobufSelector.setPartitionkeyselector(new ProtobufPartitionIdKeySelector()); } else if (selector instanceof StaticKeySelector) { const staticKeySelector = new ProtobufStaticKeySelector(); staticKeySelector.setStatickey(selector.staticKey.value); protobufSelector.setStatickeyselector(staticKeySelector); } else if (selector instanceof EventOccurredKeySelector) { const eventOccurredKeySelector = new ProtobufEventOccurredKeySelector(); eventOccurredKeySelector.setFormat(selector.occurredFormat.value); protobufSelector.setEventoccurredkeyselector(eventOccurredKeySelector); } else { throw new UnknownKeySelectorType(selector); } } private createCopiesSpecification(): ProjectionCopies { const copies = new ProjectionCopies(); if (this._projection.copies.mongoDB.shouldCopyToMongoDB) { const mongoDB = new ProjectionCopyToMongoDB(); mongoDB.setCollection(this._projection.copies.mongoDB.collectionName.value); mongoDB.setConversionsList(this.createMongoDBPropertyConversions(this._projection.copies.mongoDB.conversions)); copies.setMongodb(mongoDB); } return copies; } private createMongoDBPropertyConversions(conversions: PropertyConversion[]): ProjectionCopyToMongoDB.PropertyConversion[] { return conversions.map(conversion => { const pbConversion = new ProjectionCopyToMongoDB.PropertyConversion(); pbConversion.setPropertyname(conversion.property.value); const pbConversionType = conversion.convertTo === Conversion.None ? ProjectionCopyToMongoDB.BSONType.NONE : conversion.convertTo === Conversion.Date ? ProjectionCopyToMongoDB.BSONType.DATEASDATE : conversion.convertTo === Conversion.DateAsArray ? ProjectionCopyToMongoDB.BSONType.DATEASARRAY : conversion.convertTo === Conversion.DateAsDocument ? ProjectionCopyToMongoDB.BSONType.DATEASDOCUMENT : conversion.convertTo === Conversion.DateAsString ? ProjectionCopyToMongoDB.BSONType.DATEASSTRING : conversion.convertTo === Conversion.DateAsInt64 ? ProjectionCopyToMongoDB.BSONType.DATEASINT64 : conversion.convertTo === Conversion.Guid ? ProjectionCopyToMongoDB.BSONType.GUIDASSTANDARDBINARY : conversion.convertTo === Conversion.GuidAsCSharpLegacy ? ProjectionCopyToMongoDB.BSONType.GUIDASCSHARPLEGACYBINARY : conversion.convertTo === Conversion.GuidAsString ? ProjectionCopyToMongoDB.BSONType.GUIDASSTRING : undefined; if (pbConversionType === undefined) { throw new UnknownMongoDBConversion(conversion.convertTo); } pbConversion.setConvertto(pbConversionType); if (conversion.shouldRename) { const renameTo = new StringValue(); renameTo.setValue(conversion.renameTo.value); pbConversion.setRenameto(renameTo); } pbConversion.setChildrenList(this.createMongoDBPropertyConversions(conversion.children)); return pbConversion; }); } /** @inheritdoc */ protected createClient( client: ProjectionsClient, registerArguments: ProjectionRegistrationRequest, callback: (request: ProjectionRequest, executionContext: ExecutionContext) => Promise, executionContext: ExecutionContext, pingInterval: number, logger: Logger, cancellation: Cancellation): IReverseCallClient { return new ReverseCallClient ( (requests, cancellation) => reactiveDuplex(client, client.connect, requests, cancellation), ProjectionClientToRuntimeMessage, (message, connectArguments) => message.setRegistrationrequest(connectArguments), (message) => message.getRegistrationresponse(), (message) => message.getHandlerequest(), (message, response) => message.setHandleresult(response), (connectArguments, context) => connectArguments.setCallcontext(context), (request) => request.getCallcontext(), (response, context) => response.setCallcontext(context), (message) => message.getPing(), (message, pong) => message.setPong(pong), executionContext, registerArguments, pingInterval, callback, cancellation, logger ); } /** @inheritdoc */ protected getFailureFromRegisterResponse(response: ProjectionRegistrationResponse): Failure | undefined { return response.getFailure(); } /** @inheritdoc */ protected getRetryProcessingStateFromRequest(request: ProjectionRequest): RetryProcessingState | undefined { return request.getRetryprocessingstate(); } /** @inheritdoc */ protected createResponseFromFailure(failure: ProcessorFailure): ProjectionResponse { const response = new ProjectionResponse(); response.setFailure(failure); return response; } /** @inheritdoc */ protected async handle(request: ProjectionRequest, executionContext: ExecutionContext, services: IServiceProvider, logger: Logger): Promise { if (!request.getEvent() || !request.getEvent()?.getEvent()) { throw new MissingEventInformation('No event in ProjectionRequest'); } const pbEvent = request.getEvent()!.getEvent()!; const pbSequenceNumber = pbEvent.getEventlogsequencenumber(); if (pbSequenceNumber === undefined) throw new MissingEventInformation('Sequence Number'); const pbEventSourceId = pbEvent.getEventsourceid(); if (!pbEventSourceId) throw new MissingEventInformation('EventSourceId'); const pbExecutionContext = pbEvent.getExecutioncontext(); if (!pbExecutionContext) throw new MissingEventInformation('Execution context'); const pbOccurred = pbEvent.getOccurred(); if (!pbOccurred) throw new MissingEventInformation('Occurred'); const pbEventType = pbEvent.getEventtype(); if (!pbEventType) throw new MissingEventInformation('Event Type'); const eventContext = new EventContext( pbSequenceNumber, EventSourceId.from(pbEventSourceId), DateTime.fromJSDate(pbOccurred.toDate()), ExecutionContexts.toSDK(pbExecutionContext), executionContext); if (!request.getCurrentstate() || !request.getCurrentstate()?.getState()) { throw new MissingEventInformation('No state in ProjectionRequest'); } const pbCurrentState = request.getCurrentstate()!; const pbStateType = pbCurrentState.getType(); const pbKey = pbCurrentState.getKey(); const projectionContext = new ProjectionContext( pbStateType === ProjectionCurrentStateType.CREATED_FROM_INITIAL_STATE, Key.from(pbKey), eventContext); let event = JSON.parse(pbEvent.getContent()); const eventType = Artifacts.toSDK(pbEventType, EventType.from); if (this._eventTypes.hasTypeFor(eventType)) { const typeOfEvent = this._eventTypes.getTypeFor(eventType); event = Object.assign(new typeOfEvent(), event); } let state = JSON.parse(request.getCurrentstate()!.getState()); if (this._projection.readModelType !== undefined) { state = Object.assign(new this._projection.readModelType(), state); } const nextStateOrDelete = await this._projection.on(state, event, eventType, projectionContext); const response = new ProjectionResponse(); if (nextStateOrDelete instanceof DeleteReadModelInstance) { response.setDelete(new ProjectionDeleteResponse()); } else { const replace = new ProjectionReplaceResponse(); replace.setState(JSON.stringify(nextStateOrDelete)); response.setReplace(replace); } return response; } }