/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ /** * Direct browser-to-provider streaming for BYOK (Bring Your Own Key) models. * * Anthropic: Uses the official @anthropic-ai/sdk with `dangerouslyAllowBrowser`. * OpenAI: Uses fetch against the OpenAI chat completions API (same SSE format * the proxy already returns, so SSE parsing is shared). * * Keys are stored in localStorage and sent directly to the provider. * They never pass through our server. */ import Anthropic from '@anthropic-ai/sdk'; import { readSseStream, type StreamMessage, type StreamOptions } from './stream-client.js'; import { getModelById } from './models.js'; import { buildCacheableSystem, logCacheHit } from './prompt-cache.js'; const STREAM_REQUEST_TIMEOUT_MS = 45_000; // ── Anthropic ────────────────────────────────────────────────────────────── type AnthropicMediaType = 'image/png' | 'image/jpeg' | 'image/gif' | 'image/webp'; type AnthropicContentBlock = | { type: 'text'; text: string } | { type: 'image'; source: { type: 'base64'; media_type: AnthropicMediaType; data: string } }; function toAnthropicMessages( messages: StreamMessage[], ): Array<{ role: 'user' | 'assistant'; content: string | AnthropicContentBlock[] }> { return messages .filter((m) => m.role === 'user' || m.role === 'assistant') .map((m) => { if (typeof m.content === 'string') { return { role: m.role as 'user' | 'assistant', content: m.content }; } // Multimodal content — convert OpenAI-style parts to Anthropic format const blocks: AnthropicContentBlock[] = m.content.map((part) => { if (part.type === 'text') { return { type: 'text' as const, text: part.text }; } // image_url → Anthropic image block const dataUrl = part.image_url.url; const match = dataUrl.match(/^data:(image\/[^;]+);base64,(.+)$/); if (match) { return { type: 'image' as const, source: { type: 'base64' as const, media_type: match[1] as AnthropicMediaType, data: match[2], }, }; } // Fallback: pass URL as text return { type: 'text' as const, text: `[Image: ${dataUrl.slice(0, 100)}]` }; }); return { role: m.role as 'user' | 'assistant', content: blocks }; }); } export async function streamAnthropicChat( apiKey: string, options: Omit, ): Promise { const { model, messages, system, signal, onChunk, onComplete, onError, onFinishReason } = options; const client = new Anthropic({ apiKey, dangerouslyAllowBrowser: true, }); // Opus 4.7+ returns 400 if `temperature` (or `top_p`/`top_k`) is present. // We gate on the per-model `acceptsSamplingParams` flag in models.ts so // future Claude models that adopt the same policy only need a flag bump, // and Opus 4.6 / Sonnet 4.6 / Haiku 4.5 keep their tuned temperature. const modelDef = getModelById(model); const sendSamplingParams = modelDef?.acceptsSamplingParams !== false; let fullText = ''; try { const stream = client.messages.stream({ model, max_tokens: 8192, ...(sendSamplingParams ? { temperature: 0.3 } : {}), // Wrap the system prompt in an ephemeral cache block when it's // long enough to be worth caching (Anthropic's minimum is ~1024 // tokens, ≈ 4 KiB). Authoring turns ship the manifest schema + // widget DSL contract which is well over the threshold; one-shot // turns fall under it and pass through as plain string. system: buildCacheableSystem(system), messages: toAnthropicMessages(messages), }); // Wire up abort signal if (signal) { const onAbort = () => stream.abort(); signal.addEventListener('abort', onAbort, { once: true }); stream.on('end', () => signal.removeEventListener('abort', onAbort)); } stream.on('text', (text) => { fullText += text; onChunk(text); }); const finalMessage = await stream.finalMessage(); if (signal?.aborted) return; // Surface cache hit/miss numbers in dev tools so we can see // whether the authoring contract is paying off. logCacheHit(finalMessage.usage as { cache_creation_input_tokens?: number; cache_read_input_tokens?: number }); const stopReason = finalMessage.stop_reason; onFinishReason?.(stopReason === 'end_turn' ? 'stop' : stopReason); onComplete(fullText); } catch (err) { if (signal?.aborted) return; if (err instanceof Anthropic.APIError) { const msg = err.status === 401 ? 'Invalid Anthropic API key. Check your key in Settings.' : err.status === 429 ? 'Anthropic rate limit reached. Please wait and try again.' : `Anthropic error (${err.status}): ${err.message}`; onError(new Error(msg)); } else { onError(err instanceof Error ? err : new Error(String(err))); } } } // ── OpenAI ───────────────────────────────────────────────────────────────── /** * Stream an OpenAI model. Automatically picks the right API: * - Chat Completions (`/v1/chat/completions`) for standard chat models * - Responses (`/v1/responses`) for Codex-style models */ export async function streamOpenAiChat( apiKey: string, options: Omit, ): Promise { const modelDef = getModelById(options.model); if (modelDef?.openaiApi === 'responses') { return streamOpenAiResponses(apiKey, options); } return streamOpenAiChatCompletions(apiKey, options); } /** Standard Chat Completions API (GPT-5.4, GPT-5.4 Mini, etc.) */ async function streamOpenAiChatCompletions( apiKey: string, options: Omit, ): Promise { const { model, messages, system, signal, onChunk, onComplete, onError, onFinishReason } = options; const allMessages: StreamMessage[] = system ? [{ role: 'system', content: system }, ...messages] : [...messages]; // GPT-5 reasoning models (gpt-5.5, gpt-5.5-pro) only accept the default // temperature; sending any other value returns 400. Mirror the Anthropic // path: when the model is flagged `acceptsSamplingParams: false`, omit. const modelDef = getModelById(model); const sendSamplingParams = modelDef?.acceptsSamplingParams !== false; const { response, cleanup } = await openAiFetch( 'https://api.openai.com/v1/chat/completions', { model, messages: allMessages.map((m) => ({ role: m.role, content: m.content })), stream: true, ...(sendSamplingParams ? { temperature: 0.3 } : {}), max_completion_tokens: 8192, }, apiKey, signal, onError, ); if (!response) return; if (!response.body) { cleanup(); onError(new Error('No response body')); return; } let fullText = ''; let finishReason: string | null = null; const ok = await readSseStream(response.body, signal, (data) => { const parsed = JSON.parse(data) as { choices?: Array<{ delta?: { content?: string }; finish_reason?: string | null }>; }; const content = parsed.choices?.[0]?.delta?.content; if (content) { fullText += content; onChunk(content); } const fr = parsed.choices?.[0]?.finish_reason; if (fr) finishReason = fr; }, onError); cleanup(); if (ok) { onFinishReason?.(finishReason); onComplete(fullText); } } /** Responses API for Codex-style models (GPT-5.3 Codex) */ async function streamOpenAiResponses( apiKey: string, options: Omit, ): Promise { const { model, messages, system, signal, onChunk, onComplete, onError, onFinishReason } = options; // Build the input array: system instructions + conversation const input: Array<{ role: string; content: string | unknown[] }> = []; if (system) { input.push({ role: 'developer', content: system }); } for (const m of messages) { input.push({ role: m.role, content: m.content }); } const { response, cleanup } = await openAiFetch( 'https://api.openai.com/v1/responses', { model, input, stream: true, max_output_tokens: 8192, }, apiKey, signal, onError, ); if (!response) return; if (!response.body) { cleanup(); onError(new Error('No response body')); return; } let fullText = ''; // Map Responses API terminal events → chat-style finish_reason. // `response.incomplete` is any non-completed terminal state: when the // reason is `max_output_tokens` — or simply absent — map to 'length' so // the ChatPanel "Continue" UX can resume a truncated Codex reply. Other // explicit reasons (e.g. `content_filter`) pass through unchanged. let finishReason: string | null = 'stop'; const ok = await readSseStream(response.body, signal, (data) => { const event = JSON.parse(data) as { type?: string; delta?: string; response?: { status?: string; incomplete_details?: { reason?: string } | null; }; }; if (event.type === 'response.output_text.delta' && event.delta) { fullText += event.delta; onChunk(event.delta); } else if (event.type === 'response.incomplete') { const reason = event.response?.incomplete_details?.reason; finishReason = reason == null || reason === 'max_output_tokens' ? 'length' : reason; } else if (event.type === 'response.completed') { finishReason = 'stop'; } }, onError); cleanup(); if (ok) { onFinishReason?.(finishReason); onComplete(fullText); } } // ── Shared helpers ───────────────────────────────────────────────────────── async function openAiFetch( url: string, body: Record, apiKey: string, signal: AbortSignal | undefined, onError: (err: Error) => void, ): Promise<{ response: Response | null; cleanup: () => void }> { const controller = new AbortController(); const timeoutId = setTimeout( () => controller.abort(new Error('Chat request timed out. Please try again.')), STREAM_REQUEST_TIMEOUT_MS, ); const abortFromParent = () => controller.abort(signal?.reason); if (signal) { if (signal.aborted) { clearTimeout(timeoutId); return { response: null, cleanup: () => {} }; } signal.addEventListener('abort', abortFromParent, { once: true }); } // cleanup() clears the connect timeout and removes the abort listener. // Callers must call it AFTER streaming completes, not before — otherwise // user cancellation during SSE consumption won't abort the fetch. const cleanup = () => { clearTimeout(timeoutId); signal?.removeEventListener('abort', abortFromParent); }; let response: Response; try { response = await fetch(url, { method: 'POST', headers: { Authorization: `Bearer ${apiKey}`, 'Content-Type': 'application/json', }, body: JSON.stringify(body), signal: controller.signal, }); } catch (err) { cleanup(); if (signal?.aborted) return { response: null, cleanup: () => {} }; if (controller.signal.aborted && controller.signal.reason instanceof Error) { onError(controller.signal.reason); } else { onError(err instanceof Error ? err : new Error(String(err))); } return { response: null, cleanup: () => {} }; } if (!response.ok) { cleanup(); let detail = `OpenAI error (${response.status})`; try { const errBody = (await response.json()) as { error?: { message?: string } }; if (response.status === 401) { detail = 'Invalid OpenAI API key. Check your key in the chat panel.'; } else if (response.status === 429) { detail = 'OpenAI rate limit reached. Please wait and try again.'; } else if (errBody.error?.message) { detail = `OpenAI: ${errBody.error.message}`; } } catch { /* ignore parse failure */ } onError(new Error(detail)); return { response: null, cleanup: () => {} }; } return { response, cleanup }; }