import type { IpnsString } from '@wovin/core/applog' import type { DecodedCar } from '@wovin/core/ipfs' import type { Thread, WriteableThread } from '@wovin/core/thread' // import { getVM } from '../data/VMs/MappedVMbase' // import { ProviderVM } from '../data/VMs/ProviderVM' import type { ProviderVM } from '../data/VMs/ProviderVM' import { asyncIterableToArray, racePromises } from '@note3/utils' import { resolveIPNS as resolveIpnsFromGW /* retrieveCar */ } from '@wovin/connect-gateway' import { areCidsEqual, decodePubFromBlocks, encodeApplog } from '@wovin/core/ipfs' import { Logger } from 'besonders-logger' import { CID } from 'multiformats/cid' import { sanityCheckLogs } from '../data/block-utils-nowin' import { getBlockFromStore } from './block-store' import { getBlockStore } from './blockStore' import { getKuboGateway, hasKubo, kuboClient } from './kubo' import { storageState } from './storage' import { tryIntegrateApplogs } from './store-sync' import { getSyncWorker } from './worker-utils' const { WARN, LOG, DEBUG, VERBOSE, ERROR } = Logger.setup(Logger.DEBUG) export const retrievePubToThread = async function ( thread: WriteableThread, ipns: IpnsString, { lastCID, pinCID, subID }: { pinCID?: string, lastCID?: string, subID?: string } = {}, appThread: Thread, ) { DEBUG('Retrieving Applogs from', { ipns, lastCID, pinCID, thread }) let cid: CID const ts = new Date() if (pinCID) { cid = CID.parse(pinCID) } else { try { cid = pinCID ? CID.parse(pinCID) : (await resolveIPNS(ipns)) if (!cid) throw ERROR(`Could not resolve ipns: ${ipns}`) } catch (err) { throw ERROR(`Failed to resolve IPNS`, ipns, err) } } if (lastCID && areCidsEqual(cid.toV1().toString(), lastCID)) { LOG(`[retrieve] IPNS value === Last CID`, { lastCID, cid }) return { cid, didUpdate: false } } // const currentAppThread = withoutDeleted(lastWriteWins(appThread)) // const gatewayProviderIDs = getProviderIDs(['ipfs-gateway', 'ucan-store-proxy'], onlyFromAgent(currentAppThread, ag)) // const gateways = gatewayProviderIDs.map(id => { // const resultArr = filterAndMap(currentAppThread, { en: id, at: 'wovin/provider/url' }, 'vl') // if (resultArr.length !== 1) throw ERROR(`provider attr get length != 1`, id, resultArr, currentAppThread) // // const url = getVM(ProviderVM, id).url // return { url: resultArr[0] } // url.includes('{CID}') ? :`${urlFromHumanInput(url)}/ipfs/{CID}/?format=car&dag-scope=all` // }) const gateways = [...storageState.gateways] if (hasKubo()) { const kuboGW = getKuboGateway() if (!gateways.some(g => g.url === kuboGW.url)) { gateways.push(kuboGW as ProviderVM) // HACK kubo as GW } } let carBlocks: DecodedCar // if (isFileSystemAPIavailable()) { // const dirHandle = await agent.getDirectoryHandle() // if (dirHandle && await verifyPermission(dirHandle, true)) { // try { // const carFile = await getFile(dirHandle, `${ipns}/${cid}`) // if (carFile) car = await CarReader.fromBytes(new Uint8Array(await carFile.arrayBuffer())) // DEBUG({ dirHandle, carFile, car }) // } catch (error) { // WARN('car load from local', error) // } // } // } // TODO: load from kubo? but when? if (!carBlocks) { const pubFromStore = await getBlockFromStore(cid) if (pubFromStore) { DEBUG('pub from store situation') carBlocks = { rootCID: cid, blockStore: { get: async (cid) => { try { return await getBlockStore().get(cid) } catch (err) { const log = appThread.getApplog(cid) if (log) return encodeApplog(log).bytes // HACK: make BlockStoreish interface more flexible? throw err } }, }, } } } if (!carBlocks) { DEBUG('pub from car situation') if (!gateways.length) { if (window.location.host !== 'localhost') { WARN('No retrieval providers, scheduling reload...', storageState) setTimeout(() => { if (!storageState.gateways.length) { // check if still not fixed WARN('Still no retrieval providers, reloading...') window.location.reload() // HACK: to un-break UX as long as this bug is not fixed } }, 1000) } throw ERROR(`No retrieval providers (reload might fix it - or don't use private window / weird browser - in any case: tell manu)`) } // if (!gateways.length) throw ERROR(`No retrieval providers configured (Settings > Storage)`) const SyncWorker = getSyncWorker() // Use incremental fetch if we have lastCID, otherwise full CAR const stopAtCID = lastCID ? CID.parse(lastCID) : undefined const fetchResult = await SyncWorker.fetchSnapshotChainInWorker( cid, gateways.map(gw => ({ url: gw.url })), stopAtCID, ) DEBUG('fetchResult from SyncWorker', { snapshotCount: fetchResult.snapshotCount, blockCount: fetchResult.blocks.length }) // Reconstruct blockStore from serializable blocks array const blocksMap = new Map(fetchResult.blocks) carBlocks = { rootCID: cid, blockStore: { get: async (c: CID) => blocksMap.get(c.toV1().toString()), }, } DEBUG('carBlocks from SyncWorker', carBlocks) } const stopAtCID = lastCID ? CID.parse(lastCID) : undefined const { applogs, cid: decodedCID, applogsCID, info } = await decodePubFromBlocks(carBlocks, [], stopAtCID) // TODO: addBlocksToStore(carBlocks.blockStore.all().filter(b => !containsCid(applogCids, b.cid)).map(block => ({ cid: block.cid, block: block.bytes }))) DEBUG('decoded car', { applogs, cid, applogsCID, info }) if (cid.toString() !== decodedCID.toString()) { throw ERROR( `Decoded CID does not match original CID... something is wrong as I don't think you're important enough for an adversarial MITM`, { ipns, cid, decodedCID }, ) } const deduplicatedLogs = sanityCheckLogs(applogs) // if (deduplicated) { // WARN('encryptedFor has changed', { subToUpdate, encryptedFor: encryptedForCurrentAgent }) // } const { hasDecryptionErrors, encryptedLogs, encryptedForCurrentAgent } = await tryIntegrateApplogs( deduplicatedLogs, info.logs, // ag, thread, subID, cid, ) VERBOSE('tryIntegrateApplogs returned:', { deduplicatedLogs, hasDecryptionErrors, encryptedLogs, encryptedForCurrentAgent }) if (hasDecryptionErrors) { throw ERROR(`Failed to decrypt (${encryptedLogs.length}/${deduplicatedLogs.length}) in publication ${ipns}`) // TODO: continue with the parts we got? } return { didUpdate: true, ts, cid, encryptedFor: encryptedForCurrentAgent, applogsCID, info, applogs: deduplicatedLogs, } } async function resolveIPNS(ipns: IpnsString) { try { // TODO: race for success - but maybe the other one has a newer record? // maybe we wanna give each at least Xms ? return await racePromises([ hasKubo() && (async () => { // ? simpler? - awaiting https://github.com/ipfs/js-kubo-rpc-client/issues/246 // for await (const result of kuboClient.name.resolve(ipns, { stream: false })) { // DEBUG(`[resolveIPNS] kubo result:`, result) // return result && CID.parse(result) // } // DEBUG(`[resolveIPNS] kubo no result`) // return null const results = await asyncIterableToArray(kuboClient.name.resolve(ipns)) DEBUG(`[resolveIPNS] kubo results:`, results) if (results.length == 0) return null if (results.length > 1) WARN(`[resolveIPNS] multiple results from kubo - precedence not implemented:`, results) const result = results[0] if (!result.startsWith('/ipfs/')) { WARN('IPNS value is not \'/ipfs/..\':', result) } return CID.parse(result.replace('/ipfs/', '')) }), resolveIpnsFromGW(ipns), ]) } catch (error) { throw ERROR(`Failed to resolve ${ipns}:`, (error as AggregateError).errors || error) } }