import { Selectable, sql } from 'kysely' import { DAY, HOUR } from '@atproto/common' import { IdResolver, getPds } from '@atproto/identity' import { Cid, l, parseCid, xrpc, xrpcSafe } from '@atproto/lex' import { VerifiedRepo, WriteOpAction, getAndParseRecord, readCarWithRoot, verifyRepo, } from '@atproto/repo' import { AtUri, DidString } from '@atproto/syntax' import { com } from '../../../lexicons/index.js' import { subLogger } from '../../../logger.js' import { retryXrpc } from '../../../util/retry.js' import { BackgroundQueue } from '../background.js' import { Database } from '../db/index.js' import { Actor } from '../db/tables/actor.js' import * as Block from './plugins/block.js' import * as ChatDeclaration from './plugins/chat-declaration.js' import * as FeedGenerator from './plugins/feed-generator.js' import * as Follow from './plugins/follow.js' import * as GermDeclaration from './plugins/germ-declaration.js' import * as Labeler from './plugins/labeler.js' import * as Like from './plugins/like.js' import * as ListBlock from './plugins/list-block.js' import * as ListItem from './plugins/list-item.js' import * as List from './plugins/list.js' import * as NotifDeclaration from './plugins/notif-declaration.js' import * as Postgate from './plugins/post-gate.js' import * as Post from './plugins/post.js' import * as Profile from './plugins/profile.js' import * as Repost from './plugins/repost.js' import * as StarterPack from './plugins/starter-pack.js' import * as Status from './plugins/status.js' import * as Threadgate from './plugins/thread-gate.js' import * as Verification from './plugins/verification.js' export class IndexingService { records: { post: Post.PluginType threadGate: Threadgate.PluginType postGate: Postgate.PluginType like: Like.PluginType repost: Repost.PluginType follow: Follow.PluginType profile: Profile.PluginType list: List.PluginType listItem: ListItem.PluginType listBlock: ListBlock.PluginType block: Block.PluginType feedGenerator: FeedGenerator.PluginType starterPack: StarterPack.PluginType labeler: Labeler.PluginType notifDeclaration: NotifDeclaration.PluginType chatDeclaration: ChatDeclaration.PluginType germDeclaration: GermDeclaration.PluginType verification: Verification.PluginType status: Status.PluginType } constructor( public db: Database, public idResolver: IdResolver, public background: BackgroundQueue, ) { this.records = { post: Post.makePlugin(this.db, this.background), threadGate: Threadgate.makePlugin(this.db, this.background), postGate: Postgate.makePlugin(this.db, this.background), like: Like.makePlugin(this.db, this.background), repost: Repost.makePlugin(this.db, this.background), follow: Follow.makePlugin(this.db, this.background), profile: Profile.makePlugin(this.db, this.background), list: List.makePlugin(this.db, this.background), listItem: ListItem.makePlugin(this.db, this.background), listBlock: ListBlock.makePlugin(this.db, this.background), block: Block.makePlugin(this.db, this.background), feedGenerator: FeedGenerator.makePlugin(this.db, this.background), starterPack: StarterPack.makePlugin(this.db, this.background), labeler: Labeler.makePlugin(this.db, this.background), notifDeclaration: NotifDeclaration.makePlugin(this.db, this.background), chatDeclaration: ChatDeclaration.makePlugin(this.db, this.background), germDeclaration: GermDeclaration.makePlugin(this.db, this.background), verification: Verification.makePlugin(this.db, this.background), status: Status.makePlugin(this.db, this.background), } } transact(txn: Database) { txn.assertTransaction() return new IndexingService(txn, this.idResolver, this.background) } async indexRecord( uri: AtUri, cid: Cid, obj: unknown, action: WriteOpAction.Create | WriteOpAction.Update, timestamp: string, opts?: { disableNotifs?: boolean; disableLabels?: boolean }, ) { this.db.assertNotTransaction() return this.db.transaction(async (txn) => { const indexingTx = this.transact(txn) const indexer = indexingTx.findIndexerForCollection(uri.collection) if (!indexer) return if (action === WriteOpAction.Create) { await indexer.insertRecord(uri, cid, obj, timestamp, opts) } else { await indexer.updateRecord(uri, cid, obj, timestamp) } }) } async deleteRecord(uri: AtUri, cascading = false) { this.db.assertNotTransaction() await this.db.transaction(async (txn) => { const indexingTx = this.transact(txn) const indexer = indexingTx.findIndexerForCollection(uri.collection) if (!indexer) return await indexer.deleteRecord(uri, cascading) }) } async indexHandle(did: string, timestamp: string, force = false) { this.db.assertNotTransaction() const actor = await this.db.db .selectFrom('actor') .where('did', '=', did) .selectAll() .executeTakeFirst() if (!force && !needsHandleReindex(actor, timestamp)) { return } const atpData = await this.idResolver.did.resolveAtprotoData(did, true) const handleToDid = await this.idResolver.handle.resolve(atpData.handle) const handle: string | null = did === handleToDid ? atpData.handle.toLowerCase() : null const actorWithHandle = handle !== null ? await this.db.db .selectFrom('actor') .where('handle', '=', handle) .selectAll() .executeTakeFirst() : null // handle contention if (handle && actorWithHandle && did !== actorWithHandle.did) { await this.db.db .updateTable('actor') .where('actor.did', '=', actorWithHandle.did) .set({ handle: null }) .execute() } const actorInfo = { handle, indexedAt: timestamp } await this.db.db .insertInto('actor') .values({ did, ...actorInfo }) .onConflict((oc) => oc.column('did').doUpdateSet(actorInfo)) .returning('did') .executeTakeFirst() } async indexRepo(did: DidString, commit?: string) { this.db.assertNotTransaction() const now = new Date().toISOString() const { pds, signingKey } = await this.idResolver.did.resolveAtprotoData( did, true, ) const { body: car } = await retryXrpc(() => { return xrpc(pds, com.atproto.sync.getRepo, { params: { did } }) }) const { root, blocks } = await readCarWithRoot(car) const verifiedRepo = await verifyRepo(blocks, root, did, signingKey) const currRecords = await this.getCurrentRecords(did) const repoRecords = formatCheckout(did, verifiedRepo) const diff = findDiffFromCheckout(currRecords, repoRecords) await Promise.all( diff.map(async (op) => { const { uri, cid } = op try { if (op.op === 'delete') { await this.deleteRecord(uri) } else { const parsed = await getAndParseRecord(blocks, cid) await this.indexRecord( uri, cid, parsed.record, op.op === 'create' ? WriteOpAction.Create : WriteOpAction.Update, now, ) } } catch (err) { if (err instanceof l.LexValidationError) { subLogger.warn( { did, commit, uri: uri.toString(), cid: cid.toString() }, 'skipping indexing of invalid record', ) } else { subLogger.error( { err, did, commit, uri: uri.toString(), cid: cid.toString() }, 'skipping indexing due to error processing record', ) } } }), ) } async getCurrentRecords(did: string) { const res = await this.db.db .selectFrom('record') .where('did', '=', did) .select(['uri', 'cid']) .execute() return res.reduce( (acc, cur) => { acc[cur.uri] = { uri: new AtUri(cur.uri), cid: parseCid(cur.cid), } return acc }, {} as Record, ) } async setCommitLastSeen(did: string, commit: Cid, rev: string) { const { ref } = this.db.db.dynamic await this.db.db .insertInto('actor_sync') .values({ did, commitCid: commit.toString(), repoRev: rev ?? null, }) .onConflict((oc) => { const excluded = (col: string) => ref(`excluded.${col}`) return oc.column('did').doUpdateSet({ commitCid: sql`${excluded('commitCid')}`, repoRev: sql`${excluded('repoRev')}`, }) }) .execute() } findIndexerForCollection(collection: string) { for (const indexer of Object.values(this.records)) { if (indexer.collection === collection) { return indexer } } } async updateActorStatus( did: DidString, active: boolean, status: string = '', ) { let upstreamStatus: string | null if (active) { upstreamStatus = null } else if (['deactivated', 'suspended', 'takendown'].includes(status)) { upstreamStatus = status } else { throw new Error(`Unrecognized account status: ${status}`) } await this.db.db .updateTable('actor') .set({ upstreamStatus }) .where('did', '=', did) .execute() } async deleteActor(did: DidString) { this.db.assertNotTransaction() const actorIsHosted = await this.getActorIsHosted(did) if (actorIsHosted === false) { await this.db.db.deleteFrom('actor').where('did', '=', did).execute() await this.unindexActor(did) await this.db.db .deleteFrom('notification') .where('did', '=', did) .execute() } } private async getActorIsHosted(did: DidString): Promise { const doc = await this.idResolver.did.resolve(did, true) const pds = doc && getPds(doc) if (!pds) return false return retryXrpc(async () => { const res = await xrpcSafe(pds, com.atproto.sync.getLatestCommit, { params: { did }, }) if (res.success) return true if (res.error === 'RepoNotFound') return false throw res.reason // let retryXrpc() handle retries for transient errors }) } async unindexActor(did: DidString) { this.db.assertNotTransaction() // per-record-type indexes await this.db.db.deleteFrom('profile').where('creator', '=', did).execute() await this.db.db.deleteFrom('follow').where('creator', '=', did).execute() await this.db.db.deleteFrom('repost').where('creator', '=', did).execute() await this.db.db.deleteFrom('like').where('creator', '=', did).execute() await this.db.db .deleteFrom('feed_generator') .where('creator', '=', did) .execute() await this.db.db.deleteFrom('labeler').where('creator', '=', did).execute() // lists await this.db.db .deleteFrom('list_item') .where('creator', '=', did) .execute() await this.db.db.deleteFrom('list').where('creator', '=', did).execute() // blocks await this.db.db .deleteFrom('actor_block') .where('creator', '=', did) .execute() await this.db.db .deleteFrom('list_block') .where('creator', '=', did) .execute() // posts const postByUser = (qb) => qb .selectFrom('post') .where('post.creator', '=', did) .select('post.uri as uri') await this.db.db .deleteFrom('post_embed_image') .where('post_embed_image.postUri', 'in', postByUser) .execute() await this.db.db .deleteFrom('post_embed_external') .where('post_embed_external.postUri', 'in', postByUser) .execute() await this.db.db .deleteFrom('post_embed_record') .where('post_embed_record.postUri', 'in', postByUser) .execute() await this.db.db.deleteFrom('post').where('creator', '=', did).execute() await this.db.db .deleteFrom('thread_gate') .where('creator', '=', did) .execute() await this.db.db .deleteFrom('post_gate') .where('creator', '=', did) .execute() // notifications await this.db.db .deleteFrom('notification') .where('notification.author', '=', did) .execute() // generic record indexes await this.db.db .deleteFrom('duplicate_record') .where('duplicate_record.duplicateOf', 'in', (qb) => qb .selectFrom('record') .where('record.did', '=', did) .select('record.uri as uri'), ) .execute() await this.db.db.deleteFrom('record').where('did', '=', did).execute() } } type UriAndCid = { uri: AtUri cid: Cid } type IndexOp = | ({ op: 'create' | 'update' } & UriAndCid) | ({ op: 'delete' } & UriAndCid) const findDiffFromCheckout = ( curr: Record, checkout: Record, ): IndexOp[] => { const ops: IndexOp[] = [] for (const uri of Object.keys(checkout)) { const record = checkout[uri] if (!curr[uri]) { ops.push({ op: 'create', ...record }) } else { if (curr[uri].cid.equals(record.cid)) { // no-op continue } ops.push({ op: 'update', ...record }) } } for (const uri of Object.keys(curr)) { const record = curr[uri] if (!checkout[uri]) { ops.push({ op: 'delete', ...record }) } } return ops } const formatCheckout = ( did: string, verifiedRepo: VerifiedRepo, ): Record => { const records: Record = {} for (const create of verifiedRepo.creates) { const uri = AtUri.make(did, create.collection, create.rkey) records[uri.toString()] = { uri, cid: create.cid, } } return records } const needsHandleReindex = ( actor: Selectable | undefined, timestamp: string, ) => { if (!actor) return true const timeDiff = new Date(timestamp).getTime() - new Date(actor.indexedAt).getTime() // revalidate daily if (timeDiff > DAY) return true // revalidate more aggressively for invalidated handles if (actor.handle === null && timeDiff > HOUR) return true return false }