import { z } from "zod/v4"; import { throwIfNoProjectAccess } from "@/src/features/rbac/utils/checkProjectAccess"; import { auditLog } from "@/src/features/audit-logs/auditLog"; import { composeAggregateScoreKey } from "@/src/features/scores/lib/aggregateScores"; import { getDateFromOption, SelectedTimeOptionSchema, } from "@/src/utils/date-range-utils"; import { createTRPCRouter, protectedProjectProcedure, } from "@/src/server/api/trpc"; import { orderBy, paginationZod, singleFilter, timeFilter, UpdateAnnotationScoreData, validateDbScore, ScoreSource, LangfuseNotFoundError, InternalServerError, BatchActionQuerySchema, BatchActionType, BatchExportTableName, type ScoreDomain, CreateAnnotationScoreData, } from "@langfuse/shared"; import { getScoresGroupedByNameSourceType, getScoresUiCount, getScoresUiTable, getScoreNames, getTracesGroupedByTags, upsertScore, logger, getTraceById, getScoreById, convertDateToClickhouseDateTime, searchExistingAnnotationScore, hasAnyScore, ScoreDeleteQueue, QueueJobs, getScoreMetadataById, deleteScores, getTracesIdentifierForSession, } from "@langfuse/shared/src/server"; import { v4 } from "uuid"; import { throwIfNoEntitlement } from "@/src/features/entitlements/server/hasEntitlement"; import { createBatchActionJob } from "@/src/features/table/server/createBatchActionJob"; import { TRPCError } from "@trpc/server"; import { randomUUID } from "crypto"; import { isTraceScore } from "@/src/features/scores/lib/helpers"; const ScoreFilterOptions = z.object({ projectId: z.string(), // Required for protectedProjectProcedure filter: z.array(singleFilter), orderBy: orderBy, }); const ScoreAllOptions = ScoreFilterOptions.extend({ ...paginationZod, }); type AllScoresReturnType = Omit & { traceName: string | null; traceUserId: string | null; traceTags: Array | null; jobConfigurationId: string | null; authorUserImage: string | null; authorUserName: string | null; hasMetadata: boolean; }; export const scoresRouter = createTRPCRouter({ /** * Get all scores for a project, meant for internal use and *excludes metadata of scores* */ all: protectedProjectProcedure .input(ScoreAllOptions) .query(async ({ input, ctx }) => { const clickhouseScoreData = await getScoresUiTable({ projectId: input.projectId, filter: input.filter ?? [], orderBy: input.orderBy, limit: input.limit, offset: input.page * input.limit, excludeMetadata: true, includeHasMetadataFlag: true, }); const [jobExecutions, users] = await Promise.all([ ctx.prisma.jobExecution.findMany({ where: { projectId: input.projectId, jobOutputScoreId: { in: clickhouseScoreData.map((score) => score.id), }, }, select: { id: true, jobConfigurationId: true, jobOutputScoreId: true, }, }), ctx.prisma.user.findMany({ where: { id: { in: clickhouseScoreData .map((score) => score.authorUserId) .filter((s): s is string => Boolean(s)), }, }, select: { id: true, name: true, image: true, }, }), ]); return { scores: clickhouseScoreData.map((score) => { const jobExecution = jobExecutions.find( (je) => je.jobOutputScoreId === score.id, ); const user = users.find((u) => u.id === score.authorUserId); return { ...score, jobConfigurationId: jobExecution?.jobConfigurationId ?? null, authorUserImage: user?.image ?? null, authorUserName: user?.name ?? null, }; }), }; }), byId: protectedProjectProcedure .input( z.object({ scoreId: z.string(), // used for matching projectId: z.string(), // used for security check }), ) .query(async ({ input }) => { const score = await getScoreById({ projectId: input.projectId, scoreId: input.scoreId, }); if (!score) { throw new TRPCError({ code: "NOT_FOUND", message: `No score with id ${input.scoreId} in project ${input.projectId} in Clickhouse`, }); } return { ...score, metadata: score.metadata ? JSON.stringify(score.metadata) : null, }; }), countAll: protectedProjectProcedure .input(ScoreAllOptions) .query(async ({ input }) => { const clickhouseScoreData = await getScoresUiCount({ projectId: input.projectId, filter: input.filter ?? [], orderBy: input.orderBy, limit: 1, offset: 0, }); return { totalCount: clickhouseScoreData, }; }), filterOptions: protectedProjectProcedure .input( z.object({ projectId: z.string(), timestampFilter: timeFilter.optional(), }), ) .query(async ({ input }) => { const { timestampFilter } = input; const [names, tags] = await Promise.all([ getScoreNames( input.projectId, timestampFilter ? [timestampFilter] : [], ), getTracesGroupedByTags({ projectId: input.projectId, filter: timestampFilter ? [timestampFilter] : [], }), ]); return { name: names.map((i) => ({ value: i.name, count: i.count })), tags: tags, }; }), deleteMany: protectedProjectProcedure .input( z.object({ scoreIds: z .array(z.string()) .min(1, "Minimum 1 scoreId is required.") .nullable(), projectId: z.string(), query: BatchActionQuerySchema.optional(), isBatchAction: z.boolean().default(false), }), ) .mutation(async ({ input, ctx }) => { // We reuse the trace-deletion entitlement here as this is a very similar and destructive operation. throwIfNoProjectAccess({ session: ctx.session, projectId: input.projectId, scope: "traces:delete", }); throwIfNoEntitlement({ entitlement: "trace-deletion", projectId: input.projectId, sessionUser: ctx.session.user, }); if (input.isBatchAction && input.query) { return createBatchActionJob({ projectId: input.projectId, actionId: "score-delete", actionType: BatchActionType.Delete, tableName: BatchExportTableName.Scores, session: ctx.session, query: input.query, }); } if (input.scoreIds) { const scoreDeleteQueue = ScoreDeleteQueue.getInstance(); if (!scoreDeleteQueue) { throw new TRPCError({ code: "INTERNAL_SERVER_ERROR", message: "ScoreDeleteQueue not initialized", }); } await Promise.all( input.scoreIds.map((scoreId) => auditLog({ resourceType: "score", resourceId: scoreId, action: "delete", session: ctx.session, }), ), ); return scoreDeleteQueue.add(QueueJobs.ScoreDelete, { timestamp: new Date(), id: randomUUID(), payload: { projectId: input.projectId, scoreIds: input.scoreIds, }, name: QueueJobs.ScoreDelete, }); } throw new TRPCError({ message: "Either batchAction or scoreIds must be provided to delete scores.", code: "BAD_REQUEST", }); }), createAnnotationScore: protectedProjectProcedure .input(CreateAnnotationScoreData) .mutation(async ({ input, ctx }) => { throwIfNoProjectAccess({ session: ctx.session, projectId: input.projectId, scope: "scores:CUD", }); const inflatedParams = isTraceScore(input.scoreTarget) ? { observationId: input.scoreTarget.observationId ?? null, traceId: input.scoreTarget.traceId, sessionId: null, } : { observationId: null, traceId: null, sessionId: input.scoreTarget.sessionId, }; if (inflatedParams.traceId) { const clickhouseTrace = await getTraceById({ traceId: inflatedParams.traceId, projectId: input.projectId, clickhouseFeatureTag: "annotations-trpc", }); if (!clickhouseTrace) { logger.error( `No trace with id ${inflatedParams.traceId} in project ${input.projectId} in Clickhouse`, ); throw new LangfuseNotFoundError( `No trace with id ${inflatedParams.traceId} in project ${input.projectId} in Clickhouse`, ); } } else if (inflatedParams.sessionId) { // We consider no longer writing all sessions into postgres, hence we should search for traces with the session id const traceIdentifiers = await getTracesIdentifierForSession( input.projectId, inflatedParams.sessionId, ); if (traceIdentifiers.length === 0) { logger.error( `No trace referencing session with id ${inflatedParams.sessionId} in project ${input.projectId} in Clickhouse`, ); throw new LangfuseNotFoundError( `No trace referencing session with id ${inflatedParams.sessionId} in project ${input.projectId} in Clickhouse`, ); } } const clickhouseScore = await searchExistingAnnotationScore( input.projectId, inflatedParams.observationId, inflatedParams.traceId, inflatedParams.sessionId, input.name, input.configId, input.dataType, ); const score = !!clickhouseScore ? { ...clickhouseScore, value: input.value ?? null, stringValue: input.stringValue ?? null, comment: input.comment ?? null, metadata: {}, authorUserId: ctx.session.user.id, queueId: input.queueId ?? null, timestamp: new Date(), } : { id: v4(), projectId: input.projectId, environment: input.environment ?? "default", ...inflatedParams, // only trace and session scores are supported for annotation datasetRunId: null, value: input.value ?? null, stringValue: input.stringValue ?? null, dataType: input.dataType ?? null, configId: input.configId ?? null, name: input.name, comment: input.comment ?? null, metadata: {}, authorUserId: ctx.session.user.id, source: ScoreSource.ANNOTATION, queueId: input.queueId ?? null, createdAt: new Date(), updatedAt: new Date(), timestamp: new Date(), }; await upsertScore({ id: score.id, // Reuse ID that was generated by Prisma timestamp: convertDateToClickhouseDateTime(new Date()), project_id: input.projectId, environment: input.environment ?? "default", trace_id: inflatedParams.traceId, observation_id: inflatedParams.observationId, session_id: inflatedParams.sessionId, name: input.name, value: input.value !== null ? input.value : undefined, source: ScoreSource.ANNOTATION, comment: input.comment, author_user_id: ctx.session.user.id, config_id: input.configId, data_type: input.dataType, string_value: input.stringValue, queue_id: input.queueId, }); await auditLog({ session: ctx.session, resourceType: "score", resourceId: score.id, action: "create", after: score, }); return validateDbScore(score); }), updateAnnotationScore: protectedProjectProcedure .input(UpdateAnnotationScoreData) .mutation(async ({ input, ctx }) => { throwIfNoProjectAccess({ session: ctx.session, projectId: input.projectId, scope: "scores:CUD", }); let updatedScore: ScoreDomain | null | undefined = null; // Fetch the current score from Clickhouse const score = await getScoreById({ projectId: input.projectId, scoreId: input.id, source: ScoreSource.ANNOTATION, }); if (!score) { logger.warn( `No annotation score with id ${input.id} in project ${input.projectId} in Clickhouse`, ); throw new LangfuseNotFoundError( `No annotation score with id ${input.id} in project ${input.projectId} in Clickhouse`, ); } else { await upsertScore({ id: input.id, project_id: input.projectId, timestamp: convertDateToClickhouseDateTime(score.timestamp), value: input.value !== null ? input.value : undefined, string_value: input.stringValue, comment: input.comment, author_user_id: ctx.session.user.id, queue_id: input.queueId, source: ScoreSource.ANNOTATION, name: score.name, data_type: score.dataType, config_id: score.configId, trace_id: score.traceId, observation_id: score.observationId, session_id: score.sessionId, environment: score.environment, }); updatedScore = { ...score, value: input.value ?? null, stringValue: input.stringValue ?? null, comment: input.comment ?? null, authorUserId: ctx.session.user.id, queueId: input.queueId ?? null, timestamp: new Date(), }; await auditLog({ session: ctx.session, resourceType: "score", resourceId: input.id, action: "update", before: score, after: updatedScore, }); } if (!updatedScore) { throw new InternalServerError( `Annotation score could not be updated in project ${input.projectId}`, ); } return validateDbScore(updatedScore); }), deleteAnnotationScore: protectedProjectProcedure .input(z.object({ projectId: z.string(), id: z.string() })) .mutation(async ({ input, ctx }) => { throwIfNoProjectAccess({ session: ctx.session, projectId: input.projectId, scope: "scores:CUD", }); // Fetch the current score from Clickhouse const clickhouseScore = await getScoreById({ projectId: input.projectId, scoreId: input.id, source: ScoreSource.ANNOTATION, }); if (!clickhouseScore) { logger.warn( `No annotation score with id ${input.id} in project ${input.projectId} in Clickhouse`, ); throw new LangfuseNotFoundError( `No annotation score with id ${input.id} in project ${input.projectId} in Clickhouse`, ); } await auditLog({ session: ctx.session, resourceType: "score", resourceId: input.id, action: "delete", before: clickhouseScore, }); await deleteScores(input.projectId, [clickhouseScore.id]); return validateDbScore(clickhouseScore); }), /** * @deprecated, use getScoreColumns instead */ getScoreKeysAndProps: protectedProjectProcedure .input( z.object({ projectId: z.string(), selectedTimeOption: SelectedTimeOptionSchema, }), ) .query(async ({ input }) => { const date = getDateFromOption(input.selectedTimeOption); const res = await getScoresGroupedByNameSourceType({ projectId: input.projectId, fromTimestamp: date, filter: [], }); return res.map(({ name, source, dataType }) => ({ key: composeAggregateScoreKey({ name, source, dataType }), name: name, source: source, dataType: dataType, })); }), getScoreColumns: protectedProjectProcedure .input( z.object({ projectId: z.string(), filter: z.array(singleFilter).optional(), fromTimestamp: z.date().optional(), toTimestamp: z.date().optional(), }), ) .query(async ({ input }) => { const { projectId, filter, fromTimestamp, toTimestamp } = input; const groupedScores = await getScoresGroupedByNameSourceType({ projectId, filter: filter || [], fromTimestamp, toTimestamp, }); const scoreColumns = groupedScores.map(({ name, source, dataType }) => ({ key: composeAggregateScoreKey({ name, source, dataType }), name, source, dataType, })); return { scoreColumns }; }), hasAny: protectedProjectProcedure .input( z.object({ projectId: z.string(), }), ) .query(async ({ input }) => { return await hasAnyScore(input.projectId); }), getScoreMetadataById: protectedProjectProcedure .input(z.object({ projectId: z.string(), id: z.string() })) .query(async ({ input }) => { return (await getScoreMetadataById(input.projectId, input.id)) ?? null; }), });