/*! * Copyright (c) Microsoft Corporation and contributors. All rights reserved. * Licensed under the MIT License. */ import { TypedEventEmitter } from "@fluid-internal/client-utils"; import type { IDeltaManager } from "@fluidframework/container-definitions/internal"; import type { IDisposable, IEvent, ITelemetryBaseLogger, } from "@fluidframework/core-interfaces"; import { assert, Deferred } from "@fluidframework/core-utils/internal"; import { type IDocumentMessage, type ISummaryAck, type ISummaryContent, type ISummaryNack, MessageType, type ISequencedDocumentMessage, } from "@fluidframework/driver-definitions/internal"; import { createChildLogger, type ITelemetryLoggerExt, } from "@fluidframework/telemetry-utils/internal"; /** * Interface for summary op messages with typed contents. * @legacy @beta */ export interface ISummaryOpMessage extends ISequencedDocumentMessage { type: MessageType.Summarize; contents: ISummaryContent; } /** * Interface for summary ack messages with typed contents. * @legacy @beta */ export interface ISummaryAckMessage extends ISequencedDocumentMessage { type: MessageType.SummaryAck; contents: ISummaryAck; } /** * Interface for summary nack messages with typed contents. * @legacy @beta */ export interface ISummaryNackMessage extends ISequencedDocumentMessage { type: MessageType.SummaryNack; contents: ISummaryNack; } /** * A single summary which can be tracked as it goes through its life cycle. * The life cycle is: Local to Broadcast to Acked/Nacked. * @legacy @beta */ export interface ISummary { readonly clientId: string; readonly clientSequenceNumber: number; waitBroadcast(): Promise; waitAckNack(): Promise; } /** * A single summary which has already been acked by the server. * @legacy @beta */ export interface IAckedSummary { readonly summaryOp: ISummaryOpMessage; readonly summaryAck: ISummaryAckMessage; } enum SummaryState { Local = 0, Broadcast = 1, Acked = 2, Nacked = -1, } class Summary implements ISummary { public static createLocal(clientId: string, clientSequenceNumber: number): Summary { return new Summary(clientId, clientSequenceNumber); } public static createFromOp(op: ISummaryOpMessage): Summary { // TODO: Verify whether this should be able to handle server-generated ops (with null clientId) const summary = new Summary(op.clientId as string, op.clientSequenceNumber); summary.broadcast(op); return summary; } private state = SummaryState.Local; private _summaryOp?: ISummaryOpMessage; private _summaryAckNack?: ISummaryAckMessage | ISummaryNackMessage; private readonly defSummaryOp = new Deferred(); private readonly defSummaryAck = new Deferred(); public get summaryOp(): ISummaryOpMessage | undefined { return this._summaryOp; } public get summaryAckNack(): ISummaryAckMessage | ISummaryNackMessage | undefined { return this._summaryAckNack; } private constructor( public readonly clientId: string, public readonly clientSequenceNumber: number, ) {} public hasBeenAcked(): this is IAckedSummary { return this.state === SummaryState.Acked; } public broadcast(op: ISummaryOpMessage): boolean { assert( this.state === SummaryState.Local, 0x175 /* "Can only broadcast if summarizer starts in local state" */, ); this._summaryOp = op; this.defSummaryOp.resolve(); this.state = SummaryState.Broadcast; return true; } public ackNack(op: ISummaryAckMessage | ISummaryNackMessage): boolean { assert( this.state === SummaryState.Broadcast, 0x176 /* "Can only ack/nack if summarizer is in broadcasting state" */, ); this._summaryAckNack = op; this.defSummaryAck.resolve(); this.state = op.type === MessageType.SummaryAck ? SummaryState.Acked : SummaryState.Nacked; return true; } public async waitBroadcast(): Promise { await this.defSummaryOp.promise; // eslint-disable-next-line @typescript-eslint/no-non-null-assertion return this._summaryOp!; } public async waitAckNack(): Promise { await this.defSummaryAck.promise; // eslint-disable-next-line @typescript-eslint/no-non-null-assertion return this._summaryAckNack!; } } /** * Watches summaries created by a specific client. * @legacy @beta */ export interface IClientSummaryWatcher extends IDisposable { watchSummary(clientSequenceNumber: number): ISummary; waitFlushed(): Promise; } /** * This class watches summaries created by a specific client. * It should be created and managed from a SummaryCollection. */ class ClientSummaryWatcher implements IClientSummaryWatcher { // key: clientSeqNum private readonly localSummaries = new Map(); private _disposed = false; public get disposed(): boolean { return this._disposed; } public constructor( public readonly clientId: string, private readonly summaryCollection: SummaryCollection, ) {} /** * Watches for a specific sent summary op. * @param clientSequenceNumber - client sequence number of sent summary op */ public watchSummary(clientSequenceNumber: number): ISummary { let summary = this.localSummaries.get(clientSequenceNumber); if (!summary) { summary = Summary.createLocal(this.clientId, clientSequenceNumber); this.localSummaries.set(summary.clientSequenceNumber, summary); } return summary; } /** * Waits until all of the pending summaries in the underlying SummaryCollection * are acked/nacked. */ public async waitFlushed(): Promise { return this.summaryCollection.waitFlushed(); } /** * Gets a watched summary or returns undefined if not watched. * @param clientSequenceNumber - client sequence number of sent summary op */ public tryGetSummary(clientSequenceNumber: number): Summary | undefined { return this.localSummaries.get(clientSequenceNumber); } /** * Starts watching a summary made by this client. * @param summary - summary to start watching */ public setSummary(summary: Summary): void { this.localSummaries.set(summary.clientSequenceNumber, summary); } public dispose(): void { this.summaryCollection.removeWatcher(this.clientId); this._disposed = true; } } /** * @legacy @beta */ export type OpActionEventName = | MessageType.Summarize | MessageType.SummaryAck | MessageType.SummaryNack | "default"; /** * @legacy @beta */ export type OpActionEventListener = (op: ISequencedDocumentMessage) => void; /** * @legacy @beta */ export interface ISummaryCollectionOpEvents extends IEvent { (event: OpActionEventName, listener: OpActionEventListener); } /** * Data structure that looks at the op stream to track summaries as they * are broadcast, acked and nacked. * It provides functionality for watching specific summaries. * @legacy @beta */ export class SummaryCollection extends TypedEventEmitter { // key: clientId private readonly summaryWatchers = new Map(); // key: summarySeqNum private readonly pendingSummaries = new Map(); private refreshWaitNextAck = new Deferred(); private lastSummaryTimestamp: number | undefined; private maxAckWaitTime: number | undefined; private pendingAckTimerTimeoutCallback: (() => void) | undefined; private lastAck: IAckedSummary | undefined; public get latestAck(): IAckedSummary | undefined { return this.lastAck; } public emit(event: OpActionEventName, ...args: Parameters): boolean { return super.emit(event, ...args); } public get opsSinceLastAck(): number { return ( this.deltaManager.lastSequenceNumber - (this.lastAck?.summaryAck.sequenceNumber ?? this.deltaManager.initialSequenceNumber) ); } public addOpListener(listener: () => void): void { this.deltaManager.on("op", listener); } public removeOpListener(listener: () => void): void { this.deltaManager.off("op", listener); } private readonly logger: ITelemetryLoggerExt; public constructor( private readonly deltaManager: IDeltaManager, logger: ITelemetryBaseLogger, ) { super(); this.deltaManager.on("op", (op) => this.handleOp(op)); this.logger = createChildLogger({ logger }); } /** * Creates and returns a summary watcher for a specific client. * This will allow for local sent summaries to be better tracked. * @param clientId - client id for watcher */ public createWatcher(clientId: string): IClientSummaryWatcher { const watcher = new ClientSummaryWatcher(clientId, this); this.summaryWatchers.set(clientId, watcher); return watcher; } public removeWatcher(clientId: string): void { this.summaryWatchers.delete(clientId); } public setPendingAckTimerTimeoutCallback( maxAckWaitTime: number, timeoutCallback: () => void, ): void { this.maxAckWaitTime = maxAckWaitTime; this.pendingAckTimerTimeoutCallback = timeoutCallback; } public unsetPendingAckTimerTimeoutCallback(): void { this.maxAckWaitTime = undefined; this.pendingAckTimerTimeoutCallback = undefined; } /** * Returns a promise that resolves once all pending summary ops * have been acked or nacked. */ public async waitFlushed(): Promise { while (this.pendingSummaries.size > 0) { // eslint-disable-next-line @typescript-eslint/promise-function-async const promises = Array.from(this.pendingSummaries, ([, summary]) => summary.waitAckNack(), ); await Promise.all(promises); } return this.lastAck; } /** * Returns a promise that resolves once a summary is acked that has a reference * sequence number greater than or equal to the passed in sequence number. * @param referenceSequenceNumber - reference sequence number to wait for * @returns The latest acked summary */ public async waitSummaryAck(referenceSequenceNumber: number): Promise { while ( !this.lastAck || this.lastAck.summaryOp.referenceSequenceNumber < referenceSequenceNumber ) { await this.refreshWaitNextAck.promise; } return this.lastAck; } private parseContent(op: ISequencedDocumentMessage): void { // This should become unconditional once (Loader LTS) reaches 2.4 or later // There will be a long time of needing both cases, until LTS catches up to the change. // That said, we may instead move to listen for "op" events from ContainerRuntime, // and parsing may not be required at all if ContainerRuntime.process() continues to parse it for all types of ops. if (typeof op.contents === "string") { op.contents = JSON.parse(op.contents); } } /** * Handler for ops; only handles ops relating to summaries. * @param op - op message to handle */ private handleOp(opArg: ISequencedDocumentMessage): void { const op = { ...opArg }; switch (op.type) { case MessageType.Summarize: { this.parseContent(op); return this.handleSummaryOp(op as ISummaryOpMessage); } case MessageType.SummaryAck: case MessageType.SummaryNack: { // Old files (prior to PR #10077) may not contain this info if (op.data === undefined) { this.parseContent(op); } else { op.contents = JSON.parse(op.data); } return op.type === MessageType.SummaryAck ? this.handleSummaryAck(op as ISummaryAckMessage) : this.handleSummaryNack(op as ISummaryNackMessage); } default: { // If the difference between timestamp of current op and last summary op is greater than // the maxAckWaitTime, then we need to inform summarizer to not wait and summarize // immediately as we have already waited for maxAckWaitTime. const lastOpTimestamp = op.timestamp; if ( this.lastSummaryTimestamp !== undefined && this.maxAckWaitTime !== undefined && lastOpTimestamp - this.lastSummaryTimestamp >= this.maxAckWaitTime ) { this.pendingAckTimerTimeoutCallback?.(); } this.emit("default", op); return; } } } private handleSummaryOp(op: ISummaryOpMessage): void { let summary: Summary | undefined; // Check if summary already being watched, broadcast if so // TODO: Verify whether this should be able to handle server-generated ops (with null clientId) const watcher = this.summaryWatchers.get(op.clientId as string); if (watcher) { summary = watcher.tryGetSummary(op.clientSequenceNumber); if (summary) { summary.broadcast(op); } } // If not watched, create from op if (!summary) { summary = Summary.createFromOp(op); if (watcher) { watcher.setSummary(summary); } } this.pendingSummaries.set(op.sequenceNumber, summary); this.lastSummaryTimestamp = op.timestamp; this.emit(MessageType.Summarize, op); } private handleSummaryAck(op: ISummaryAckMessage): void { const seq = op.contents.summaryProposal.summarySequenceNumber; const summary = this.pendingSummaries.get(seq); // eslint-disable-next-line @typescript-eslint/prefer-optional-chain -- optional chain is not logically equivalent if (!summary || summary.summaryOp === undefined) { // Summary ack without an op should be rare. We could fetch the // reference sequence number from the snapshot, but instead we // will not emit this ack. It should be the case that the summary // op that this ack is for is earlier than this file was loaded // from. i.e. initialSequenceNumber > summarySequenceNumber. // We really don't care about it for now, since it is older than // the one we loaded from. if (seq > this.deltaManager.initialSequenceNumber) { // Potential causes for it to be later than our initialSequenceNumber // are that the summaryOp was nacked then acked, double-acked, or // the summarySequenceNumber is incorrect. this.logger.sendTelemetryEvent({ eventName: "SummaryAckWithoutOp", sequenceNumber: op.sequenceNumber, // summary ack seq # summarySequenceNumber: seq, // missing summary seq # initialSequenceNumber: this.deltaManager.initialSequenceNumber, }); } return; } summary.ackNack(op); this.pendingSummaries.delete(seq); // Track latest ack if ( !this.lastAck || seq > this.lastAck.summaryAck.contents.summaryProposal.summarySequenceNumber ) { this.lastAck = { summaryOp: summary.summaryOp, summaryAck: op, }; this.refreshWaitNextAck.resolve(); this.refreshWaitNextAck = new Deferred(); this.emit(MessageType.SummaryAck, op); } } private handleSummaryNack(op: ISummaryNackMessage): void { const seq = op.contents.summaryProposal.summarySequenceNumber; const summary = this.pendingSummaries.get(seq); if (summary) { summary.ackNack(op); this.pendingSummaries.delete(seq); this.emit(MessageType.SummaryNack, op); } } }