/* (づ ̄ ³ ̄)づ IPFS node things. */ // Considered dev dependency as these imports will not be present in the resulting lib code import type { IPFS as IPFSCore } from "ipfs-core-types" import type { IPFSRepo } from "ipfs-repo" import type { PeerId } from "@libp2p/interface-peer-id" import * as keys from "@libp2p/interface-keys" import { multiaddr, Multiaddr } from "@multiformats/multiaddr" import { peerIdFromString } from "@libp2p/peer-id" import * as Storage from "../../../../components/storage/implementation.js" import * as t from "../../../../common/type-checks.js" import { IPFSPackage, Options as IPFSOptions } from "./package.js" import * as IpfsRepo from "./node/repo.js" // GLOBAL STATE const latestPeerTimeoutIds: { [ peer: string ]: null | ReturnType } = {} const isSafari = /^((?!chrome|android).)*safari/i.test(globalThis.navigator?.userAgent || "") // TYPES export type BackOff = { retryNumber: number lastBackoff: number currentBackoff: number } export type Dependencies = { storage: Storage.Implementation } export type Status = { connected: boolean lastConnectedAt: number | null latency: number | null } export type IPFS = IPFSCore & { libp2p: { ping(peerId: PeerId | Multiaddr): Promise } } // OPTIONS /** 🎛️ Connection interval knobs * * KEEP_ALIVE_INTERVAL: Interval to keep the connection alive when online * KEEP_TRYING_INTERVAL: Interval to keep trying the connection when offline * BACKOFF_INIT: Starting intervals for fibonacci backoff used when establishing a connection */ const KEEP_ALIVE_INTERVAL = 1 * 60 * 1000 // 1 minute const KEEP_TRYING_INTERVAL = 5 * 60 * 1000 // 5 minutes const BACKOFF_INIT = { retryNumber: 0, lastBackoff: 0, currentBackoff: 1000 } /** 🎛️ IPFS Options */ export const OPTIONS: IPFSOptions = { config: { Addresses: { Delegates: [] }, Bootstrap: [], Discovery: { webRTCStar: { Enabled: false } }, Pubsub: { Enabled: false } }, preload: { enabled: false, addresses: [] }, libp2p: { peerDiscovery: [], connectionManager: { autoDial: false } }, init: { algorithm: isSafari ? keys.RSA : undefined, emptyRepo: true }, } // 🚀 export async function createAndConnect( dependencies: Dependencies, pkg: IPFSPackage, peersUrl: string, repoName: string, logging: boolean ): Promise<{ ipfs: IPFSCore; repo: IPFSRepo }> { const peers = await listPeers(dependencies.storage, peersUrl) if (peers.length === 0) { throw new Error("💥 Couldn't start IPFS node, peer list is empty") } // Start an IPFS node & connect to all the peers const repo = IpfsRepo.create(repoName) const ipfs: IPFSCore = await pkg.create({ ...OPTIONS, repo }) peers.forEach(peer => { latestPeerTimeoutIds[ peer.toString() ] = null tryConnecting(ipfs as unknown as IPFS, peer, logging) }) // Try connecting when browser comes online globalThis.addEventListener("online", async () => { (await listPeers(dependencies.storage, peersUrl)) .filter(peer => { const peerStr = peer.toString() return !peerStr.includes("/localhost/") && !peerStr.includes("/127.0.0.1/") && !peerStr.includes("/0.0.0.0/") }) .forEach(peer => { tryConnecting(ipfs as unknown as IPFS, peer, logging) }) }) // Fin if (logging) console.log("🚀 Started IPFS node") return { ipfs, repo } } // PEERS // ----- export function fetchPeers(peersUrl: string): Promise { return fetch(peersUrl) .then(r => r.json()) .then(r => Array.isArray(r) ? r : []) .then(r => r.filter(p => t.isString(p) && p.includes("/wss/"))) .catch(() => { throw new Error("💥 Couldn't start IPFS node, failed to fetch peer list") }) } export async function listPeers( storage: Storage.Implementation, peersUrl: string ): Promise { let peers const storageKey = `ipfs-peers-${peersUrl}` const maybePeers = await storage.getItem(storageKey) if (t.isString(maybePeers) && maybePeers.trim() !== "") { peers = JSON.parse(maybePeers) fetchPeers(peersUrl).then(list => storage.setItem(storageKey, JSON.stringify(list)) ).catch(err => { // don't throw console.error(err) }) } else { peers = await fetchPeers(peersUrl) await storage.setItem(storageKey, JSON.stringify(peers)) } return peers.map(multiaddr) } // CONNECTIONS // ----------- function keepAlive(ipfs: IPFS, peer: Multiaddr, backoff: BackOff, status: Status): void { let timeoutId: ReturnType | null = null if (backoff.currentBackoff < KEEP_TRYING_INTERVAL) { // Start race between reconnect and ping timeoutId = setTimeout(() => reconnect(ipfs, peer, backoff, status), backoff.currentBackoff) } else { // Disregard backoff, but keep trying timeoutId = setTimeout(() => reconnect(ipfs, peer, backoff, status), KEEP_TRYING_INTERVAL) } // Track the latest reconnect attempt latestPeerTimeoutIds[ peer.toString() ] = timeoutId ping(ipfs, peer).then(({ latency }) => { const updatedStatus = { connected: true, lastConnectedAt: Date.now(), latency } report(peer, updatedStatus) // Cancel reconnect because ping won if (timeoutId) clearTimeout(timeoutId) // Keep alive after the latest ping-reconnect race, ignore the rest if (timeoutId === latestPeerTimeoutIds[ peer.toString() ]) { setTimeout(() => keepAlive(ipfs, peer, BACKOFF_INIT, updatedStatus), KEEP_ALIVE_INTERVAL) } }).catch(() => { // ignore errors }) } async function reconnect(ipfs: IPFS, peer: Multiaddr, backoff: BackOff, status: Status): Promise { const updatedStatus = { ...status, connected: false, latency: null } report(peer, updatedStatus) try { await ipfs.swarm.disconnect(peer) await ipfs.swarm.connect(peer) } catch { // No action needed, we will retry } if (backoff.currentBackoff < KEEP_TRYING_INTERVAL) { const nextBackoff = { retryNumber: backoff.retryNumber + 1, lastBackoff: backoff.currentBackoff, currentBackoff: backoff.lastBackoff + backoff.currentBackoff } keepAlive(ipfs, peer, nextBackoff, updatedStatus) } else { keepAlive(ipfs, peer, backoff, updatedStatus) } } export function tryConnecting(ipfs: IPFS, peer: Multiaddr, logging: boolean): void { ping(ipfs, peer).then(({ latency }) => { return ipfs.swarm .connect(peer, { timeout: 60 * 1000 }) .then(() => { if (logging) console.log(`🪐 Connected to ${peer}`) const status = { connected: true, lastConnectedAt: Date.now(), latency } report(peer, status) // Ensure permanent connection to a peer // NOTE: This is a temporary solution while we wait for // https://github.com/libp2p/js-libp2p/issues/744 // (see "Keep alive" bit) setTimeout(() => keepAlive(ipfs, peer, BACKOFF_INIT, status), KEEP_ALIVE_INTERVAL) }) }).catch(() => { if (logging) console.log(`🪓 Could not connect to ${peer}`) const status = { connected: false, lastConnectedAt: null, latency: null } report(peer, status) keepAlive(ipfs, peer, BACKOFF_INIT, status) }) } export async function ping(ipfs: IPFS, peer: Multiaddr): Promise<{ latency: number }> { return ipfs.libp2p.ping(peer).then(latency => ({ latency })) } // REPORTING // --------- let peerConnections: { peer: Multiaddr; status: Status }[] = [] let monitoringPeers = false function report(peer: Multiaddr, status: Status): void { peerConnections = peerConnections .filter(connection => connection.peer !== peer) .concat({ peer, status }) const offline = peerConnections.every(connection => !connection.status.connected) const lastConnectedAt: number = peerConnections.reduce((newest, { status }) => newest >= (status.lastConnectedAt || 0) ? newest : (status.lastConnectedAt || 0), 0 ) const activeConnections = peerConnections.filter(connection => connection.status.latency !== null) const averageLatency = activeConnections.length > 0 ? peerConnections.reduce((sum, connection) => sum + (connection.status.latency || 0), 0) / activeConnections.length : null if (monitoringPeers) { console.table(peerConnections) console.log("offline", offline) console.log("last connected at", lastConnectedAt === 0 ? null : lastConnectedAt) console.log("average latency", averageLatency) } } export async function monitorPeers() { monitoringPeers = true console.log("📡 Monitoring IPFS peers") } export function stopMonitoringPeers() { monitoringPeers = false } // 🔮 let monitor: ReturnType | null = null export async function monitorBitswap(dependencies: Dependencies, ipfs: IPFS, peersUrl: string, verbose: boolean): Promise { const cidCount: { [ k: string ]: number } = {} const seen: string[] = [] const peers = await listPeers(dependencies.storage, peersUrl) verbose = verbose === undefined ? false : true console.log("🕵️‍♀️ Monitoring IPFS bitswap requests") await stopMonitoringBitswap() monitor = setInterval(async () => { const peerList = peers peerList.map(async peer => { const peerIdString = peer.getPeerId() if (!peerIdString) return const peerId = peerIdFromString(peerIdString) const wantList = await ipfs.bitswap.wantlistForPeer(peerId, { timeout: 120 * 1000 }) wantList.forEach(async cid => { const c = cid.toString() const s = peer + "-" + c if (!seen.includes(s)) { const seenCid = !!cidCount[ c ] const emoji = seenCid ? "📡" : "🔮" const msg = `${emoji} Peer ${peer} requested CID ${c}` cidCount[ c ] = (cidCount[ c ] || 0) + 1 if (seenCid) { if (verbose) console.log(msg + ` (#${cidCount[ c ]})`) return } else { console.log(msg) } const start = performance.now() seen.push(s) const dag = await ipfs.dag.get(cid) const end = performance.now() const diff = end - start const loaded = `loaded locally in ${diff.toFixed(2)} ms` if (dag.value.Links) { console.log(`🧱 ${c} is a 👉 DAG structure (${loaded})`) ; (console.table || console.log)( dag.value.Links .map((l: unknown) => { if (t.isObject(l) && t.hasProp(l, "Name") && t.hasProp(l, "Hash")) { return { name: l.Name, cid: (l.Hash as unknown as string).toString() } } else { return null } }) .filter((a: { name: string; cid: string } | null) => a) ) } else { console.log(`📦 ${c} is 👉 Data (${loaded})`) console.log(dag.value) } } }) }) }, 20) } export async function stopMonitoringBitswap() { if (monitor) clearInterval(monitor) }