/*! * Copyright (c) Microsoft Corporation and contributors. All rights reserved. * Licensed under the MIT License. */ import type { IDisposable, ITelemetryBaseLogger } from "@fluidframework/core-interfaces"; import type { InboundSequencedContainerRuntimeMessage, LocalContainerRuntimeMessage } from "./messageTypes.js"; import { type EmptyGroupedBatch, type LocalBatchMessage, type InboundMessageResult, type LocalEmptyBatchPlaceholder, type BatchResubmitInfo } from "./opLifecycle/index.js"; /** * This represents a message that has been submitted and is added to the pending queue when `submit` is called on the * ContainerRuntime. This message has either not been ack'd by the server or has not been submitted to the server yet. * * @remarks This is the current serialization format for pending local state when a Container is serialized. */ export interface IPendingMessage { type: "message"; referenceSequenceNumber: number; /** * Serialized copy of runtimeOp */ content: string; /** * The original runtime op that was submitted to the ContainerRuntime * Unless this pending message came from stashed content, in which case this is undefined at first and then deserialized from the contents string */ runtimeOp: LocalContainerRuntimeMessage | EmptyGroupedBatch | undefined; /** * Local Op Metadata that was passed to the ContainerRuntime when the op was submitted. * This contains state needed when processing the ack, or to resubmit or rollback the op. */ localOpMetadata: unknown; /** * Metadata that was passed to the ContainerRuntime when the op was submitted. * This is rarely used, and may be inspected by the service (as opposed to op contents which is opaque) */ opMetadata: Record | undefined; /** * Populated upon processing the op's ack, before moving the pending message to savedOps. */ sequenceNumber?: number; /** * Info about the batch this pending message belongs to, for validation and for computing the batchId on reconnect * We don't include batchId itself to avoid redundancy, because that's stamped on opMetadata above */ batchInfo: { /** * The Batch's original clientId, from when it was first flushed to be submitted. * Or, a random uuid if it was never submitted (and batchStartCsn will be -1) */ clientId: string; /** * The Batch's original clientSequenceNumber, from when it was first flushed to be submitted * Or, -1 if it was never submitted (and clientId will be a random uuid) */ batchStartCsn: number; /** * length of the batch (how many runtime messages here) */ length: number; /** * If true, this batch is staged and should not actually be submitted on replayPendingStates. */ staged: boolean; }; } export interface IPendingLocalState { /** * list of pending states, including ops and batch information */ pendingStates: IPendingMessage[]; } /** * Info needed to replay/resubmit a pending message */ export type PendingMessageResubmitData = Pick & { runtimeOp: LocalContainerRuntimeMessage; }; export interface PendingBatchResubmitMetadata extends BatchResubmitInfo { /** * Whether changes in this batch should be squashed when resubmitting. */ squash: boolean; } export interface IRuntimeStateHandler { connected(): boolean; clientId(): string | undefined; applyStashedOp(serializedOp: string): Promise; reSubmitBatch(batch: PendingMessageResubmitData[], metadata: PendingBatchResubmitMetadata): void; isActiveConnection: () => boolean; isAttached: () => boolean; } /** * Finds and returns the index where the strings diverge, and the character at that index in each string (or undefined if not applicable) * It scrubs non-ASCII characters since they convey more meaning (privacy consideration) */ export declare function findFirstCharacterMismatched(a: string, b: string): [index: number, charA?: string, charB?: string]; interface ReplayPendingStateOptions { /** * If true, only replay staged batches, clearing the "staged" flag. * This is used when we are exiting staging mode and want to rebase and submit the staged batches without resubmitting pre-staged messages. * Default: false */ committingStagedBatches: boolean; /** * @param squash - If true, edits should be squashed when resubmitting. * Default: false */ squash: boolean; } /** * PendingStateManager is responsible for maintaining the messages that have not been sent or have not yet been * acknowledged by the server. It also maintains the batch information for both automatically and manually flushed * batches along with the messages. * When the Container reconnects, it replays the pending states, which includes manual flushing * of messages and triggering resubmission of unacked ops. * * It verifies that all the ops are acked, are received in the right order and batch information is correct. */ export declare class PendingStateManager implements IDisposable { private readonly stateHandler; /** * Messages that will need to be resubmitted if not ack'd before the next reconnection */ private readonly pendingMessages; /** * Messages stashed from a previous container, now being rehydrated. Need to be resubmitted. */ private readonly initialMessages; /** * Sequenced local ops that are saved when stashing since pending ops may depend on them */ private savedOps; private readonly disposeOnce; /** * Used to ensure we don't replay ops on the same connection twice */ private clientIdFromLastReplay; /** * The pending messages count. Includes `pendingMessages` and `initialMessages` to keep in sync with * 'hasPendingMessages'. */ get pendingMessagesCount(): number; /** * Checks the pending messages to see if any of them represent user changes (aka "dirtyable" messages) */ hasPendingUserChanges(): boolean; /** * The minimumPendingMessageSequenceNumber is the minimum of the first pending message and the first initial message. * * We need this so that we can properly keep local data and maintain the correct sequence window. */ get minimumPendingMessageSequenceNumber(): number | undefined; /** * Called to check if there are any pending messages in the pending message queue. * @returns A boolean indicating whether there are messages or not. */ hasPendingMessages(): boolean; getLocalState(snapshotSequenceNumber?: number): { pending: IPendingLocalState; }; private readonly logger; constructor(stateHandler: IRuntimeStateHandler, stashedLocalState: IPendingLocalState | undefined, logger: ITelemetryBaseLogger); get disposed(): boolean; readonly dispose: () => void; /** * We've flushed an empty batch, and need to track it locally until the corresponding * ack is processed, to properly track batch IDs */ onFlushEmptyBatch(placeholder: LocalEmptyBatchPlaceholder, clientSequenceNumber: number | undefined, staged: boolean): void; /** * The given batch has been flushed, and needs to be tracked locally until the corresponding * acks are processed, to ensure it is successfully sent. * @param batch - The batch that was flushed * @param clientSequenceNumber - The CSN of the first message in the batch, * or undefined if the batch was not yet sent (e.g. by the time we flushed we lost the connection) * @param staged - Indicates whether batch is staged (not to be submitted while runtime is in Staging Mode) */ onFlushBatch(batch: LocalBatchMessage[] | [LocalEmptyBatchPlaceholder], clientSequenceNumber: number | undefined, staged: boolean): void; /** * Applies stashed ops at their reference sequence number so they are ready to be ACKed or resubmitted * @param seqNum - Sequence number at which to apply ops. Will apply all ops if seqNum is undefined. */ applyStashedOpsAt(seqNum?: number): Promise; /** * Compares the batch ID of the incoming batch with the pending batch ID for this client. * They should not match, as that would indicate a forked container. * @param remoteBatchStart - BatchStartInfo for an incoming batch *NOT* submitted by this client * @returns whether the batch IDs match */ private remoteBatchMatchesPendingBatch; /** * Processes an inbound message or batch of messages - May be local or remote. * * @param inbound - The inbound message(s) to process, with extra info (e.g. about the start of a batch). Could be local or remote. * @param local - true if we submitted these messages and expect corresponding pending messages * @returns The inbound messages with localOpMetadata "zipped" in. * * @throws a DataProcessingError in either of these cases: * - The pending message content doesn't match the incoming message content for any message here * - The batch IDs *do match* but it's not local (indicates Container forking). */ processInboundMessages(inbound: InboundMessageResult, local: boolean): { message: InboundSequencedContainerRuntimeMessage; localOpMetadata?: unknown; }[]; /** * Processes the incoming message(s) from the server that were submitted by this client. * It verifies that messages are received in the right order and that any batch information is correct. * @param inbound - The inbound message(s) (originating from this client) to correlate with the pending local state * @throws DataProcessingError if the pending message content doesn't match the incoming message content for any message here * @returns The inbound messages with localOpMetadata "zipped" in. */ private processPendingLocalMessages; /** * Processes the pending local copy of message that's been ack'd by the server. * @param sequenceNumber - The sequenceNumber from the server corresponding to the next pending message. * @param message - [optional] The entire incoming message, for comparing contents with the pending message for extra validation. * @throws DataProcessingError if the pending message content doesn't match the incoming message content. * @returns The localOpMetadata of the next pending message, to be sent to whoever submitted the original message. */ private processNextPendingMessage; /** * Check if the incoming batch matches the batch info for the next pending message. */ private onLocalBatchBegin; /** * Called when the Container's connection state changes. If the Container gets connected, it replays all the pending * states in its queue. This includes triggering resubmission of unacked ops. * ! Note: successfully resubmitting an op that has been successfully sequenced is not possible due to checks in the ConnectionStateHandler (Loader layer) * * @returns The unique batch infos for all batches that were replayed. */ replayPendingStates(options?: ReplayPendingStateOptions): IPendingMessage["batchInfo"][]; /** * Pops all staged batches, invoking the callback on each constituent op in order (LIFO) */ popStagedBatches(callback: (stagedMessage: IPendingMessage & { runtimeOp: LocalContainerRuntimeMessage; }) => void): IPendingMessage["batchInfo"][]; } export {}; //# sourceMappingURL=pendingStateManager.d.ts.map