import { BaseCheckpointSaver, Checkpoint, CheckpointMetadata, CheckpointTuple } from "@langchain/langgraph"; import { RunnableConfig } from "@langchain/core/runnables"; /** * Redis session configuration interface */ export interface RedisSessionConfig { host?: string; port?: number; password?: string; database?: number; url?: string; keyPrefix?: string; ttl?: number; // Time to live in seconds maxSessions?: number; enableCompression?: boolean; } /** * Session metadata interface */ export interface SessionMetadata { id: string; userId?: string; createdAt: Date; lastAccessed: Date; expiresAt?: Date; metadata?: Record; } /** * Redis-based checkpoint saver for LangGraph sessions * Supports multiple sessions, TTL, compression, and advanced session management */ export class RedisSessionManager extends BaseCheckpointSaver { private redis: any; private config: RedisSessionConfig; private keyPrefix: string; // Map> private activeSessions: Map> = new Map(); constructor(config: RedisSessionConfig) { super(); this.config = { host: 'localhost', port: 6379, database: 0, keyPrefix: 'oobe:session', ttl: 86400, // 24 hours default maxSessions: 100, enableCompression: false, ...config }; this.keyPrefix = this.config.keyPrefix!; } /** * Initialize Redis connection */ async initialize(): Promise { try { // Dynamic import for Redis (optional dependency) let Redis: any; try { // Use dynamic import to avoid compilation errors when ioredis is not installed const ioredisModule = await eval('import("ioredis")'); Redis = ioredisModule.default; } catch { throw new Error('ioredis package not found. Install it with: npm install ioredis @types/ioredis'); } if (this.config.url) { this.redis = new Redis(this.config.url); } else { this.redis = new Redis({ host: this.config.host, port: this.config.port, password: this.config.password, db: this.config.database, }); } // Test connection await this.redis.ping(); console.log('โœ… Redis session manager connected successfully'); // Load existing sessions await this.loadActiveSessions(); } catch (error) { console.error('โŒ Failed to initialize Redis session manager:', error); const message = error instanceof Error ? error.message : String(error); throw new Error(`Redis connection failed: ${message}`); } } /** * Create a new session */ async createSession(agentId: string, userId?: string, metadata?: Record): Promise { const sessionId = this.generateSessionId(); const sessionMeta: SessionMetadata = { id: sessionId, createdAt: new Date(), lastAccessed: new Date(), }; if (userId !== undefined) { sessionMeta.userId = userId; } if (metadata !== undefined) { sessionMeta.metadata = metadata; } if (this.config.ttl) { sessionMeta.expiresAt = new Date(Date.now() + this.config.ttl * 1000); } // Check session limit for this agent const agentSessions = this.activeSessions.get(agentId) || new Map(); if (agentSessions.size >= this.config.maxSessions!) { await this.cleanupOldestSession(agentId); } // Store session metadata await this.redis.hset( `${this.keyPrefix}:${agentId}:meta:${sessionId}`, 'data', JSON.stringify(sessionMeta) ); // Set TTL if configured if (this.config.ttl) { await this.redis.expire(`${this.keyPrefix}:${agentId}:meta:${sessionId}`, this.config.ttl); } agentSessions.set(sessionId, sessionMeta); this.activeSessions.set(agentId, agentSessions); console.log(`๐Ÿ“ Created new session: ${sessionId} for agent: ${agentId}${userId ? ` user: ${userId}` : ''}`); return sessionId; } /** * Get session metadata */ async getSession(agentId: string, sessionId: string): Promise { try { const data = await this.redis.hget(`${this.keyPrefix}:${agentId}:meta:${sessionId}`, 'data'); if (!data) return null; const sessionMeta = JSON.parse(data) as SessionMetadata; // Update last accessed sessionMeta.lastAccessed = new Date(); await this.updateSessionMetadata(agentId, sessionId, sessionMeta); return sessionMeta; } catch (error) { console.error(`Error getting session ${sessionId} for agent ${agentId}:`, error); return null; } } /** * List all active sessions */ async listSessions(agentId: string, userId?: string): Promise { const sessions: SessionMetadata[] = []; const agentSessions = this.activeSessions.get(agentId); if (!agentSessions) return sessions; for (const [sessionId, sessionMeta] of agentSessions) { if (!userId || sessionMeta.userId === userId) { sessions.push(sessionMeta); } } return sessions.sort((a, b) => b.lastAccessed.getTime() - a.lastAccessed.getTime()); } /** * Delete a session */ async deleteSession(agentId: string, sessionId: string): Promise { try { // Delete all checkpoint data for this session const pattern = `${this.keyPrefix}:${agentId}:${sessionId}:*`; const keys = await this.redis.keys(pattern); if (keys.length > 0) { await this.redis.del(...keys); } // Delete session metadata await this.redis.del(`${this.keyPrefix}:${agentId}:meta:${sessionId}`); // Remove from activeSessions const agentSessions = this.activeSessions.get(agentId); if (agentSessions) { agentSessions.delete(sessionId); if (agentSessions.size === 0) { this.activeSessions.delete(agentId); } } console.log(`๐Ÿ—‘๏ธ Deleted session: ${sessionId} for agent: ${agentId}`); return true; } catch (error) { console.error(`Error deleting session ${sessionId} for agent ${agentId}:`, error); return false; } } /** * Clean up expired sessions */ async cleanupExpiredSessions(agentId: string): Promise { const now = new Date(); let cleanedCount = 0; const agentSessions = this.activeSessions.get(agentId); if (!agentSessions) return 0; for (const [sessionId, sessionMeta] of agentSessions) { if (sessionMeta.expiresAt && sessionMeta.expiresAt < now) { await this.deleteSession(agentId, sessionId); cleanedCount++; } } if (cleanedCount > 0) { console.log(`๐Ÿงน Cleaned up ${cleanedCount} expired sessions for agent ${agentId}`); } return cleanedCount; } /** * Get session statistics */ async getSessionStats(agentId: string): Promise<{ totalSessions: number; sessionsByUser: Record; averageSessionAge: number; oldestSession?: SessionMetadata; newestSession?: SessionMetadata; }> { const agentSessions = this.activeSessions.get(agentId); if (!agentSessions) { return { totalSessions: 0, sessionsByUser: {}, averageSessionAge: 0, }; } const sessions = Array.from(agentSessions.values()); const now = new Date(); const sessionsByUser: Record = {}; let totalAge = 0; let oldestSession: SessionMetadata | undefined; let newestSession: SessionMetadata | undefined; for (const session of sessions) { const userId = session.userId || 'anonymous'; sessionsByUser[userId] = (sessionsByUser[userId] || 0) + 1; const age = now.getTime() - session.createdAt.getTime(); totalAge += age; if (!oldestSession || session.createdAt < oldestSession.createdAt) { oldestSession = session; } if (!newestSession || session.createdAt > newestSession.createdAt) { newestSession = session; } } const result: { totalSessions: number; sessionsByUser: Record; averageSessionAge: number; oldestSession?: SessionMetadata; newestSession?: SessionMetadata; } = { totalSessions: sessions.length, sessionsByUser, averageSessionAge: sessions.length > 0 ? totalAge / sessions.length : 0, }; if (oldestSession) { result.oldestSession = oldestSession; } if (newestSession) { result.newestSession = newestSession; } return result; } // BaseCheckpointSaver implementation async getTuple(config: RunnableConfig): Promise { const agentId = config.configurable?.agent_id; const sessionId = this.getSessionIdFromConfig(config); if (!sessionId || !agentId) return undefined; try { const key = `${this.keyPrefix}:${agentId}:${sessionId}:checkpoint`; const data = await this.redis.hget(key, 'data'); if (!data) return undefined; const parsed = JSON.parse(data); return { config, checkpoint: parsed.checkpoint, metadata: parsed.metadata, }; } catch (error) { console.error(`Error getting checkpoint for session ${sessionId} (agent ${agentId}):`, error); return undefined; } } async *list(config: RunnableConfig): AsyncGenerator { const agentId = config.configurable?.agent_id; const sessionId = this.getSessionIdFromConfig(config); if (!sessionId || !agentId) return; try { const pattern = `${this.keyPrefix}:${agentId}:${sessionId}:checkpoint:*`; const keys = await this.redis.keys(pattern); for (const key of keys) { const data = await this.redis.hget(key, 'data'); if (data) { const parsed = JSON.parse(data); yield { config, checkpoint: parsed.checkpoint, metadata: parsed.metadata, }; } } } catch (error) { console.error(`Error listing checkpoints for session ${sessionId} (agent ${agentId}):`, error); } } async put(config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata): Promise { const agentId = config.configurable?.agent_id; const sessionId = this.getSessionIdFromConfig(config); if (!sessionId || !agentId) { throw new Error('Session ID or agent ID not found in config'); } try { const key = `${this.keyPrefix}:${agentId}:${sessionId}:checkpoint`; const data = JSON.stringify({ checkpoint, metadata, timestamp: new Date().toISOString() }); await this.redis.hset(key, 'data', data); // Set TTL if configured if (this.config.ttl) { await this.redis.expire(key, this.config.ttl); } // Update session last accessed await this.updateSessionLastAccessed(agentId, sessionId); return config; } catch (error) { console.error(`Error saving checkpoint for session ${sessionId} (agent ${agentId}):`, error); throw error; } } async putWrites(config: RunnableConfig, writes: any[], taskId: string): Promise { const agentId = config.configurable?.agent_id; const sessionId = this.getSessionIdFromConfig(config); if (!sessionId || !agentId) { throw new Error('Session ID or agent ID not found in config'); } try { const key = `${this.keyPrefix}:${agentId}:${sessionId}:writes:${taskId}`; const data = JSON.stringify({ writes, taskId, timestamp: new Date().toISOString() }); await this.redis.hset(key, 'data', data); // Set TTL if configured if (this.config.ttl) { await this.redis.expire(key, this.config.ttl); } // Update session last accessed await this.updateSessionLastAccessed(agentId, sessionId); } catch (error) { console.error(`Error saving writes for session ${sessionId} (agent ${agentId}):`, error); throw error; } } // Private helper methods private generateSessionId(): string { return `session_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; } private getSessionIdFromConfig(config: RunnableConfig): string | undefined { return config.configurable?.thread_id || config.configurable?.session_id; } private async loadActiveSessions(): Promise { try { const pattern = `${this.keyPrefix}:*:` + 'meta:*'; const keys = await this.redis.keys(pattern); for (const key of keys) { // key format: oobe:session::meta: const match = key.match(/(.+?):(.+?):meta:(.+)/); if (!match) continue; const agentId = match[2]; const sessionId = match[3]; const data = await this.redis.hget(key, 'data'); if (data) { const sessionMeta = JSON.parse(data) as SessionMetadata; let agentSessions = this.activeSessions.get(agentId); if (!agentSessions) { agentSessions = new Map(); this.activeSessions.set(agentId, agentSessions); } agentSessions.set(sessionId, sessionMeta); } } let total = 0; for (const agentSessions of this.activeSessions.values()) { total += agentSessions.size; } console.log(`๐Ÿ“š Loaded ${total} active sessions (scoped by agent)`); } catch (error) { console.error('Error loading active sessions:', error); } } private async updateSessionMetadata(agentId: string, sessionId: string, sessionMeta: SessionMetadata): Promise { await this.redis.hset( `${this.keyPrefix}:${agentId}:meta:${sessionId}`, 'data', JSON.stringify(sessionMeta) ); let agentSessions = this.activeSessions.get(agentId); if (!agentSessions) { agentSessions = new Map(); this.activeSessions.set(agentId, agentSessions); } agentSessions.set(sessionId, sessionMeta); } private async updateSessionLastAccessed(agentId: string, sessionId: string): Promise { const agentSessions = this.activeSessions.get(agentId); if (agentSessions) { const sessionMeta = agentSessions.get(sessionId); if (sessionMeta) { sessionMeta.lastAccessed = new Date(); await this.updateSessionMetadata(agentId, sessionId, sessionMeta); } } } private async cleanupOldestSession(agentId: string): Promise { const agentSessions = this.activeSessions.get(agentId); if (!agentSessions) return; let oldestSessionId: string | undefined; let oldestTime = Date.now(); for (const [sessionId, sessionMeta] of agentSessions) { if (sessionMeta.lastAccessed.getTime() < oldestTime) { oldestTime = sessionMeta.lastAccessed.getTime(); oldestSessionId = sessionId; } } if (oldestSessionId) { await this.deleteSession(agentId, oldestSessionId); } } /** * Disconnect from Redis */ async disconnect(): Promise { if (this.redis) { await this.redis.quit(); console.log('๐Ÿ‘‹ Redis session manager disconnected'); } } }