import { Namespace, Server, Socket } from 'socket.io'; import { Identity } from 'ts-eventsourcing/ValueObject/Identity'; import { ServerSocketIOGateway } from './ServerSocketIOGateway'; import { SerializerInterface } from '../../Serializer/SerializerInterface'; import { StoreRepositoryInterface } from '../../ReadModel/StoreRepositoryInterface'; import { SimpleProjectorGateway } from '../../ReadModel/Projector/SimpleProjectorGateway'; import { ReadModelAction, ReadModelMetadata } from '../../ReadModel/ReadModelAction'; import { ProjectorGatewayFactory } from '../Projector/ProjectorGatewayFactory'; import { ProjectorGatewayInterface } from '../../ReadModel/ProjectorGatewayInterface'; import { SocketConnection } from './ValueObject/SocketConnection'; import { SocketIoGatewayFactoryError } from './Error/SocketIoGatewayFactoryError'; export interface SocketIoGatewayOptions { nsp: string; } /** * Namespace based gateway factory for projectors. */ export class SocketIoGatewayFactory< State, Id extends Identity = Identity, Metadata extends ReadModelMetadata = ReadModelMetadata, Action extends ReadModelAction = ReadModelAction> implements ProjectorGatewayFactory { private openNamespace: { [namespace: string]: { socketNamespace: Namespace, gateway: ProjectorGatewayInterface, }, } = {}; constructor(private socketServer: Server, private serializer: SerializerInterface, private repository: StoreRepositoryInterface, /** * Throw an error to prevent connection to the namespace. */ private onConnection: (connection: SocketConnection) => void) { } public open(options: SocketIoGatewayOptions): ProjectorGatewayInterface { const namespace = options.nsp; if (this.openNamespace[namespace]) { throw SocketIoGatewayFactoryError.alreadyOpen(namespace); } const socketNamespace = this.socketServer.of(namespace); const socketGateway = new ServerSocketIOGateway(socketNamespace, this.serializer); socketNamespace.use((socket: Socket, next) => { try { this.onConnection(new SocketConnection(socketGateway, options, socket)); next(); } catch (e) { next(new Error(e)); } }); const gateway = new SimpleProjectorGateway(this.repository, socketGateway); this.openNamespace[namespace] = { gateway, socketNamespace, }; return gateway; } public close(options: SocketIoGatewayOptions): void { const namespace = options.nsp; if (!this.openNamespace[namespace]) { throw SocketIoGatewayFactoryError.notOpen(namespace); } const socketNamespace = this.openNamespace[namespace].socketNamespace; const connected = socketNamespace.connected; Object.keys(connected).forEach((client) => { connected[client].disconnect(); }); socketNamespace.removeAllListeners(); delete this.socketServer.nsps[namespace]; delete this.openNamespace[namespace]; } public get(options: SocketIoGatewayOptions): ProjectorGatewayInterface { const namespace = options.nsp; if (!this.openNamespace[namespace]) { throw SocketIoGatewayFactoryError.notOpen(namespace); } return this.openNamespace[namespace].gateway; } }