import Executor from "./executor"; import Message from "./message"; import crypto from "crypto"; export interface IVertexOptions { id: string; executor: Executor; } export enum DuplicateMessagePolicy { ProcessAll = 1, ProcessLatestFromSender, } export enum CommitPolicy { ShouldCommit = 1, NoCommit, AlwaysCommit, } export default class Vertex { public static DEFAULT_LIFETIME = 60 * 60 * 1000; public static VERTEX_TYPE = "vertex"; public static VERTEX_ID_SEPARATOR = "|"; public static duplicateMessagePolicy: DuplicateMessagePolicy = DuplicateMessagePolicy.ProcessAll; public static makeId(vertexType: string, id: string) { return `${vertexType}${Vertex.VERTEX_ID_SEPARATOR}${id}`; } public static typeFromId(id: string) { if (!id) { return ""; } const idParts = id.split(Vertex.VERTEX_ID_SEPARATOR); if (idParts.length !== 2) { return ""; } return idParts[0]; } public static idFromId(id: string) { if (!id) { return ""; } const idParts = id.split(Vertex.VERTEX_ID_SEPARATOR); if (idParts.length !== 2) { return ""; } return idParts[1]; } public id: string; public expiration: number; public commitPolicy: CommitPolicy = CommitPolicy.ShouldCommit; public lastCommit: number; public lifetime: number = Vertex.DEFAULT_LIFETIME; public committedHash: string; public state: S; public executor: Executor; constructor(options: IVertexOptions) { this.id = options.id; this.executor = options.executor; } public async activate() { return; } public async commit() { return this; } public hashState() { const stateString = JSON.stringify(this.state); return crypto.createHash("md5").update(stateString).digest("hex"); } public needsCommit() { const currentHash = this.hashState(); if (this.commitPolicy === CommitPolicy.NoCommit) { return false; } if (this.commitPolicy === CommitPolicy.AlwaysCommit) { return true; } return !this.committedHash || currentHash !== this.committedHash; } public async deactivate() { return this; } public expired() { return Date.now() > this.expiration; } // processes an incoming message and builds new state public async process(message: Message) { return this; } // takes current state and emits views of it over messages to other vertexes. // called after activation and after every process of incoming message by executor public async emit(message?: Message): Promise[]> { return []; } // sends messages via associated executor public async send(messages: Message[]) { return this.executor.send(messages); } }