import { InvalidParametersError, NotStartedError, TimeoutError, TypedEventEmitter, UnsupportedProtocolError, setMaxListeners } from '@libp2p/interface' import { PeerQueue } from '@libp2p/utils' import drain from 'it-drain' import * as lp from 'it-length-prefixed' import map from 'it-map' import { pushable } from 'it-pushable' import take from 'it-take' import { CID } from 'multiformats/cid' import { CustomProgressEvent } from 'progress-events' import { raceEvent } from 'race-event' import { BITSWAP_120, DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_INCOMING_MESSAGE_SIZE, DEFAULT_MAX_OUTBOUND_STREAMS, DEFAULT_MAX_OUTGOING_MESSAGE_SIZE, DEFAULT_MAX_PROVIDERS_PER_REQUEST, DEFAULT_MESSAGE_RECEIVE_TIMEOUT, DEFAULT_MESSAGE_SEND_CONCURRENCY, DEFAULT_MESSAGE_SEND_TIMEOUT, DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS } from './constants.ts' import { BitswapMessage } from './pb/message.ts' import { mergeMessages } from './utils/merge-messages.ts' import { splitMessage } from './utils/split-message.ts' import type { WantOptions } from './bitswap.ts' import type { Block } from './pb/message.ts' import type { QueuedBitswapMessage } from './utils/bitswap-message.ts' import type { BlockBrokerGetBlockProgressEvents } from '@helia/interface' import type { Provider, Routing, RoutingFindProvidersProgressEvents } from '@helia/interface/routing' import type { Libp2p, AbortOptions, Connection, PeerId, Topology, ComponentLogger, IdentifyResult, Counter, Metrics, Stream, OpenConnectionProgressEvents, NewStreamProgressEvents } from '@libp2p/interface' import type { Logger } from '@libp2p/logger' import type { PeerQueueJobOptions } from '@libp2p/utils' import type { Multiaddr } from '@multiformats/multiaddr' import type { ProgressEvent, ProgressOptions } from 'progress-events' import type { Uint8ArrayList } from 'uint8arraylist' export interface BitswapProvider { /** * The type of provider */ type: 'bitswap' /** * the CID the provider can supply the block for */ cid: CID /** * The provider info */ provider: Provider /** * Which routing subsystem found the provider */ routing: string } export type BitswapNetworkProgressEvents = ProgressEvent<'bitswap:dial', PeerId | Multiaddr | Multiaddr[]> | OpenConnectionProgressEvents | NewStreamProgressEvents export type BitswapNetworkWantProgressEvents = ProgressEvent<'bitswap:send-wantlist', PeerId> | ProgressEvent<'bitswap:send-wantlist:error', { peer: PeerId, error: Error }> | ProgressEvent<'bitswap:find-providers', CID> | ProgressEvent<'bitswap:found-provider', BitswapProvider> | BitswapNetworkProgressEvents | RoutingFindProvidersProgressEvents | BlockBrokerGetBlockProgressEvents export type BitswapNetworkNotifyProgressEvents = BitswapNetworkProgressEvents | ProgressEvent<'bitswap:send-block', PeerId> export interface NetworkInit { maxInboundStreams?: number maxOutboundStreams?: number messageReceiveTimeout?: number messageSendTimeout?: number messageSendConcurrency?: number protocols?: string[] runOnLimitedConnections?: boolean maxOutgoingMessageSize?: number maxIncomingMessageSize?: number } export interface NetworkComponents { routing: Routing logger: ComponentLogger libp2p: Libp2p metrics?: Metrics } export interface BitswapMessageEventDetail { /** * @deprecated access the peer via connection.remotePeer instead */ peer: PeerId message: BitswapMessage connection: Connection } export interface NetworkEvents { 'bitswap:message': CustomEvent 'peer:connected': CustomEvent 'peer:disconnected': CustomEvent } interface SendMessageJobOptions extends AbortOptions, ProgressOptions, PeerQueueJobOptions { message: QueuedBitswapMessage } export class Network extends TypedEventEmitter { private readonly log: Logger private readonly libp2p: Libp2p private readonly routing: Routing private readonly protocols: string[] private running: boolean private readonly maxInboundStreams: number private readonly maxOutboundStreams: number private readonly messageReceiveTimeout: number private readonly messageSendTimeout: number private registrarIds: string[] private readonly metrics: { blocksSent?: Counter, dataSent?: Counter } private readonly sendQueue: PeerQueue private readonly runOnLimitedConnections: boolean private readonly maxOutgoingMessageSize: number private readonly maxIncomingMessageSize: number constructor (components: NetworkComponents, init: NetworkInit = {}) { super() this.log = components.logger.forComponent('helia:bitswap:network') this.libp2p = components.libp2p this.routing = components.routing this.protocols = init.protocols ?? [BITSWAP_120] this.registrarIds = [] this.running = false // bind event listeners this._onStream = this._onStream.bind(this) this.maxInboundStreams = init.maxInboundStreams ?? DEFAULT_MAX_INBOUND_STREAMS this.maxOutboundStreams = init.maxOutboundStreams ?? DEFAULT_MAX_OUTBOUND_STREAMS this.messageReceiveTimeout = init.messageReceiveTimeout ?? DEFAULT_MESSAGE_RECEIVE_TIMEOUT this.messageSendTimeout = init.messageSendTimeout ?? DEFAULT_MESSAGE_SEND_TIMEOUT this.runOnLimitedConnections = init.runOnLimitedConnections ?? DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS this.maxIncomingMessageSize = init.maxIncomingMessageSize ?? DEFAULT_MAX_OUTGOING_MESSAGE_SIZE this.maxOutgoingMessageSize = init.maxOutgoingMessageSize ?? init.maxIncomingMessageSize ?? DEFAULT_MAX_INCOMING_MESSAGE_SIZE this.metrics = { blocksSent: components.metrics?.registerCounter('helia_bitswap_sent_blocks_total'), dataSent: components.metrics?.registerCounter('helia_bitswap_sent_data_bytes_total') } this.sendQueue = new PeerQueue({ concurrency: init.messageSendConcurrency ?? DEFAULT_MESSAGE_SEND_CONCURRENCY, metrics: components.metrics, metricName: 'helia_bitswap_message_send_queue' }) } async start (): Promise { if (this.running) { return } this.running = true await this.libp2p.handle(this.protocols, this._onStream, { maxInboundStreams: this.maxInboundStreams, maxOutboundStreams: this.maxOutboundStreams, runOnLimitedConnection: this.runOnLimitedConnections }) // register protocol with topology const topology: Topology = { onConnect: (peerId: PeerId) => { this.safeDispatchEvent('peer:connected', { detail: peerId }) }, onDisconnect: (peerId: PeerId) => { this.safeDispatchEvent('peer:disconnected', { detail: peerId }) } } this.registrarIds = [] for (const protocol of this.protocols) { this.registrarIds.push(await this.libp2p.register(protocol, topology)) } // All existing connections are like new ones for us this.libp2p.getConnections().forEach(conn => { this.safeDispatchEvent('peer:connected', { detail: conn.remotePeer }) }) } async stop (): Promise { this.running = false // Unhandle both, libp2p doesn't care if it's not already handled await this.libp2p.unhandle(this.protocols) // unregister protocol and handlers if (this.registrarIds != null) { for (const id of this.registrarIds) { this.libp2p.unregister(id) } this.registrarIds = [] } } /** * Handles incoming bitswap messages */ _onStream (stream: Stream, connection: Connection): void { if (!this.running) { return } Promise.resolve().then(async () => { this.log('incoming new bitswap %s stream from %p', stream.protocol, connection.remotePeer) const abortListener = (): void => { if (stream.status === 'open') { stream.abort(new TimeoutError(`Incoming Bitswap stream timed out after ${this.messageReceiveTimeout}ms`)) } else { this.log('stream aborted with status %s', stream.status) } } let signal = AbortSignal.timeout(this.messageReceiveTimeout) setMaxListeners(Infinity, signal) signal.addEventListener('abort', abortListener) await stream.close({ signal }) const input = pushable() stream.addEventListener('message', (evt) => { input.push(evt.data) }) stream.addEventListener('remoteCloseWrite', () => { input.end() }) stream.addEventListener('close', (evt) => { if (evt.error != null) { input.end(evt.error) } }) for await (const data of lp.decode(input, { maxDataLength: this.maxIncomingMessageSize })) { try { const message = BitswapMessage.decode(data) this.log('incoming new bitswap %s message from %p on stream', stream.protocol, connection.remotePeer, stream.id) this.safeDispatchEvent('bitswap:message', { detail: { peer: connection.remotePeer, message, connection } }) // we have received some data so reset the timeout controller signal.removeEventListener('abort', abortListener) signal = AbortSignal.timeout(this.messageReceiveTimeout) setMaxListeners(Infinity, signal) signal.addEventListener('abort', abortListener) } catch (err: any) { this.log.error('error reading incoming bitswap message from %p on stream - %e', connection.remotePeer, stream.id, err) stream.abort(err) break } } }) .catch(err => { this.log.error('error handling incoming stream from %p - %e', connection.remotePeer, err) stream.abort(err) }) } /** * Find bitswap providers for a given `cid`. */ async * findProviders (cid: CID, options?: AbortOptions & ProgressOptions): AsyncIterable { options?.onProgress?.(new CustomProgressEvent('bitswap:find-providers', cid)) for await (const provider of this.routing.findProviders(cid, options)) { // make sure we can dial the provider const dialable = await this.libp2p.isDialable(provider.multiaddrs, { runOnLimitedConnection: this.runOnLimitedConnections }) if (!dialable) { this.log('skipping peer %p as they are not dialable - %a', provider.id, provider.multiaddrs) continue } options?.onProgress?.(new CustomProgressEvent('bitswap:found-provider', { type: 'bitswap', cid, provider, routing: provider.routing })) yield provider } } /** * Find the providers of a given `cid` and connect to them. */ async findAndConnect (cid: CID, options?: WantOptions): Promise { // connect to initial session providers if supplied if (options?.providers != null) { await Promise.all( options.providers.map(async prov => this.connectTo(prov) .catch(err => { this.log.error('could not connect to supplied provider - %e', err) })) ) } // make a routing query to find additional providers await drain( map( take(this.findProviders(cid, options), options?.maxProviders ?? DEFAULT_MAX_PROVIDERS_PER_REQUEST), async provider => this.connectTo(provider.id, options) ) ) .catch(err => { this.log.error(err) }) } /** * Connect to the specified peer and send the given message */ async sendMessage (peerId: PeerId, message: QueuedBitswapMessage, options?: AbortOptions & ProgressOptions): Promise { if (!this.running) { throw new Error('network isn\'t running') } const existingJob = this.sendQueue.queue.find(job => { return peerId.equals(job.options.peerId) && job.status === 'queued' }) if (existingJob != null) { existingJob.options.message = mergeMessages(existingJob.options.message, message) await existingJob.join(options) return } await this.sendQueue.add(async (options) => { const message = options.message if (message == null) { throw new InvalidParametersError('No message to send') } this.log('sendMessage to %p', peerId) options.onProgress?.(new CustomProgressEvent('bitswap:send-wantlist', peerId)) const stream = await this.libp2p.dialProtocol(peerId, BITSWAP_120, options) await stream.closeRead(options) try { for (const buf of splitMessage(message, this.maxOutgoingMessageSize)) { if (!stream.send(lp.encode.single(buf))) { await stream.onDrain(options) } } await stream.close(options) } catch (err: any) { this.log.error('error sending message to %p - %e', peerId, err) options?.onProgress?.(new CustomProgressEvent<{ peer: PeerId, error: Error }>('bitswap:send-wantlist:error', { peer: peerId, error: err })) stream.abort(err) } this._updateSentStats(message.blocks) }, { onProgress: options?.onProgress, peerId, signal: options?.signal ?? AbortSignal.timeout(this.messageSendTimeout), message }) } /** * Connects to another peer */ async connectTo (peer: PeerId | Multiaddr | Multiaddr[], options?: AbortOptions & ProgressOptions): Promise { if (!this.running) { throw new NotStartedError('Network isn\'t running') } options?.onProgress?.(new CustomProgressEvent('bitswap:dial', peer)) // dial and wait for identify - this is to avoid opening a protocol stream // that we are not going to use but depends on the remote node running the // identify protocol const [ connection ] = await Promise.all([ this.libp2p.dial(peer, options), raceEvent(this.libp2p, 'peer:identify', options?.signal, { filter: (evt: CustomEvent): boolean => { if (!evt.detail.peerId.equals(peer)) { return false } if (evt.detail.protocols.includes(BITSWAP_120)) { return true } throw new UnsupportedProtocolError(`${peer} did not support ${BITSWAP_120}`) } }) ]) return connection } _updateSentStats (blocks: Map): void { let bytes = 0 for (const block of blocks.values()) { bytes += block.data.byteLength } this.metrics.dataSent?.increment(bytes) this.metrics.blocksSent?.increment(blocks.size) } }