import { mapDefined, noUndefinedVals } from '@atproto/common' import { Client, DidString } from '@atproto/lex' import { MethodNotImplementedError, Server } from '@atproto/xrpc-server' import { AppContext } from '../../../../context.js' import { HydrateCtx, Hydrator, mergeManyStates, } from '../../../../hydration/hydrator.js' import { app } from '../../../../lexicons/index.js' import { HydrationFn, PresentationFn, RulesFn, SkeletonFn, createPipeline, } from '../../../../pipeline.js' import { Views } from '../../../../views/index.js' export default function (server: Server, ctx: AppContext) { const getTrends = createPipeline(skeleton, hydration, noBlocks, presentation) server.add(app.bsky.unspecced.getTrends, { auth: ctx.authVerifier.standardOptional, handler: async ({ auth, params, req }) => { const viewer = auth.credentials.iss const labelers = ctx.reqLabelers(req) const hydrateCtx = await ctx.hydrator.createContext({ labelers, viewer }) const headers = noUndefinedVals({ 'accept-language': req.headers['accept-language'], 'x-bsky-topics': Array.isArray(req.headers['x-bsky-topics']) ? req.headers['x-bsky-topics'].join(',') : req.headers['x-bsky-topics'], }) const result = await getTrends( { ...params, hydrateCtx, headers, }, ctx, ) return { encoding: 'application/json', body: result, } }, }) } const skeleton: SkeletonFn = async (input) => { const { params, ctx } = input if (!ctx.topicsClient) { // Use 501 instead of 500 as these are not considered retry-able by clients throw new MethodNotImplementedError('Topics agent not available') } const skeleton = await ctx.topicsClient.call( app.bsky.unspecced.getTrendsSkeleton, { limit: params.limit, viewer: params.hydrateCtx.viewer ?? undefined, }, { headers: params.headers, }, ) // @TODO Make sure upstream always provides this for (const trend of skeleton.trends) trend.dids ??= [] return skeleton } const hydration: HydrationFn = async ( input, ) => { const { ctx, params, skeleton } = input const dids = getUniqueDidsFromTrends(skeleton.trends) const pairs: Map = new Map() const viewer = params.hydrateCtx.viewer if (viewer) pairs.set(viewer, dids) const [profileState, bidirectionalBlocks] = await Promise.all([ ctx.hydrator.hydrateProfilesBasic(dids, params.hydrateCtx), ctx.hydrator.hydrateBidirectionalBlocks(pairs, params.hydrateCtx), ]) return mergeManyStates(profileState, { bidirectionalBlocks }) } const noBlocks: RulesFn = (input) => { const { skeleton, params, hydration } = input const viewer = params.hydrateCtx.viewer if (!viewer) { return skeleton } const blocks = hydration.bidirectionalBlocks?.get(viewer) return { trends: skeleton.trends.map((t) => ({ ...t, dids: t.dids.filter((did) => !blocks?.get(did)), })), } } const presentation: PresentationFn< Context, Params, SkeletonState, app.bsky.unspecced.getTrends.$OutputBody > = (input) => { const { ctx, skeleton, hydration } = input return { trends: skeleton.trends.map((t) => ({ topic: t.topic, displayName: t.displayName, link: t.link, startedAt: t.startedAt, postCount: t.postCount, status: t.status, category: t.category, actors: mapDefined(t.dids, (did) => ctx.views.profileBasic(did, hydration), ), })), } } type Context = { hydrator: Hydrator views: Views topicsClient: Client | undefined } type Params = app.bsky.unspecced.getTrendingTopics.$Params & { hydrateCtx: HydrateCtx & { viewer: string | null } headers: Record } type SkeletonState = app.bsky.unspecced.getTrendsSkeleton.$OutputBody function getUniqueDidsFromTrends( trends?: app.bsky.unspecced.defs.SkeletonTrend[], ): DidString[] { if (!trends) return [] const dids = new Set() for (const trend of trends) { if (trend.dids) { for (const did of trend.dids) { dids.add(did) } } } return Array.from(dids) }