'use client'; import { useCallback, useEffect, useReducer, useRef } from 'react'; import type { ChatAttachment, ChatMessage, ChatPersona, ChatStreamEvent, ChatTransport, ChatToolCall, } from '../types'; import { LIMITS } from '../constants'; import { type ChatState, initialState, reducer, type ChatAction, } from '../core/reducer'; import { createId } from '../core/ids'; import { getChatLogger } from '../core/logger'; import { createTokenBuffer } from '../core/markdown'; import { resolveSendMetadata } from '../core/metadata'; export interface UseChatConfig { transport: ChatTransport; initialSessionId?: string; autoCreateSession?: boolean; streaming?: boolean; pageSize?: number; onError?: (err: Error) => void; /** Fires once an assistant message finishes streaming (or buffered send returns). */ onMessageEnd?: (msg: ChatMessage) => void; /** Fires after a user message is added to the state (right before streaming starts). */ onMessageSent?: (msg: ChatMessage) => void; /** Fires when the assistant placeholder is created (first byte / pre-stream). */ onStreamStart?: (assistantMessageId: string) => void; metadata?: Record; /** Stamped on outgoing user messages as `message.sender`. */ userPersona?: ChatPersona; /** * Rewrite the outgoing message content right before it hits the * transport — runs after the user bubble is added (so history shows * the original) but before `transport.stream/send`. Sync or async. * Return the original to opt out for that call. * * Use case: strip rich-display chips (e.g. mention links) so the LLM * sees plain text, while the bubble keeps the chip rendering. Plan64. */ onBeforeSend?: (content: string) => string | Promise; /** * Contribute extra transport metadata, computed fresh at send time. * Invoked synchronously right before each `transport.stream/send`, * and the result is merged over the static `metadata`. Returning * `undefined` adds nothing. * * Use case: the page-context snapshot — captured per message * (capture-on-submit) and carried as a separate `metadata` field, * never mixed into the message content. */ getDynamicMetadata?: () => Record | undefined; /** * Enable verbose dev-mode logging (consola, namespace `chat:*`). * Defaults to `isDev` from `@djangocfg/ui-core/lib`. Pass `false` to silence * even in development; `true` to force on in production. */ debug?: boolean; } export interface UseChatReturn extends ChatState { sendMessage: (content: string, attachments?: ChatAttachment[]) => Promise; cancelStream: () => void; regenerate: (messageId?: string) => Promise; editMessage: (id: string, content: string) => Promise; deleteMessage: (id: string) => void; clearMessages: () => void; loadMore: () => Promise; newSession: () => Promise; lastError: Error | null; /** * Inject a complete message from outside (push notification, admin * takeover, system notice). De-duped by id. Position defaults to * `append` — pass `prepend` for retroactive inserts. */ injectMessage: (message: ChatMessage, position?: 'append' | 'prepend') => void; /** * Patch fields of an existing message in place (e.g. live-edit the * admin's last reply, mark a message as resolved). No-op if the id * doesn't exist. */ updateMessage: (id: string, patch: Partial) => void; } export function useChat(config: UseChatConfig): UseChatReturn { const [state, dispatch] = useReducer(reducer, initialState); const stateRef = useRef(state); stateRef.current = state; const abortRef = useRef(null); const lastErrorRef = useRef(null); const streamingMsgIdRef = useRef(null); // Promise resolved once the initial session is available (or `null` when the // bootstrap finished without producing one — e.g. autoCreateSession=false). // Action methods (sendMessage, regenerate, …) await this so users who type // before the first network round-trip resolves don't hit "No active session". const bootstrapRef = useRef | null>(null); const { transport, autoCreateSession = true, streaming = true, pageSize = LIMITS.pageSize } = config; const log = getChatLogger(config.debug); // Initial session bootstrap. // // Strict Mode quirk: this effect runs twice in dev (mount → unmount → mount). // Previous design used `initRef` to skip the second run, but the first run's // cleanup sets `cancelled = true`, so its dispatch never lands — and the // second run was blocked. Result: bootstrap silently completed the network // call but never wrote sessionId to state. // // Fix: drop `initRef`. On the second mount we DO re-run, but `bootstrapRef` // is preserved across renders so we don't re-fetch if a previous run already // succeeded — we just resolve from existing state. useEffect(() => { let cancelled = false; // If a prior run already produced a sessionId, skip. if (stateRef.current.sessionId) { return; } // Show "loading" state immediately so the UI doesn't look idle while we // wait for createSession / loadHistory to come back. if (config.initialSessionId || autoCreateSession) { dispatch({ type: 'HISTORY_LOAD_START' }); } log.bootstrap.info('start', { mode: config.initialSessionId ? 'resume' : autoCreateSession ? 'create' : 'idle', initialSessionId: config.initialSessionId, }); const run = async (): Promise => { const t0 = performance.now(); try { if (config.initialSessionId) { dispatch({ type: 'SESSION_SET', sessionId: config.initialSessionId, }); const page = await transport.loadHistory(config.initialSessionId, null, pageSize); // Commit the loaded history even if THIS effect run was cancelled. // Under React StrictMode the first mount's effect is always // cancelled (mount → unmount → mount), and the second run skips the // fetch because `sessionId` is already in the persisted reducer // state (the early-return at the top of the effect). If we bailed // here on `cancelled`, `HISTORY_LOAD_DONE` would never dispatch on // either run and the resumed history would silently never reach the // reducer — the read succeeds but the messages are dropped, so the // chat shows the empty-state until a remount re-bootstraps it. The // reducer state survives the StrictMode remount, so dispatching the // load result regardless is correct (mirrors the autoCreateSession // branch below, which already commits post-cancel for the same reason). dispatch({ type: 'HISTORY_LOAD_DONE', messages: page.messages, hasMore: page.hasMore, cursor: page.nextCursor, }); log.bootstrap.success(cancelled ? 'resumed (post-cancel)' : 'resumed', { sessionId: config.initialSessionId, messages: page.messages.length, hasMore: page.hasMore, cancelled, elapsedMs: Math.round(performance.now() - t0), }); return config.initialSessionId; } if (autoCreateSession) { const info = await transport.createSession({ metadata: config.metadata }); // We always commit the session to state even if this effect was // cancelled — the network call already succeeded, throwing away the // sessionId would just trigger a duplicate createSession on remount. dispatch({ type: 'SESSION_SET', sessionId: info.sessionId, messages: info.messages ?? [], hasMore: info.hasMore ?? false, cursor: info.cursor ?? null, }); dispatch({ type: 'HISTORY_LOAD_DONE', messages: info.messages ?? [], hasMore: info.hasMore ?? false, cursor: info.cursor ?? null, }); log.bootstrap.success(cancelled ? 'created (post-cancel)' : 'created', { sessionId: info.sessionId, resumed: info.resumed ?? false, cancelled, elapsedMs: Math.round(performance.now() - t0), }); return info.sessionId; } log.bootstrap.debug('idle (no initialSessionId, autoCreateSession=false)'); return null; } catch (err) { const e = err instanceof Error ? err : new Error(String(err)); if (cancelled) { log.bootstrap.debug('cancelled (in catch)', { message: e.message }); return null; } lastErrorRef.current = e; dispatch({ type: 'ERROR_SET', error: e.message }); config.onError?.(e); log.error.error('bootstrap failed', { message: e.message, elapsedMs: Math.round(performance.now() - t0) }); return null; } }; bootstrapRef.current = run(); return () => { cancelled = true; }; // eslint-disable-next-line react-hooks/exhaustive-deps }, []); /** Wait for the initial session bootstrap to settle, then return whatever * sessionId is now in state. Safe to call multiple times. */ const awaitSession = useCallback(async (): Promise => { if (stateRef.current.sessionId) return stateRef.current.sessionId; if (bootstrapRef.current) { const id = await bootstrapRef.current; if (id) return id; } return stateRef.current.sessionId; }, []); const consumeStream = useCallback( async ( sessionId: string, content: string, attachments?: ChatAttachment[], ): Promise => { const ctrl = new AbortController(); abortRef.current = ctrl; const assistantId = createId('a'); streamingMsgIdRef.current = assistantId; // The message id that subsequent events (chunks, tool calls, done) // target. Starts as the freshly-created placeholder; on a // `resume_start` it is repointed to the resumed assistant message // so STREAM_DONE / TOOL_CALL_* don't land on a removed placeholder. let targetId = assistantId; const iterator = transport.stream(sessionId, content, { signal: ctrl.signal, attachments, metadata: resolveSendMetadata(config.metadata, config.getDynamicMetadata), }); // Peek at the first event — if it's `resume_start` we reuse the last assistant // message instead of creating a new empty placeholder. The peek happens inside // the try/catch so any transport error (network, 401, etc.) is handled normally. let peekedEvent: ChatStreamEvent | null = null; dispatch({ type: 'STREAM_START', id: assistantId }); config.onStreamStart?.(assistantId); log.stream.info('start', { sessionId, assistantId, chars: content.length }); const tokenBuffer = createTokenBuffer((delta) => dispatch({ type: 'STREAM_CHUNK', delta }), ); let serverMessageId: string | null = null; let chunkCount = 0; let charsReceived = 0; const t0 = performance.now(); try { // Peek first event to detect resume_start — must be inside try/catch so // transport errors (network down, 401) are caught and shown as error banners. const firstResult = await iterator.next(); if (!firstResult.done) { const ev = firstResult.value as ChatStreamEvent; if (ev.type === 'resume_start') { // Switch existing placeholder to continueExisting mode: remove the // just-created empty placeholder and mark the last assistant msg streaming. dispatch({ type: 'STREAM_CANCEL_PLACEHOLDER', id: assistantId }); dispatch({ type: 'STREAM_RESUME_EXISTING' }); // Repoint the event target at the resumed message. Without // this, every subsequent chunk / tool call / message_end // dispatches against the removed placeholder id and is lost // — the resumed bubble would stream forever (isStreaming // never cleared) and tool panels would never appear. for (let i = stateRef.current.messages.length - 1; i >= 0; i -= 1) { const m = stateRef.current.messages[i]; if (m.role === 'assistant' && m.isStreaming) { targetId = m.id; streamingMsgIdRef.current = m.id; break; } } } else { peekedEvent = ev; } } if (peekedEvent) handleEvent(peekedEvent); for await (const ev of iterator) { if (ctrl.signal.aborted) break; handleEvent(ev); } tokenBuffer.flush(); // If transport never emitted message_end, finalize manually. if (stateRef.current.isStreaming) { dispatch({ type: 'STREAM_DONE', id: targetId }); } const finalMsg = stateRef.current.messages.find((m) => m.id === targetId); if (finalMsg) config.onMessageEnd?.(finalMsg); log.stream.success('done', { assistantId, chunks: chunkCount, chars: charsReceived, elapsedMs: Math.round(performance.now() - t0), }); } catch (err) { tokenBuffer.close(); if (ctrl.signal.aborted) { const partial = stateRef.current.messages.find((m) => m.id === targetId)?.content ?? ''; dispatch({ type: 'STREAM_CANCELLED', id: targetId, partialText: partial }); log.stream.warn('cancelled', { assistantId: targetId, partialChars: partial.length }); return; } const e = err instanceof Error ? err : new Error(String(err)); lastErrorRef.current = e; dispatch({ type: 'STREAM_ERROR', id: targetId, message: e.message }); config.onError?.(e); log.error.error('stream failed', { assistantId, message: e.message }); } finally { tokenBuffer.close(); if (abortRef.current === ctrl) abortRef.current = null; streamingMsgIdRef.current = null; } function handleEvent(ev: ChatStreamEvent) { switch (ev.type) { case 'message_start': serverMessageId = ev.messageId; log.stream.debug('message_start', { messageId: ev.messageId }); return; case 'chunk': tokenBuffer.push(ev.delta); chunkCount += 1; charsReceived += ev.delta.length; return; case 'tool_activity': tokenBuffer.flush(); dispatch({ type: 'STREAM_TOOL_ACTIVITY', tool: ev.tool }); log.tools.debug('activity', { tool: ev.tool, status: ev.status }); return; case 'tool_call_start': { tokenBuffer.flush(); const toolCall: ChatToolCall = { id: ev.toolId, name: ev.name, input: ev.input, status: 'running', startedAt: Date.now(), sourceHostname: ev.sourceHostname, }; dispatch({ type: 'TOOL_CALL_START', messageId: targetId, toolCall, }); log.tools.info('call_start', { toolId: ev.toolId, name: ev.name }); return; } case 'tool_call_delta': dispatch({ type: 'TOOL_CALL_DELTA', messageId: targetId, toolId: ev.toolId, delta: ev.delta, }); return; case 'tool_call_end': dispatch({ type: 'TOOL_CALL_END', messageId: targetId, toolId: ev.toolId, output: ev.output, status: ev.status, }); log.tools.info('call_end', { toolId: ev.toolId, status: ev.status }); return; case 'message_end': tokenBuffer.flush(); dispatch({ type: 'STREAM_DONE', id: targetId, patch: ev.patch, }); log.stream.debug('message_end', { patchKeys: ev.patch ? Object.keys(ev.patch) : [], }); return; case 'message_metrics': // Non-terminal: attach per-turn metrics to the streaming // message. The stream stays open until message_end / error. dispatch({ type: 'MESSAGE_PATCH', id: targetId, patch: { metrics: ev.metrics, ...(ev.metrics.resolvedModel ? { resolvedModel: ev.metrics.resolvedModel } : {}), }, }); log.stream.debug('message_metrics', { turns: ev.metrics.turns, toolCallCount: ev.metrics.toolCallCount, resolvedModel: ev.metrics.resolvedModel, }); return; case 'resolved_model': // Non-terminal: model alias was resolved mid-run. dispatch({ type: 'MESSAGE_PATCH', id: targetId, patch: { resolvedModel: ev.resolvedModel }, }); log.stream.debug('resolved_model', { originalAlias: ev.originalAlias, resolvedModel: ev.resolvedModel, upgraded: ev.upgraded, }); return; case 'error': tokenBuffer.flush(); dispatch({ type: 'STREAM_ERROR', id: targetId, message: ev.message, }); log.error.error('stream event error', { code: ev.code, message: ev.message }); return; } // unreachable; prevents unused-var on serverMessageId void serverMessageId; } }, [transport, config], ); const consumeBuffered = useCallback( async (sessionId: string, content: string, attachments?: ChatAttachment[]): Promise => { const ctrl = new AbortController(); abortRef.current = ctrl; try { const reply = await transport.send(sessionId, content, { signal: ctrl.signal, attachments, metadata: resolveSendMetadata(config.metadata, config.getDynamicMetadata), }); const placeholderId = createId('a'); dispatch({ type: 'STREAM_START', id: placeholderId }); config.onStreamStart?.(placeholderId); dispatch({ type: 'STREAM_CHUNK', delta: reply.content }); dispatch({ type: 'STREAM_DONE', id: placeholderId }); config.onMessageEnd?.(reply); } catch (err) { // A user-initiated cancel (cancelStream / newSession) aborts the // controller — that's not an error, so don't raise the banner. if (ctrl.signal.aborted) return; const e = err instanceof Error ? err : new Error(String(err)); lastErrorRef.current = e; dispatch({ type: 'STREAM_ERROR', message: e.message }); config.onError?.(e); } finally { if (abortRef.current === ctrl) abortRef.current = null; } }, [transport, config], ); const sendMessage = useCallback( async (content: string, attachments?: ChatAttachment[]) => { // Wait for the initial session bootstrap if it's still in flight. // Without this, fast typers hit "No active session" before // transport.createSession resolves. log.lifecycle.info('sendMessage', { chars: content.length, attachments: attachments?.length ?? 0, hasSession: !!stateRef.current.sessionId, }); const sessionId = await awaitSession(); if (!sessionId) { const e = new Error('No active session'); lastErrorRef.current = e; dispatch({ type: 'ERROR_SET', error: e.message }); config.onError?.(e); log.error.error('sendMessage aborted: no session'); return; } if (!content.trim() && !(attachments && attachments.length > 0)) { log.lifecycle.debug('sendMessage skipped (empty)'); return; } if (stateRef.current.isStreaming) { log.lifecycle.debug('sendMessage skipped (already streaming)'); return; } const userMsg: ChatMessage = { id: createId('u'), role: 'user', content, createdAt: Date.now(), attachments, sender: config.userPersona, }; dispatch({ type: 'MESSAGE_USER_ADD', message: userMsg }); config.onMessageSent?.(userMsg); // History bubble shows the original; transport sees the rewrite. // Use case: strip rich-display chips so the LLM sees plain text. let outbound = content; if (config.onBeforeSend) { try { outbound = await config.onBeforeSend(content); } catch (err) { log.error.error('onBeforeSend threw — falling back to original content', err); } } if (streaming) { await consumeStream(sessionId, outbound, attachments); } else { await consumeBuffered(sessionId, outbound, attachments); } }, [streaming, consumeStream, consumeBuffered, config, awaitSession, log], ); const cancelStream = useCallback(() => { abortRef.current?.abort(); }, []); const regenerate = useCallback( async (messageId?: string) => { log.lifecycle.info('regenerate', { messageId: messageId ?? '(last)' }); const messages = stateRef.current.messages; let targetUserIdx = -1; if (messageId) { const idx = messages.findIndex((m) => m.id === messageId); if (idx !== -1) { targetUserIdx = messages[idx].role === 'user' ? idx : findPreviousUserIndex(messages, idx); } } else { targetUserIdx = findLastUserIndex(messages); } if (targetUserIdx === -1) return; const userMsg = messages[targetUserIdx]; // Drop everything after this user message. for (let i = messages.length - 1; i > targetUserIdx; i -= 1) { dispatch({ type: 'MESSAGE_DELETE', id: messages[i].id }); } const sessionId = await awaitSession(); if (!sessionId) return; if (streaming) { await consumeStream(sessionId, userMsg.content, userMsg.attachments); } else { await consumeBuffered(sessionId, userMsg.content, userMsg.attachments); } }, [streaming, consumeStream, consumeBuffered, awaitSession], ); const editMessage = useCallback( async (id: string, content: string) => { dispatch({ type: 'MESSAGE_EDIT', id, content }); const msg = stateRef.current.messages.find((m) => m.id === id); if (msg?.role === 'user') { await regenerate(id); } }, [regenerate], ); const deleteMessage = useCallback((id: string) => { dispatch({ type: 'MESSAGE_DELETE', id }); }, []); const injectMessage = useCallback( (message: ChatMessage, position?: 'append' | 'prepend') => { dispatch({ type: 'MESSAGE_INJECT', message, position }); }, [], ); const updateMessage = useCallback( (id: string, patch: Partial) => { dispatch({ type: 'MESSAGE_PATCH', id, patch }); }, [], ); const clearMessages = useCallback(() => { abortRef.current?.abort(); dispatch({ type: 'MESSAGES_CLEAR' }); }, []); const loadMore = useCallback(async () => { const sessionId = stateRef.current.sessionId; if (!sessionId) return; if (stateRef.current.isLoadingMore || !stateRef.current.hasMore) return; dispatch({ type: 'HISTORY_MORE_START' }); try { const page = await transport.loadHistory( sessionId, stateRef.current.oldestCursor, pageSize, ); dispatch({ type: 'HISTORY_MORE_DONE', messages: page.messages, hasMore: page.hasMore, cursor: page.nextCursor, }); } catch (err) { const e = err instanceof Error ? err : new Error(String(err)); lastErrorRef.current = e; dispatch({ type: 'ERROR_SET', error: e.message }); config.onError?.(e); } }, [transport, pageSize, config]); const newSession = useCallback(async () => { log.lifecycle.info('newSession', { previous: stateRef.current.sessionId }); abortRef.current?.abort(); const previous = stateRef.current.sessionId; if (previous) { try { await transport.closeSession(previous); } catch { /* ignore */ } } dispatch({ type: 'MESSAGES_CLEAR' }); try { const info = await transport.createSession({ metadata: config.metadata }); dispatch({ type: 'SESSION_SET', sessionId: info.sessionId, messages: info.messages ?? [], hasMore: info.hasMore ?? false, cursor: info.cursor ?? null, }); log.lifecycle.success('newSession ok', { sessionId: info.sessionId }); } catch (err) { const e = err instanceof Error ? err : new Error(String(err)); lastErrorRef.current = e; dispatch({ type: 'ERROR_SET', error: e.message }); config.onError?.(e); log.error.error('newSession failed', { message: e.message }); } }, [transport, config]); return { ...state, sendMessage, cancelStream, regenerate, editMessage, deleteMessage, clearMessages, loadMore, newSession, lastError: lastErrorRef.current, injectMessage, updateMessage, }; } function findLastUserIndex(messages: ChatMessage[]): number { for (let i = messages.length - 1; i >= 0; i -= 1) { if (messages[i].role === 'user') return i; } return -1; } function findPreviousUserIndex(messages: ChatMessage[], from: number): number { for (let i = from - 1; i >= 0; i -= 1) { if (messages[i].role === 'user') return i; } return -1; } // Suppress unused-action warnings if the action union grows. type _Used = ChatAction;