import { Model, Prisma } from "../../"; import { instrumentAsync, logger, recordIncrement, redis, safeMultiDel, } from "../"; import { type Cluster } from "ioredis"; import { env } from "../../env"; import { Decimal } from "decimal.js"; import { prisma } from "../../db"; export type ModelMatchProps = { projectId: string; model: string; }; const MODEL_MATCH_CACHE_LOCKED_KEY = "LOCK:model-match-clear"; export async function findModel(p: ModelMatchProps): Promise { return instrumentAsync( { name: "model-match", traceScope: "model-match", }, async (span) => { logger.debug(`Finding model for ${JSON.stringify(p)}`); const redisModel = await getModelFromRedis(p); if (redisModel) { span.setAttribute("model_match_source", "redis"); if (redisModel === NOT_FOUND_TOKEN) { return null; } else { logger.debug( `Found model name ${redisModel?.modelName} (id: ${redisModel?.id}) for project ${p.projectId} and model ${p.model}`, ); span.setAttribute("matched_model_id", redisModel.id); } return redisModel; } // try to find model in Postgres const postgresModel = await findModelInPostgres(p); if (postgresModel && env.LANGFUSE_CACHE_MODEL_MATCH_ENABLED === "true") { await addModelToRedis(p, postgresModel); span.setAttribute("matched_model_id", postgresModel.id); span.setAttribute("model_match_source", "postgres"); span.setAttribute("model_cache_set", "true"); } else if (postgresModel) { span.setAttribute("matched_model_id", postgresModel.id); span.setAttribute("model_match_source", "postgres"); span.setAttribute("model_cache_set", "false"); } else { span.setAttribute("model_match_source", "none"); if (env.LANGFUSE_CACHE_MODEL_MATCH_ENABLED === "true") { await addModelNotFoundTokenToRedis(p); span.setAttribute("model_cache_set", "true"); } } logger.debug( `Found model name ${postgresModel?.modelName} (id: ${postgresModel?.id}) for project ${p.projectId} and model ${p.model}`, ); return postgresModel; }, ); } const getModelFromRedis = async ( p: ModelMatchProps, ): Promise => { if (env.LANGFUSE_CACHE_MODEL_MATCH_ENABLED === "false") { return null; } try { if (await isModelMatchCacheLocked()) { logger.info( "Model match cache is locked. Skipping model lookup from Redis.", ); return null; } const key = getRedisModelKey(p); const redisModel = await redis?.get(key); if (redisModel) { recordIncrement("langfuse.model_match.cache_hit", 1); if (redisModel === NOT_FOUND_TOKEN) { return NOT_FOUND_TOKEN; } const model = redisModelToPrismaModel(redisModel); return model; } recordIncrement("langfuse.model_match.cache_miss", 1); return null; } catch (error) { logger.error( `Error getting model for ${JSON.stringify(p)} from Redis`, error, ); return null; } }; export async function findModelInPostgres( p: ModelMatchProps, ): Promise { const { projectId, model } = p; // either get the model from the existing observation // or match pattern on the user provided model name const modelCondition = model ? Prisma.sql`AND ${model} ~ match_pattern` : undefined; if (!modelCondition) return null; const sql = Prisma.sql` SELECT id, created_at AS "createdAt", updated_at AS "updatedAt", project_id AS "projectId", model_name AS "modelName", match_pattern AS "matchPattern", start_date AS "startDate", input_price AS "inputPrice", output_price AS "outputPrice", total_price AS "totalPrice", unit, tokenizer_id AS "tokenizerId", tokenizer_config AS "tokenizerConfig" FROM models WHERE (project_id = ${projectId} OR project_id IS NULL) ${modelCondition} ORDER BY project_id ASC, start_date DESC NULLS LAST LIMIT 1 `; const foundModels = await prisma.$queryRaw>(sql); return foundModels[0] ?? null; } const NOT_FOUND_TOKEN = "LANGFUSE_MODEL_MATCH_NOT_FOUND" as const; const addModelNotFoundTokenToRedis = async (p: ModelMatchProps) => { try { const key = getRedisModelKey(p); await redis?.set( key, NOT_FOUND_TOKEN, "EX", env.LANGFUSE_CACHE_MODEL_MATCH_TTL_SECONDS, ); } catch (error) { logger.error( `Error adding model not found token for ${JSON.stringify(p)} to Redis`, error, ); } }; const addModelToRedis = async (p: ModelMatchProps, model: Model) => { try { const key = getRedisModelKey(p); await redis?.set( key, JSON.stringify(model), "EX", env.LANGFUSE_CACHE_MODEL_MATCH_TTL_SECONDS, ); } catch (error) { logger.error(`Error adding model for ${JSON.stringify(p)} to Redis`, error); } }; export const getRedisModelKey = (p: ModelMatchProps) => { const uriEncodedModel = encodeURIComponent(p.model); return `${getModelMatchKeyPrefix()}:${p.projectId}:${uriEncodedModel}`; }; const getModelMatchKeyPrefix = () => { if (env.REDIS_CLUSTER_ENABLED === "true") { // Use hash tags for Redis cluster compatibility // This ensures all model cache keys are placed on the same hash slot return "{model-match}"; } return "model-match"; }; export const redisModelToPrismaModel = (redisModel: string): Model => { const parsed: Model = JSON.parse(redisModel); return { ...parsed, createdAt: new Date(parsed.createdAt), updatedAt: new Date(parsed.updatedAt), inputPrice: parsed.inputPrice !== null && parsed.inputPrice !== undefined ? new Decimal(parsed.inputPrice) : null, outputPrice: parsed.outputPrice !== null && parsed.outputPrice !== undefined ? new Decimal(parsed.outputPrice) : null, totalPrice: parsed.totalPrice !== null && parsed.totalPrice !== undefined ? new Decimal(parsed.totalPrice) : null, startDate: parsed.startDate !== null && parsed.startDate !== undefined ? new Date(parsed.startDate) : null, }; }; export async function clearModelCacheForProject( projectId: string, ): Promise { if (env.LANGFUSE_CACHE_MODEL_MATCH_ENABLED === "false" || !redis) { return; } try { const pattern = `${getModelMatchKeyPrefix()}:${projectId}:*`; const keys = env.REDIS_CLUSTER_ENABLED === "true" ? ( await Promise.all( (redis as Cluster) .nodes("master") .map((node) => node.keys(pattern) || []), ) ).flat() : await redis.keys(pattern); if (keys.length > 0) { await safeMultiDel(redis, keys); logger.info( `Cleared ${keys.length} model cache entries for project ${projectId}`, ); } } catch (error) { logger.error( `Error clearing model cache for project ${projectId}: ${error}`, ); } } export async function isModelMatchCacheLocked() { try { return Boolean(await redis?.exists(MODEL_MATCH_CACHE_LOCKED_KEY)); } catch (err) { logger.error("Failed to check whether model match is locked", err); return false; } } export async function clearFullModelCache() { if (env.LANGFUSE_CACHE_MODEL_MATCH_ENABLED === "false" || !redis) { return; } try { // Use lock to protect for concurrent executions // This function is called on worker startup, so we want to avoid all workers triggering this delete if (await isModelMatchCacheLocked()) { logger.info("Model cache clearing already in progress; skipping."); return; } const startTime = Date.now(); logger.info("Clearing full model cache..."); const tenMinutesInSeconds = 60 * 10; await redis.setex( MODEL_MATCH_CACHE_LOCKED_KEY, tenMinutesInSeconds, "locked", ); const pattern = getModelMatchKeyPrefix() + "*"; const keys = env.REDIS_CLUSTER_ENABLED === "true" ? ( await Promise.all( (redis as Cluster) .nodes("master") .map((node) => node.keys(pattern) || []), ) ).flat() : await redis.keys(pattern); if (keys.length > 0) { await safeMultiDel(redis, keys); logger.info( `Cleared full model cache with ${keys.length} keys in ${Date.now() - startTime}ms.`, ); } else { logger.info(`No keys found for match pattern '${pattern}'`); } } catch (error) { logger.error(`Error clearing full model cache: ${error}`); } finally { await redis?.del(MODEL_MATCH_CACHE_LOCKED_KEY); } }