import Directory from "./directory"; import Message from "./message"; import Vertex, { CommitPolicy, DuplicateMessagePolicy } from "./vertex"; export interface IExecutorOptions { maxParallelism?: number; maxMessageScanDepth?: number; // maximum time vertex can be uncommitted maxUncommittedAgeMS?: number; } export default class Executor { public static readonly MAX_UNCOMMITTED_AGE_DEFAULT = 60 * 1000; // 1 minute public static readonly SCHEDULE_MIN_INTERVAL = 1000; // 1 second public static startTime = Date.now(); public id: string; public directory: Directory = new Directory(); protected vertexTypes: { [id: string]: typeof Vertex } = {}; public messages: { [vertexType: string]: { [vertexId: string]: Message[] }; } = {}; public messagesQueued: number = 0; public vertices: { [vertexId: string]: Vertex } = {}; public uncommittedVertices: { [vertexId: string]: Vertex } = {}; protected scheduleQueued: boolean = false; // message being processed by a vertex or it is being committed. protected vertexBusy: { [vertexId: string]: boolean } = {}; protected maxParallelism: number | undefined; protected maxMessageScanDepth: number | undefined; protected maxUncommittedAgeMS: number; public lastStatsDisplay: number = 0; public statistics: any = { activations: 0, commits: 0, messagesSent: 0, messagesScanned: 0, messagesProcessed: 0, messagesReplaced: 0, highwaterParallelism: 0, busyVertices: 0, totalSchedules: 0, }; constructor(options: IExecutorOptions) { this.maxParallelism = options.maxParallelism; this.maxMessageScanDepth = options.maxMessageScanDepth; this.maxUncommittedAgeMS = options.maxUncommittedAgeMS || Executor.MAX_UNCOMMITTED_AGE_DEFAULT; // set up infrequent schedule to handle committing vertexes. setInterval(() => { this.triggerSchedule(); }, Executor.SCHEDULE_MIN_INTERVAL); } public async activate(vertexId: string) { const type = Vertex.typeFromId(vertexId); const VertexClass = this.vertexTypes[type]; if (!VertexClass) { // not being able to activate a vertex is a fatal error that should only happen in dev throw new Error( `can't activate vertex ${vertexId}: unknown type '${type}'` ); } const vertex = new VertexClass({ id: vertexId, executor: this, }); try { await vertex.activate(); if (vertex.state) { vertex.committedHash = vertex.hashState(); } else { vertex.committedHash = ""; } vertex.lastCommit = Date.now(); vertex.expiration = Date.now() + vertex.lifetime; this.vertices[vertexId] = vertex; this.uncommittedVertices[vertexId] = vertex; const messages = await vertex.emit(); await this.send(messages); } catch (ex) { delete this.vertices[vertexId]; throw new Error( `activating vertex ${vertexId} failed with error: ${ex}` ); } this.statistics.activations += 1; return vertex; } public async get(vertexId: string) { if (this.vertices[vertexId]) { return this.vertices[vertexId]; } return this.activate(vertexId); } public async commit(vertex: Vertex) { delete this.uncommittedVertices[vertex.id]; if (vertex.needsCommit()) { this.statistics.commits += 1; await vertex.commit(); vertex.lastCommit = Date.now(); vertex.committedHash = vertex.hashState(); } return vertex; } public async deactivate(vertexId: string) { const vertex = this.vertices[vertexId]; delete this.vertices[vertexId]; await vertex.deactivate(); return this.commit(vertex); } public register(vertexClass: any) { this.vertexTypes[vertexClass.VERTEX_TYPE] = vertexClass; } public async send(messages: Message[]) { for (const message of messages) { const vertexType = Vertex.typeFromId(message.to); if (!this.messages[vertexType]) { this.messages[vertexType] = {}; } const typeMessages = this.messages[vertexType]; if (!typeMessages[message.to]) { typeMessages[message.to] = []; } const vertexClass = this.vertexTypes[vertexType]; let completed = false; // if we are using a replacement with latest policy, first scan to see if we have a message from this sender. // if we find one, replace it with this later message. if ( vertexClass.duplicateMessagePolicy === DuplicateMessagePolicy.ProcessLatestFromSender ) { for (const [index, existingMessage] of typeMessages[ message.to ].entries()) { if (existingMessage.from === message.from) { // replace with latest message this.statistics.messagesReplaced += 1; typeMessages[message.to][index] = message; completed = true; } } } if (!completed) { typeMessages[message.to].push(message); this.messagesQueued += 1; this.statistics.messagesSent += 1; } } this.triggerSchedule(); } protected async triggerSchedule() { if (!this.scheduleQueued) { this.scheduleQueued = true; setImmediate(() => this.schedule()); } } protected atMaxParallelism() { // Are we already at max parallelism? const currentParallelism = Object.keys(this.vertexBusy).length; this.statistics.busyVertices = currentParallelism; if (this.statistics.highwaterParallelism < currentParallelism) { this.statistics.highwaterParallelism = currentParallelism; } if (this.maxParallelism && currentParallelism >= this.maxParallelism) { return true; } return false; } protected async processAndEmit(vertex: Vertex, message: Message) { await vertex.process(message); const messages = await vertex.emit(message); await this.send(messages); return vertex; } protected async scheduleMessages() { const messagePromises: { [id: string]: Promise> } = {}; for (const vertexType in this.messages) { if (vertexType) { const typeMessages = this.messages[vertexType]; for (const vertexId in typeMessages) { if (vertexId) { if (this.atMaxParallelism()) { break; } this.statistics.messagesScanned += 1; // peek at message const message = typeMessages[vertexId][0]; if (!this.vertexBusy[message.to]) { typeMessages[vertexId].shift(); this.messagesQueued -= 1; if (typeMessages[vertexId].length === 0) { delete typeMessages[vertexId]; } this.vertexBusy[message.to] = true; const vertex = await this.get(message.to); if (vertex) { messagePromises[ message.to ] = this.processAndEmit(vertex, message); if ( vertex.commitPolicy === CommitPolicy.ShouldCommit ) { this.uncommittedVertices[ vertex.id ] = vertex; } this.statistics.messagesProcessed += 1; } } } } } } return messagePromises; } protected async scheduleCommits() { const commitPromises: { [id: string]: Promise> } = {}; // schedule commits for (const vertexId in this.uncommittedVertices) { if (vertexId) { if (this.atMaxParallelism()) { break; } if (this.vertexBusy[vertexId]) { continue; } const vertex = this.uncommittedVertices[vertexId]; // if this commit has not reached its max committed age then // none of the ones behind it will have, so break. if (Date.now() - vertex.lastCommit < this.maxUncommittedAgeMS) { break; } this.vertexBusy[vertexId] = true; commitPromises[vertexId] = this.commit(vertex); } } return commitPromises; } protected async scheduleDeactivations() { const deactivationPromises: { [id: string]: Promise> } = {}; for (const vertexId in this.vertices) { if (vertexId) { if (this.atMaxParallelism()) { break; } const vertex = this.vertices[vertexId]; if (vertex.expired()) { this.vertexBusy[vertexId] = true; deactivationPromises[vertexId] = this.deactivate(vertexId); } } } return deactivationPromises; } protected displayStats() { console.log( `total messages: ${this.messagesQueued} vertices: ${ Object.keys(this.vertices).length } messages sent: ${this.statistics.messagesSent} processed: ${ this.statistics.messagesProcessed } replaced: ${this.statistics.messagesReplaced} schedules: ${ this.statistics.totalSchedules } memory usage: ${Math.round( process.memoryUsage().heapUsed / 1024 / 1024 )} MB, busy vertices: ${ Object.keys(this.vertexBusy).length } commits: ${this.statistics.commits} uncommitted: ${ Object.keys(this.uncommittedVertices).length }` ); } protected async retirePromises(promises: { [id: string]: Promise>; }) { // retire promises while (Object.keys(promises).length > 0) { const vertex = await Promise.race(Object.values(promises)); delete promises[vertex.id]; delete this.vertexBusy[vertex.id]; this.triggerSchedule(); } } // scans uncommitted vertices to see if we need to commit. // scans message queue to see if there are messages we can process. protected async schedule() { this.scheduleQueued = false; this.statistics.totalSchedules += 1; const deactivationPromises: { [id: string]: Promise>; } = await this.scheduleDeactivations(); const commitPromises: { [id: string]: Promise>; } = await this.scheduleCommits(); const messagePromises: { [id: string]: Promise>; } = await this.scheduleMessages(); if (Date.now() - this.lastStatsDisplay > 1000) { this.displayStats(); this.lastStatsDisplay = Date.now(); } return Promise.all([ this.retirePromises(deactivationPromises), this.retirePromises(messagePromises), this.retirePromises(commitPromises), ]); } }