import { peerIdFromCID } from '@libp2p/peer-id' import { uriToMultiaddr } from '@multiformats/uri-to-multiaddr' import { base64 } from 'multiformats/bases/base64' import { CID } from 'multiformats/cid' import { identity } from 'multiformats/hashes/identity' import { CustomProgressEvent } from 'progress-events' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { DEFAULT_MAX_SIZE } from './index.ts' import { limitedResponse } from './utils.ts' import type { BlockBrokerConnectedProgressEvent, BlockBrokerConnectProgressEvent, BlockBrokerGetBlockProgressEvents, BlockBrokerReceiveBlockProgressEvent, BlockBrokerRequestBlockProgressEvent } from '@helia/interface' import type { ComponentLogger, Logger, PeerId } from '@libp2p/interface' import type { ProgressOptions } from 'progress-events' const TRANSPORT_IPFS_GATEWAY_HTTP_CODE = 0x0920 export interface TrustlessGatewayStats { attempts: number errors: number invalidBlocks: number successes: number pendingResponses?: number } export interface TransformRequestInit { (defaultReqInit: RequestInit): Promise | RequestInit } export interface TrustlessGatewayComponents { logger: ComponentLogger transformRequestInit?: TransformRequestInit routing: string } export interface GetRawBlockOptions extends ProgressOptions { signal?: AbortSignal /** * The maximum number of bytes to allow when fetching a raw block. * * @default 2_097_152 (2MiB) */ maxSize?: number } /** * A `TrustlessGateway` keeps track of the number of attempts, errors, and * successes for a given gateway url so that we can prioritize gateways that * have been more reliable in the past, and ensure that requests are distributed * across all gateways within a given `TrustlessGatewayBlockBroker` instance. */ export class TrustlessGateway { public readonly url: URL private readonly peer: PeerId /** * The number of times this gateway has been attempted to be used to fetch a * block. This includes successful, errored, and aborted attempts. By counting * even aborted attempts, slow gateways that are out-raced by others will be * considered less reliable. */ #attempts = 0 /** * The number of times this gateway has errored while attempting to fetch a * block. This includes `response.ok === false` and any other errors that * throw while attempting to fetch a block. This does not include aborted * attempts. */ #errors = 0 /** * The number of times this gateway has returned an invalid block. A gateway * that returns the wrong blocks for a CID should be considered for removal * from the list of gateways to fetch blocks from. */ #invalidBlocks = 0 /** * The number of times this gateway has successfully fetched a block. */ #successes = 0 /** * A map of pending responses for this gateway. This is used to ensure that * only one request per CID is made to a given gateway at a time, and that we * don't make multiple in-flight requests for the same CID to the same gateway. */ readonly #pendingResponses = new Map>() private readonly log: Logger private readonly transformRequestInit?: TransformRequestInit public readonly routing: string constructor (url: URL | string, { logger, transformRequestInit, routing }: TrustlessGatewayComponents) { this.url = url instanceof URL ? url : new URL(url) this.transformRequestInit = transformRequestInit this.log = logger.forComponent(`helia:trustless-gateway-block-broker:${this.url.host}`) this.routing = routing this.peer = peerIdFromCID(CID.createV1(TRANSPORT_IPFS_GATEWAY_HTTP_CODE, identity.digest(uint8ArrayFromString(this.url.toString())))) } /** * This function returns a unique string for the multihash.bytes of the CID. * * Some useful resources for why this is needed can be found using the links below: * * - https://github.com/ipfs/helia/pull/503#discussion_r1572451331 * - https://github.com/ipfs/kubo/issues/6815 * - https://www.notion.so/pl-strflt/Handling-ambiguity-around-CIDs-9d5e14f6516f438980b01ef188efe15d#d9d45cd1ed8b4d349b96285de4aed5ab */ #uniqueBlockId (cid: CID): string { const multihashBytes = cid.multihash.bytes return base64.encode(multihashBytes) } /** * Fetch a raw block from `this.url` following the specification defined at * https://specs.ipfs.tech/http-gateways/trustless-gateway/ */ async getRawBlock (cid: CID, options: GetRawBlockOptions = {}): Promise { const gwUrl = new URL(this.url.toString()) gwUrl.pathname = `/ipfs/${cid.toString()}` const maxSize = options.maxSize ?? DEFAULT_MAX_SIZE // necessary as not every gateway supports dag-cbor, but every should support // sending raw block as-is gwUrl.search = '?format=raw' if (options.signal?.aborted === true) { throw new Error(`Signal to fetch raw block for CID ${cid} from gateway ${this.url} was aborted prior to fetch`) } const blockId = this.#uniqueBlockId(cid) // workaround for https://github.com/nodejs/node/issues/52635 const innerController = new AbortController() const abortInnerSignal = (): void => { innerController.abort() } options.signal?.addEventListener('abort', abortInnerSignal) try { let pendingResponse: Promise | undefined = this.#pendingResponses.get(blockId) if (pendingResponse == null) { this.#attempts++ const defaultReqInit: RequestInit = { signal: innerController.signal, headers: { Accept: 'application/vnd.ipld.raw' }, cache: 'force-cache' } const reqInit: RequestInit = this.transformRequestInit != null ? await this.transformRequestInit(defaultReqInit) : defaultReqInit const headers = new Headers(reqInit.headers) this.log(`sending request %s %s HTTP/1.1 %s `, reqInit.method ?? 'GET', gwUrl, [...headers.entries()].map(([key, value]) => `${key}: ${value}`).join('\n')) options.onProgress?.(new CustomProgressEvent('helia:block-broker:connect', { broker: 'trustless-gateway', type: 'connect', provider: this.peer, cid })) pendingResponse = fetch(gwUrl.toString(), reqInit).then(async (res) => { this.log(`received response HTTP/1.1 %d %s %s `, res.status, res.statusText, [...res.headers.entries()].map(([key, value]) => `${key}: ${value}`).join('\n')) if (!res.ok) { this.#errors++ throw new Error(`Unable to fetch raw block for CID ${cid} from gateway ${this.url}, received ${res.status} ${res.statusText}`) } options.onProgress?.(new CustomProgressEvent('helia:block-broker:connected', { broker: 'trustless-gateway', type: 'connected', provider: this.peer, address: uriToMultiaddr(gwUrl.toString()), cid })) options.onProgress?.(new CustomProgressEvent('helia:block-broker:request-block', { broker: 'trustless-gateway', type: 'request-block', provider: this.peer, cid })) // limited Response ensures the body is less than 2MiB (or configurable maxSize) // see https://github.com/ipfs/helia/issues/790 const body = await limitedResponse(res, maxSize, { signal: innerController.signal, log: this.log }) options.onProgress?.(new CustomProgressEvent('helia:block-broker:receive-block', { broker: 'trustless-gateway', type: 'receive-block', provider: this.peer, cid })) this.#successes++ return body }) this.#pendingResponses.set(blockId, pendingResponse) } return await pendingResponse } catch (cause: any) { // @ts-expect-error - TS thinks signal?.aborted can only be false now // because it was checked for true above. if (options.signal?.aborted === true) { throw new Error(`Fetching raw block for CID ${cid} from gateway ${this.url} was aborted`) } this.#errors++ throw new Error(`Unable to fetch raw block for CID ${cid} - ${cause.message}`) } finally { options.signal?.removeEventListener('abort', abortInnerSignal) this.#pendingResponses.delete(blockId) } } /** * Encapsulate the logic for determining whether a gateway is considered * reliable, for prioritization. This is based on the number of successful attempts made * and the number of errors encountered. * * Unused gateways have 100% reliability; They will be prioritized over * gateways with a 100% success rate to ensure that we attempt all gateways. */ reliability (): number { /** * if we have never tried to use this gateway, it is considered the most * reliable until we determine otherwise (prioritize unused gateways) */ if (this.#attempts === 0) { return 1 } if (this.#invalidBlocks > 0) { // this gateway may not be trustworthy.. return -Infinity } /** * We have attempted the gateway, so we need to calculate the reliability * based on the number of attempts, errors, and successes. Gateways that * return a single error should drop their reliability score more than a * single success increases it. * * Play around with the below reliability function at https://www.desmos.com/calculator/d6hfhf5ukm */ return this.#successes / (this.#attempts + (this.#errors * 3)) } /** * Increment the number of invalid blocks returned by this gateway. */ incrementInvalidBlocks (): void { this.#invalidBlocks++ } getStats (): TrustlessGatewayStats { return { attempts: this.#attempts, errors: this.#errors, invalidBlocks: this.#invalidBlocks, successes: this.#successes, pendingResponses: this.#pendingResponses.size } } toString (): string { return `TrustlessGateway(${this.url})` } }