import { Insertable, Selectable } from 'kysely' import { Cid } from '@atproto/lex' import { AtUri, normalizeDatetimeAlways } from '@atproto/syntax' import { app } from '../../../../lexicons/index.js' import { BackgroundQueue } from '../../background.js' import { DatabaseSchema, DatabaseSchemaType } from '../../db/database-schema.js' import { Database } from '../../db/index.js' import { Notification } from '../../db/tables/notification.js' import { countAll, excluded } from '../../db/util.js' import { RecordProcessor } from '../processor.js' type Notif = Insertable type IndexedRepost = Selectable const insertFn = async ( db: DatabaseSchema, uri: AtUri, cid: Cid, obj: app.bsky.feed.repost.Main, timestamp: string, ): Promise => { const repost = { uri: uri.toString(), cid: cid.toString(), creator: uri.host, subject: obj.subject.uri, subjectCid: obj.subject.cid, via: obj.via?.uri, viaCid: obj.via?.cid, createdAt: normalizeDatetimeAlways(obj.createdAt), indexedAt: timestamp, } const [inserted] = await Promise.all([ db .insertInto('repost') .values(repost) .onConflict((oc) => oc.doNothing()) .returningAll() .executeTakeFirst(), db .insertInto('feed_item') .values({ type: 'repost', uri: repost.uri, cid: repost.cid, postUri: repost.subject, originatorDid: repost.creator, sortAt: repost.indexedAt < repost.createdAt ? repost.indexedAt : repost.createdAt, }) .onConflict((oc) => oc.doNothing()) .executeTakeFirst(), ]) return inserted || null } const findDuplicate = async ( db: DatabaseSchema, uri: AtUri, obj: app.bsky.feed.repost.Main, ): Promise => { const found = await db .selectFrom('repost') .where('creator', '=', uri.host) .where('subject', '=', obj.subject.uri) .selectAll() .executeTakeFirst() return found ? new AtUri(found.uri) : null } const notifsForInsert = (obj: IndexedRepost) => { const subjectUri = new AtUri(obj.subject) // prevent self-notifications const isRepostFromSubjectUser = subjectUri.host === obj.creator if (isRepostFromSubjectUser) { return [] } const notifs: Notif[] = [ // Notification to the author of the reposted record. { did: subjectUri.host, author: obj.creator, recordUri: obj.uri, recordCid: obj.cid, reason: 'repost' as const, reasonSubject: subjectUri.toString(), sortAt: obj.sortAt, }, ] if (obj.via) { const viaUri = new AtUri(obj.via) const isRepostFromViaSubjectUser = viaUri.host === obj.creator // prevent self-notifications if (!isRepostFromViaSubjectUser) { notifs.push( // Notification to the reposter via whose repost the repost was made. { did: viaUri.host, author: obj.creator, recordUri: obj.uri, recordCid: obj.cid, reason: 'repost-via-repost' as const, reasonSubject: viaUri.toString(), sortAt: obj.sortAt, }, ) } } return notifs } const deleteFn = async ( db: DatabaseSchema, uri: AtUri, ): Promise => { const uriStr = uri.toString() const [deleted] = await Promise.all([ db .deleteFrom('repost') .where('uri', '=', uriStr) .returningAll() .executeTakeFirst(), db.deleteFrom('feed_item').where('uri', '=', uriStr).executeTakeFirst(), ]) return deleted || null } const notifsForDelete = ( deleted: IndexedRepost, replacedBy: IndexedRepost | null, ) => { const toDelete = replacedBy ? [] : [deleted.uri] return { notifs: [], toDelete } } const updateAggregates = async (db: DatabaseSchema, repost: IndexedRepost) => { const repostCountQb = db .insertInto('post_agg') .values({ uri: repost.subject, repostCount: db .selectFrom('repost') .where('repost.subject', '=', repost.subject) .select(countAll.as('count')), }) .onConflict((oc) => oc .column('uri') .doUpdateSet({ repostCount: excluded(db, 'repostCount') }), ) await repostCountQb.execute() } export type PluginType = ReturnType export const makePlugin = (db: Database, background: BackgroundQueue) => { return new RecordProcessor(db, background, { schema: app.bsky.feed.repost.main, insertFn, findDuplicate, deleteFn, notifsForInsert, notifsForDelete, updateAggregates, }) } export default makePlugin