/** * Pure chat state machine. Zero React. Zero I/O. * * Invariants: * - At most one message has `isStreaming === true` at any time. It is always * the last message in the array. * - Messages are immutable; updates produce new objects. * - `version` bumps on edit so memo keys invalidate. */ import type { ChatAttachment, ChatMessage, ChatToolCall } from '../types'; export interface ChatState { sessionId: string | null; messages: ChatMessage[]; /** Initial history load in flight. */ isLoading: boolean; /** * True once the initial history load has settled (done OR errored). False * from mount until then. Lets the UI distinguish "history still loading" * from "genuinely empty conversation": render a spinner while * `!historyLoaded`, the empty-state only once `historyLoaded` is true and * `messages` is empty. `isLoading` alone can't carry this — it is `false` * on the very first synchronous render (before the bootstrap effect runs), * which would flash the empty-state for a session that does have history. */ historyLoaded: boolean; /** Assistant is generating a reply. */ isStreaming: boolean; /** Older history page in flight. */ isLoadingMore: boolean; hasMore: boolean; oldestCursor: string | null; error: string | null; } export const initialState: ChatState = { sessionId: null, messages: [], isLoading: false, historyLoaded: false, isStreaming: false, isLoadingMore: false, hasMore: true, oldestCursor: null, error: null, }; export type ChatAction = | { type: 'SESSION_SET'; sessionId: string; messages?: ChatMessage[]; hasMore?: boolean; cursor?: string | null; } | { type: 'HISTORY_LOAD_START' } | { type: 'HISTORY_LOAD_DONE'; messages: ChatMessage[]; hasMore: boolean; cursor: string | null; } | { type: 'HISTORY_MORE_START' } | { type: 'HISTORY_MORE_DONE'; messages: ChatMessage[]; hasMore: boolean; cursor: string | null; } | { type: 'MESSAGE_USER_ADD'; message: ChatMessage } | { type: 'STREAM_START'; id: string; createdAt?: number } | { type: 'STREAM_CHUNK'; delta: string } | { type: 'STREAM_TOOL_ACTIVITY'; tool: string } | { type: 'TOOL_CALL_START'; messageId: string; toolCall: ChatToolCall } | { type: 'TOOL_CALL_DELTA'; messageId: string; toolId: string; delta: string } | { type: 'TOOL_CALL_END'; messageId: string; toolId: string; output: unknown; status: 'success' | 'error'; } | { type: 'STREAM_DONE'; id: string; /** * Generic finalize merge (content/blocks/sources/tokens/attachments/ * metadata). The single slot the terminal event carries — applied as * a `Partial` merge. No-op when absent. */ patch?: Partial; } | { type: 'STREAM_CANCELLED'; id: string; partialText: string; label?: string } | { type: 'STREAM_ERROR'; id?: string; message: string } | { type: 'STREAM_CANCEL_PLACEHOLDER'; id: string } | { type: 'STREAM_RESUME_EXISTING' } | { type: 'MESSAGE_EDIT'; id: string; content: string } | { type: 'MESSAGE_DELETE'; id: string } | { type: 'MESSAGE_INJECT'; message: ChatMessage; position?: 'append' | 'prepend' } | { type: 'MESSAGE_PATCH'; id: string; patch: Partial } | { type: 'MESSAGES_CLEAR' } | { type: 'ERROR_SET'; error: string | null } | { type: 'ATTACHMENT_PROGRESS'; messageId: string; attachmentId: string; progress?: number; status?: ChatAttachment['status']; }; function updateLastStreaming( messages: ChatMessage[], patch: (m: ChatMessage) => ChatMessage, ): ChatMessage[] { const idx = findLastStreamingIndex(messages); if (idx === -1) return messages; const next = messages.slice(); next[idx] = patch(messages[idx]); return next; } function findLastStreamingIndex(messages: ChatMessage[]): number { for (let i = messages.length - 1; i >= 0; i -= 1) { if (messages[i].isStreaming) return i; } return -1; } function patchMessageById( messages: ChatMessage[], id: string, patch: (m: ChatMessage) => ChatMessage, ): ChatMessage[] { const idx = messages.findIndex((m) => m.id === id); if (idx === -1) return messages; const next = messages.slice(); next[idx] = patch(messages[idx]); return next; } export function reducer(state: ChatState, action: ChatAction): ChatState { switch (action.type) { case 'SESSION_SET': return { ...state, sessionId: action.sessionId, messages: action.messages ?? state.messages, hasMore: action.hasMore ?? state.hasMore, oldestCursor: action.cursor ?? state.oldestCursor, // When the session carries its messages (the autoCreateSession / // newSession path commits them here), the initial load is settled. // A bare `SESSION_SET` (resume path sets the id, then awaits // loadHistory → HISTORY_LOAD_DONE) leaves `historyLoaded` untouched. historyLoaded: action.messages !== undefined ? true : state.historyLoaded, error: null, }; case 'HISTORY_LOAD_START': return { ...state, isLoading: true, error: null }; case 'HISTORY_LOAD_DONE': return { ...state, isLoading: false, historyLoaded: true, messages: action.messages, hasMore: action.hasMore, oldestCursor: action.cursor, }; case 'HISTORY_MORE_START': return { ...state, isLoadingMore: true }; case 'HISTORY_MORE_DONE': return { ...state, isLoadingMore: false, // Older messages prepend to the top. messages: [...action.messages, ...state.messages], hasMore: action.hasMore, oldestCursor: action.cursor, }; case 'MESSAGE_USER_ADD': return { ...state, messages: [...state.messages, action.message], error: null, }; case 'STREAM_START': { if (state.isStreaming) { if (typeof console !== 'undefined') { // eslint-disable-next-line no-console console.warn('[chat] STREAM_START while already streaming, ignoring'); } return state; } const placeholder: ChatMessage = { id: action.id, role: 'assistant', content: '', createdAt: action.createdAt ?? Date.now(), isStreaming: true, }; return { ...state, isStreaming: true, messages: [...state.messages, placeholder], }; } case 'STREAM_CHUNK': { const messages = updateLastStreaming(state.messages, (m) => ({ ...m, content: m.content + action.delta, })); // Content is now streaming in for the current turn (STREAM_START opened a // fresh placeholder; chunks only land on it). If an error from a prior // turn is still showing, retract it the moment a good answer begins so // the banner can't sit above live content. Only touch `error` when set. return state.error ? { ...state, error: null, messages } : { ...state, messages }; } case 'STREAM_TOOL_ACTIVITY': { const messages = updateLastStreaming(state.messages, (m) => ({ ...m, toolActivity: action.tool, })); return { ...state, messages }; } case 'TOOL_CALL_START': { const messages = patchMessageById(state.messages, action.messageId, (m) => ({ ...m, toolCalls: [...(m.toolCalls ?? []), action.toolCall], })); return { ...state, messages }; } case 'TOOL_CALL_DELTA': { const messages = patchMessageById(state.messages, action.messageId, (m) => { if (!m.toolCalls) return m; return { ...m, toolCalls: m.toolCalls.map((tc) => tc.id === action.toolId ? { ...tc, streamingText: (tc.streamingText ?? '') + action.delta } : tc, ), }; }); return { ...state, messages }; } case 'TOOL_CALL_END': { const messages = patchMessageById(state.messages, action.messageId, (m) => { if (!m.toolCalls) return m; return { ...m, toolCalls: m.toolCalls.map((tc) => tc.id === action.toolId ? { ...tc, output: action.output, status: action.status, streamingText: undefined, endedAt: Date.now(), } : tc, ), }; }); return { ...state, messages }; } case 'STREAM_DONE': { const patched = patchMessageById(state.messages, action.id, (m) => ({ ...m, // Generic finalize merge (content/blocks/sources/tokens/attachments/ // metadata). No-op when `patch` is absent — `message_end` is then a // pure terminal that only clears `isStreaming`. ...(action.patch ?? {}), isStreaming: false, })); // Drop an assistant message that ended up completely empty (no text, no // tool calls). This happens in the HITL approval flow: the first stream // produces only tool calls + approval_required and no text body, so the // placeholder should not leave a blank bubble in the chat. const msg = patched.find((m) => m.id === action.id); const messages = msg && !msg.content && !(msg.toolCalls?.length) ? patched.filter((m) => m.id !== action.id) : patched; // A turn that reaches message_end completed successfully (error is the // other terminal event — the two are mutually exclusive per turn). Any // `error` still in state therefore belongs to a prior turn and is now // stale, so clear it: otherwise the banner lingers above a good answer. return { ...state, isStreaming: false, error: null, messages }; } case 'STREAM_CANCELLED': { const suffix = action.label ?? '[cancelled]'; const messages = patchMessageById(state.messages, action.id, (m) => ({ ...m, isStreaming: false, content: action.partialText + (action.partialText ? `\n\n_${suffix}_` : `_${suffix}_`), })); return { ...state, isStreaming: false, messages }; } case 'STREAM_ERROR': { const messages = action.id ? patchMessageById(state.messages, action.id, (m) => ({ ...m, isStreaming: false, isError: true, })) : state.messages; return { ...state, isStreaming: false, error: action.message, messages }; } case 'STREAM_CANCEL_PLACEHOLDER': // Remove the freshly-created empty assistant placeholder (resume path). return { ...state, isStreaming: false, messages: state.messages.filter((m) => m.id !== action.id), }; case 'STREAM_RESUME_EXISTING': { // Mark the last assistant message as streaming so resume chunks append to it. const lastIdx = (() => { for (let i = state.messages.length - 1; i >= 0; i--) { if (state.messages[i].role === 'assistant') return i; } return -1; })(); if (lastIdx === -1) return { ...state, isStreaming: true }; const msgs = state.messages.slice(); msgs[lastIdx] = { ...msgs[lastIdx], isStreaming: true }; return { ...state, isStreaming: true, messages: msgs }; } case 'MESSAGE_EDIT': { const messages = patchMessageById(state.messages, action.id, (m) => ({ ...m, content: action.content, version: (m.version ?? 0) + 1, })); return { ...state, messages }; } case 'MESSAGE_DELETE': return { ...state, messages: state.messages.filter((m) => m.id !== action.id), }; case 'MESSAGE_INJECT': { // De-dupe by id: if a message with this id already exists, merge // instead of duplicating. Avoids double-render when external code // re-emits the same payload (Centrifugo replay, SWR retry, …). const existingIdx = state.messages.findIndex((m) => m.id === action.message.id); if (existingIdx >= 0) { const messages = state.messages.slice(); const prev = messages[existingIdx]!; messages[existingIdx] = { ...prev, ...action.message, version: (prev.version ?? 0) + 1, }; return { ...state, messages }; } const next = action.position === 'prepend' ? [action.message, ...state.messages] : [...state.messages, action.message]; return { ...state, messages: next }; } case 'MESSAGE_PATCH': { const messages = patchMessageById(state.messages, action.id, (m) => ({ ...m, ...action.patch, version: (m.version ?? 0) + 1, })); return { ...state, messages }; } case 'MESSAGES_CLEAR': return { ...state, messages: [], isStreaming: false, error: null, }; case 'ERROR_SET': // An error settles the initial load — a failed bootstrap must not leave // a stuck spinner; clear `isLoading` and mark the load resolved so the // host can fall through to its error/empty surface. return { ...state, error: action.error, isLoading: false, historyLoaded: true }; case 'ATTACHMENT_PROGRESS': { const messages = patchMessageById(state.messages, action.messageId, (m) => { if (!m.attachments) return m; return { ...m, attachments: m.attachments.map((a) => a.id === action.attachmentId ? { ...a, progress: action.progress ?? a.progress, status: action.status ?? a.status, } : a, ), }; }); return { ...state, messages }; } default: { // Exhaustiveness check. const _exhaustive: never = action; void _exhaustive; return state; } } }