import type { Applog, CidString, IShare, SnapRootBlock } from '@wovin/core' import type { BlockStoreForFetch } from '@wovin/core/ipfs' import type { AppAgentForWorker, SnapBlockLogs } from '@wovin/core/pubsub' import * as dagJson from '@ipld/dag-json' // import { retrievePubToThread } from './retrievePubToThread' import { retrieveCar, retrieveCarBlock } from '@wovin/connect-gateway' import { containsCid, prepareSnapshotForPush } from '@wovin/core' import { fetchSnapshotChainUntil } from '@wovin/core/ipfs' // import { getApplogCIDsFromPubInBlockStore, getBlockFromStore } from './block-store' import { toJS } from '@wovin/core/mobx' import { Logger } from 'besonders-logger' import { EdKeypairInstanceFromSecretKeyBytes } from 'mnemkey' import { CID } from 'multiformats/cid' import { getShareThread } from '../data/getShareThread' import { loadAppThread } from '../data/loadAppThread' import { stateDB } from '../data/local-state' import { getApplogCIDsFromPubInBlockStore, getBlockFromStore } from './block-store' import { getBlockStore } from './blockStore' import './worker-transferHandlers' const { WARN, LOG, DEBUG, VERBOSE, ERROR } = Logger.setup(Logger.DEBUG) // eslint-disable-line unused-imports/no-unused-vars export function addTest(a, b) { DEBUG('addTestWorkerSide', { a, b, sum: a + b }) return a + b } export async function retrieveCarInWorker(...params: Parameters) { const [cidObj, gateways] = params const cid = CID.decode(cidObj.bytes) DEBUG('retrievingCarInWorker', { cidObj, cid, gateways }) try { const carReaderInWorker = await retrieveCar(cidObj, gateways) DEBUG('retrieved', { carReaderInWorker }) return carReaderInWorker } catch (error) { throw ERROR(error) } } /** * Quick fetch for UI - root + info only (2 requests) * Returns snapshot metadata without fetching all applogs */ export async function fetchSnapshotInfoInWorker( cidObj: CID, gateways: { url: string }[], ): Promise<{ root: SnapRootBlock, infoLogs: Applog[] }> { const cid = CID.decode(cidObj.bytes) DEBUG('[fetchSnapshotInfo] fetching root + info', cid.toString()) const blocks = new Map() const blockStore: BlockStoreForFetch = { async get(c: CID) { return blocks.get(c.toV1().toString()) }, async put(c: CID, bytes: Uint8Array) { blocks.set(c.toV1().toString(), bytes) }, } // Fetch root block only const rootCar = await retrieveCarBlock(cid, gateways) for await (const { cid: blockCid, bytes } of rootCar.blocks()) { const validCid = typeof blockCid.toV1 === 'function' ? blockCid : CID.decode(blockCid.bytes) await blockStore.put(validCid, bytes) } const rootBytes = await blockStore.get(cid) if (!rootBytes) throw ERROR('[fetchSnapshotInfo] root block not found after fetch') const root = dagJson.decode(rootBytes) as SnapRootBlock // Fetch info const infoCar = await retrieveCar(root.info, gateways) for await (const { cid: blockCid, bytes } of infoCar.blocks()) { const validCid = typeof blockCid.toV1 === 'function' ? blockCid : CID.decode(blockCid.bytes) await blockStore.put(validCid, bytes) } // Decode info logs const infoBlockBytes = await blockStore.get(root.info) if (!infoBlockBytes) throw ERROR('[fetchSnapshotInfo] info block not found after fetch') const infoBlock = dagJson.decode(infoBlockBytes) as SnapBlockLogs const infoLogs = await Promise.all(infoBlock.logs.map(async (logCid) => { const logBytes = await blockStore.get(logCid) if (!logBytes) throw ERROR('[fetchSnapshotInfo] info log not found', logCid.toString()) return dagJson.decode(logBytes) as Applog })) // TODO: Consider storing blocks to IDB for caching (needs cache eviction strategy) DEBUG('[fetchSnapshotInfo] done', { root, infoLogs: infoLogs.length }) return { root, infoLogs } } /** Serializable result that can cross worker boundary */ export interface FetchChainResultSerialized { rootCID: CID blocks: [string, Uint8Array][] // Array of [cidString, bytes] pairs - serializable snapshotCount: number } /** * Full chain fetch - iterates until stopAtCID * If no stopAtCID (first pull), uses full CAR for efficiency * If stopAtCID exists (subsequent pull), uses incremental mode (3 requests per snapshot) * * Returns serializable data (blocks as array) that can cross worker boundary */ export async function fetchSnapshotChainInWorker( cidObj: CID, gateways: { url: string }[], stopAtCIDObj?: CID, ): Promise { const rootCID = CID.decode(cidObj.bytes) const stopAtCID = stopAtCIDObj ? CID.decode(stopAtCIDObj.bytes) : undefined // If no stopAtCID (first pull), use full CAR for efficiency if (!stopAtCID) { DEBUG('[fetchSnapshotChain] full CAR mode (no lastCID)', rootCID.toString()) const car = await retrieveCar(rootCID, gateways) const blocks: [string, Uint8Array][] = [] for await (const { cid, bytes } of car.blocks()) { const validCid = typeof cid.toV1 === 'function' ? cid : CID.decode(cid.bytes) blocks.push([validCid.toV1().toString(), bytes]) } // TODO: Consider storing blocks to IDB for caching (needs cache eviction strategy) DEBUG('[fetchSnapshotChain] full CAR done', { blockCount: blocks.length }) return { rootCID, blocks, snapshotCount: -1 } // -1 = unknown (full CAR) } // Incremental mode - 3 requests per snapshot DEBUG('[fetchSnapshotChain] incremental mode, stopping at:', stopAtCID.toString()) const result = await fetchSnapshotChainUntil({ rootCID, stopAtCID, fetchBlock: cid => retrieveCarBlock(cid, gateways), fetchAll: cid => retrieveCar(cid, gateways), maxDepth: 100, }) // TODO: Consider storing blocks to IDB for caching (needs cache eviction strategy) DEBUG('[fetchSnapshotChain] incremental done', { snapshotCount: result.snapshotCount, blockCount: result.blocks.length }) return { rootCID, blocks: result.blocks, snapshotCount: result.snapshotCount } } const getShareIDB = shareID => stateDB.shares.get(shareID) export async function getNewLogsForShareInWorker(shareID, ag) { let newLogs: readonly Applog[] let prevSnapCID: CID = null const share = await getShareIDB(shareID) const shareLastCID = share.lastCID const appThread = await loadAppThread() const shareThread = getShareThread(appThread, share) DEBUG('getNewLogsForShareInWorker', { shareThread, share }) if (shareLastCID) { await getBlockStore().open() prevSnapCID = CID.parse(shareLastCID) const alreadyPushed: Set = new Set() const getLogsFromPrevSnaps = async (snapCID: CID, trace: CID[] = []) => { VERBOSE(`[getLogsFromPrevSnaps] loading`, snapCID.toString(), { shareLastCID, trace }) const lastSnap = (await getBlockFromStore(snapCID)) as SnapRootBlock // TODO: why does `blockStore.has(snapCID)` not work? if (lastSnap) { const lastSnapLogCIDs = await getApplogCIDsFromPubInBlockStore(snapCID, lastSnap) for (const cid of lastSnapLogCIDs) alreadyPushed.add(cid.toV1().toString()) VERBOSE(`[getLogsFromPrevSnaps] loaded`, lastSnapLogCIDs.length, 'log CIDs', { shareLastCID, trace }) if (lastSnap.prev) { if (containsCid(trace, lastSnap.prev)) throw ERROR(`snapshot chain has a loop`, { trace, lastSnap, shareLastCID }) await getLogsFromPrevSnaps(lastSnap.prev, [...trace, snapCID]) } } else { throw ERROR(`TODO worker pull missing snapshots for share`, snapCID.toString()) // const thread = ThreadInMemory.empty(`pub-${pubCID}`) // const { applogs, info } = await retrievePubToThread(thread, null, { pinCID: pubCID.toString() }, ag) // VERBOSE('no last pub retrievePubToThread returned:', { applogs, info }) // for (let { cid } of applogs) alreadyPublished.add(cid) } } await getLogsFromPrevSnaps(CID.parse(shareLastCID)) // aka shareAtoms const shareLogs = shareThread.applogs // HACK: newLogs = shareLogs.filter(shareLog => !containsCid(alreadyPushed, shareLog.cid)) DEBUG(`Skipping ${shareThread.size - newLogs.length} out of ${shareThread.size} which were already pushed`, { share, shareThread, alreadyPushed, }) } else { newLogs = shareThread.applogs } return toJS(newLogs) } // export async function doPull(thread: ThreadIDB, subscriptions: ISubscription[], onlyIfAuto: boolean, syncState, agent) { // // const agent = useAgent() // for (const sub of subscriptions) { // if (onlyIfAuto && !sub.autopull) continue // DEBUG('[Sync] Pulling', sub) // runInAction(() => syncState.syncing = { sub: sub.id }) // await agent.updateSub(sub.id, { lastPullAttempt: dateNowIso() }) // try { // // Pull // const pullData = await retrievePubToThread(thread, sub.id, { lastCID: sub.lastCID, subID: sub.id }) // await updateSubFromPullData(pullData, sub.id) // } catch (error) { // runInAction(() => { // ERROR('[Sync] failed to retrieve', sub, error) // syncState.errors.push({ sub, error }) // }) // } finally { // runInAction(() => syncState.syncing = true) // } // } // } // export async function doPush(thread: Thread, publications: IPublication[], onlyIfAuto: boolean, syncState) { // for (const pub of publications) { // if (onlyIfAuto && !pub.autopush) continue // DEBUG('[Sync] Pushing', pub) // runInAction(() => syncState.syncing = { pub: pub.id }) // try { // await pushPublication(thread, pub) // } catch (error) { // runInAction(() => { // ERROR('[Sync] failed to publish', pub, error) // syncState.errors.push({ pub, error }) // }) // } finally { // runInAction(() => syncState.syncing = true) // } // } // } export async function prepareSnapshotForPushInsideWorker( agent: AppAgentForWorker, logsToPublish: readonly Applog[], share: IShare, prevSnapCID: CID | null, ) { // const share = await getShareIDB(shareID) const appThread = await loadAppThread() // HACK create signer on worker thread const agentWithSign = Object.assign(agent, { sign: async (message: Uint8Array) => { const kp = await EdKeypairInstanceFromSecretKeyBytes(agent.signerSecret) return kp.sign(message) }, }) const preparedSnapshot = await prepareSnapshotForPush(agentWithSign, appThread, logsToPublish, share, prevSnapCID) DEBUG('returning from prepareSnapshotForPushInsideWorker', { preparedSnapshot }) return preparedSnapshot }