// import * as Comlink from 'comlink' import type { CarReader } from '@ipld/car/indexed-reader' import type { Applog, ApplogEnc, CidString } from '@wovin/core/applog' import type { AppAgent, IShare } from '@wovin/core/pubsub' import type { Thread } from '@wovin/core/thread' import type { PromiseType } from '@wovin/core/types' import type { AgentStateClass } from '../data/agent/AgentState' import { getGatewayUrl } from '@wovin/connect-gateway' import { dateNowIso, getApplogNoCidTypeErrors, getApplogTypeErrors, isEncryptedApplog, isValidApplog, removeDuplicateAppLogs, } from '@wovin/core/applog' import { areCidsEqual, cidToString, decodePubFromCar, encodeApplogAndGetCid, ensureValidCIDinstance, toIpnsString, tryParseCID } from '@wovin/core/ipfs' import { flow } from '@wovin/core/mobx' import { integratePub } from '@wovin/core/pubsub' import { agentsOfThread, entityOverlapCount, entityOverlapMap, threadFromMaybeArray } from '@wovin/core/query' import { ThreadInMemory } from '@wovin/core/thread' import { sleep, unwrapIfSingle, urlFromHumanInput } from '@wovin/utils' import { Logger } from 'besonders-logger' import * as Comlink from 'comlink' import { uniqBy } from 'lodash-es' import partition from 'lodash-es/partition' import { CID } from 'multiformats/cid' import stringify from 'safe-stable-stringify' import * as W3Name from 'w3name' import { decryptWithAesSharedKey, getAESkeyForEncryptedApplogs, getDecryptedSecretFromIDB } from '../data/agent/AgentCrypto' import { getAgentString, useAgent } from '../data/agent/AgentState' import { insertCompleteApplogsInAppDBifMissing } from '../data/ApplogDB' import { isFileSystemAPIavailable, verifyPermission, writeFile } from '../data/filesystem-utils' import { getVM } from '../data/VMs/MappedVMbase' import { ProviderVM } from '../data/VMs/ProviderVM' import { useProviderIDs } from '../ui/reactive' import { notifyToast } from '../ui/utils-ui' import { addBlocksToStore } from './block-store' import { uploadCarToKubo } from './kubo' import { retrievePubToThread } from './retrievePubToThread' import { publishIPNS, storageState } from './storage' import { getSyncWorker } from './worker-utils' const { WARN, LOG, DEBUG, VERBOSE, ERROR } = Logger.setup(Logger.INFO) // eslint-disable-line unused-imports/no-unused-vars // im port * as Wasm from 'note3-rs' // let_wasm: any // async function initWasm() { // if (!_wasm) { // _wasm = await Wasm.default() // DEBUG('WASM', _wasm, Wasm) // await Wasm.init_rs() // } // } export async function pushShare(thread: Thread, share: IShare) { DEBUG('[publish]', share, { thread, storageState }) const agent = useAgent() if (!storageState.ucanUploads?.length) throw ERROR(`No upload provider configured (Settings > Storage)`) // const pubSigningKeyBytes = await state.crypto?.keystore?.publicWriteKey() let shareNameString: string let writableNameFromPriKey: W3Name.WritableName DEBUG('Publishing datalog snapshot via share:', { ipns: share.id, share }) writableNameFromPriKey = await W3Name.from(share.pk) DEBUG('using:', { writableNameFromPriKey }) shareNameString = share.id // const publicationThread = getPublicationThread(thread, publication) // if (!publicationThread) { // ERROR(`Empty publication data`, { publication, thread }) // throw ERROR(`Empty publication data`, publication.id) // } const SyncWorker = getSyncWorker() DEBUG('SyncWorker', SyncWorker) const newLogs = await SyncWorker.getNewLogsForShareInWorker(share.id, agent.ag) if (!newLogs.length) { return } DEBUG('newLogs from Worker', newLogs) const prevSnapCID: CID = (share.lastCID) ? CID.parse(share.lastCID) : null // ? agent needs to have ag, did for preparePubForPush and agent.sign for encodePubAsCar const { cid, blob, blocks, applogCids } = await prepareForPushInWorker(agent, newLogs, share, prevSnapCID) const mapCIDsByString = (eachCID) => { const validCID = ensureValidCIDinstance(eachCID) return { [validCID.toString()]: validCID } } const mappedCIDs = applogCids.map(mapCIDsByString) DEBUG('adding prepared blocks to store', { cid, blob, blocks, mappedCIDs }) addBlocksToStore( blocks // .filter(b => !containsCid(applogCids, b.cid)) // ? why exclude them .map(block => ({ cid: block.cid, block: block.bytes })), ) await pushToFileSystem(agent, share, cid, blob) await pushToKubo(blob) await pushToRemotes(blob, cid) await publishIPNS(cid.toString(), writableNameFromPriKey) agent.updateShare(share.id, { lastPush: dateNowIso(), lastCID: cid.toString() }) queueMicrotask(() => void pullCarToEdge(cid)) return shareNameString } async function pushToRemotes(blob: Blob, cid: CID) { // TODO: check if remote already has car.cid ? // or even better: car-mirror const storeResults = await Promise.allSettled(storageState.ucanUploads.map(async (provider) => { return await provider.storeCar(blob) })) DEBUG('[pushToRemotes] results:', storeResults) const [succeeded, failed] = partition(storeResults, r => r.status === 'fulfilled') if (failed.length > 0) { const error = ERROR( `[publish] failed to store (${failed.length}/${storageState.ucanUploads.length} providers)`, failed.map(({ reason }) => reason), ) if (!succeeded.length) { throw error } // TODO: show in UI } const storedCids = uniqBy( succeeded.map(({ value }) => value), cid => cid.toString(), ) if (storedCids.length !== 1) throw ERROR('Invalid amount of stored CIDs:', storedCids) const storedCID = storedCids[0] if (!areCidsEqual(cid, storedCID)) { throw ERROR(`[publish] resulting CID is different than what we thought we sent`, { ourCID: cid.toString(), storedCID: storedCID.toString(), }) } } async function pushToKubo(blob: Blob) { uploadCarToKubo(blob) } async function pushToFileSystem(agent: AgentStateClass, share: IShare, cid, blob: Blob) { try { if (isFileSystemAPIavailable()) { const dirHandle = await agent.getDirectoryHandle() as FileSystemDirectoryHandle if (dirHandle) { const isReadWrite = await verifyPermission(dirHandle) DEBUG({ dirHandle, isReadWrite }) if (isReadWrite) { const ipnsSubfolder = await dirHandle.getDirectoryHandle(share.id, { create: true }) const carFileHandle = await ipnsSubfolder.getFileHandle(`${cid.toString()}.car`, { create: true }) DEBUG({ ipnsSubfolder, carFileHandle }) writeFile(carFileHandle, blob) } } } } catch (err) { ERROR(`[publish] filesystem write error`, err) notifyToast((err as any).message ?? stringify(err, undefined, 2), 'danger') } } async function prepareForPushInWorker( agent: AgentStateClass, /* AppAgent */ // appThread: Thread, logsToPublish: readonly Applog[], share: IShare, prevSnapCID: CID | null, ) { const signerSecret = await getDecryptedSecretFromIDB(agent) const appAgentToPass = { ag: agent.ag, did: agent.did, signerSecret: Comlink.transfer(signerSecret, [signerSecret.buffer]), /* sign: Comlink.proxy(await agent.getProxyfiableSignFx()) */ } as unknown as AppAgent DEBUG('calling prepareSnapshotForPushInsideWorker with:', { appAgentToPass, logsToPublish, share, prevSnapCID }) const SyncWorker = getSyncWorker() const preparedByWorker = await SyncWorker.prepareSnapshotForPushInsideWorker( appAgentToPass, // Comlink.proxy(agent), logsToPublish, JSON.parse(stringify(share)), prevSnapCID, ) DEBUG({ preparedByWorker }) preparedByWorker.cid = CID.decode(preparedByWorker.cid.bytes) // HACK somehow the worker transfer should be able to CIDS also deeply return preparedByWorker } // export const pullCarToEdge = debounce( // probably should debounce and or reconsider auto push when no changes export async function pullCarToEdge(cid: CID) { await sleep(1500) // const p = performance.now() // await retrieveCar(cid) // DEBUG('pullCar took:', performance.now() - p, 'ms') // const cidStr = cid.toString() // // const dwebURL = `https://${cid}.ipfs.dweb.link/?format=car` // // const nftsURL = `https://${cid}.ipfs.nftstorage.link/?format=car` // const ipfsURL = `https://ipfs.io/ipfs/${cidStr}/?format=car` // const cfURL = `https://cloudflare-ipfs.com/ipfs/${cidStr}/?format=car` // const evaURL = `https://ipfs.4everland.io/ipfs/${cidStr}/?format=car` // const URLs = [/* dwebURL, nftsURL, */ cfURL, ipfsURL, evaURL] const providers = useProviderIDs() .map(id => getVM(ProviderVM, id)) .filter(provider => provider.type === 'ipfs-gateway') for (const provider of providers) { // await sleep(150) queueMicrotask(async () => { const gatewayUrl = getGatewayUrl({ url: urlFromHumanInput(provider.url) }, cid) // await sleep(250) void fetch(gatewayUrl) }) } /** * some edge URL info; * https://w3s.link/ipfs/baguqeerajc3qwxrddm7tdsmnrh4grktyv4s2oljhidxqtqoqm7d33neqbj7a/?format=car * https://dweb.link/ipfs/baguqeerajc3qwxrddm7tdsmnrh4grktyv4s2oljhidxqtqoqm7d33neqbj7a/?format=car // redirects to => * https://baguqeerajc3qwxrddm7tdsmnrh4grktyv4s2oljhidxqtqoqm7d33neqbj7a.ipfs.dweb.link/?format=car * https://ipfs.io/ipfs/baguqeerajc3qwxrddm7tdsmnrh4grktyv4s2oljhidxqtqoqm7d33neqbj7a/?format=car * https://nftstorage.link/ipfs/baguqeerajc3qwxrddm7tdsmnrh4grktyv4s2oljhidxqtqoqm7d33neqbj7a/?format=car // redirects to => * https://baguqeerajc3qwxrddm7tdsmnrh4grktyv4s2oljhidxqtqoqm7d33neqbj7a.ipfs.nftstorage.link/?format=car * not found: * https://ipfs.runfission.com/ipfs/baguqeerajc3qwxrddm7tdsmnrh4grktyv4s2oljhidxqtqoqm7d33neqbj7a/?format=car * no car support: * https://cloudflare-ipfs.com/ipfs/baguqeerajc3qwxrddm7tdsmnrh4grktyv4s2oljhidxqtqoqm7d33neqbj7a/?format=car * https://ipfs.4everland.io/ipfs/baguqeerajc3qwxrddm7tdsmnrh4grktyv4s2oljhidxqtqoqm7d33neqbj7a/?format=car */ } export const updateSubFromPullData = async function ( pullData: PromiseType>, // ? ts shenanigans extras, pubToThread, pubAsThread etc... subToUpdate: string, ) { if (!pullData.didUpdate) return const agent = useAgent() await agent.updateSub(subToUpdate, { lastPull: pullData.ts.toISOString(), // lastPullAttempt: should happen earlier lastCID: pullData.cid.toString(), lastApplogCID: pullData.applogsCID.toString(), }) } export const tryIntegrateApplogs = flow( function* tryIntegrateApplogsFlow( applogs: Array, infoLogs, // ag: string, thread: Thread, subID: string, cid: CID, ) { const [encryptedLogs, nonEncryptedLogs] = partition(applogs, isEncryptedApplog) as [ApplogEnc[], Applog[]] if (DEBUG.isEnabled) DEBUG('Encrypted?:', { encryptedLogs, nonEncryptedLogs }) const agent = useAgent() const typeboxErrors = nonEncryptedLogs.map(log => [log, getApplogNoCidTypeErrors(log)]).filter(([, errs]) => errs.length) if (typeboxErrors.length) { ERROR(`[tryIntegrateApplogs] typebox check failed:`, typeboxErrors) throw new Error( `${typeboxErrors.length}/${nonEncryptedLogs.length} applogs have an invalid format, e.g. ${stringify(typeboxErrors[0])}`, ) } let encryptedForCurrentAgent = false if (encryptedLogs.length) { // TODO consider what pubAtoms we want to use // const publicationAtoms = nonEncryptedLogs.filter(eachLog => (eachLog.at.includes && eachLog.at.includes('pub/') && eachLog.en === ipnsNameString)) var { aesKey, publisherAg } = yield getAESkeyForEncryptedApplogs(infoLogs) encryptedForCurrentAgent = !!aesKey } DEBUG('[decrypt] isEncrypted?:', !!encryptedLogs.length, { aesKey, applogs /* , subToUpdate */ }) const maybeDecryptedApplogs = [] as Applog[] let hasDecryptionErrors = false let isRetry = false const tryIntegrate = flow(function* tryIntegrateFlow() { for (const eachLog of encryptedLogs) { if (!aesKey) { hasDecryptionErrors || ERROR('encrypted log(s) found but no pubSub Key found', { aesKey, applogs /* , subToUpdate */ }) hasDecryptionErrors = true continue } VERBOSE('[decrypt] attempting decryption:', eachLog) // if(subToUpdate.encryptedWith) yield doTestEncryptDecrypt(subToUpdate) const decrypted: Applog = yield decryptWithAesSharedKey(eachLog.enc, aesKey, 'parsed') const pv = decrypted.pv ? decrypted.pv.toString() : null VERBOSE('[decrypt] decrypted:', { decrypted }) maybeDecryptedApplogs.push({ ...decrypted, pv, cid: encodeApplogAndGetCid(decrypted).toString() }) // locally we want pv as a string } for (const eachLog of nonEncryptedLogs) { if (isValidApplog(eachLog)) { const pv = eachLog.pv ? eachLog.pv.toString() : null maybeDecryptedApplogs.push({ ...eachLog, pv }) // locally we want pv as a string } else { const errors = getApplogTypeErrors(eachLog) WARN('[pull] unknown log problem', eachLog, unwrapIfSingle(errors)) } } const newApplogs = maybeDecryptedApplogs.filter((newLog: any) => !thread.hasApplog(newLog as Applog, false)) const deduplicated = removeDuplicateAppLogs(newApplogs) if (deduplicated.length != newApplogs.length) { WARN(`Removed ${newApplogs.length - deduplicated.length} duplicate applogs`, { newApplogs, deduplicated }) } DEBUG(`[tryIntegrate->${thread.nameAndSizeUntracked}] New Applogs:`, deduplicated.length, { deduplicated, maybeDecryptedApplogs, thread, }) if (deduplicated.length) { // thread.insertRaw(deduplicated) integratePub({ targetThread: thread, agentHash: agent.ag, subID, pubData: { cid, thread: threadFromMaybeArray(deduplicated) } }) } }) yield tryIntegrate() // first try if (hasDecryptionErrors) { WARN('trying to solve encryption errore', { aesKey, nonEncryptedLogs, applogs /* , subToUpdate , ipnsNameString */ }) if (isRetry) { ERROR('second round of errors') } else { isRetry = true let derivationJWKstring const isForMe = false // try to infer encryption and setup subscription before retrying if (!encryptedForCurrentAgent) { notifyToast( `Found Encrypted Applogs that are not shared with us (author: ${getAgentString(publisherAg)} [${publisherAg}])).`, 'warning', Number.POSITIVE_INFINITY, ) } DEBUG('[decrypt] Errors found retrying now', { isForMe, publisherAg, derivationJWKstring /* , subToUpdate */ }) if (isForMe && derivationJWKstring) yield tryIntegrate() // ? only if its for me and i have the remote jwk, i think ? } } DEBUG(`[tryIntegrate->${thread.nameAndSizeUntracked}] Successfully integrated`) return { hasDecryptionErrors, encryptedLogs, encryptedForCurrentAgent } }, ) export async function retrievePubAsThread(appThread: Thread, pubID: CidString, pinCID?: string) { const writableThread = ThreadInMemory.empty(`pub-${pubID}`) const { cid: parsedPubID, errors, isIpns } = tryParseCID(pubID) if (!parsedPubID) throw ERROR(`Failed to parse pubID`, pubID, errors) const { cid, applogsCID, applogs: maybeEncryptedApplogs, info } = await retrievePubToThread( writableThread, isIpns ? toIpnsString(parsedPubID) : null, { pinCID: isIpns ? pinCID : cidToString(parsedPubID) }, appThread, ) VERBOSE('retrievePubToThread returned:', { cid, applogsCID, maybeEncryptedApplogs, info }) insertCompleteApplogsInAppDBifMissing(info.logs) const thread = ThreadInMemory.fromArray([...writableThread.applogs], `preview-${pubID}`, true) // HACK: bc. readonly - better would be a readOnly(thread) wrapper return { thread, cid, applogsCID, maybeEncryptedApplogs, info } } export async function retrievePubDataWithExtras(appThread: Thread, pubID: CidString, pinCID?: string) { const stuff = await retrievePubAsThread(appThread, pubID, pinCID) return { id: pubID, ...stuff, ...getPubExtraInfo(appThread, stuff.thread), encryptedCount: stuff.maybeEncryptedApplogs.filter(isEncryptedApplog).length, // HACK refactor } } export async function decodePubCarWithExtras(appThread: Thread, car: CarReader) { const { cid, applogs: applogsMaybeEnc, info } = await decodePubFromCar(car) const [encryptedLogs, applogs] = partition(applogsMaybeEnc, isEncryptedApplog) const thread = ThreadInMemory.fromArray(applogs, `preview-${cid.toString()}`, true) // HACK: bc. readonly - better would be a readOnly(thread) wrapper return { id: cid.toString(), cid, info, applogs, thread, ...getPubExtraInfo(appThread, thread), encryptedCount: encryptedLogs.length, } } export function getPubExtraInfo(appThread: Thread, thread: Thread) { const entityOverlappingVMmap = entityOverlapMap(appThread, thread) DEBUG({ entityOverlappingVMmap }) return { entityOverlapCount: entityOverlapCount(appThread, thread), agents: Array.from(agentsOfThread(thread).keys()), newLogs: thread.applogs.filter(log => !appThread.hasApplog(log, false)).length, } }