/*! * Copyright (c) Microsoft Corporation and contributors. All rights reserved. * Licensed under the MIT License. */ import assert from "assert"; import { EventEmitter } from "events"; import { IContext, IQueuedMessage, ILogger, IContextErrorData, IRoutingKey, } from "@fluidframework/server-services-core"; import { Lumberjack } from "@fluidframework/server-services-telemetry"; const InitialOffset: IQueuedMessage = { offset: -1, partition: -1, topic: "", value: undefined, }; /** * @internal */ export class DocumentContext extends EventEmitter implements IContext { // We track two offsets - head and tail. Head represents the largest offset related to this document we // have seen. Tail represents the last checkpointed offset. When head and tail match we have fully checkpointed // the document. private headInternal: IQueuedMessage; private tailInternal: IQueuedMessage; private lastSuccessfulOffsetInternal: number; private closed = false; private contextError = undefined; // Below flag is used to track whether head has been updated after a pause/resume event. // This is to allow moving out of order once during resume. // Value = true means it is in a paused state and waiting to be updated during resume. private headPaused = false; constructor( private readonly routingKey: IRoutingKey, head: IQueuedMessage, public readonly log: ILogger | undefined, private readonly getLatestTail: () => IQueuedMessage, private readonly getContextManagerPauseState: () => { headPaused: boolean; tailPaused: boolean; }, ) { super(); // Head represents the largest offset related to the document that is not checkpointed. // Tail will be set to the checkpoint offset of the previous head this.headInternal = head; this.tailInternal = this.getLatestTail(); this.lastSuccessfulOffsetInternal = this.tailInternal.offset; // will be -1 at creation // If docContext is created while contentManager's tailPaused == true, then this.getLatestTail() would return the old tail // which would be inaccurate and higher than the headInternal offset. // In that case, set tailInternal to InitialOffset, and set lastSuccessfulOffsetInternal to head.offset - 1 if ( !this.getContextManagerPauseState().headPaused && this.getContextManagerPauseState().tailPaused ) { this.tailInternal = InitialOffset; this.lastSuccessfulOffsetInternal = this.head.offset - 1; Lumberjack.info( "Resetting documentContext's tail and lastSuccessfulOffset since the contextManager's tail is not yet updated after resume", { headOffset: this.head.offset, tailOffset: this.tail.offset, lastSuccessfulOffset: this.lastSuccessfulOffset, }, ); } } public get head(): IQueuedMessage { return this.headInternal; } public get tail(): IQueuedMessage { return this.tailInternal; } public get lastSuccessfulOffset(): number { return this.lastSuccessfulOffsetInternal; } public get documentId(): string { return this.routingKey.documentId; } /** * Returns whether or not there is pending work in flight - i.e. the head and tail are not equal */ public hasPendingWork(): boolean { return this.headInternal !== this.tailInternal; } /** * Sets the last successfully processed offset. */ public setLastSuccessfulOffset(offset: number) { this.lastSuccessfulOffsetInternal = offset; } /** * Sets the state to pause, i.e. headPaused = true, without emitting the pause event. * It is different than pause() method which emits the pause event. * This is used to set the state to pause when another doc in the same kafka partition triggered pause and we want to pause all the docs in that kafka partition. */ public setStateToPause() { this.headPaused = true; } /** * Updates the head offset for the context. */ public setHead(head: IQueuedMessage) { assert( head.offset > this.head.offset || this.headPaused, `Head offset ${head.offset} must be greater than the current head offset ${this.head.offset} or headPaused should be true (${this.headPaused}). Topic ${head.topic}, partition ${head.partition}, tenantId ${this.routingKey.tenantId}, documentId ${this.routingKey.documentId}.`, ); // If head is moving backwards if (head.offset <= this.head.offset) { if (head.offset <= this.tail.offset) { Lumberjack.verbose( "Not updating documentContext head since new head's offset is <= last checkpoint offset (tail), returning early", { newHeadOffset: head.offset, currentHeadOffset: this.head.offset, currentTailOffset: this.tail.offset, lastSuccessfulOffset: this.lastSuccessfulOffset, documentId: this.routingKey.documentId, }, ); return false; } // allow moving backwards Lumberjack.info( "Allowing the document context head to move backwards to the specified offset", { newHeadOffset: head.offset, currentHeadOffset: this.head.offset, currentTailOffset: this.tail.offset, lastSuccessfulOffset: this.lastSuccessfulOffset, headPaused: this.headPaused, documentId: this.routingKey.documentId, }, ); } // When moving back to a state where head and tail differ we set the tail to be the old head, as in the // constructor, to make tail represent the inclusive top end of the checkpoint range. if (!this.hasPendingWork()) { this.tailInternal = this.getLatestTail(); } if (this.headPaused) { Lumberjack.info("Setting headPaused to false", { newHeadOffset: head.offset, currentHeadOffset: this.head.offset, currentTailOffset: this.tail.offset, lastSuccessfulOffset: this.lastSuccessfulOffset, documentId: this.routingKey.documentId, }); this.headPaused = false; } this.headInternal = head; return true; } public checkpoint(message: IQueuedMessage, restartOnCheckpointFailure?: boolean) { if (this.closed) { return; } // skip checkpoint in paused state const contextManagerPauseState = this.getContextManagerPauseState(); if ( this.headPaused || (!contextManagerPauseState.headPaused && contextManagerPauseState.tailPaused) ) { const telemetryProperties = { documentId: this.routingKey.documentId, tenantId: this.routingKey.tenantId, headOffset: this.head.offset, tailOffset: this.tail.offset, checkpointOffset: message.offset, headPaused: this.headPaused, }; if (this.headPaused) { Lumberjack.info( "Skipping doc checkpoint since the documentContext is currently in paused state", telemetryProperties, ); } else { // contextManager's tail is resumed after head, so its possible to be in this state, but vice versa is not possible // but document shouldnt be checkpointing at this time, adding a log here to monitor if this happens Lumberjack.warning( "Skipping doc checkpoint since contextManager's tail is not yet updated after resume", telemetryProperties, ); } return; } // Assert offset is between the current tail and head const offset = message.offset; assert( offset > this.tail.offset && offset <= this.head.offset, `Checkpoint offset ${offset} must be greater than the current tail offset ${this.tail.offset} and less than or equal to the head offset ${this.head.offset}. Topic ${message.topic}, partition ${message.partition}, tenantId ${this.routingKey.tenantId}, documentId ${this.routingKey.documentId}.`, ); // Update the tail and broadcast the checkpoint this.tailInternal = message; this.emit("checkpoint", restartOnCheckpointFailure); } public error(error: any, errorData: IContextErrorData) { if (this.closed) { // don't emit errors after closing Lumberjack.info("Skipping emitting error since the documentContext is already closed", { documentId: this.routingKey.documentId, tenantId: this.routingKey.tenantId, }); return; } this.contextError = error; Lumberjack.verbose("Emitting error from documentContext"); this.emit("error", error, errorData); } public close() { this.closed = true; this.removeAllListeners(); } public getContextError() { return this.contextError; } public pause(offset?: number, reason?: any) { this.headPaused = true; this.emit("pause", offset, reason); } public resume() { this.emit("resume"); } }