import { mapDefined, noUndefinedVals } from '@atproto/common' import { XrpcInvalidResponseError, XrpcResponseError, xrpcSafe, } from '@atproto/lex' import { Headers as HeadersMap, InvalidRequestError, Server, ServerTimer, UpstreamFailureError, XRPCError, serverTimingHeader, } from '@atproto/xrpc-server' import { AppContext } from '../../../../context.js' import { Code, getServiceEndpoint, isDataplaneError, unpackIdentityServices, } from '../../../../data-plane/index.js' import { FeedItem } from '../../../../hydration/feed.js' import { HydrateCtx } from '../../../../hydration/hydrator.js' import { app } from '../../../../lexicons/index.js' import { HydrationFnInput, PresentationFnInput, RulesFnInput, SkeletonFnInput, createPipeline, } from '../../../../pipeline.js' import { GetIdentityByDidResponse } from '../../../../proto/bsky_pb.js' import { BSKY_USER_AGENT, resHeaders } from '../../../util.js' export default function (server: Server, ctx: AppContext) { const getFeed = createPipeline( skeleton, hydration, noBlocksOrMutes, presentation, ) server.add(app.bsky.feed.getFeed, { auth: ctx.authVerifier.standardOptionalParameterized({ lxmCheck: (method) => { return ( method === app.bsky.feed.getFeedSkeleton.$lxm || method === app.bsky.feed.getFeed.$lxm ) }, skipAudCheck: true, }), handler: async ({ params, auth, req }) => { const viewer = auth.credentials.iss const labelers = ctx.reqLabelers(req) const hydrateCtx = await ctx.hydrator.createContext({ labelers, viewer }) const headers = noUndefinedVals({ 'user-agent': BSKY_USER_AGENT, authorization: req.headers['authorization'], 'accept-language': req.headers['accept-language'], 'x-bsky-topics': Array.isArray(req.headers['x-bsky-topics']) ? req.headers['x-bsky-topics'].join(',') : req.headers['x-bsky-topics'], }) // @NOTE feed cursors should not be affected by appview swap const { timerSkele, timerHydr, resHeaders: feedResHeaders, ...result } = await getFeed({ ...params, hydrateCtx, headers }, ctx) return { encoding: 'application/json', body: result, headers: { ...feedResHeaders, ...resHeaders({ labelers: hydrateCtx.labelers }), 'server-timing': serverTimingHeader([timerSkele, timerHydr]), }, } }, }) } const skeleton = async ( inputs: SkeletonFnInput, ): Promise => { const { ctx, params } = inputs const timerSkele = new ServerTimer('skele').start() const { feedItems: algoItems, reqId, cursor, resHeaders, ...passthrough } = await skeletonFromFeedGen(ctx, params) return { cursor, items: algoItems, reqId, timerSkele: timerSkele.stop(), timerHydr: new ServerTimer('hydr').start(), resHeaders, passthrough, } } const hydration = async ( inputs: HydrationFnInput, ) => { const { ctx, params, skeleton } = inputs const timerHydr = new ServerTimer('hydr').start() const hydration = await ctx.hydrator.hydrateFeedItems( skeleton.items, params.hydrateCtx, ) skeleton.timerHydr = timerHydr.stop() return hydration } const noBlocksOrMutes = (inputs: RulesFnInput) => { const { ctx, skeleton, hydration } = inputs skeleton.items = skeleton.items.filter((item) => { const bam = ctx.views.feedItemBlocksAndMutes(item, hydration) return ( !bam.authorBlocked && !bam.authorMuted && !bam.originatorBlocked && !bam.originatorMuted && !bam.ancestorAuthorBlocked ) }) return skeleton } const presentation = ( inputs: PresentationFnInput, ) => { const { ctx, skeleton, hydration } = inputs const feed = mapDefined(skeleton.items, (item) => { const post = ctx.views.feedViewPost(item, hydration) if (!post) return return { ...post, feedContext: item.feedContext, } }) return { feed: feed.map((fi) => ({ ...fi, reqId: skeleton.reqId })), cursor: skeleton.cursor, timerSkele: skeleton.timerSkele, timerHydr: skeleton.timerHydr, resHeaders: skeleton.resHeaders, ...skeleton.passthrough, } } type Context = AppContext type Params = app.bsky.feed.getFeed.$Params & { hydrateCtx: HydrateCtx headers: HeadersMap } type Skeleton = { items: AlgoResponseItem[] reqId?: string passthrough: Record // pass through additional items in feedgen response resHeaders?: HeadersMap cursor?: string timerSkele: ServerTimer timerHydr: ServerTimer } const skeletonFromFeedGen = async ( ctx: Context, params: Params, ): Promise => { const { feed, headers } = params const found = await ctx.hydrator.feed.getFeedGens([feed], true) const feedDid = found.get(feed)?.record.did if (!feedDid) { throw new InvalidRequestError('could not find feed') } let identity: GetIdentityByDidResponse try { identity = await ctx.dataplane.getIdentityByDid({ did: feedDid }) } catch (err) { if (isDataplaneError(err, Code.NotFound)) { throw new InvalidRequestError(`could not resolve identity: ${feedDid}`) } throw err } const services = unpackIdentityServices(identity.services) const fgEndpoint = getServiceEndpoint(services, { id: 'bsky_fg', type: 'BskyFeedGenerator', }) if (!fgEndpoint) { throw new InvalidRequestError( `invalid feed generator service details in did document: ${feedDid}`, ) } // @TODO currently passthrough auth headers from pds const result = await xrpcSafe(fgEndpoint, app.bsky.feed.getFeedSkeleton, { strictResponseProcessing: false, headers, params: { feed: params.feed, // The feedgen is not guaranteed to honor the limit, but we try it. limit: params.limit, cursor: params.cursor, }, }) if (!result.success) { const cause = result.reason // Pass through structurally valid XRPC error response (4xx/5xx), such as // auth errors if (cause instanceof XrpcResponseError) { const { status, body } = cause.toDownstreamError() throw new XRPCError(status, body.message, body.error, { cause }) } // The response does not match the schema if (cause instanceof XrpcInvalidResponseError) { throw new UpstreamFailureError( 'feed provided an invalid response', 'InvalidFeedResponse', { cause }, ) } // Typically a network error. throw new UpstreamFailureError('feed unavailable', undefined, { cause }) } const { feed: feedSkele, cursor, ...skele } = result.body const feedItems = feedSkele.slice(0, params.limit).map((item) => ({ post: { uri: item.post }, repost: item.reason != null && app.bsky.feed.defs.skeletonReasonRepost.$isTypeOf(item.reason) ? { uri: item.reason.repost } : undefined, feedContext: item.feedContext, })) const contentLang = result.headers.get('content-language') return { ...skele, resHeaders: contentLang ? { 'content-language': contentLang } : undefined, feedItems, // Prevents loops if the custom feed echoes the input cursor back. cursor: cursor === params.cursor ? undefined : cursor, } } export type AlgoResponse = { feedItems: AlgoResponseItem[] resHeaders?: HeadersMap cursor?: string reqId?: string } export type AlgoResponseItem = FeedItem & { feedContext?: string }