import { TypedEventEmitter, StrictSign, StrictNoSign, TopicValidatorResult, serviceCapabilities, serviceDependencies } from '@libp2p/interface'; import { type DecodeRPCLimits } from './message/decodeRpc.js'; import { RPC } from './message/rpc.js'; import { MessageCache } from './message-cache.js'; import { type MetricsRegister, type TopicStrToLabel } from './metrics.js'; import { PeerScore, type PeerScoreParams, type PeerScoreThresholds, type PeerScoreStatsDump } from './score/index.js'; import { InboundStream, OutboundStream } from './stream.js'; import { IWantTracer } from './tracer.js'; import { type MsgIdFn, type TopicStr, type MsgIdStr, type PeerIdStr, type FastMsgIdFn, type AddrInfo, type DataTransform, type MsgIdToStrFn, type PublishOpts } from './types.js'; import type { GossipsubOptsSpec } from './config.js'; import type { Direction, PeerId, PeerStore, Message, PublishResult, PubSub, PubSubEvents, PubSubInit, TopicValidatorFn, Logger, ComponentLogger, PrivateKey } from '@libp2p/interface'; import type { ConnectionManager, Registrar } from '@libp2p/interface-internal'; export declare const multicodec: string; export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit { /** if dial should fallback to floodsub */ fallbackToFloodsub: boolean; /** if self-published messages should be sent to all peers */ floodPublish: boolean; /** serialize message once and send to all peers without control messages */ batchPublish: boolean; /** whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted nodes. */ doPX: boolean; /** peers with which we will maintain direct connections */ directPeers: AddrInfo[]; /** * If true will not forward messages to mesh peers until reportMessageValidationResult() is called. * Messages will be cached in mcache for some time after which they are evicted. Calling * reportMessageValidationResult() after the message is dropped from mcache won't forward the message. */ asyncValidation: boolean; /** * Do not throw `PublishError.NoPeersSubscribedToTopic` error if there are no * peers listening on the topic. * * N.B. if you sent this option to true, and you publish a message on a topic * with no peers listening on that topic, no other network node will ever * receive the message. */ allowPublishToZeroTopicPeers: boolean; /** Do not throw `PublishError.Duplicate` if publishing duplicate messages */ ignoreDuplicatePublishError: boolean; /** For a single stream, await processing each RPC before processing the next */ awaitRpcHandler: boolean; /** For a single RPC, await processing each message before processing the next */ awaitRpcMessageHandler: boolean; /** message id function */ msgIdFn: MsgIdFn; /** fast message id function */ fastMsgIdFn: FastMsgIdFn; /** Uint8Array message id to string function */ msgIdToStrFn: MsgIdToStrFn; /** override the default MessageCache */ messageCache: MessageCache; /** peer score parameters */ scoreParams: Partial; /** peer score thresholds */ scoreThresholds: Partial; /** customize GossipsubIWantFollowupTime in order not to apply IWANT penalties */ gossipsubIWantFollowupMs: number; /** override constants for fine tuning */ prunePeers?: number; pruneBackoff?: number; unsubcribeBackoff?: number; graftFloodThreshold?: number; opportunisticGraftPeers?: number; opportunisticGraftTicks?: number; directConnectTicks?: number; dataTransform?: DataTransform; metricsRegister?: MetricsRegister | null; metricsTopicStrToLabel?: TopicStrToLabel; /** Prefix tag for debug logs */ debugName?: string; /** * Specify the maximum number of inbound gossipsub protocol * streams that are allowed to be open concurrently */ maxInboundStreams?: number; /** * Specify the maximum number of outbound gossipsub protocol * streams that are allowed to be open concurrently */ maxOutboundStreams?: number; /** * Pass true to run on limited connections - data or time-limited * connections that may be closed at any time such as circuit relay * connections. * * @default false */ runOnLimitedConnection?: boolean; /** * Specify max buffer size in bytes for OutboundStream. * If full it will throw and reject sending any more data. */ maxOutboundBufferSize?: number; /** * Specify max size to skip decoding messages whose data * section exceeds this size. * */ maxInboundDataLength?: number; /** * If provided, only allow topics in this list */ allowedTopics?: string[] | Set; /** * Limits to bound protobuf decoding */ decodeRpcLimits?: DecodeRPCLimits; /** * If true, will utilize the libp2p connection manager tagging system to prune/graft connections to peers, defaults to true */ tagMeshPeers: boolean; /** * Specify what percent of peers to send gossip to. If the percent results in * a number smaller than `Dlazy`, `Dlazy` will be used instead. * * It should be a number between 0 and 1, with a reasonable default of 0.25 */ gossipFactor: number; /** * The minimum message size in bytes to be considered for sending IDONTWANT messages * * @default 512 */ idontwantMinDataSize?: number; /** * The maximum number of IDONTWANT messages per heartbeat per peer * * @default 512 */ idontwantMaxMessages?: number; } export interface GossipsubMessage { propagationSource: PeerId; msgId: MsgIdStr; msg: Message; } export interface MeshPeer { peerId: string; topic: string; direction: Direction; } export interface GossipsubEvents extends PubSubEvents { 'gossipsub:heartbeat': CustomEvent; 'gossipsub:message': CustomEvent; 'gossipsub:graft': CustomEvent; 'gossipsub:prune': CustomEvent; } interface GossipOptions extends GossipsubOpts { scoreParams: PeerScoreParams; scoreThresholds: PeerScoreThresholds; } export interface GossipSubComponents { privateKey: PrivateKey; peerId: PeerId; peerStore: PeerStore; registrar: Registrar; connectionManager: ConnectionManager; logger: ComponentLogger; } export declare class GossipSub extends TypedEventEmitter implements PubSub { /** * The signature policy to follow by default */ readonly globalSignaturePolicy: typeof StrictSign | typeof StrictNoSign; multicodecs: string[]; private publishConfig; private readonly dataTransform; readonly peers: Map; readonly streamsInbound: Map; readonly streamsOutbound: Map; /** Ensures outbound streams are created sequentially */ private outboundInflightQueue; /** Direct peers */ readonly direct: Set; /** Floodsub peers */ private readonly floodsubPeers; /** Cache of seen messages */ private readonly seenCache; /** * Map of peer id and AcceptRequestWhileListEntry */ private readonly acceptFromWhitelist; /** * Map of topics to which peers are subscribed to */ private readonly topics; /** * List of our subscriptions */ private readonly subscriptions; /** * Map of topic meshes * topic => peer id set */ readonly mesh: Map>; /** * Map of topics to set of peers. These mesh peers are the ones to which we are publishing without a topic membership * topic => peer id set */ readonly fanout: Map>; /** * Map of last publish time for fanout topics * topic => last publish time */ private readonly fanoutLastpub; /** * Map of pending messages to gossip * peer id => control messages */ readonly gossip: Map; /** * Map of control messages * peer id => control message */ readonly control: Map; /** * Number of IHAVEs received from peer in the last heartbeat */ private readonly peerhave; /** Number of messages we have asked from peer in the last heartbeat */ private readonly iasked; /** Prune backoff map */ private readonly backoff; /** * Connection direction cache, marks peers with outbound connections * peer id => direction */ private readonly outbound; private readonly msgIdFn; /** * A fast message id function used for internal message de-duplication */ private readonly fastMsgIdFn; private readonly msgIdToStrFn; /** Maps fast message-id to canonical message-id */ private readonly fastMsgIdCache; /** * Short term cache for published message ids. This is used for penalizing peers sending * our own messages back if the messages are anonymous or use a random author. */ private readonly publishedMessageIds; /** * A message cache that contains the messages for last few heartbeat ticks */ private readonly mcache; /** Peer score tracking */ readonly score: PeerScore; /** * Custom validator function per topic. * Must return or resolve quickly (< 100ms) to prevent causing penalties for late messages. * If you need to apply validation that may require longer times use `asyncValidation` option and callback the * validation result through `Gossipsub.reportValidationResult` */ readonly topicValidators: Map; /** * Make this protected so child class may want to redirect to its own log. */ protected readonly log: Logger; /** * Number of heartbeats since the beginning of time * This allows us to amortize some resource cleanup -- eg: backoff cleanup */ private heartbeatTicks; /** * Tracks IHAVE/IWANT promises broken by peers */ readonly gossipTracer: IWantTracer; /** * Tracks IDONTWANT messages received by peers in the current heartbeat */ private readonly idontwantCounts; /** * Tracks IDONTWANT messages received by peers and the heartbeat they were received in * * idontwants are stored for `mcacheLength` heartbeats before being pruned, * so this map is bounded by peerCount * idontwantMaxMessages * mcacheLength */ private readonly idontwants; private readonly components; private directPeerInitial; static multicodec: string; readonly opts: Required; private readonly decodeRpcLimits; private readonly metrics; private status; private readonly maxInboundStreams?; private readonly maxOutboundStreams?; private readonly runOnLimitedConnection?; private readonly allowedTopics; private heartbeatTimer; constructor(components: GossipSubComponents, options?: Partial); readonly [Symbol.toStringTag] = "@chainsafe/libp2p-gossipsub"; readonly [serviceCapabilities]: string[]; readonly [serviceDependencies]: string[]; getPeers(): PeerId[]; isStarted(): boolean; /** * Mounts the gossipsub protocol onto the libp2p node and sends our * our subscriptions to every peer connected */ start(): Promise; /** * Unmounts the gossipsub protocol and shuts down every connection */ stop(): Promise; /** FOR DEBUG ONLY - Dump peer stats for all peers. Data is cloned, safe to mutate */ dumpPeerScoreStats(): PeerScoreStatsDump; /** * On an inbound stream opened */ private onIncomingStream; /** * Registrar notifies an established connection with pubsub protocol */ private onPeerConnected; /** * Registrar notifies a closing connection with pubsub protocol */ private onPeerDisconnected; private createOutboundStream; private createInboundStream; /** * Add a peer to the router */ private addPeer; /** * Removes a peer from the router */ private removePeer; get started(): boolean; /** * Get a the peer-ids in a topic mesh */ getMeshPeers(topic: TopicStr): PeerIdStr[]; /** * Get a list of the peer-ids that are subscribed to one topic. */ getSubscribers(topic: TopicStr): PeerId[]; /** * Get the list of topics which the peer is subscribed to. */ getTopics(): TopicStr[]; /** * Responsible for processing each RPC message received by other peers. */ private pipePeerReadStream; /** * Handle error when read stream pipe throws, less of the functional use but more * to for testing purposes to spy on the error handling * */ private handlePeerReadStreamError; /** * Handles an rpc request from a peer */ handleReceivedRpc(from: PeerId, rpc: RPC): Promise; /** * Handles a subscription change from a peer */ private handleReceivedSubscription; /** * Handles a newly received message from an RPC. * May forward to all peers in the mesh. */ private handleReceivedMessage; /** * Handles a newly received message from an RPC. * May forward to all peers in the mesh. */ private validateReceivedMessage; /** * Return score of a peer. */ getScore(peerId: PeerIdStr): number; /** * Send an rpc object to a peer with subscriptions */ private sendSubscriptions; /** * Handles an rpc control message from a peer */ private handleControlMessage; /** * Whether to accept a message from a peer */ acceptFrom(id: PeerIdStr): boolean; /** * Handles IHAVE messages */ private handleIHave; /** * Handles IWANT messages * Returns messages to send back to peer */ private handleIWant; /** * Handles Graft messages */ private handleGraft; /** * Handles Prune messages */ private handlePrune; private handleIdontwant; /** * Add standard backoff log for a peer in a topic */ private addBackoff; /** * Add backoff expiry interval for a peer in a topic * * @param id * @param topic * @param intervalMs - backoff duration in milliseconds */ private doAddBackoff; /** * Apply penalties from broken IHAVE/IWANT promises */ private applyIwantPenalties; /** * Clear expired backoff expiries */ private clearBackoff; /** * Maybe reconnect to direct peers */ private directConnect; /** * Maybe attempt connection given signed peer records */ private pxConnect; /** * Connect to a peer using the gossipsub protocol */ private connect; /** * Subscribes to a topic */ subscribe(topic: TopicStr): void; /** * Unsubscribe to a topic */ unsubscribe(topic: TopicStr): void; /** * Join topic */ private join; /** * Leave topic */ private leave; private selectPeersToForward; private selectPeersToPublish; /** * Forwards a message from our peers. * * For messages published by us (the app layer), this class uses `publish` */ private forwardMessage; /** * App layer publishes a message to peers, return number of peers this message is published to * Note: `async` due to crypto only if `StrictSign`, otherwise it's a sync fn. * * For messages not from us, this class uses `forwardMessage`. */ publish(topic: TopicStr, data: Uint8Array, opts?: PublishOpts): Promise; /** * Send the same data in batch to tosend list without considering cached control messages * This is not only faster but also avoid allocating memory for each peer * see https://github.com/ChainSafe/js-libp2p-gossipsub/issues/344 */ private sendRpcInBatch; /** * This function should be called when `asyncValidation` is `true` after * the message got validated by the caller. Messages are stored in the `mcache` and * validation is expected to be fast enough that the messages should still exist in the cache. * There are three possible validation outcomes and the outcome is given in acceptance. * * If acceptance = `MessageAcceptance.Accept` the message will get propagated to the * network. The `propagation_source` parameter indicates who the message was received by and * will not be forwarded back to that peer. * * If acceptance = `MessageAcceptance.Reject` the message will be deleted from the memcache * and the P₄ penalty will be applied to the `propagationSource`. * * If acceptance = `MessageAcceptance.Ignore` the message will be deleted from the memcache * but no P₄ penalty will be applied. * * This function will return true if the message was found in the cache and false if was not * in the cache anymore. * * This should only be called once per message. */ reportMessageValidationResult(msgId: MsgIdStr, propagationSource: PeerIdStr, acceptance: TopicValidatorResult): void; /** * Sends a GRAFT message to a peer */ private sendGraft; /** * Sends a PRUNE message to a peer */ private sendPrune; private sendIDontWants; /** * Send an rpc object to a peer */ private sendRpc; /** Mutates `outRpc` adding graft and prune control messages */ piggybackControl(id: PeerIdStr, outRpc: RPC, ctrl: RPC.ControlMessage): void; /** Mutates `outRpc` adding ihave control messages */ private piggybackGossip; /** * Send graft and prune messages * * @param tograft - peer id => topic[] * @param toprune - peer id => topic[] */ private sendGraftPrune; /** * Emits gossip - Send IHAVE messages to a random set of gossip peers */ private emitGossip; /** * Send gossip messages to GossipFactor peers above threshold with a minimum of D_lazy * Peers are randomly selected from the heartbeat which exclude mesh + fanout peers * We also exclude direct peers, as there is no reason to emit gossip to them * * @param topic * @param candidateToGossip - peers to gossip * @param messageIDs - message ids to gossip */ private doEmitGossip; /** * Flush gossip and control messages */ private flush; /** * Adds new IHAVE messages to pending gossip */ private pushGossip; /** * Make a PRUNE control message for a peer in a topic */ private makePrune; private readonly runHeartbeat; /** * Maintains the mesh and fanout maps in gossipsub. */ heartbeat(): Promise; /** * Given a topic, returns up to count peers subscribed to that topic * that pass an optional filter function * * @param topic * @param count * @param filter - a function to filter acceptable peers */ private getRandomGossipPeers; private onScrapeMetrics; private readonly tagMeshPeer; private readonly untagMeshPeer; } export declare function gossipsub(init?: Partial): (components: GossipSubComponents) => PubSub; export {}; //# sourceMappingURL=index.d.ts.map