import { P2P as P2PTypes } from '@shardeum-foundation/lib-types'; import StateManager from '.'; import Crypto from '../crypto'; import Logger from '../logger'; import { P2PModuleContext as P2P } from '../p2p/Context'; import * as Shardus from '../shardus/shardus-types'; import Storage from '../storage'; import * as utils from '../utils'; import Profiler from '../utils/profiler'; import { AcceptedTx, CommitConsensedTransactionResult, PreApplyAcceptedTransactionResult, ProcessQueueStats, QueueCountsResult, QueueEntry, SeenAccounts, SimpleNumberStats, TxDebug, WrappedResponses, ArchiverReceipt, NonceQueueItem, RequestFinalDataResp } from './state-manager-types'; import { Logger as L4jsLogger } from 'log4js'; export declare enum DebugComplete { Incomplete = 0, Completed = 1 } declare class TransactionQueue { app: Shardus.App; crypto: Crypto; config: Shardus.StrictServerConfiguration; profiler: Profiler; logger: Logger; p2p: P2P; storage: Storage; stateManager: StateManager; mainLogger: L4jsLogger; seqLogger: L4jsLogger; fatalLogger: L4jsLogger; shardLogger: L4jsLogger; statsLogger: L4jsLogger; statemanager_fatal: (key: string, log: string) => void; _transactionQueue: QueueEntry[]; pendingTransactionQueue: QueueEntry[]; archivedQueueEntries: QueueEntry[]; txDebugStatList: utils.FIFOCache; _transactionQueueByID: Map; pendingTransactionQueueByID: Map; archivedQueueEntriesByID: Map; receiptsToForward: ArchiverReceipt[]; forwardedReceiptsByTimestamp: Map; receiptsBundleByInterval: Map; receiptsForwardedTimestamp: number; queueStopped: boolean; queueEntryCounter: number; queueRestartCounter: number; archivedQueueEntryMaxCount: number; transactionProcessingQueueRunning: boolean; processingLastRunTime: number; processingMinRunBreak: number; transactionQueueHasRemainingWork: boolean; executeInOneShard: boolean; useNewPOQ: boolean; usePOQo: boolean; txCoverageMap: { [key: symbol]: unknown; }; /** This is a set of updates to rework how TXs can time out in the queue. After a enough testing this should become the default and we can remove the old code */ queueTimingFixes: boolean; /** process loop stats. This map contains the latest and the last of each time overage category */ lastProcessStats: { [limitName: string]: ProcessQueueStats; }; largePendingQueueReported: boolean; queueReads: Set; queueWrites: Set; queueReadWritesOld: Set; /** is the processing queue currently considered stuck */ isStuckProcessing: boolean; /** this s how many times the processing queue has transitioned from unstuck to stuck */ stuckProcessingCount: number; /** this is how many cycles processing is stuck becuase it has not run recently */ stuckProcessingCyclesCount: number; /** this is how many cycles processing is stuck and we can confirm the queue did not finish */ stuckProcessingQueueLockedCyclesCount: number; /** these three strings help us have a trail if the processing queue becomes stuck */ debugLastAwaitedCall: string; debugLastAwaitedCallInner: string; debugLastAwaitedAppCall: string; debugLastAwaitedCallInnerStack: { [key: string]: number; }; debugLastAwaitedAppCallStack: { [key: string]: number; }; debugLastProcessingQueueStartTime: number; debugRecentQueueEntry: QueueEntry; nonceQueue: Map; constructor(stateManager: StateManager, profiler: Profiler, app: Shardus.App, logger: Logger, storage: Storage, p2p: P2P, crypto: Crypto, config: Shardus.StrictServerConfiguration); /*** * ######## ## ## ######## ######## ####### #### ## ## ######## ###### * ## ### ## ## ## ## ## ## ## ## ### ## ## ## ## * ## #### ## ## ## ## ## ## ## ## #### ## ## ## * ###### ## ## ## ## ## ######## ## ## ## ## ## ## ## ###### * ## ## #### ## ## ## ## ## ## ## #### ## ## * ## ## ### ## ## ## ## ## ## ## ### ## ## ## * ######## ## ## ######## ## ####### #### ## ## ## ###### */ setupHandlers(): void; isTxInPendingNonceQueue(accountId: string, txId: string): boolean; getPendingCountInNonceQueue(): { totalQueued: number; totalAccounts: number; avgQueueLength: number; }; addTransactionToNonceQueue(nonceQueueEntry: NonceQueueItem): { success: boolean; reason?: string; alreadyAdded?: boolean; }; processNonceQueue(accounts: Shardus.WrappedData[]): Promise; handleSharedTX(tx: Shardus.TimestampedTx, appData: unknown, sender: Shardus.Node): QueueEntry; /*** * ### ######## ######## ###### ######## ### ######## ######## * ## ## ## ## ## ## ## ## ## ## ## ## ## * ## ## ## ## ## ## ## ## ## ## ## ## * ## ## ######## ######## ###### ## ## ## ## ###### * ######### ## ## ## ## ######### ## ## * ## ## ## ## ## ## ## ## ## ## ## * ## ## ## ## ###### ## ## ## ## ######## */ /** * getAccountsStateHash * DEPRICATED in current sync algorithm. This is very slow when we have many accounts or TXs * @param accountStart * @param accountEnd * @param tsStart * @param tsEnd */ getAccountsStateHash(accountStart?: string, accountEnd?: string, tsStart?: number, tsEnd?: number): Promise; /** * preApplyTransaction * call into the app code to apply a transaction on our in memory copies of data. * the results will be used for voting/consensus (if non-global) * when a receipt is formed commitConsensedTransaction will actually commit a the data * @param queueEntry */ preApplyTransaction(queueEntry: QueueEntry): Promise; configUpdated(): void; resetTxCoverageMap(): void; /** * commitConsensedTransaction * This works with our in memory copies of data that have had a TX applied. * This calls into the app to save data and also updates: * acceptedTransactions, stateTableData and accountsCopies DB tables * accountCache and accountStats * @param queueEntry */ commitConsensedTransaction(queueEntry: QueueEntry): Promise; updateHomeInformation(txQueueEntry: QueueEntry): void; tryInvloveAccount(txId: string, address: string, isRead: boolean): boolean; /*** * ######## ## ## ####### ## ## ######## ## ## ######## * ## ### ## ## ## ## ## ## ## ## ## * ## #### ## ## ## ## ## ## ## ## ## * ###### ## ## ## ## ## ## ## ###### ## ## ###### * ## ## #### ## ## ## ## ## ## ## ## ## * ## ## ### ## ## ## ## ## ## ## ## * ######## ## ## ##### ## ####### ######## ####### ######## */ routeAndQueueAcceptedTransaction(acceptedTx: AcceptedTx, sendGossip: boolean, sender: Shardus.Node | null, globalModification: boolean, noConsensus: boolean): string | boolean; queueEntryPrePush(txQueueEntry: QueueEntry): Promise; /*** * ####### ### ###### ###### ######## ###### ###### * ## ## ## ## ## ## ## ## ## ## ## ## ## * ## ## ## ## ## ## ## ## ## * ## ## ## ## ## ## ###### ###### ###### * ## ## ## ######### ## ## ## ## ## * ## ## ## ## ## ## ## ## ## ## ## ## ## * ##### ## ## ## ###### ###### ######## ###### ###### */ /** * getQueueEntry * get a queue entry from the current queue * @param txid */ getQueueEntry(txid: string): QueueEntry | null; /** * getQueueEntrySafe * get a queue entry from the queue or the pending queue (but not archive queue) * @param txid */ getQueueEntrySafe(txid: string): QueueEntry | null; /** * getQueueEntryArchived * get a queue entry from the archive queue only * @param txid * @param msg */ getQueueEntryArchived(txid: string, msg: string): QueueEntry | null; getArchivedQueueEntryByAccountIdAndHash(accountId: string, hash: string, msg: string): QueueEntry | null; /** * getQueueEntryArchived * get a queue entry from the archive queue only * @param txid * @param msg */ getQueueEntryArchivedByTimestamp(timestamp: number, msg: string): QueueEntry | null; /** * queueEntryAddData * add data to a queue entry * // TODO CODEREVIEW. need to look at the use of local cache. also is the early out ok? * @param queueEntry * @param data */ queueEntryAddData(queueEntry: QueueEntry, data: Shardus.WrappedResponse, signatureCheck?: boolean): void; shareCompleteDataToNeighbours(queueEntry: QueueEntry): Promise; gossipCompleteData(queueEntry: QueueEntry): Promise; /** * queueEntryHasAllData * Test if the queueEntry has all the data it needs. * TODO could be slightly more if it only recalculated when dirty.. but that would add more state and complexity, * so wait for this to show up in the profiler before fixing * @param queueEntry */ queueEntryHasAllData(queueEntry: QueueEntry): boolean; queueEntryListMissingData(queueEntry: QueueEntry): string[]; /** * queueEntryRequestMissingData * ask other nodes for data that is missing for this TX. * normally other nodes in the network should foward data to us at the correct time. * This is only for the case that a TX has waited too long and not received the data it needs. * @param queueEntry */ queueEntryRequestMissingData(queueEntry: QueueEntry): Promise; /** * queueEntryRequestMissingReceipt * Ask other nodes for a receipt to go with this TX * @param queueEntry */ queueEntryRequestMissingReceipt(queueEntry: QueueEntry): Promise; computeNodeRank(nodeId: string, txId: string, txTimestamp: number): bigint; orderNodesByRank(nodeList: Shardus.Node[], queueEntry: QueueEntry): Shardus.NodeWithRank[]; /** * queueEntryGetTransactionGroup * @param {QueueEntry} queueEntry * @returns {Node[]} */ queueEntryGetTransactionGroup(queueEntry: QueueEntry, tryUpdate?: boolean): Shardus.Node[]; /** * queueEntryGetConsensusGroup * Gets a merged results of all the consensus nodes for all of the accounts involved in the transaction * Ignores global accounts if globalModification == false and the account is global * @param {QueueEntry} queueEntry * @returns {Node[]} */ queueEntryGetConsensusGroup(queueEntry: QueueEntry): Shardus.Node[]; /** * queueEntryGetConsensusGroupForAccount * Gets a merged results of all the consensus nodes for a specific account involved in the transaction * Ignores global accounts if globalModification == false and the account is global * @param {QueueEntry} queueEntry * @returns {Node[]} */ queueEntryGetConsensusGroupForAccount(queueEntry: QueueEntry, accountId: string): Shardus.Node[]; /** * tellCorrespondingNodes * @param queueEntry * -sends account data to the correct involved nodees * -loads locally available data into the queue entry */ broadcastState(nodes: Shardus.Node[], message: { stateList: Shardus.WrappedResponse[]; txid: string; }, context: string): Promise; /** * tellCorrespondingNodes * @param queueEntry * -sends account data to the correct involved nodees * -loads locally available data into the queue entry */ tellCorrespondingNodes(queueEntry: QueueEntry): Promise; /** * Calculate FACT v2 sender and receiver groups * @param transactionGroup Full transaction group * @param executionGroup Execution group subset * @returns Reduced sender and receiver groups for FACT communication */ calculateFactSenderGroup(queueEntry: QueueEntry): [string[], P2PTypes.NodeListTypes.Node[]]; factTellCorrespondingNodes(queueEntry: QueueEntry): Promise; validateCorrespondingTellSender(queueEntry: QueueEntry, dataKey: string, senderNodeId: string): boolean; factValidateCorrespondingTellSender(queueEntry: QueueEntry, dataKey: string, senderNodeId: string): boolean; getStartAndEndIndexOfTargetGroup(targetGroup: string[], transactionGroup: (Shardus.NodeWithRank | P2PTypes.NodeListTypes.Node)[]): { startIndex: number; endIndex: number; }; factTellCorrespondingNodesFinalData(queueEntry: QueueEntry): void; factValidateCorrespondingTellFinalDataSender(queueEntry: QueueEntry, senderNodeId: string): boolean; dumpTxDebugToStatList(queueEntry: QueueEntry): void; clearTxDebugStatList(): void; printTxDebugByTxId(txId: string): string; printTxDebug(): string; /** * removeFromQueue remove an item from the queue and place it in the archivedQueueEntries list for awhile in case we have to access it again * @param {QueueEntry} queueEntry * @param {number} currentIndex */ removeFromQueue(queueEntry: QueueEntry, currentIndex: number, archive?: boolean): void; /*** * ######## ######## ####### ###### ######## ###### ###### * ## ## ## ## ## ## ## ## ## ## ## ## ## * ## ## ## ## ## ## ## ## ## ## * ######## ######## ## ## ## ###### ###### ###### * ## ## ## ## ## ## ## ## ## * ## ## ## ## ## ## ## ## ## ## ## ## * ## ## ## ####### ###### ######## ###### ###### */ /** * Run our main processing queue untill there is nothing that we can do * old name: processAcceptedTxQueue * @param firstTime * @returns */ processTransactions(firstTime?: boolean): Promise; private setTXExpired; private setTxAlmostExpired; getArchiverReceiptFromQueueEntry(queueEntry: QueueEntry): Promise; addOriginalTxDataToForward(queueEntry: QueueEntry): void; addReceiptToForward(queueEntry: QueueEntry, debugString?: string): Promise; getReceiptsToForward(): ArchiverReceipt[]; requestFinalData(queueEntry: QueueEntry, accountIds: string[], nodesToAskKeys?: string[] | null, includeAppReceiptData?: boolean): Promise; requestInitialData(queueEntry: QueueEntry, accountIds: string[]): Promise; resetReceiptsToForward(): void; /** * processQueue_accountSeen * Helper for processQueue to detect if this queueEntry has any accounts that are already blocked because they were seen upstream * a seen account is a an account that is involved in a TX that is upstream(older) in the queue * @param seenAccounts * @param queueEntry */ processQueue_accountSeen(seenAccounts: SeenAccounts, queueEntry: QueueEntry): boolean; processQueue_getUpstreamTx(seenAccounts: SeenAccounts, queueEntry: QueueEntry): QueueEntry | null; /** * processQueue_markAccountsSeen * Helper for processQueue to mark accounts as seen. * note only operates on writeable accounts. a read only account should not block downstream operations * a seen account is a an account that is involved in a TX that is upstream(older) in the queue * @param seenAccounts * @param queueEntry */ processQueue_markAccountsSeen(seenAccounts: SeenAccounts, queueEntry: QueueEntry): void; processQueue_accountSeen2(seenAccounts: SeenAccounts, queueEntry: QueueEntry): boolean; processQueue_markAccountsSeen2(seenAccounts: SeenAccounts, queueEntry: QueueEntry): void; /** * processQueue_clearAccountsSeen * Helper for processQueue to clear accounts that were marked as seen. * a seen account is a an account that is involved in a TX that is upstream(older) in the queue * @param seenAccounts * @param queueEntry */ processQueue_clearAccountsSeen(seenAccounts: SeenAccounts, queueEntry: QueueEntry): void; /** * Helper for processQueue to dump debug info * @param queueEntry * @param app */ processQueue_debugAccountData(queueEntry: QueueEntry, app: Shardus.App): string; /** * txWillChangeLocalData * This is a just in time check to see if a TX will modify any local accounts managed by this node. * Not longer used. candidate for deprecation, but this may be useful in some logging/analysis later * * @param queueEntry */ txWillChangeLocalData(queueEntry: QueueEntry): boolean; /** * This is a new function. It must be called before calling dapp.apply(). * The purpose is to do a last minute test to make sure that no involved accounts have a * timestamp newer than our transaction timestamp. * If they do have a newer timestamp we must fail the TX and vote for a TX fail receipt. */ checkAccountTimestamps(queueEntry: QueueEntry): boolean; /** * Computes a sieve time for a TX. This is a deterministic time bonus that is given so that * when TXs older than time M2 are getting culled due waiting on an older upstream TX * that we thin the list between time M2 and M2.5. The idea is that this thinning * makes next in line TXs that are close to M2.5 more rare. Node processing loops are not time synced with * each other at a real time level so different nodes may resolve TX as slightly different times. * This method hopes to improve the probablity nodes choose the same TX to work on after a TX has timed out. * TXs may time out if they are prepetually too old. Simply cutting off the younger TXs as an earlier time when blocked * such as time = M2 is not good enough because there is too much time jitter between nodes are working on that part of the list * this was causing nodes to all pick different TXs to work on. When nodes pick differnt TXs the are all likely to just * age out to time M3 without ever getting enough votes. The could happen at even 5tps for the same contract. * The time sieve helps this situation. * * At high TPS per single problems there are still issues recovering. extraRare is feature designed to help * give scores in the top 1% an extra second of queue life. hopefully if the list is thrahsing badly due to too many TXs * and nodes having bad luck picking the next one to work on that this extra second will give a better chance that everything syncs * back up. This may not bee enough yet. but probably good for 1.2 refresh 1. The downside to extra rare is that it makes the * tx only about 2 seconds away from the hard M3 timeout. But this is a rare event also, so if it backfires then the impact should also be * low. There may even be a chance that this would still help nodes sync up a bit. * @param queueEntry */ computeTxSieveTime(queueEntry: QueueEntry): void; updateSimpleStatsObject(statsObj: { [statName: string]: SimpleNumberStats; }, statName: string, duration: number): void; finalizeSimpleStatsObject(statsObj: { [statName: string]: SimpleNumberStats; }): void; getConsenusGroupForAccount(accountID: string): Shardus.Node[]; getRandomConsensusNodeForAccount(accountID: string, excludeNodeIds?: string[]): Shardus.Node; getStorageGroupForAccount(accountID: string): Shardus.Node[]; isAccountRemote(accountID: string): boolean; /** count the number of queue entries that will potentially execute on this node */ getExecuteQueueLength(): number; getAccountQueueCount(accountID: string, remote?: boolean): QueueCountsResult; isAccountInQueue(accountID: string, remote?: boolean): boolean; /** * call this to test if the processing queue is stuck. * currently this may return false positives if the queue is not stuck but is just slow * autoUnstickProcessing will attempt to fix the stuck processing if set to true */ checkForStuckProcessing(): void; /** * This is called when we detect that the processing queue is stuck */ onProcesssingQueueStuck(): void; getDebugStuckTxs(opts: any): unknown; getDebugProccessingStatus(): unknown; clearStuckProcessingDebugVars(): void; /** * Used to unblock and restart the processing queue if it gets stuck * @param clearPendingTransactions if true, will clear the pending transaction queue. if false, will leave it alone */ fixStuckProcessing(clearPendingTransactions: boolean): void; setDebugLastAwaitedCall(label: string, complete?: DebugComplete): void; setDebugLastAwaitedCallInner(label: string, complete?: DebugComplete): void; setDebugSetLastAppAwait(label: string, complete?: DebugComplete): void; addressCountInQueue(address: string, limit: number): number; clearQueueItems(minAge: number): number; getQueueItems(): any[]; getQueueItemById(txId: string): any; getDebugQueueInfo(queueEntry: QueueEntry): any; removeTxFromArchivedQueue(txId: string): void; updateTxState(queueEntry: QueueEntry, nextState: string, context?: string): void; txDebugMarkStartTime(queueEntry: QueueEntry, state: string): void; txDebugMarkEndTime(queueEntry: QueueEntry, state: string): void; clearDebugAwaitStrings(): void; getQueueLengthBuckets(): any; } export default TransactionQueue;