import * as crypto from "node:crypto"; import * as fsSync from "node:fs"; import * as fs from "node:fs/promises"; import * as path from "node:path"; import { scheduler } from "node:timers/promises"; import type { Effort } from "@oh-my-pi/pi-catalog/effort"; import { isVertexExpressOpenAIUrl, isVertexRawPredictUrl } from "@oh-my-pi/pi-catalog/hosts"; import { mapEffortToAnthropicAdaptiveEffort, mapEffortToGoogleThinkingLevel, minimumSupportedEffort, requireSupportedEffort, resolveWireModelId, } from "@oh-my-pi/pi-catalog/model-thinking"; import { CATALOG_PROVIDERS, type ProviderCatalogEntry } from "@oh-my-pi/pi-catalog/provider-models"; import { $env, $pickenv, getConfigRootDir, isEnoent, logger } from "@oh-my-pi/pi-utils"; import { getCustomApi } from "./api-registry"; import { AUTH_RETRY_STEPS, isApiKeyResolver, resolveRetryKey } from "./auth-retry"; import * as AIError from "./error"; import { ProviderHttpError } from "./error"; import { isUsageLimitOutcome } from "./error/rate-limit"; import type { BedrockOptions } from "./providers/amazon-bedrock"; import type { AnthropicOptions } from "./providers/anthropic"; import type { CursorOptions } from "./providers/cursor"; import type { DevinOptions } from "./providers/devin"; import { isGitLabDuoModel, streamGitLabDuo } from "./providers/gitlab-duo"; import { type GitLabDuoWorkflowOptions, streamGitLabDuoWorkflow } from "./providers/gitlab-duo-workflow"; import type { GoogleOptions } from "./providers/google"; import { getVertexAccessToken } from "./providers/google-auth"; import type { GoogleGeminiCliOptions } from "./providers/google-gemini-cli"; import type { GoogleVertexOptions } from "./providers/google-vertex"; import { isKimiModel, streamKimi } from "./providers/kimi"; import type { OllamaChatOptions } from "./providers/ollama"; import type { OpenAICompletionsOptions } from "./providers/openai-completions"; import { streamPiNative } from "./providers/pi-native-client"; // Heavy provider stream functions are imported lazily via register-builtins, // which wraps each provider module in a dynamic import. This keeps the // AWS SDK, google-auth-library, @google/genai, @bufbuild/protobuf, and // other provider SDKs out of the CLI startup parse graph. The // gitlab-duo / kimi / synthetic providers stay eager because their modules // export routing predicates (isGitLabDuoModel, isKimiModel, isSyntheticModel) // that must be callable synchronously before streaming begins, and their // modules are thin wrappers with no heavy SDK dependencies. import { streamAnthropic, streamAzureOpenAIResponses, streamBedrock, streamCursor, streamDevin, streamGoogle, streamGoogleGeminiCli, streamGoogleVertex, streamOllama, streamOpenAICodexResponses, streamOpenAICompletions, streamOpenAIResponses, } from "./providers/register-builtins"; import { isSyntheticModel, streamSynthetic } from "./providers/synthetic"; import { PROVIDER_REGISTRY } from "./registry"; import type { Api, AssistantMessage, AssistantMessageEvent, Context, FetchImpl, Model, OptionsForApi, SimpleStreamOptions, StreamOptions, ThinkingBudgets, ToolChoice, } from "./types"; import { AssistantMessageEventStream } from "./utils/event-stream"; import { wrapLeakedThinkingStream } from "./utils/leaked-thinking-stream"; import { wrapFetchForProxy } from "./utils/proxy"; import { withRequestDebugFetch } from "./utils/request-debug"; import { withGeminiThinkingLoopGuard } from "./utils/thinking-loop"; function isGoogleVertexAuthenticatedModel(model: Model): boolean { return ( model.provider === "google-vertex" && ((model.api === "openai-completions" && isVertexExpressOpenAIUrl(model.baseUrl)) || (model.api === "anthropic-messages" && isVertexRawPredictUrl(model.baseUrl))) ); } type ProviderInFlightLease = { path: string; heartbeat: NodeJS.Timeout; flushHeartbeat: () => Promise; }; type ProviderInFlightLeaseInfo = { pid: number; timestamp: number; token: string; }; type ProviderInFlightStaleLock = { token: string } | { mtimeMs: number }; type ProviderInFlightLockIdentity = { dev: number; ino: number; birthtimeMs: number }; const PROVIDER_INFLIGHT_LOCK_STALE_MS = 10_000; const PROVIDER_INFLIGHT_LEASE_STALE_MS = 30_000; const PROVIDER_INFLIGHT_HEARTBEAT_MS = 5_000; const PROVIDER_INFLIGHT_SIGNAL_FALLBACK_MS = 250; let configuredProviderMaxInFlightRequests: Record = {}; let providerInFlightRootOverride: string | undefined; export function configureProviderMaxInFlightRequests(limits: Record | undefined): void { configuredProviderMaxInFlightRequests = limits ?? {}; } function resolveProviderInFlightLimit( provider: string, options?: Pick, ): number | undefined { const limits = options?.maxInFlightRequests ?? configuredProviderMaxInFlightRequests; const value = limits[provider]; if (typeof value !== "number" || !Number.isFinite(value) || value <= 0) return undefined; return Math.max(1, Math.floor(value)); } function providerInFlightRoot(): string { if (providerInFlightRootOverride) return providerInFlightRootOverride; return path.join(getConfigRootDir(), "run", "provider-inflight"); } function providerInFlightSegment(provider: string): string { return crypto.createHash("sha256").update(provider).digest("base64url"); } function providerInFlightDir(provider: string): string { return path.join(providerInFlightRoot(), providerInFlightSegment(provider)); } function providerInFlightSignalPath(provider: string): string { return path.join(providerInFlightDir(provider), ".wakeup"); } function providerInFlightLockDir(provider: string): string { return `${providerInFlightDir(provider)}.lock`; } // `process.kill(pid, 0)` may throw for permission/sandbox reasons even when a // process exists. Treat non-ESRCH failures as alive; timestamp expiry still // reaps leases whose heartbeat stopped. function isProcessAlive(pid: number): boolean { try { process.kill(pid, 0); return true; } catch (error) { return (error as NodeJS.ErrnoException).code !== "ESRCH"; } } async function readProviderInFlightInfo(infoPath: string): Promise { try { const content = await fs.readFile(infoPath, "utf-8"); const parsed = JSON.parse(content) as Partial; if (typeof parsed.pid !== "number" || typeof parsed.timestamp !== "number" || typeof parsed.token !== "string") { return null; } return { pid: parsed.pid, timestamp: parsed.timestamp, token: parsed.token }; } catch { return null; } } async function writeProviderInFlightInfo(dir: string, token: string): Promise { const info: ProviderInFlightLeaseInfo = { pid: process.pid, timestamp: Date.now(), token }; const infoPath = path.join(dir, "info.json"); const tempPath = path.join(dir, `.info-${process.pid}-${crypto.randomUUID()}.tmp`); try { await Bun.write(tempPath, JSON.stringify(info)); await fs.rename(tempPath, infoPath); } catch (error) { await fs.rm(tempPath, { force: true }).catch(() => {}); throw error; } } async function isProviderInFlightDirStale(dir: string, staleMs: number): Promise { const info = await readProviderInFlightInfo(path.join(dir, "info.json")); if (info) { if (!isProcessAlive(info.pid)) return true; return Date.now() - info.timestamp > staleMs; } try { const stat = await fs.stat(path.join(dir, "info.json")); return Date.now() - stat.mtimeMs > staleMs; } catch (error) { if (!isEnoent(error)) throw error; } try { const stat = await fs.stat(dir); return Date.now() - stat.mtimeMs > staleMs; } catch (error) { if (isEnoent(error)) return false; throw error; } } async function readProviderInFlightStaleLock(lockDir: string): Promise { const infoPath = path.join(lockDir, "info.json"); const info = await readProviderInFlightInfo(infoPath); if (info) return isProcessAlive(info.pid) ? null : { token: info.token }; try { const stat = await fs.stat(lockDir); return Date.now() - stat.mtimeMs > PROVIDER_INFLIGHT_LOCK_STALE_MS ? { mtimeMs: stat.mtimeMs } : null; } catch (error) { if (isEnoent(error)) return null; throw error; } } async function readProviderInFlightLockIdentity(lockDir: string): Promise { const stat = await fs.stat(lockDir); return { dev: stat.dev, ino: stat.ino, birthtimeMs: stat.birthtimeMs }; } function isSameProviderInFlightLock( current: ProviderInFlightLockIdentity, expected: ProviderInFlightLockIdentity, ): boolean { if (current.dev !== expected.dev) return false; if (current.ino !== 0 || expected.ino !== 0) return current.ino === expected.ino; return current.birthtimeMs === expected.birthtimeMs; } async function releaseProviderInFlightStaleLock(lockDir: string, stale: ProviderInFlightStaleLock): Promise { if ("token" in stale) { await releaseProviderInFlightLock(lockDir, stale.token); return; } const infoPath = path.join(lockDir, "info.json"); if (await readProviderInFlightInfo(infoPath)) return; try { const stat = await fs.stat(lockDir); if (stat.mtimeMs !== stale.mtimeMs || Date.now() - stat.mtimeMs <= PROVIDER_INFLIGHT_LOCK_STALE_MS) return; await fs.rm(lockDir, { recursive: true, force: true }); } catch {} } // Best-effort token-checked release. A token mismatch means another process has // already replaced the lock, so the fresh lock must be left intact. async function releaseProviderInFlightLock(lockDir: string, token: string): Promise { try { const info = await readProviderInFlightInfo(path.join(lockDir, "info.json")); if (!info || info.token !== token) return; await fs.rm(lockDir, { recursive: true, force: true }); } catch {} } async function releaseProviderInFlightLockDirIfSame( lockDir: string, identity: ProviderInFlightLockIdentity, ): Promise { try { if (await readProviderInFlightInfo(path.join(lockDir, "info.json"))) return; const current = await readProviderInFlightLockIdentity(lockDir); if (!isSameProviderInFlightLock(current, identity)) return; await fs.rm(lockDir, { recursive: true, force: true }); } catch {} } async function acquireProviderInFlightLock(provider: string, signal?: AbortSignal): Promise<() => Promise> { const lockDir = providerInFlightLockDir(provider); await fs.mkdir(path.dirname(lockDir), { recursive: true }); while (true) { if (signal?.aborted) throw signal.reason ?? new AIError.AbortError("Provider request aborted before dispatch"); try { await fs.mkdir(lockDir); const lockIdentity = await readProviderInFlightLockIdentity(lockDir); const token = crypto.randomUUID(); try { await writeProviderInFlightInfo(lockDir, token); } catch (error) { await releaseProviderInFlightLockDirIfSame(lockDir, lockIdentity); throw error; } return async () => { await releaseProviderInFlightLock(lockDir, token); }; } catch (error) { if ((error as NodeJS.ErrnoException).code !== "EEXIST") throw error; } const staleLock = await readProviderInFlightStaleLock(lockDir); if (staleLock) { await releaseProviderInFlightStaleLock(lockDir, staleLock); await signalProviderInFlightWaiters(provider); continue; } await waitForProviderInFlightSignal(provider, signal); } } async function cleanupProviderInFlightLeases(providerDir: string): Promise { let active = 0; let entries: string[]; try { entries = await fs.readdir(providerDir); } catch (error) { if (isEnoent(error)) return 0; throw error; } for (const entry of entries) { const leaseDir = path.join(providerDir, entry); let isDirectory = false; try { isDirectory = (await fs.stat(leaseDir)).isDirectory(); } catch (error) { if (isEnoent(error)) continue; throw error; } if (!isDirectory) continue; if (await isProviderInFlightDirStale(leaseDir, PROVIDER_INFLIGHT_LEASE_STALE_MS)) { await fs.rm(leaseDir, { recursive: true, force: true }); continue; } active++; } return active; } async function tryAcquireProviderInFlightLease( provider: string, limit: number, signal?: AbortSignal, ): Promise { const releaseLock = await acquireProviderInFlightLock(provider, signal); try { const dir = providerInFlightDir(provider); await fs.mkdir(dir, { recursive: true }); const active = await cleanupProviderInFlightLeases(dir); if (active >= limit) return null; const leaseDir = path.join(dir, `${process.pid}-${Date.now()}-${crypto.randomUUID()}`); const token = crypto.randomUUID(); try { await fs.mkdir(leaseDir); await writeProviderInFlightInfo(leaseDir, token); } catch (error) { await removeProviderInFlightLeaseDir(leaseDir).catch(() => {}); throw error; } let heartbeatFlush = Promise.resolve(); const touchHeartbeat = () => { heartbeatFlush = heartbeatFlush .then( () => writeProviderInFlightInfo(leaseDir, token), () => writeProviderInFlightInfo(leaseDir, token), ) .catch(() => {}); }; const heartbeat = setInterval(touchHeartbeat, PROVIDER_INFLIGHT_HEARTBEAT_MS); heartbeat.unref?.(); return { path: leaseDir, heartbeat, flushHeartbeat: () => heartbeatFlush }; } finally { await releaseLock(); } } async function signalProviderInFlightWaitersInDir(dir: string): Promise { try { await fs.mkdir(dir, { recursive: true }); await Bun.write(path.join(dir, ".wakeup"), String(Date.now())); } catch {} } async function signalProviderInFlightWaiters(provider: string): Promise { await signalProviderInFlightWaitersInDir(providerInFlightDir(provider)); } function waitForProviderInFlightSignal(provider: string, signal?: AbortSignal): Promise { if (signal?.aborted) return Promise.reject(signal.reason ?? new AIError.AbortError("Provider request aborted before dispatch")); const signalPath = providerInFlightSignalPath(provider); const waitStarted = Date.now(); const { promise, resolve, reject } = Promise.withResolvers(); let settled = false; let watcher: fsSync.FSWatcher | undefined; const timer = setTimeout(() => finish(resolve), PROVIDER_INFLIGHT_SIGNAL_FALLBACK_MS); const finish = (settle: () => void) => { if (settled) return; settled = true; clearTimeout(timer); watcher?.close(); signal?.removeEventListener("abort", onAbort); settle(); }; const onAbort = () => { finish(() => reject(signal?.reason ?? new AIError.AbortError("Provider request aborted before dispatch"))); }; signal?.addEventListener("abort", onAbort, { once: true }); try { watcher = fsSync.watch(providerInFlightDir(provider), (_event, filename) => { if (filename === ".wakeup" || filename === null) { finish(resolve); } }); void fs.stat(signalPath).then( stat => { if (stat.mtimeMs >= waitStarted) finish(resolve); }, error => { if (!isEnoent(error)) finish(resolve); }, ); } catch { // Filesystem notifications are best-effort across platforms; the fallback // timer keeps stale-lock/lease cleanup progressing if an event is dropped. } return promise; } async function removeProviderInFlightLeaseDir(leasePath: string): Promise { for (let attempt = 0; attempt < 3; attempt++) { try { await fs.rm(leasePath, { recursive: true, force: true }); return; } catch (error) { if (isEnoent(error)) return; const code = (error as NodeJS.ErrnoException).code; if (attempt < 2 && (code === "EBUSY" || code === "ENOTEMPTY" || code === "EPERM")) { await Bun.sleep(25); continue; } throw error; } } } // Signal into the lease's OWN provider directory (derived from `lease.path`) // rather than recomputing it from the current root. A release that lands after // the in-flight root has been repointed (only the test seam does that) must not // write `.wakeup` into an unrelated provider directory. async function releaseProviderInFlightLease(lease: ProviderInFlightLease): Promise { clearInterval(lease.heartbeat); await lease.flushHeartbeat(); await removeProviderInFlightLeaseDir(lease.path); await signalProviderInFlightWaitersInDir(path.dirname(lease.path)); } async function acquireProviderInFlightSlot( provider: string, limit: number | undefined, signal?: AbortSignal, ): Promise<() => Promise> { if (limit === undefined) return async () => {}; let loggedWait = false; while (true) { if (signal?.aborted) throw signal.reason ?? new AIError.AbortError("Provider request aborted before dispatch"); const lease = await tryAcquireProviderInFlightLease(provider, limit, signal); if (lease) return () => releaseProviderInFlightLease(lease); if (!loggedWait) { loggedWait = true; logger.debug("Provider in-flight limit blocked request", { provider, limit }); } await waitForProviderInFlightSignal(provider, signal); } } export const __providerInFlightForTesting = { setRoot(root: string | undefined): void { providerInFlightRootOverride = root; }, providerDir(provider: string): string { return providerInFlightDir(provider); }, lockDir(provider: string): string { return providerInFlightLockDir(provider); }, async captureStaleLockRelease(provider: string): Promise<(() => Promise) | null> { const lockDir = providerInFlightLockDir(provider); const stale = await readProviderInFlightStaleLock(lockDir); if (!stale) return null; return () => releaseProviderInFlightStaleLock(lockDir, stale); }, async captureLockDirRelease(provider: string): Promise<(() => Promise) | null> { const lockDir = providerInFlightLockDir(provider); try { const identity = await readProviderInFlightLockIdentity(lockDir); return () => releaseProviderInFlightLockDirIfSame(lockDir, identity); } catch { return null; } }, }; function withProviderInFlightLimit>( model: Model, options: TOptions | undefined, dispatch: () => AssistantMessageEventStream, ): AssistantMessageEventStream { // Leaked-thinking healing folds in here — the one shared provider-dispatch // chokepoint — so the loop guard (which wraps this) sees healed events and all // six provider exits are covered by one wrap. Healing is idempotent. const limit = resolveProviderInFlightLimit(model.provider, options); if (limit === undefined) return wrapLeakedThinkingStream(dispatch()); const outer = new AssistantMessageEventStream(); void (async () => { let release: (() => Promise) | undefined; let released = false; const releaseOnce = async () => { if (!release || released) return; released = true; await release(); }; try { const startedWaitingAt = Date.now(); release = await acquireProviderInFlightSlot(model.provider, limit, options?.signal); if (Date.now() - startedWaitingAt >= PROVIDER_INFLIGHT_SIGNAL_FALLBACK_MS) { logger.debug("Provider in-flight limit wait completed", { provider: model.provider, limit }); } if (options?.signal?.aborted) { throw options.signal.reason ?? new AIError.AbortError("Provider request aborted before dispatch"); } const inner = wrapLeakedThinkingStream(dispatch()); try { for await (const event of inner) { outer.push(event); if (outer.done) return; } if (!outer.done) outer.end(await inner.result()); } finally { await releaseOnce(); } } catch (error) { await releaseOnce(); if (!outer.done) outer.fail(error); } })(); return outer; } function createVertexAuthenticatedFetch(options: StreamOptions | undefined): FetchImpl { const baseFetch = options?.fetch ?? fetch; const vertexFetch = async (input: string | URL | Request, init?: RequestInit): Promise => { const token = await getVertexAccessToken({ signal: options?.signal, fetch: baseFetch }); const headers = new Headers(init?.headers); headers.set("Authorization", `Bearer ${token}`); const rewritten = resolveVertexRequest(input); const url = rewritten instanceof Request ? rewritten.url : rewritten.toString(); if (isVertexRawPredictUrl(url)) { const bodyText = await readVertexRequestBody(rewritten, init); const transformed = transformVertexAnthropicBody(bodyText); return baseFetch(url, { ...init, method: init?.method ?? (rewritten instanceof Request ? rewritten.method : "POST"), headers, body: transformed, }); } return baseFetch(rewritten, { ...init, headers }); }; return Object.assign(vertexFetch, baseFetch.preconnect ? { preconnect: baseFetch.preconnect } : {}); } async function readVertexRequestBody(input: string | URL | Request, init: RequestInit | undefined): Promise { if (input instanceof Request) return input.clone().text(); const body = init?.body; if (typeof body === "string") return body; if (body instanceof Uint8Array) return new TextDecoder().decode(body); if (body instanceof ArrayBuffer) return new TextDecoder().decode(body); return ""; } // Vertex Claude rejects the standard Anthropic body shape: the `model` field // is encoded in the URL path and `anthropic_version: "vertex-2023-10-16"` is // required in the JSON body instead of the `anthropic-version` HTTP header. function transformVertexAnthropicBody(bodyText: string): string { if (!bodyText) return bodyText; try { const payload = JSON.parse(bodyText) as Record; delete payload.model; payload.anthropic_version = "vertex-2023-10-16"; return JSON.stringify(payload); } catch { return bodyText; } } function resolveVertexRequest(input: string | URL | Request): string | URL | Request { const project = $env.GOOGLE_CLOUD_PROJECT || $env.GCP_PROJECT || $env.GCLOUD_PROJECT; const location = $env.GOOGLE_VERTEX_LOCATION || $env.GOOGLE_CLOUD_LOCATION || $env.VERTEX_LOCATION; if (!project || !location) return input; const rewriteUrl = (url: string): string => { const hasPlaceholder = url.includes("{project}") || url.includes("{location}") || url.includes("%7Bproject%7D") || url.includes("%7Blocation%7D"); const host = location === "global" ? "aiplatform.googleapis.com" : `${location}-aiplatform.googleapis.com`; const rewritten = hasPlaceholder ? url .replace("https://{location}-aiplatform.googleapis.com", `https://${host}`) .replace("https://%7Blocation%7D-aiplatform.googleapis.com", `https://${host}`) .replaceAll("{project}", encodeURIComponent(project)) .replaceAll("%7Bproject%7D", encodeURIComponent(project)) .replaceAll("{location}", encodeURIComponent(location)) .replaceAll("%7Blocation%7D", encodeURIComponent(location)) : url; return rewritten.replace(":streamRawPredict/v1/messages", ":streamRawPredict"); }; if (input instanceof Request) { const rewrittenUrl = rewriteUrl(input.url); return rewrittenUrl === input.url ? input : new Request(rewrittenUrl, input); } if (input instanceof URL) { const rewrittenUrl = rewriteUrl(input.toString()); return rewrittenUrl === input.toString() ? input : new URL(rewrittenUrl); } return rewriteUrl(input); } type KeyResolver = string | (() => string | undefined); const LEGACY_ENV_KEYS: Record = { // Non-provider / search-tool keys and API-name keys not modeled as registry provider defs. "azure-openai-responses": "AZURE_OPENAI_API_KEY", exa: "EXA_API_KEY", jina: "JINA_API_KEY", brave: "BRAVE_API_KEY", tinyfish: "TINYFISH_API_KEY", firecrawl: "FIRECRAWL_API_KEY", }; /** * Env fallbacks derived from the catalog table — the single source for plain * provider env-var names. Registry defs override with computed resolvers * (Foundry/ADC/Bedrock probes); legacy non-provider keys merge last. */ const CATALOG_ENTRY_ENV_KEYS = (CATALOG_PROVIDERS as readonly ProviderCatalogEntry[]).flatMap(provider => { const envVars = provider.envVars; if (!envVars || envVars.length === 0) return []; const resolver: KeyResolver = envVars.length === 1 ? envVars[0] : () => $pickenv(...envVars); return [[provider.id, resolver] as [string, KeyResolver]]; }); const serviceProviderMap: Record = { ...Object.fromEntries(CATALOG_ENTRY_ENV_KEYS), ...Object.fromEntries( PROVIDER_REGISTRY.flatMap(provider => provider.envKeys != null ? [[provider.id, provider.envKeys] as [string, KeyResolver]] : [], ), ), ...LEGACY_ENV_KEYS, }; /** * Get API key for provider from known environment variables, e.g. OPENAI_API_KEY. * * Will not return API keys for providers that require OAuth tokens. * Checks Bun.env, then cwd/.env, then ~/.env. */ export function getEnvApiKey(provider: string): string | undefined { const resolver = serviceProviderMap[provider]; if (typeof resolver === "string") { return $env[resolver]; } return resolver?.(); } /** * Name of the environment variable that backs `getEnvApiKey` for a provider, * when that provider maps to a single named variable (e.g. `github-copilot` → * `COPILOT_GITHUB_TOKEN`). Returns undefined for providers whose env fallback * is computed (multi-var pickers, Vertex ADC / Bedrock probes, …) since no * single variable name describes the source. */ export function getEnvApiKeyName(provider: string): string | undefined { const resolver = serviceProviderMap[provider]; return typeof resolver === "string" ? resolver : undefined; } /** * Enumerate every provider that has an env-var fallback for `getEnvApiKey`. * Used by `omp auth-broker migrate --include-env` to discover env-sourced keys * that should be uploaded to the broker. */ export function listProvidersWithEnvKey(): string[] { return Object.keys(serviceProviderMap); } export function stream( model: Model, context: Context, options?: OptionsForApi, ): AssistantMessageEventStream { return withGeminiThinkingLoopGuard(model, options, opts => withProviderInFlightLimit(model, opts, () => streamDispatch(model, context, opts)), ); } function streamDispatch( model: Model, context: Context, options?: OptionsForApi, ): AssistantMessageEventStream { const baseOptions = (options || {}) as StreamOptions; const debugOptions = withRequestDebugFetch(baseOptions); const requestOptions = { ...debugOptions, fetch: wrapFetchForProxy(debugOptions.fetch ?? (globalThis.fetch as FetchImpl), model.provider), } as OptionsForApi; // Check custom API registry first (extension-provided APIs like "vertex-claude-api") const customApiProvider = getCustomApi(model.api); if (customApiProvider) { return customApiProvider.stream(model, context, requestOptions as StreamOptions); } if (isGitLabDuoModel(model)) { const apiKey = requestOptions.apiKey || getEnvApiKey(model.provider); if (!apiKey) { throw new AIError.MissingApiKeyError(model.provider); } return streamGitLabDuo(model, context, { ...(requestOptions as SimpleStreamOptions), apiKey, }); } if (model.api === "gitlab-duo-agent") { const apiKey = (requestOptions as StreamOptions | undefined)?.apiKey || getEnvApiKey(model.provider); if (!apiKey) { throw new AIError.MissingApiKeyError(model.provider); } return streamGitLabDuoWorkflow(model as Model<"gitlab-duo-agent">, context, { ...(requestOptions as StreamOptions | undefined), apiKey, } as GitLabDuoWorkflowOptions); } // Vertex AI uses Application Default Credentials, not API keys if (model.api === "google-vertex") { return streamGoogleVertex(model as Model<"google-vertex">, context, requestOptions as GoogleVertexOptions); } else if (model.api === "bedrock-converse-stream") { // Bedrock doesn't have any API keys instead it sources credentials from standard AWS env variables or from given AWS profile. return streamBedrock(model as Model<"bedrock-converse-stream">, context, requestOptions as BedrockOptions); } const apiKey = requestOptions.apiKey || getEnvApiKey(model.provider); if (!apiKey) { throw new AIError.MissingApiKeyError(model.provider); } const providerOptions = isGoogleVertexAuthenticatedModel(model) ? { ...requestOptions, apiKey: "vertex-adc", fetch: createVertexAuthenticatedFetch(requestOptions), } : { ...requestOptions, apiKey }; const api: Api = model.api; switch (api) { case "anthropic-messages": { const anthropicOptions = providerOptions as AnthropicOptions; return streamAnthropic(model as Model<"anthropic-messages">, context, { ...anthropicOptions, isOAuth: anthropicOptions.isOAuth ?? model.isOAuth, }); } case "openrouter": { const useResponses = $env.PI_OPENROUTER_RESPONSES !== "0"; if (useResponses) { return streamOpenAIResponses( model as Model<"openai-responses">, context, providerOptions as OptionsForApi<"openai-responses">, ); } return streamOpenAICompletions( model as Model<"openai-completions">, context, providerOptions as OptionsForApi<"openai-completions">, ); } case "openai-completions": return streamOpenAICompletions( model as Model<"openai-completions">, context, providerOptions as OptionsForApi<"openai-completions">, ); case "openai-responses": return streamOpenAIResponses( model as Model<"openai-responses">, context, providerOptions as OptionsForApi<"openai-responses">, ); case "azure-openai-responses": return streamAzureOpenAIResponses( model as Model<"azure-openai-responses">, context, providerOptions as OptionsForApi<"azure-openai-responses">, ); case "openai-codex-responses": return streamOpenAICodexResponses( model as Model<"openai-codex-responses">, context, providerOptions as OptionsForApi<"openai-codex-responses">, ); case "google-generative-ai": return streamGoogle(model as Model<"google-generative-ai">, context, providerOptions); case "google-gemini-cli": return streamGoogleGeminiCli( model as Model<"google-gemini-cli">, context, providerOptions as GoogleGeminiCliOptions, ); case "ollama-chat": return streamOllama(model as Model<"ollama-chat">, context, providerOptions as OllamaChatOptions); case "cursor-agent": return streamCursor(model as Model<"cursor-agent">, context, providerOptions as CursorOptions); case "devin-agent": return streamDevin(model as Model<"devin-agent">, context, providerOptions as DevinOptions); default: throw new AIError.ConfigurationError(`Unhandled API: ${api}`); } } /** Thinking-loop re-samples spent before {@link resolveWithThinkingLoopCook} cooks. */ const THINKING_LOOP_MAX_ABORTS = 3; const THINKING_LOOP_RETRY_BASE_DELAY_MS = 500; const THINKING_LOOP_RETRY_MAX_DELAY_MS = 8_000; /** * Resolve a completion, re-sampling a thinking-loop stall up to * {@link THINKING_LOOP_MAX_ABORTS} times before letting it cook. The loop guard * raises an empty `stopReason: "error"` stall on each guarded attempt; this * result-path consumer re-dispatches a fresh request per stall and, once the abort * budget is spent, runs one final pass with the guard disabled so a stubborn loop * returns the model's raw output instead of a fatal stall. Non-stall results — * including genuine errors — return immediately; a caller abort during backoff * propagates so cancellation surfaces as an abort, never a stale stall result. */ async function resolveWithThinkingLoopCook( signal: AbortSignal | undefined, dispatch: () => AssistantMessageEventStream, cook: () => AssistantMessageEventStream, ): Promise { let message = await dispatch().result(); let thinkingLoopRetry = AIError.is(message.errorId, AIError.Flag.ThinkingLoop); for (let attempt = 0; thinkingLoopRetry && attempt < THINKING_LOOP_MAX_ABORTS - 1; attempt += 1) { // A caller abort surfaces as a thrown abort (never the stall, which would // misclassify as a 502): throwIfAborted before backoff, and scheduler.wait // rejects if the abort lands mid-delay. signal?.throwIfAborted(); const delay = Math.min(THINKING_LOOP_RETRY_BASE_DELAY_MS * 2 ** attempt, THINKING_LOOP_RETRY_MAX_DELAY_MS); await scheduler.wait(delay, { signal }); message = await dispatch().result(); thinkingLoopRetry = message.stopReason === "error" && message.content.length === 0 && AIError.is(message.errorId, AIError.Flag.ThinkingLoop); } if (!thinkingLoopRetry) return message; signal?.throwIfAborted(); // Abort budget spent and still looping: let it cook with the guard disabled. return cook().result(); } export async function complete( model: Model, context: Context, options?: OptionsForApi, ): Promise { return resolveWithThinkingLoopCook( options?.signal, () => stream(model, context, options), () => stream(model, context, { ...options, loopGuard: { ...options?.loopGuard, enabled: false } }), ); } type AuthRetryFailure = { error: unknown; bufferedEvents: AssistantMessageEvent[]; terminalEvent?: Extract; }; function extractStatusFromAssistantError(message: AssistantMessage): number | undefined { if (message.errorStatus !== undefined) return message.errorStatus; if (!message.errorMessage) return undefined; return AIError.status({ message: message.errorMessage }); } function isRetryableUpstreamError(error: unknown, status: number | undefined, message: string | undefined): boolean { // 401 means the credential is bad. Usage-limit phrasing (Codex's // "You have hit your ChatGPT usage limit", Anthropic's "usage_limit_reached", // Google's "resource_exhausted", OpenAI's "insufficient_quota") and 429s // without transient rate-limit wording mean this account is parked but a // sibling credential can usually pick the request up. Both are rotatable // via `onAuthError` — the auth-gateway maps the former to // `invalidateCredentialMatching` and the latter to // `markUsageLimitReached`. Transient 429s ("Too many requests", // per-minute caps) classify as RATE_LIMIT_EXCEEDED in // `parseRateLimitReason` and stay in the provider's own backoff layer // instead of burning siblings. if (status === 401) return true; void error; return isUsageLimitOutcome(status, message); } function createAssistantAuthError(message: AssistantMessage): Error { const text = message.errorMessage ?? "Provider authentication failed"; const status = extractStatusFromAssistantError(message); return status === undefined ? new AIError.ProviderResponseError(text, { kind: "runtime" }) : new ProviderHttpError(text, status); } function emitBufferedEvents(stream: AssistantMessageEventStream, events: AssistantMessageEvent[]): void { for (const event of events) { stream.push(event); } } export function streamSimple( model: Model, context: Context, options?: SimpleStreamOptions, ): AssistantMessageEventStream { const baseOptions = (options || {}) as SimpleStreamOptions; const debugOptions = withRequestDebugFetch(baseOptions); const requestOptions = { ...debugOptions, fetch: wrapFetchForProxy(debugOptions.fetch ?? (globalThis.fetch as FetchImpl), model.provider), } as SimpleStreamOptions; const apiKeyResolver = isApiKeyResolver(requestOptions?.apiKey) ? requestOptions.apiKey : undefined; if (apiKeyResolver) { const outer = new AssistantMessageEventStream(); const signal = requestOptions?.signal; // One inner attempt against a resolved string key. When // `captureAuthFailure` is set, a retryable auth error that arrives before // any replay-unsafe event is buffered and returned (so the caller can // retry with a fresh key) instead of surfaced. The terminal attempt // clears the flag and emits whatever it gets. const runAttempt = async (apiKey: string, captureAuthFailure: boolean): Promise => { const bufferedEvents: AssistantMessageEvent[] = []; let emittedReplayUnsafeEvent = false; const flushBuffered = (): void => { emitBufferedEvents(outer, bufferedEvents); bufferedEvents.length = 0; }; try { const inner = streamSimple(model, context, { ...requestOptions, apiKey }); for await (const event of inner) { if (!emittedReplayUnsafeEvent && event.type === "start") { bufferedEvents.push(event); continue; } if ( !emittedReplayUnsafeEvent && captureAuthFailure && event.type === "error" && isRetryableUpstreamError( event.error, extractStatusFromAssistantError(event.error), event.error.errorMessage, ) ) { return { error: createAssistantAuthError(event.error), bufferedEvents, terminalEvent: event }; } flushBuffered(); emittedReplayUnsafeEvent = true; outer.push(event); if (outer.done) return undefined; } flushBuffered(); if (!outer.done) outer.end(await inner.result()); } catch (error) { if ( !emittedReplayUnsafeEvent && captureAuthFailure && isRetryableUpstreamError( error, AIError.status(error), error instanceof Error ? error.message : undefined, ) ) { return { error, bufferedEvents }; } flushBuffered(); outer.fail(error); } return undefined; }; const emitFailure = (failure: AuthRetryFailure): void => { emitBufferedEvents(outer, failure.bufferedEvents); if (failure.terminalEvent) { outer.push(failure.terminalEvent); } else { outer.fail(failure.error); } }; void (async () => { let lastKey: string | undefined; try { lastKey = (await apiKeyResolver({ lastChance: false, error: undefined, signal })) || undefined; } catch (error) { // A thrown resolver is a broker/OAuth/network failure, not a missing // key — surface the cause instead of masking it as "No API key". outer.fail( new AIError.ConfigurationError( `Failed to resolve API key for provider ${model.provider}: ${error instanceof Error ? error.message : String(error)}`, { cause: error }, ), ); return; } if (lastKey === undefined) { outer.fail(new AIError.MissingApiKeyError(model.provider)); return; } let failure = await runAttempt(lastKey, true); if (!failure) return; // a/b/c policy: refresh the same account (lastChance=false), then // switch to a sibling (lastChance=true). A step is skipped when the // resolver yields the same key it just tried or `undefined`; the // final step's attempt clears the capture flag so it emits directly. for (let step = 0; step < AUTH_RETRY_STEPS.length; step++) { // Caller aborted between attempts: don't mint a fresh token or fire // another doomed request — emit the captured failure instead. if (signal?.aborted) break; const nextKey = await resolveRetryKey(apiKeyResolver, AUTH_RETRY_STEPS[step]!, failure.error, signal); if (nextKey === undefined || nextKey === lastKey) continue; lastKey = nextKey; const isLastStep = step === AUTH_RETRY_STEPS.length - 1; const next = await runAttempt(nextKey, !isLastStep); if (!next) return; failure = next; } emitFailure(failure); })(); return outer; } // Pi-native transport short-circuits the per-provider dispatch entirely: // the gateway resolves provider + credential server-side, so we don't // need an `apiKey` from `getEnvApiKey` here — `options.apiKey` carries // the gateway bearer instead. Comes BEFORE the custom-API check so // extension-registered APIs can't accidentally override a configured // pi-native transport. if (model.transport === "pi-native") { return withGeminiThinkingLoopGuard(model, requestOptions, opts => withProviderInFlightLimit(model, opts, () => streamPiNative(model, context, opts)), ); } // Check custom API registry (extension-provided APIs) const customApiProvider = getCustomApi(model.api); if (customApiProvider) { return withGeminiThinkingLoopGuard(model, requestOptions, opts => withProviderInFlightLimit(model, opts, () => customApiProvider.streamSimple(model, context, opts)), ); } // Vertex AI uses Application Default Credentials, not API keys if (model.api === "google-vertex") { const providerOptions = mapOptionsForApi(model, requestOptions, undefined); return stream(model, context, providerOptions); } else if (model.api === "bedrock-converse-stream") { // Bedrock doesn't have any API keys instead it sources credentials from standard AWS env variables or from given AWS profile. const providerOptions = mapOptionsForApi(model, requestOptions, undefined); return stream(model, context, providerOptions); } // The resolver form is handled by the wrapper above; only a static string // key reaches this point. const apiKey = (typeof requestOptions?.apiKey === "string" ? requestOptions.apiKey : undefined) || getEnvApiKey(model.provider); if (!apiKey) { throw new AIError.MissingApiKeyError(model.provider); } // GitLab Duo - wraps Anthropic/OpenAI behind GitLab AI Gateway direct access tokens if (isGitLabDuoModel(model)) { return withProviderInFlightLimit(model, requestOptions, () => streamGitLabDuo(model, context, { ...requestOptions, apiKey, }), ); } // GitLab Duo Workflow - IDE workflow protocol + WebSocket action bridge if (model.api === "gitlab-duo-agent") { // Does not route through withProviderInFlightLimit, so heal explicitly. return wrapLeakedThinkingStream( streamGitLabDuoWorkflow(model as Model<"gitlab-duo-agent">, context, { ...requestOptions, apiKey, }), ); } // Kimi Code - route to dedicated handler that wraps OpenAI or Anthropic API if (isKimiModel(model)) { // Pass raw SimpleStreamOptions - streamKimi handles mapping internally return withProviderInFlightLimit(model, requestOptions, () => streamKimi(model as Model<"openai-completions">, context, { ...requestOptions, apiKey, format: requestOptions?.kimiApiFormat ?? "anthropic", }), ); } // Synthetic - route to dedicated handler that wraps OpenAI or Anthropic API if (isSyntheticModel(model)) { // Pass raw SimpleStreamOptions - streamSynthetic handles mapping internally return withProviderInFlightLimit(model, requestOptions, () => streamSynthetic(model as Model<"openai-completions">, context, { ...requestOptions, apiKey, format: requestOptions?.syntheticApiFormat ?? "openai", // Default to OpenAI format }), ); } const providerOptions = mapOptionsForApi(model, requestOptions, apiKey); return stream(model, context, providerOptions); } export async function completeSimple( model: Model, context: Context, options?: SimpleStreamOptions, ): Promise { return resolveWithThinkingLoopCook( options?.signal, () => streamSimple(model, context, options), () => streamSimple(model, context, { ...options, loopGuard: { ...options?.loopGuard, enabled: false } }), ); } const MIN_OUTPUT_TOKENS = 1024; // Fallback total output cap for models whose catalog entry has no maxTokens. const OUTPUT_CAP_WHEN_UNKNOWN = 64_000; function maxTokensWithThinkingBudget( baseMaxTokens: number | undefined, modelMaxTokens: number | null, thinkingBudget: number, ): number { const uncappedMaxTokens = baseMaxTokens === undefined ? OUTPUT_CAP_WHEN_UNKNOWN : baseMaxTokens + thinkingBudget; return Math.min(uncappedMaxTokens, modelMaxTokens ?? Number.POSITIVE_INFINITY); } export const OUTPUT_FALLBACK_BUFFER = 4000; const ANTHROPIC_USE_INTERLEAVED_THINKING = Bun.env.PI_NO_INTERLEAVED_THINKING !== "1"; export const ANTHROPIC_THINKING: Record = { minimal: 1024, low: 4096, medium: 8192, high: 16384, xhigh: 32768, }; const GOOGLE_THINKING: Record = { minimal: 1024, low: 4096, medium: 8192, high: 16384, xhigh: 24575, }; const BEDROCK_CLAUDE_THINKING: Record = { minimal: 1024, low: 2048, medium: 8192, high: 16384, xhigh: 16384, }; function resolveBedrockThinkingBudget( model: Model<"bedrock-converse-stream">, options?: SimpleStreamOptions, ): { budget: number; level: Effort } | null { if (!options?.reasoning || !model.reasoning) return null; const level = requireSupportedEffort(model, options.reasoning); const budget = options.thinkingBudgets?.[level] ?? BEDROCK_CLAUDE_THINKING[level]; return { budget, level }; } export function mapAnthropicToolChoice(choice?: ToolChoice): AnthropicOptions["toolChoice"] { if (!choice) return undefined; if (typeof choice === "string") { if (choice === "required") return "any"; if (choice === "auto" || choice === "none" || choice === "any") return choice; return undefined; } if (choice.type === "tool") { return choice.name ? { type: "tool", name: choice.name } : undefined; } if (choice.type === "function") { const name = "function" in choice ? choice.function?.name : choice.name; return name ? { type: "tool", name } : undefined; } return undefined; } export function mapGoogleToolChoice( choice?: ToolChoice, ): GoogleOptions["toolChoice"] | GoogleGeminiCliOptions["toolChoice"] | GoogleVertexOptions["toolChoice"] { if (!choice) return undefined; if (typeof choice === "string") { if (choice === "required") return "any"; if (choice === "auto" || choice === "none" || choice === "any") return choice; return undefined; } // Named-tool routing on Google: emit an `ANY`-mode allow-list of one entry, // mirroring the Anthropic mapper that returns `{type: "tool", name}`. if (choice.type === "tool") { return choice.name ? { mode: "ANY", allowedFunctionNames: [choice.name] } : undefined; } if (choice.type === "function") { const name = "function" in choice ? choice.function?.name : choice.name; return name ? { mode: "ANY", allowedFunctionNames: [name] } : undefined; } return undefined; } function mapOpenAiToolChoice(choice?: ToolChoice): OpenAICompletionsOptions["toolChoice"] { if (!choice) return undefined; if (typeof choice === "string") { if (choice === "any") return "required"; if (choice === "auto" || choice === "none" || choice === "required") return choice; return undefined; } if (choice.type === "tool") { return choice.name ? { type: "function", function: { name: choice.name } } : undefined; } if (choice.type === "function") { const name = "function" in choice ? choice.function?.name : choice.name; return name ? { type: "function", function: { name } } : undefined; } return undefined; } type ReasoningEffortMapCompat = { reasoningEffortMap?: Partial>; }; function getCompatReasoningEffortMap( model: Model, ): Partial> | undefined { const compat = model.compat; if (compat === undefined || typeof compat !== "object" || !("reasoningEffortMap" in compat)) { return undefined; } return (compat as ReasoningEffortMapCompat).reasoningEffortMap; } function resolveSupportedMappedReasoningEffort( model: Model, reasoning: Effort, ): Effort | undefined { const mapped = getCompatReasoningEffortMap(model)?.[reasoning]; if (!mapped) return undefined; const mappedEffort = mapped as Effort; return model.thinking?.efforts.includes(mappedEffort) ? mappedEffort : undefined; } function resolveOpenAiReasoningEffort( model: Model, options?: SimpleStreamOptions, ): Effort | undefined { const reasoning = options?.reasoning; if (!reasoning || !model.reasoning) return undefined; // Models that reason natively but expose no effort dial carry // `thinking: undefined` (baked at build time from // `compat.supportsReasoningEffort: false` on openai-responses*). The // wire-side omitReasoningEffort gate (stream.ts) is the actual strip; returning // undefined here avoids a redundant requireSupportedEffort throw that would // defeat the gate and surface a confusing "Compaction failed: Thinking effort // high is not supported by..." to the user. if (!model.thinking) return undefined; if (model.thinking.efforts.includes(reasoning)) return reasoning; const mappedReasoning = resolveSupportedMappedReasoningEffort(model, reasoning); if (mappedReasoning) return mappedReasoning; if (getCompatReasoningEffortMap(model)?.[reasoning] !== undefined) return reasoning; if (model.thinking.effortMap?.[reasoning] !== undefined) return reasoning; return requireSupportedEffort(model, reasoning); } const castApi = (api: OptionsForApi): OptionsForApi => api as OptionsForApi; /** * Mandatory-reasoning endpoints (`thinking.requiresEffort`) reject disabled * or omitted thinking ("Reasoning is mandatory for this endpoint and cannot * be disabled") — clamp to the lowest supported effort instead. * `suppressWhenOff` models handle off provider-side via explicit wire * suppression. Collapsed pairs interplay: pair derivation strips member * flags (off routes to a bare SKU that CAN disable), while identity backfill * re-flags pairs whose logical id is itself mandatory (Gemini 3.x) — there * the clamp wins and the floored effort routes to the thinking SKU. */ function normalizeMandatoryReasoningOptions( model: Model, options?: SimpleStreamOptions, ): SimpleStreamOptions | undefined { if ( !model.reasoning || !model.thinking?.requiresEffort || model.thinking.suppressWhenOff || (options?.reasoning !== undefined && !options.disableReasoning) ) { return options; } const floor = minimumSupportedEffort(model); if (floor === undefined) return options; return { ...options, reasoning: floor, disableReasoning: undefined }; } function mapOptionsForApi( model: Model, rawOptions?: SimpleStreamOptions, apiKey?: string, ): OptionsForApi { const options = normalizeMandatoryReasoningOptions(model, rawOptions); const base = { temperature: options?.temperature, topP: options?.topP, topK: options?.topK, minP: options?.minP, presencePenalty: options?.presencePenalty, repetitionPenalty: options?.repetitionPenalty, maxTokens: options?.maxTokens ?? model.maxTokens ?? undefined, signal: options?.signal, apiKey: apiKey ?? (typeof options?.apiKey === "string" ? options.apiKey : undefined), cacheRetention: options?.cacheRetention, headers: options?.headers, initiatorOverride: options?.initiatorOverride, maxRetryDelayMs: options?.maxRetryDelayMs, metadata: options?.metadata, taskBudget: options?.taskBudget, sessionId: options?.sessionId, promptCacheKey: options?.promptCacheKey, streamFirstEventTimeoutMs: options?.streamFirstEventTimeoutMs, streamIdleTimeoutMs: options?.streamIdleTimeoutMs, providerSessionState: options?.providerSessionState, useInteractionsApi: options?.useInteractionsApi, storeInteraction: options?.storeInteraction, previousInteractionId: options?.previousInteractionId, maxInFlightRequests: options?.maxInFlightRequests, onPayload: options?.onPayload, onResponse: options?.onResponse, onSseEvent: options?.onSseEvent, execHandlers: options?.execHandlers, fetch: options?.fetch, }; switch (model.api) { case "anthropic-messages": { // Explicitly disable thinking when reasoning is not specified or model doesn't support it const reasoning = options?.reasoning; if (!reasoning || !model.reasoning) { return castApi<"anthropic-messages">({ ...base, requestModelId: resolveWireModelId(model, undefined), thinkingEnabled: false, toolChoice: mapAnthropicToolChoice(options?.toolChoice), thinkingDisplay: options?.hideThinkingSummary ? "omitted" : undefined, serviceTier: options?.serviceTier, }); } let thinkingBudget = options.thinkingBudgets?.[reasoning] ?? ANTHROPIC_THINKING[reasoning]; if (thinkingBudget <= 0) { return castApi<"anthropic-messages">({ ...base, requestModelId: resolveWireModelId(model, undefined), thinkingEnabled: false, toolChoice: mapAnthropicToolChoice(options?.toolChoice), thinkingDisplay: options?.hideThinkingSummary ? "omitted" : undefined, serviceTier: options?.serviceTier, }); } const thinkingMode = model.thinking?.mode; const effort = thinkingMode === "anthropic-adaptive" || thinkingMode === "anthropic-budget-effort" ? mapEffortToAnthropicAdaptiveEffort(model, reasoning) : undefined; // For Opus 4.6+ and Sonnet 4.6+: use adaptive thinking with effort level // For older models: use budget-based thinking if (thinkingMode === "anthropic-adaptive") { return castApi<"anthropic-messages">({ ...base, requestModelId: resolveWireModelId(model, reasoning), thinkingEnabled: true, effort, toolChoice: mapAnthropicToolChoice(options?.toolChoice), thinkingDisplay: options?.hideThinkingSummary ? "omitted" : undefined, serviceTier: options?.serviceTier, }); } if (ANTHROPIC_USE_INTERLEAVED_THINKING) { return castApi<"anthropic-messages">({ ...base, requestModelId: resolveWireModelId(model, reasoning), thinkingEnabled: true, thinkingBudgetTokens: thinkingBudget, effort, toolChoice: mapAnthropicToolChoice(options?.toolChoice), thinkingDisplay: options?.hideThinkingSummary ? "omitted" : undefined, serviceTier: options?.serviceTier, }); } // Caller's maxTokens is desired output, so add thinking budget on top. With no caller/model cap, use a finite total fallback. const maxTokens = maxTokensWithThinkingBudget(base.maxTokens, model.maxTokens, thinkingBudget); // If not enough room for thinking + output, reduce thinking budget if (maxTokens <= thinkingBudget) { thinkingBudget = maxTokens - MIN_OUTPUT_TOKENS; } // If thinking budget is too low, disable thinking if (thinkingBudget <= 0) { return castApi<"anthropic-messages">({ ...base, requestModelId: resolveWireModelId(model, undefined), thinkingEnabled: false, toolChoice: mapAnthropicToolChoice(options?.toolChoice), thinkingDisplay: options?.hideThinkingSummary ? "omitted" : undefined, serviceTier: options?.serviceTier, }); } else { return castApi<"anthropic-messages">({ ...base, maxTokens, requestModelId: resolveWireModelId(model, reasoning), thinkingEnabled: true, thinkingBudgetTokens: thinkingBudget, effort, toolChoice: mapAnthropicToolChoice(options?.toolChoice), thinkingDisplay: options?.hideThinkingSummary ? "omitted" : undefined, serviceTier: options?.serviceTier, }); } } case "bedrock-converse-stream": { const bedrockBase: BedrockOptions = { ...base, reasoning: options?.reasoning, thinkingBudgets: options?.thinkingBudgets, toolChoice: mapAnthropicToolChoice(options?.toolChoice), thinkingDisplay: options?.hideThinkingSummary ? "omitted" : undefined, }; // Adaptive mode sends effort directly, no budget_tokens — skip budget inflation. if (model.thinking?.mode === "anthropic-adaptive") { return castApi<"bedrock-converse-stream">(bedrockBase); } const budgetInfo = resolveBedrockThinkingBudget(model as Model<"bedrock-converse-stream">, options); if (!budgetInfo) return bedrockBase as OptionsForApi; let maxTokens = bedrockBase.maxTokens ?? model.maxTokens ?? OUTPUT_CAP_WHEN_UNKNOWN; let thinkingBudgets = bedrockBase.thinkingBudgets; if (maxTokens <= budgetInfo.budget) { const desiredMaxTokens = Math.min( model.maxTokens ?? Number.POSITIVE_INFINITY, budgetInfo.budget + MIN_OUTPUT_TOKENS, ); if (desiredMaxTokens > maxTokens) { maxTokens = desiredMaxTokens; } } if (maxTokens <= budgetInfo.budget) { const adjustedBudget = Math.max(0, maxTokens - MIN_OUTPUT_TOKENS); thinkingBudgets = { ...(thinkingBudgets ?? {}), [budgetInfo.level]: adjustedBudget }; } return castApi<"bedrock-converse-stream">({ ...bedrockBase, maxTokens, thinkingBudgets }); } case "openrouter": { const useResponses = $env.PI_OPENROUTER_RESPONSES !== "0"; if (useResponses) { return castApi<"openai-responses">({ ...base, reasoning: resolveOpenAiReasoningEffort(model, options), toolChoice: mapOpenAiToolChoice(options?.toolChoice), serviceTier: options?.serviceTier, reasoningSummary: options?.hideThinkingSummary ? null : undefined, openrouterVariant: options?.openrouterVariant, maxTokensExplicit: rawOptions?.maxTokens !== undefined, disableReasoning: options?.disableReasoning, textVerbosity: options?.textVerbosity, }); } return castApi<"openai-completions">({ ...base, reasoning: resolveOpenAiReasoningEffort(model, options), disableReasoning: options?.disableReasoning, toolChoice: mapOpenAiToolChoice(options?.toolChoice), serviceTier: options?.serviceTier, openrouterVariant: options?.openrouterVariant, maxTokensExplicit: rawOptions?.maxTokens !== undefined, }); } case "openai-completions": return castApi<"openai-completions">({ ...base, reasoning: resolveOpenAiReasoningEffort(model, options), disableReasoning: options?.disableReasoning, toolChoice: mapOpenAiToolChoice(options?.toolChoice), serviceTier: options?.serviceTier, openrouterVariant: options?.openrouterVariant, maxTokensExplicit: rawOptions?.maxTokens !== undefined, }); case "openai-responses": return castApi<"openai-responses">({ ...base, reasoning: resolveOpenAiReasoningEffort(model, options), toolChoice: mapOpenAiToolChoice(options?.toolChoice), serviceTier: options?.serviceTier, reasoningSummary: options?.hideThinkingSummary ? null : undefined, openrouterVariant: options?.openrouterVariant, maxTokensExplicit: rawOptions?.maxTokens !== undefined, disableReasoning: options?.disableReasoning, textVerbosity: options?.textVerbosity, }); case "azure-openai-responses": return castApi<"azure-openai-responses">({ ...base, reasoning: resolveOpenAiReasoningEffort(model, options), toolChoice: mapOpenAiToolChoice(options?.toolChoice), serviceTier: options?.serviceTier, reasoningSummary: options?.hideThinkingSummary ? null : undefined, }); case "openai-codex-responses": return castApi<"openai-codex-responses">({ ...base, reasoning: resolveOpenAiReasoningEffort(model, options), toolChoice: mapOpenAiToolChoice(options?.toolChoice), serviceTier: options?.serviceTier, preferWebsockets: options?.preferWebsockets, reasoningSummary: options?.hideThinkingSummary ? null : "detailed", textVerbosity: options?.textVerbosity, }); case "google-generative-ai": { // Explicitly disable thinking when reasoning is not specified or model doesn't support it // This is needed because Gemini has "dynamic thinking" enabled by default const reasoning = options?.reasoning; if (!reasoning || !model.reasoning) { return castApi<"google-generative-ai">({ ...base, serviceTier: options?.serviceTier, thinking: { enabled: false }, toolChoice: mapGoogleToolChoice(options?.toolChoice), }); } const googleModel = model as Model<"google-generative-ai">; const effort = requireSupportedEffort(googleModel, reasoning); // Gemini 3+ models use thinkingLevel exclusively instead of thinkingBudget. // https://ai.google.dev/gemini-api/docs/thinking#set-budget if (googleModel.thinking?.mode === "google-level") { return castApi<"google-generative-ai">({ ...base, serviceTier: options?.serviceTier, thinking: { enabled: true, level: mapEffortToGoogleThinkingLevel(effort), }, toolChoice: mapGoogleToolChoice(options?.toolChoice), }); } return castApi<"google-gemini-cli">({ ...base, thinking: { enabled: true, budgetTokens: getGoogleBudget(googleModel, effort, options?.thinkingBudgets), }, toolChoice: mapGoogleToolChoice(options?.toolChoice), }); } case "google-gemini-cli": { const reasoning = options?.reasoning; const toolChoice = mapGoogleToolChoice(options?.toolChoice); if (reasoning && model.reasoning) { const effort = requireSupportedEffort(model, reasoning); // Gemini 3+ models use thinkingLevel instead of thinkingBudget if (model.thinking?.mode === "google-level") { return castApi<"google-gemini-cli">({ ...base, requestModelId: resolveWireModelId(model, effort), thinking: { enabled: true, level: mapEffortToGoogleThinkingLevel(effort), }, toolChoice, antigravityEndpointMode: options?.antigravityEndpointMode, }); } let thinkingBudget = options.thinkingBudgets?.[effort] ?? model.thinking?.effortBudgets?.[effort] ?? GOOGLE_THINKING[effort]; // Caller's maxTokens is desired output, so add thinking budget on top. With no caller/model cap, use a finite total fallback. const maxTokens = maxTokensWithThinkingBudget(base.maxTokens, model.maxTokens, thinkingBudget); // If not enough room for thinking + output, reduce thinking budget if (maxTokens <= thinkingBudget) { thinkingBudget = Math.max(0, maxTokens - MIN_OUTPUT_TOKENS); } if (thinkingBudget > 0) { return castApi<"google-gemini-cli">({ ...base, maxTokens, requestModelId: resolveWireModelId(model, effort), thinking: { enabled: true, budgetTokens: thinkingBudget }, toolChoice, antigravityEndpointMode: options?.antigravityEndpointMode, }); } // Budget clamped to zero — fall through to the thinking-off path. } const thinking: GoogleGeminiCliOptions["thinking"] = { enabled: false }; if (model.reasoning && model.thinking?.suppressWhenOff) { // CCA re-applies the per-id baked server default when the config // is omitted; suppression must be explicit on the wire. thinking.suppress = model.thinking.mode === "google-level" ? { level: "MINIMAL" } : { budget: 0 }; } return castApi<"google-gemini-cli">({ ...base, requestModelId: resolveWireModelId(model, undefined), thinking, toolChoice, antigravityEndpointMode: options?.antigravityEndpointMode, }); } case "google-vertex": { // Explicitly disable thinking when reasoning is not specified or model doesn't support it const reasoning = options?.reasoning; if (!reasoning || !model.reasoning) { return castApi<"google-vertex">({ ...base, serviceTier: options?.serviceTier, thinking: { enabled: false }, toolChoice: mapGoogleToolChoice(options?.toolChoice), }); } const vertexModel = model as Model<"google-vertex">; const effort = requireSupportedEffort(vertexModel, reasoning); const geminiModel = vertexModel as unknown as Model<"google-generative-ai">; if (geminiModel.thinking?.mode === "google-level") { return castApi<"google-vertex">({ ...base, serviceTier: options?.serviceTier, thinking: { enabled: true, level: mapEffortToGoogleThinkingLevel(effort), }, toolChoice: mapGoogleToolChoice(options?.toolChoice), }); } return castApi<"google-vertex">({ ...base, serviceTier: options?.serviceTier, thinking: { enabled: true, budgetTokens: getGoogleBudget(geminiModel, effort, options?.thinkingBudgets), }, toolChoice: mapGoogleToolChoice(options?.toolChoice), }); } case "ollama-chat": return castApi<"ollama-chat">({ ...base, reasoning: resolveOpenAiReasoningEffort(model, options), disableReasoning: options?.disableReasoning, toolChoice: options?.toolChoice, }); case "cursor-agent": { const execHandlers = options?.cursorExecHandlers ?? options?.execHandlers; const onToolResult = options?.cursorOnToolResult ?? execHandlers?.onToolResult; return castApi<"cursor-agent">({ ...base, execHandlers, onToolResult, }); } case "gitlab-duo-agent": return castApi<"gitlab-duo-agent">({ ...base, cwd: options?.cwd, toolChoice: options?.toolChoice, }); case "devin-agent": { const devinModel = model as Model<"devin-agent">; const effort = options?.reasoning && !options.disableReasoning ? requireSupportedEffort(devinModel, options.reasoning) : undefined; return castApi<"devin-agent">({ ...base, chatModelUid: resolveWireModelId(devinModel, effort), }); } default: throw new AIError.ConfigurationError(`Unhandled API in mapOptionsForApi: ${model.api}`); } } function getGoogleBudget( model: Model<"google-generative-ai">, effort: Effort, customBudgets?: ThinkingBudgets, ): number { requireSupportedEffort(model, effort); // Custom budgets take precedence if provided for this level if (customBudgets?.[effort] !== undefined) { return customBudgets[effort]!; } // See https://ai.google.dev/gemini-api/docs/thinking#set-budget if (model.id.includes("2.5-")) { switch (effort) { case "minimal": return 128; case "low": return 2048; case "medium": return 8192; default: return model.id.includes("2.5-flash") ? 24576 : 32768; } } // Unknown model - use dynamic return -1; }