import Redis, { Cluster, ClusterNode, ClusterOptions, SentinelConnectionOptions, } from "ioredis"; import { v4 as uuidv4 } from "uuid"; import logger, { truncatePayloadForLogging } from "../logger"; import { eventStreamManager } from "../eventStreamManager"; import { Context } from "../../types"; import { registrar } from "../registrar"; import { MemoryCache } from "./MemoryCache"; import { CacheEntry, CacheSettings } from "./index"; import { CacheRefreshStrategy } from "../../types"; export class RedisCache { private client: Redis | Cluster | undefined; private clientUUID: string = uuidv4(); private readonly publishPayloadToChannel: boolean; private subscriberClient: Redis | Cluster | undefined; private readonly memoryCacheClient: MemoryCache | undefined; private readonly connectionUrl: string | undefined; private readonly staleTTL: number; private readonly expiresTTL: number | "never"; public readonly allowStale: boolean; public readonly cacheRefreshStrategy: CacheRefreshStrategy; private readonly useCluster: boolean; private readonly clusterRootNodesJSON?: ClusterNode[]; private readonly clusterOptions?: ClusterOptions; private readonly useSentinel: boolean; private readonly sentinelConnectionOptions?: SentinelConnectionOptions; private readonly appContext?: Context; public constructor( { staleTTL = 60, // 1 minute expiresTTL = 10 * 60, // 10 minutes allowStale = true, cacheRefreshStrategy, connectionUrl, useAdditionalMemoryCache, publishPayloadToChannel = false, useCluster = false, clusterRootNodesJSON, clusterOptionsJSON, useSentinel = false, sentinelConnectionOptionsJSON, }: CacheSettings = {}, appContext?: Context, ) { this.connectionUrl = connectionUrl; this.staleTTL = staleTTL * 1000; this.expiresTTL = expiresTTL === "never" ? "never" : expiresTTL * 1000; this.allowStale = allowStale; this.cacheRefreshStrategy = cacheRefreshStrategy!; this.publishPayloadToChannel = publishPayloadToChannel; this.useCluster = useCluster; this.clusterRootNodesJSON = clusterRootNodesJSON; this.clusterOptions = clusterOptionsJSON; this.useSentinel = useSentinel; this.sentinelConnectionOptions = sentinelConnectionOptionsJSON; this.appContext = appContext; // wrap the RedisCache in a MemoryCache to avoid hitting Redis on every request if (useAdditionalMemoryCache) { this.memoryCacheClient = new MemoryCache({ expiresTTL: 1, // 1 second, allowStale: false, cacheRefreshStrategy: this.cacheRefreshStrategy, }); } } public async connect() { if (this.useCluster) { if (this.clusterRootNodesJSON) { this.client = new Redis.Cluster( this.clusterRootNodesJSON, this.clusterOptions, ); } else { throw new Error("No cluster root nodes"); } } else if (this.useSentinel && this.sentinelConnectionOptions) { this.client = new Redis(this.sentinelConnectionOptions); } else { this.client = this.connectionUrl ? new Redis(this.connectionUrl) : new Redis(); } await this.subscribe(); } public async get(key: string): Promise { if (!this.client) { throw new Error("No redis client"); } let entry = undefined; // try fetching from MemoryCache first if (this.memoryCacheClient) { const memoryCacheEntry = await this.memoryCacheClient.get(key); if (memoryCacheEntry && (!memoryCacheEntry.expiresOn || memoryCacheEntry.expiresOn > new Date())) { entry = memoryCacheEntry.payload as CacheEntry; } } // if cache miss from MemoryCache, fetch from Redis if (!entry) { const entryRaw = await this.client.get(key); if (!entryRaw) { return undefined; } try { entry = JSON.parse(entryRaw); } catch (e) { logger.error({ err: e }, "unable to parse cache json"); return undefined; } } if (!entry) { return undefined; } entry.staleOn = new Date(entry.staleOn); if (entry.expiresOn) { entry.expiresOn = new Date(entry.expiresOn); } // With "none" strategy, never eject based on staleness or expiration if (this.cacheRefreshStrategy === "none") { // refresh MemoryCache if (this.memoryCacheClient) { await this.memoryCacheClient.set(key, entry); } return entry; } // With "stale-while-revalidate" strategy, allowStale controls whether we return stale but not-yet-expired entries if (this.cacheRefreshStrategy === "stale-while-revalidate" && !this.allowStale && entry.staleOn < new Date()) { return undefined; } if (entry.expiresOn && entry.expiresOn < new Date()) { return undefined; } // refresh MemoryCache if (this.memoryCacheClient) { await this.memoryCacheClient.set(key, entry); } return entry; } public async set(key: string, payload: unknown) { if (!this.client) { throw new Error("No redis client"); } const oldEntry = await this.get(key); const entry: CacheEntry = { payload, staleOn: new Date(Date.now() + this.staleTTL), expiresOn: this.expiresTTL === "never" ? undefined : new Date(Date.now() + this.expiresTTL), }; if (this.expiresTTL === "never") { await this.client.set(key, JSON.stringify(entry)); } else { await this.client.set( key, JSON.stringify(entry), "EX", this.expiresTTL / 1000, ); } // refresh MemoryCache if (this.memoryCacheClient) { await this.memoryCacheClient.set(key, entry); } // Publish with Redis pub/sub so that other proxy nodes can // 1. emit SSE to SDK subscribers // 2. update their MemoryCache if (this.publishPayloadToChannel) { // publish to Redis subscribers if new payload !== old payload const hasChanges = JSON.stringify(oldEntry?.payload) !== JSON.stringify(payload); if (hasChanges) { if (this.appContext?.verboseDebugging) { logger.info( { payload: truncatePayloadForLogging(payload) }, "RedisCache.set: publish to Redis subscribers", ); } this.client.publish( "set", JSON.stringify({ uuid: this.clientUUID, key, payload, }), ); return; } if (this.appContext?.verboseDebugging) { logger.info( { payload: truncatePayloadForLogging(payload), oldPayload: truncatePayloadForLogging(oldEntry?.payload) }, "RedisCache.set: do not publish to Redis subscribers (no changes)", ); } } } private async subscribe() { if (!this.publishPayloadToChannel) return; this.appContext?.verboseDebugging && logger.info("RedisCache.subscribe"); if (!this.client) { throw new Error("No redis client"); } // Redis requires that subscribers use a separate client this.subscriberClient = this.client.duplicate(); // Subscribe to Redis pub/sub so that this proxy node can: // 1. emit SSE to SDK subscribers // 2. update its MemoryCache this.subscriberClient.subscribe("set", (err) => { if (err) { logger.error({ err }, "RedisCache.subscribe: error subscribing to 'set'"); } else { this.appContext?.verboseDebugging && logger.info("RedisCache.subscribe: subscribed to 'set' channel"); } }); this.subscriberClient.on( "message", async (channel: string, message: string) => { if (channel === "set") { try { const { uuid, key, payload } = JSON.parse(message); // ignore messages published from this node (shouldn't subscribe to ourselves) if (uuid === this.clientUUID) return; this.appContext?.verboseDebugging && logger.info( { payload: truncatePayloadForLogging(payload) }, "RedisCache.subscribe: got 'set' message", ); // 1. emit SSE to SDK clients if (this.appContext?.enableEventStream && eventStreamManager) { this.appContext?.verboseDebugging && logger.info({ payload: truncatePayloadForLogging(payload) }, "RedisCache.subscribe: publish SSE"); const remoteEvalEnabled = !!registrar.getConnection(key)?.remoteEvalEnabled; eventStreamManager.publish({ apiKey: key, event: remoteEvalEnabled ? "features-updated" : "features", payload, }); } // 2. update MemoryCache if (this.memoryCacheClient) { const entry: CacheEntry = { payload, staleOn: new Date(Date.now() + this.staleTTL), expiresOn: this.expiresTTL === "never" ? undefined : new Date(Date.now() + this.expiresTTL), }; await this.memoryCacheClient.set(key, entry); } } catch (e) { logger.error({ err: e }, "Error parsing message from Redis pub/sub"); } } }, ); } public getClient() { return this.client; } public getsubscriberClient() { return this.subscriberClient; } public async getStatus() { return this.client?.status; } }