import {phase0, Slot, allForks, RootHex} from "@lodestar/types"; import {IForkChoice} from "@lodestar/fork-choice"; import {CachedBeaconStateAllForks, computeEpochAtSlot} from "@lodestar/state-transition"; import {toHexString} from "@chainsafe/ssz"; import {CheckpointStateCache, StateContextCache, toCheckpointHex} from "../stateCache/index.js"; import {IMetrics} from "../../metrics/index.js"; import {JobItemQueue} from "../../util/queue/index.js"; import {IStateRegenerator, RegenCaller, RegenFnName} from "./interface.js"; import {StateRegenerator, RegenModules} from "./regen.js"; import {RegenError, RegenErrorCode} from "./errors.js"; const REGEN_QUEUE_MAX_LEN = 256; type QueuedStateRegeneratorModules = RegenModules & { signal: AbortSignal; }; type RegenRequestKey = keyof IStateRegenerator; type RegenRequestByKey = {[K in RegenRequestKey]: {key: K; args: Parameters}}; export type RegenRequest = RegenRequestByKey[RegenRequestKey]; /** * Regenerates states that have already been processed by the fork choice * * All requests are queued so that only a single state at a time may be regenerated at a time */ export class QueuedStateRegenerator implements IStateRegenerator { readonly jobQueue: JobItemQueue<[RegenRequest], CachedBeaconStateAllForks>; private regen: StateRegenerator; private forkChoice: IForkChoice; private stateCache: StateContextCache; private checkpointStateCache: CheckpointStateCache; private metrics: IMetrics | null; constructor(modules: QueuedStateRegeneratorModules) { this.regen = new StateRegenerator(modules); this.jobQueue = new JobItemQueue<[RegenRequest], CachedBeaconStateAllForks>( this.jobQueueProcessor, {maxLength: REGEN_QUEUE_MAX_LEN, signal: modules.signal}, modules.metrics ? modules.metrics.regenQueue : undefined ); this.forkChoice = modules.forkChoice; this.stateCache = modules.stateCache; this.checkpointStateCache = modules.checkpointStateCache; this.metrics = modules.metrics; } /** * Get the state to run with `block`. * - State after `block.parentRoot` dialed forward to block.slot */ async getPreState(block: allForks.BeaconBlock, rCaller: RegenCaller): Promise { this.metrics?.regenFnCallTotal.inc({caller: rCaller, entrypoint: RegenFnName.getPreState}); // First attempt to fetch the state from caches before queueing const parentRoot = toHexString(block.parentRoot); const parentBlock = this.forkChoice.getBlockHex(parentRoot); if (!parentBlock) { throw new RegenError({ code: RegenErrorCode.BLOCK_NOT_IN_FORKCHOICE, blockRoot: block.parentRoot, }); } const parentEpoch = computeEpochAtSlot(parentBlock.slot); const blockEpoch = computeEpochAtSlot(block.slot); // Check the checkpoint cache (if the pre-state is a checkpoint state) if (parentEpoch < blockEpoch) { const checkpointState = this.checkpointStateCache.getLatest(parentRoot, blockEpoch); if (checkpointState && computeEpochAtSlot(checkpointState.slot) === blockEpoch) { // TODO: Miss-use of checkpointStateCache here return checkpointState; // console.error({ // "checkpointState.slot": checkpointState.slot, // "block.slot": block.slot, // blockEpoch, // blockEpochStartSlot: computeStartSlotAtEpoch(blockEpoch), // }); } } // Check the state cache, only if the state doesn't need to go through an epoch transition. // Otherwise the state transition may not be cached and wasted. Queue for regen since the // work required will still be significant. if (parentEpoch === blockEpoch) { const state = this.stateCache.get(parentBlock.stateRoot); if (state) { return state; } } // The state is not immediately available in the caches, enqueue the job this.metrics?.regenFnQueuedTotal.inc({caller: rCaller, entrypoint: RegenFnName.getPreState}); return this.jobQueue.push({key: "getPreState", args: [block, rCaller]}); } async getCheckpointState(cp: phase0.Checkpoint, rCaller: RegenCaller): Promise { this.metrics?.regenFnCallTotal.inc({caller: rCaller, entrypoint: RegenFnName.getCheckpointState}); // First attempt to fetch the state from cache before queueing const checkpointState = this.checkpointStateCache.get(toCheckpointHex(cp)); if (checkpointState) { return checkpointState; } // The state is not immediately available in the caches, enqueue the job this.metrics?.regenFnQueuedTotal.inc({caller: rCaller, entrypoint: RegenFnName.getCheckpointState}); return this.jobQueue.push({key: "getCheckpointState", args: [cp, rCaller]}); } async getBlockSlotState(blockRoot: RootHex, slot: Slot, rCaller: RegenCaller): Promise { this.metrics?.regenFnCallTotal.inc({caller: rCaller, entrypoint: RegenFnName.getBlockSlotState}); // The state is not immediately available in the caches, enqueue the job return this.jobQueue.push({key: "getBlockSlotState", args: [blockRoot, slot, rCaller]}); } async getState(stateRoot: RootHex, rCaller: RegenCaller): Promise { this.metrics?.regenFnCallTotal.inc({caller: rCaller, entrypoint: RegenFnName.getState}); // First attempt to fetch the state from cache before queueing const state = this.stateCache.get(stateRoot); if (state) { return state; } // The state is not immediately available in the cache, enqueue the job this.metrics?.regenFnQueuedTotal.inc({caller: rCaller, entrypoint: RegenFnName.getState}); return this.jobQueue.push({key: "getState", args: [stateRoot, rCaller]}); } private jobQueueProcessor = async (regenRequest: RegenRequest): Promise => { const metricsLabels = { caller: regenRequest.args[regenRequest.args.length - 1] as RegenCaller, entrypoint: regenRequest.key, }; let timer; try { timer = this.metrics?.regenFnCallDuration.startTimer(metricsLabels); switch (regenRequest.key) { case "getPreState": return await this.regen.getPreState(...regenRequest.args); case "getCheckpointState": return await this.regen.getCheckpointState(...regenRequest.args); case "getBlockSlotState": return await this.regen.getBlockSlotState(...regenRequest.args); case "getState": return await this.regen.getState(...regenRequest.args); } } catch (e) { this.metrics?.regenFnTotalErrors.inc(metricsLabels); throw e; } finally { if (timer) timer(); } }; }