import sinon from "sinon"; import {expect} from "chai"; import {createIBeaconConfig, createIChainForkConfig, defaultChainConfig} from "@lodestar/config"; import {capella, altair, phase0, ssz} from "@lodestar/types"; import {sleep} from "@lodestar/utils"; import {computeStartSlotAtEpoch} from "@lodestar/state-transition"; import {getReqRespHandlers, Network} from "../../../src/network/index.js"; import {defaultNetworkOptions, INetworkOptions} from "../../../src/network/options.js"; import {GossipType, GossipHandlers} from "../../../src/network/gossip/index.js"; import {MockBeaconChain, zeroProtoBlock} from "../../utils/mocks/chain/chain.js"; import {createNode} from "../../utils/network.js"; import {generateState} from "../../utils/state.js"; import {StubbedBeaconDb} from "../../utils/stub/index.js"; import {connect, onPeerConnect} from "../../utils/network.js"; import {testLogger} from "../../utils/logger.js"; const multiaddr = "/ip4/127.0.0.1/tcp/0"; const opts: INetworkOptions = { ...defaultNetworkOptions, maxPeers: 1, targetPeers: 1, bootMultiaddrs: [], localMultiaddrs: [], discv5FirstQueryDelayMs: 0, discv5: null, }; // Schedule all forks at ALTAIR_FORK_EPOCH to avoid generating the pubkeys cache /* eslint-disable @typescript-eslint/naming-convention */ const config = createIChainForkConfig({ ...defaultChainConfig, ALTAIR_FORK_EPOCH: 1, BELLATRIX_FORK_EPOCH: 1, CAPELLA_FORK_EPOCH: 1, }); const START_SLOT = computeStartSlotAtEpoch(config.ALTAIR_FORK_EPOCH); describe("gossipsub", function () { if (this.timeout() < 15 * 1000) this.timeout(15 * 1000); this.retries(2); // This test fail sometimes, with a 5% rate. const logger = testLogger(); const afterEachCallbacks: (() => Promise | void)[] = []; afterEach(async () => { while (afterEachCallbacks.length > 0) { const callback = afterEachCallbacks.pop(); if (callback) await callback(); } }); // eslint-disable-next-line @typescript-eslint/explicit-function-return-type async function mockModules(gossipHandlersPartial?: Partial) { const controller = new AbortController(); const block = ssz.phase0.SignedBeaconBlock.defaultValue(); const state = generateState({ finalizedCheckpoint: { epoch: 0, root: ssz.phase0.BeaconBlock.hashTreeRoot(block.message), }, }); const beaconConfig = createIBeaconConfig(config, state.genesisValidatorsRoot); const chain = new MockBeaconChain({ genesisTime: 0, chainId: 0, networkId: BigInt(0), state, config: beaconConfig, }); chain.forkChoice.getHead = () => { return { ...zeroProtoBlock, slot: START_SLOT, }; }; const db = new StubbedBeaconDb(config); const reqRespHandlers = getReqRespHandlers({db, chain}); const gossipHandlers = gossipHandlersPartial as GossipHandlers; const [libp2pA, libp2pB] = await Promise.all([createNode(multiaddr), createNode(multiaddr)]); const loggerA = testLogger("A"); const loggerB = testLogger("B"); const modules = { config: beaconConfig, chain, db, reqRespHandlers, gossipHandlers, signal: controller.signal, metrics: null, }; const netA = new Network(opts, {...modules, libp2p: libp2pA, logger: loggerA}); const netB = new Network(opts, {...modules, libp2p: libp2pB, logger: loggerB}); await Promise.all([netA.start(), netB.start()]); afterEachCallbacks.push(async () => { await chain.close(); controller.abort(); await Promise.all([netA.stop(), netB.stop()]); sinon.restore(); }); return {netA, netB, chain, controller}; } it("Publish and receive a voluntaryExit", async function () { let onVoluntaryExit: (ve: phase0.SignedVoluntaryExit) => void; const onVoluntaryExitPromise = new Promise((resolve) => (onVoluntaryExit = resolve)); const {netA, netB, controller} = await mockModules({ [GossipType.voluntary_exit]: async (voluntaryExit) => { onVoluntaryExit(voluntaryExit); }, }); await Promise.all([onPeerConnect(netA), onPeerConnect(netB), connect(netA, netB.peerId, netB.localMultiaddrs)]); expect(Array.from(netA.getConnectionsByPeer().values()).length).to.equal(1); expect(Array.from(netB.getConnectionsByPeer().values()).length).to.equal(1); netA.subscribeGossipCoreTopics(); netB.subscribeGossipCoreTopics(); // Wait to have a peer connected to a topic while (!controller.signal.aborted) { await sleep(500); const topicStr = netA.gossip.getTopics()[0]; if (topicStr && netA.gossip.getMeshPeers(topicStr).length > 0) { break; } } const voluntaryExit = ssz.phase0.SignedVoluntaryExit.defaultValue(); await netA.gossip.publishVoluntaryExit(voluntaryExit); const receivedVoluntaryExit = await onVoluntaryExitPromise; expect(receivedVoluntaryExit).to.deep.equal(voluntaryExit); }); it("Publish and receive 1000 voluntaryExits", async function () { const receivedVoluntaryExits: phase0.SignedVoluntaryExit[] = []; const {netA, netB, controller} = await mockModules({ [GossipType.voluntary_exit]: async (voluntaryExit) => { receivedVoluntaryExits.push(voluntaryExit); }, }); await Promise.all([onPeerConnect(netA), onPeerConnect(netB), connect(netA, netB.peerId, netB.localMultiaddrs)]); expect(Array.from(netA.getConnectionsByPeer().values()).length).to.equal(1); expect(Array.from(netB.getConnectionsByPeer().values()).length).to.equal(1); netA.subscribeGossipCoreTopics(); netB.subscribeGossipCoreTopics(); // Wait to have a peer connected to a topic while (!controller.signal.aborted) { await sleep(500); const topicStr = netA.gossip.getTopics()[0]; if (topicStr && netA.gossip.getMeshPeers(topicStr).length > 0) { break; } } const msgCount = 1000; for (let i = 0; i < msgCount; i++) { const voluntaryExit = ssz.phase0.SignedVoluntaryExit.defaultValue(); voluntaryExit.message.epoch = i; netA.gossip.publishVoluntaryExit(voluntaryExit).catch((e: Error) => { logger.error("Error on publishVoluntaryExit", {}, e); }); } // Wait to receive all the messages. A timeout error will happen otherwise while (!controller.signal.aborted) { await sleep(500); if (receivedVoluntaryExits.length >= msgCount) { break; } } }); it("Publish and receive a blsToExecutionChange", async function () { let onBlsToExecutionChange: (blsToExec: capella.SignedBLSToExecutionChange) => void; const onBlsToExecutionChangePromise = new Promise( (resolve) => (onBlsToExecutionChange = resolve) ); const {netA, netB, controller} = await mockModules({ [GossipType.bls_to_execution_change]: async (blsToExec) => { onBlsToExecutionChange(blsToExec); }, }); await Promise.all([onPeerConnect(netA), onPeerConnect(netB), connect(netA, netB.peerId, netB.localMultiaddrs)]); expect(Array.from(netA.getConnectionsByPeer().values()).length).to.equal(1); expect(Array.from(netB.getConnectionsByPeer().values()).length).to.equal(1); netA.subscribeGossipCoreTopics(); netB.subscribeGossipCoreTopics(); // Wait to have a peer connected to a topic while (!controller.signal.aborted) { await sleep(500); const topicStr = netA.gossip.getTopics()[0]; if (topicStr && netA.gossip.getMeshPeers(topicStr).length > 0) { break; } } const blsToExec = ssz.capella.SignedBLSToExecutionChange.defaultValue(); await netA.gossip.publishBlsToExecutionChange(blsToExec); const receivedblsToExec = await onBlsToExecutionChangePromise; expect(receivedblsToExec).to.deep.equal(blsToExec); }); it("Publish and receive a LightClientOptimisticUpdate", async function () { let onLightClientOptimisticUpdate: (ou: altair.LightClientOptimisticUpdate) => void; const onLightClientOptimisticUpdatePromise = new Promise( (resolve) => (onLightClientOptimisticUpdate = resolve) ); const {netA, netB, controller} = await mockModules({ [GossipType.light_client_optimistic_update]: async (lightClientOptimisticUpdate) => { onLightClientOptimisticUpdate(lightClientOptimisticUpdate); }, }); await Promise.all([onPeerConnect(netA), onPeerConnect(netB), connect(netA, netB.peerId, netB.localMultiaddrs)]); expect(Array.from(netA.getConnectionsByPeer().values()).length).to.equal(1); expect(Array.from(netB.getConnectionsByPeer().values()).length).to.equal(1); netA.subscribeGossipCoreTopics(); netB.subscribeGossipCoreTopics(); // Wait to have a peer connected to a topic while (!controller.signal.aborted) { await sleep(500); const topicStr = netA.gossip.getTopics()[0]; if (topicStr && netA.gossip.getMeshPeers(topicStr).length > 0) { break; } } const lightClientOptimisticUpdate = ssz.altair.LightClientOptimisticUpdate.defaultValue(); lightClientOptimisticUpdate.signatureSlot = START_SLOT; await netA.gossip.publishLightClientOptimisticUpdate(lightClientOptimisticUpdate); const optimisticUpdate = await onLightClientOptimisticUpdatePromise; expect(optimisticUpdate).to.deep.equal(lightClientOptimisticUpdate); }); it("Publish and receive a LightClientFinalityUpdate", async function () { let onLightClientFinalityUpdate: (fu: altair.LightClientFinalityUpdate) => void; const onLightClientFinalityUpdatePromise = new Promise( (resolve) => (onLightClientFinalityUpdate = resolve) ); const {netA, netB, controller} = await mockModules({ [GossipType.light_client_finality_update]: async (lightClientFinalityUpdate) => { onLightClientFinalityUpdate(lightClientFinalityUpdate); }, }); await Promise.all([onPeerConnect(netA), onPeerConnect(netB), connect(netA, netB.peerId, netB.localMultiaddrs)]); expect(Array.from(netA.getConnectionsByPeer().values()).length).to.equal(1); expect(Array.from(netB.getConnectionsByPeer().values()).length).to.equal(1); netA.subscribeGossipCoreTopics(); netB.subscribeGossipCoreTopics(); // Wait to have a peer connected to a topic while (!controller.signal.aborted) { await sleep(500); const topicStr = netA.gossip.getTopics()[0]; if (topicStr && netA.gossip.getMeshPeers(topicStr).length > 0) { break; } } const lightClientFinalityUpdate = ssz.altair.LightClientFinalityUpdate.defaultValue(); lightClientFinalityUpdate.signatureSlot = START_SLOT; await netA.gossip.publishLightClientFinalityUpdate(lightClientFinalityUpdate); const optimisticUpdate = await onLightClientFinalityUpdatePromise; expect(optimisticUpdate).to.deep.equal(lightClientFinalityUpdate); }); });