import { useCallback, useEffect, useRef, useState } from 'react'; import { VertesiaClient } from '@vertesia/client'; import { AgentMessage, AgentMessageType, ConversationFile, FileProcessingDetails, StreamingChunkDetails, } from '@vertesia/common'; import { insertMessageInTimeline, isInProgress } from '../ModernAgentOutput/utils'; /** Streaming data for a single active stream, keyed by streaming/activity ID */ export interface StreamingData { text: string; workstreamId?: string; isComplete?: boolean; startTimestamp: number; activityId?: string; } export interface UseAgentStreamResult { messages: AgentMessage[]; streamingMessages: Map; isCompleted: boolean; /** Whether we are receiving chunks right now (for visual indicators) */ debugChunkFlash: boolean; /** Add an optimistic message (for user input) */ addOptimisticMessage: (msg: AgentMessage) => void; /** Remove optimistic messages matching a predicate */ removeOptimisticMessages: (predicate: (msg: AgentMessage) => boolean) => void; /** AgentRun status fetched from API (RUNNING, COMPLETED, FAILED, etc.) */ agentRunStatus: string | null; /** Temporal workflow run ID (first_workflow_run_id from AgentRun) */ workflowRunId: string | null; /** Server-side file processing status updates (from SYSTEM messages) */ serverFileUpdates: Map; } /** * Hook that manages the SSE message stream for an agent conversation. * * Encapsulates: * - `client.store.workflows.streamMessages()` call and cleanup * - STREAMING_CHUNK accumulation with RAF-batched flushing * - Streaming→persisted message dedup (activity_id matching on THOUGHT/ANSWER arrival) * - COMPLETE/IDLE streaming flush * - Timestamp-based message dedup and optimistic QUESTION replacement * - `isCompleted` derived state * - State reset when `agentRunId` changes * * File-processing SYSTEM messages are passed through to the messages array * (Option A from the plan) so downstream hooks can react to them. */ export function useAgentStream( client: VertesiaClient, agentRunId: string, ): UseAgentStreamResult { const [messages, setMessages] = useState([]); const [isCompleted, setIsCompleted] = useState(false); const [agentRunStatus, setAgentRunStatus] = useState(null); const [workflowRunId, setWorkflowRunId] = useState(null); // Server-side file processing status updates const [serverFileUpdates, setServerFileUpdates] = useState>(new Map()); // Streaming messages by streaming_id for real-time chunk aggregation const [streamingMessages, setStreamingMessages] = useState>(new Map()); // RAF-batched streaming updates const pendingStreamingChunks = useRef>(new Map()); const streamingFlushScheduled = useRef<{ mode: 'raf' | 'timeout'; id: number } | null>(null); // Debug: visual flash indicator for incoming chunks const [debugChunkFlash, setDebugChunkFlash] = useState(false); const debugFlashTimeout = useRef | null>(null); const cancelScheduledStreamingFlush = useCallback(() => { const scheduled = streamingFlushScheduled.current; if (!scheduled) return; if (scheduled.mode === 'raf') { cancelAnimationFrame(scheduled.id); } else { clearTimeout(scheduled.id); } streamingFlushScheduled.current = null; }, []); const flushStreamingChunks = useCallback(() => { if (pendingStreamingChunks.current.size > 0) { setStreamingMessages(new Map(pendingStreamingChunks.current)); setDebugChunkFlash(true); if (debugFlashTimeout.current) clearTimeout(debugFlashTimeout.current); debugFlashTimeout.current = setTimeout(() => setDebugChunkFlash(false), 50); } streamingFlushScheduled.current = null; }, []); // Update isCompleted when messages change useEffect(() => { setIsCompleted(!isInProgress(messages)); }, [messages]); // Stream messages from the agent useEffect(() => { // Reset all state when agentRunId changes (new agent) console.debug('[useAgentStream] effect:start', { agentRunId }); setMessages([]); setAgentRunStatus(null); setWorkflowRunId(null); setStreamingMessages(new Map()); setServerFileUpdates(new Map()); const abortController = new AbortController(); // Check agent run status client.agents.getInternals(agentRunId) .then((agentRun) => { if (!abortController.signal.aborted) { setAgentRunStatus(agentRun.status?.toUpperCase() ?? null); setWorkflowRunId(agentRun.first_workflow_run_id ?? null); } }) .catch((error) => { if (!abortController.signal.aborted) { console.error('Failed to check agent run status:', error); } }); client.agents.streamMessages(agentRunId, (message) => { if (abortController.signal.aborted) return; // Handle streaming chunks separately for real-time aggregation // PERFORMANCE: Batch updates using RAF instead of immediate state updates if (message.type === AgentMessageType.STREAMING_CHUNK) { const details = message.details as StreamingChunkDetails; const streamKey = details?.activity_id || details?.streaming_id; if (!streamKey) return; // Accumulate chunks in the ref (no state update yet) const current = pendingStreamingChunks.current.get(streamKey) || { text: '', workstreamId: message.workstream_id, startTimestamp: Date.now(), activityId: details?.activity_id, }; const newText = current.text + (message.message || ''); pendingStreamingChunks.current.set(streamKey, { text: newText, workstreamId: message.workstream_id, isComplete: details.is_final, startTimestamp: current.startTimestamp, activityId: details?.activity_id, }); // Schedule a flush if not already scheduled (~60 updates/sec max) if (streamingFlushScheduled.current === null) { if (document.hidden) { streamingFlushScheduled.current = { mode: 'timeout', id: window.setTimeout(flushStreamingChunks, 16), }; } else { streamingFlushScheduled.current = { mode: 'raf', id: requestAnimationFrame(flushStreamingChunks), }; } } return; } // Handle file processing SYSTEM messages — update serverFileUpdates // for downstream useFileProcessing hook, don't add to messages array if (message.type === AgentMessageType.SYSTEM) { const details = message.details as FileProcessingDetails | undefined; if (details?.system_type === 'file_processing' && details.files) { setServerFileUpdates(prev => { const newMap = new Map(prev); for (const file of details.files) { newMap.set(file.id, file); } return newMap; }); return; } // Other SYSTEM messages fall through to normal handling } // When THOUGHT or ANSWER arrives with activity_id, remove matching streaming message if ( (message.type === AgentMessageType.THOUGHT || message.type === AgentMessageType.ANSWER) && message.details?.activity_id ) { const activityId = message.details.activity_id as string; pendingStreamingChunks.current.delete(activityId); setStreamingMessages(prev => { if (prev.has(activityId)) { const next = new Map(prev); next.delete(activityId); return next; } return prev; }); } // On COMPLETE or IDLE, flush any pending chunks if (message.type === AgentMessageType.COMPLETE || message.type === AgentMessageType.IDLE) { if (pendingStreamingChunks.current.size > 0) { flushStreamingChunks(); } } const hasContent = !!message.message; const isStateMessage = [ AgentMessageType.COMPLETE, AgentMessageType.IDLE, AgentMessageType.TERMINATED, AgentMessageType.REQUEST_INPUT, ].includes(message.type); if (hasContent || isStateMessage) { setMessages((prev) => { // Check for duplicate by timestamp if (prev.find((m) => m.timestamp === message.timestamp)) { return prev; } // For QUESTION messages from server, replace any optimistic version if (message.type === AgentMessageType.QUESTION && !message.details?._optimistic) { const withoutOptimistic = prev.filter( (m) => !(m.type === AgentMessageType.QUESTION && m.details?._optimistic), ); insertMessageInTimeline(withoutOptimistic, message); return [...withoutOptimistic]; } insertMessageInTimeline(prev, message); return [...prev]; }); } }, undefined, abortController.signal) .catch((error) => { if (!abortController.signal.aborted) { console.error('Failed to stream agent messages:', error); } }); return () => { console.debug('[useAgentStream] effect:cleanup', { agentRunId }); abortController.abort(); setMessages([]); cancelScheduledStreamingFlush(); pendingStreamingChunks.current.clear(); if (debugFlashTimeout.current) { clearTimeout(debugFlashTimeout.current); debugFlashTimeout.current = null; } }; }, [agentRunId, client.agents, flushStreamingChunks, cancelScheduledStreamingFlush]); // Flush pending streaming chunks when tab becomes visible. useEffect(() => { const handleVisibilityChange = () => { if (!document.hidden && pendingStreamingChunks.current.size > 0) { cancelScheduledStreamingFlush(); flushStreamingChunks(); } }; document.addEventListener('visibilitychange', handleVisibilityChange); return () => { document.removeEventListener('visibilitychange', handleVisibilityChange); }; }, [flushStreamingChunks, cancelScheduledStreamingFlush]); // Add an optimistic message to the timeline const addOptimisticMessage = useCallback((msg: AgentMessage) => { setMessages((prev) => { const newMessages = [...prev, msg]; newMessages.sort((a, b) => { const timeA = typeof a.timestamp === 'number' ? a.timestamp : new Date(a.timestamp).getTime(); const timeB = typeof b.timestamp === 'number' ? b.timestamp : new Date(b.timestamp).getTime(); return timeA - timeB; }); return newMessages; }); }, []); // Remove optimistic messages matching a predicate const removeOptimisticMessages = useCallback((predicate: (msg: AgentMessage) => boolean) => { setMessages((prev) => prev.filter((m) => !predicate(m))); }, []); return { messages, streamingMessages, isCompleted, debugChunkFlash, addOptimisticMessage, removeOptimisticMessages, agentRunStatus, workflowRunId, serverFileUpdates, }; }