import { v4 as uuidv4 } from "uuid"; import { HistoryManager } from "./base"; interface HistoryEntry { id: string; memory_id: string; previous_value: string | null; new_value: string | null; action: string; created_at: string; updated_at: string | null; is_deleted: number; user_id?: string; run_id?: string; message_id?: string; happened_at: string; } interface Entity { entity_id: string; label: string; type: string; created_at: string; updated_at: string; user_id?: string; run_id?: string; } interface MemoryEntity { memory_id: string; entity_id: string; created_at: string; } interface RawMessage { id: string; message_id: string; content: string; role: string; user_id?: string; run_id?: string; batch_info?: string; created_at: string; happened_at: string; message_index: number; } export class MemoryHistoryManager implements HistoryManager { private memoryStore: Map = new Map(); private entityStore: Map = new Map(); private memoryEntityStore: Map = new Map(); // memory_id -> entity_ids[] private rawMessageStore: Map = new Map(); // id -> RawMessage async addHistory( memoryId: string, previousValue: string | null, newValue: string | null, action: string, createdAt?: string, updatedAt?: string, isDeleted: number = 0, userId?: string, runId?: string, messageId?: string, happenedAt?: string, ): Promise { const timestamp = new Date().toISOString(); const historyEntry: HistoryEntry = { id: uuidv4(), memory_id: memoryId, previous_value: previousValue, new_value: newValue, action: action, created_at: createdAt || timestamp, updated_at: updatedAt || null, is_deleted: isDeleted, user_id: userId, run_id: runId, message_id: messageId, happened_at: happenedAt || timestamp, }; this.memoryStore.set(historyEntry.id, historyEntry); } async getHistory(memoryId: string, options?: { userId?: string, runId?: string }): Promise { return Array.from(this.memoryStore.values()) .filter((entry) => { let match = entry.memory_id === memoryId; if (match && options?.userId) { match = match && entry.user_id === options.userId; } if (match && options?.runId) { match = match && entry.run_id === options.runId; } return match; }) .sort( (a, b) => new Date(b.created_at).getTime() - new Date(a.created_at).getTime(), ) .slice(0, 100); } async getAllMessages(options?: { userId?: string, runId?: string, limit?: number }): Promise { const limit = options?.limit || 100; return Array.from(this.memoryStore.values()) .filter((entry) => { let match = true; if (options?.userId) { match = match && entry.user_id === options.userId; } if (options?.runId) { match = match && entry.run_id === options.runId; } return match; }) .sort( (a, b) => new Date(b.happened_at).getTime() - new Date(a.happened_at).getTime(), ) .slice(0, limit); } async getMessageContext( messageId: string, options: { beforeCount?: number; afterCount?: number; userId?: string; runId?: string; } ): Promise { const beforeCount = options.beforeCount || 10; const afterCount = options.afterCount || 10; // Get all messages that match userId/runId criteria const allEntries = Array.from(this.memoryStore.values()) .filter((entry) => { let match = true; if (options.userId) { match = match && entry.user_id === options.userId; } if (options.runId) { match = match && entry.run_id === options.runId; } return match; }) .sort((a, b) => new Date(a.happened_at).getTime() - new Date(b.happened_at).getTime() ); // Find the reference message const refIndex = allEntries.findIndex(msg => msg.message_id === messageId); if (refIndex === -1) { return []; // Reference message not found } // Calculate the start and end indices for the context window const startIndex = Math.max(0, refIndex - beforeCount); const endIndex = Math.min(allEntries.length - 1, refIndex + afterCount); // Extract the context messages const before = allEntries .slice(startIndex, refIndex) .map(msg => ({ ...msg, type: 'before' })); const match = { ...allEntries[refIndex], type: 'match' }; const after = allEntries .slice(refIndex + 1, endIndex + 1) .map(msg => ({ ...msg, type: 'after' })); // Combine and return all context messages return [...before, match, ...after]; } async reset(): Promise { this.memoryStore.clear(); this.entityStore.clear(); this.memoryEntityStore.clear(); this.rawMessageStore.clear(); } // Entity management methods async addEntity( entityId: string, label: string, type: string, userId?: string, runId?: string, ): Promise { const currentTime = new Date().toISOString(); const entity: Entity = { entity_id: entityId, label, type, created_at: currentTime, updated_at: currentTime, user_id: userId, run_id: runId, }; this.entityStore.set(entityId, entity); } async updateEntity( entityId: string, label: string, type: string, userId?: string, runId?: string, ): Promise { const existingEntity = this.entityStore.get(entityId); if (existingEntity) { const updatedEntity: Entity = { ...existingEntity, label, type, updated_at: new Date().toISOString(), user_id: userId, run_id: runId, }; this.entityStore.set(entityId, updatedEntity); } } async deleteEntity(entityId: string): Promise { this.entityStore.delete(entityId); // Remove entity associations from memories for (const [memoryId, entityIds] of this.memoryEntityStore.entries()) { const filteredIds = entityIds.filter(id => id !== entityId); if (filteredIds.length !== entityIds.length) { this.memoryEntityStore.set(memoryId, filteredIds); } } } async getEntity(entityId: string): Promise { return this.entityStore.get(entityId) || null; } async getAllEntities(options?: { userId?: string, runId?: string }): Promise { return Array.from(this.entityStore.values()) .filter((entity) => { let match = true; if (options?.userId) { match = match && entity.user_id === options.userId; } if (options?.runId) { match = match && entity.run_id === options.runId; } return match; }) .sort((a, b) => new Date(a.created_at).getTime() - new Date(b.created_at).getTime()); } async associateMemoryWithEntities(memoryId: string, entityIds: string[]): Promise { this.memoryEntityStore.set(memoryId, [...entityIds]); } async getMemoryEntities(memoryId: string): Promise { return this.memoryEntityStore.get(memoryId) || []; } async getMessageDetails(memoryId: string): Promise { const memoryEntries = Array.from(this.memoryStore.values()) .filter(entry => entry.memory_id === memoryId) .sort((a, b) => new Date(b.happened_at).getTime() - new Date(a.happened_at).getTime()); return memoryEntries.map(entry => ({ type: 'message_details', memoryId, messageId: entry.message_id, content: entry.new_value || entry.previous_value, action: entry.action, createdAt: entry.created_at, updatedAt: entry.updated_at, userId: entry.user_id, runId: entry.run_id })); } async getMessageConversationContext(memoryId: string): Promise { // First get the memory entry to find the message_id const memoryEntry = Array.from(this.memoryStore.values()) .find(entry => entry.memory_id === memoryId); if (!memoryEntry || !memoryEntry.message_id) { return []; } // Get conversation context around this message const contextResult = await this.getMessageContext(memoryEntry.message_id, { beforeCount: 5, afterCount: 5, userId: memoryEntry.user_id, runId: memoryEntry.run_id }); return contextResult.map(msg => ({ type: 'conversation_context', memoryId, messageId: msg.message_id, content: msg.new_value || msg.previous_value, contextType: msg.type, happenedAt: msg.happened_at, userId: msg.user_id, runId: msg.run_id })); } async storeRawMessages( messages: any[], metadata: { userId?: string; runId?: string; messageId: string; batchInfo?: any; } ): Promise { const timestamp = new Date().toISOString(); const batchInfoStr = metadata.batchInfo ? JSON.stringify(metadata.batchInfo) : undefined; for (let i = 0; i < messages.length; i++) { const message = messages[i]; const content = typeof message.content === 'string' ? message.content : JSON.stringify(message.content); const rawMessage: RawMessage = { id: uuidv4(), message_id: metadata.messageId, content, role: message.role || 'user', user_id: metadata.userId, run_id: metadata.runId, batch_info: batchInfoStr, created_at: timestamp, happened_at: timestamp, message_index: i }; this.rawMessageStore.set(rawMessage.id, rawMessage); } } async searchMessages( query: string, options?: { userId?: string; runId?: string; limit?: number; timeRange?: { start?: string; end?: string }; } ): Promise { const results = Array.from(this.rawMessageStore.values()) .filter((msg) => { // Text search if (!msg.content.toLowerCase().includes(query.toLowerCase())) { return false; } // User filter if (options?.userId && msg.user_id !== options.userId) { return false; } // Run filter if (options?.runId && msg.run_id !== options.runId) { return false; } // Time range filter if (options?.timeRange?.start && msg.happened_at < options.timeRange.start) { return false; } if (options?.timeRange?.end && msg.happened_at > options.timeRange.end) { return false; } return true; }) .sort((a, b) => new Date(b.happened_at).getTime() - new Date(a.happened_at).getTime()) .slice(0, options?.limit || 100); return results; } async getMessagesByPattern( messageIdPattern: string, options?: { userId?: string; runId?: string; exact?: boolean; } ): Promise { const results = Array.from(this.rawMessageStore.values()) .filter((msg) => { // Message ID pattern match const idMatch = options?.exact ? msg.message_id === messageIdPattern : msg.message_id.includes(messageIdPattern); if (!idMatch) return false; // User filter if (options?.userId && msg.user_id !== options.userId) { return false; } // Run filter if (options?.runId && msg.run_id !== options.runId) { return false; } return true; }) .sort((a, b) => { // Sort by happened_at, then by message_index const timeCompare = new Date(a.happened_at).getTime() - new Date(b.happened_at).getTime(); if (timeCompare !== 0) return timeCompare; return a.message_index - b.message_index; }); return results; } close(): void { // No need to close anything for in-memory storage return; } }