/** * 共享上游服务层 * * 从 proxy.ts 和 anthropic/handler.ts 中提取的共享逻辑, * 包括重试、SSE 过滤、讯飞字段清理、以及统一的 upstreamRequest() 函数。 */ import { FastifyInstance } from 'fastify'; import { Protocol } from './stats'; export declare const RETRYABLE_STATUS_CODES: Set; export declare const RETRYABLE_XFYUN_CODES: Set; /** * 检测响应体是否包含讯飞可重试错误码 * 讯飞的错误格式为 {"code": 10012, "msg": "..."},可能出现在 HTTP 200 的响应中 * 优先使用 JSON parse 确保准确性,失败时 fallback 到字符串匹配 */ export declare function isRetryableXfyunError(responseBody: string): boolean; /** * 从讯飞响应体中提取错误详情 * 支持多种格式: * - {"code":10012,"msg":"EngineInternalError:error"} * - {"error":{"code":"ModelArts.81001","message":"..."}} * - SSE 格式 data:{"error":{...}} */ export declare function extractXfyunError(body: string): { code?: string | number; msg?: string; sid?: string; } | null; /** * 提取 messages 中 content 类型分布,用于日志排查 * 例如: "3 text, 2 image_url" */ export declare function summarizeContentTypes(body: Record | undefined): string; export interface RequestDiagnostics { model: string; stream: boolean; messageCount: number; contentTypes: string; maxTokens: number | null; toolCount: number; requestBytes: number; } export declare function summarizeRequestDiagnostics(body: Record | undefined, model: string, isStream: boolean): RequestDiagnostics; /** * 从 SSE rawChunk 中提取 token 用量 * 支持两种格式: * 1. 标准 OpenAI usage:{"prompt_tokens":N,"completion_tokens":N}(仅当值 > 0 时返回) * 2. 讯飞 context_usage 事件:{"tokens":N}(独立 key,非 total_tokens;仅当 N > 0 时返回) * * 一个 rawChunk 可能包含多个 SSE 事件(中间事件 usage 为 0,最后事件为真实值), * 因此用全局匹配取最后一个非零结果。 */ export declare function extractStreamUsage(rawChunk: string): { promptTokens?: number; completionTokens?: number; totalTokens?: number; }; /** * 路径重写:客户端请求 /v1/* → 上游 /v2/* * 也支持 /ollama/v1/* → 上游 /v2/*(VS Code Continue.dev 等工具的 Ollama OpenAI 兼容路径) * 讯飞 Coding Plan 的 OpenAI 协议端点使用 /v2 前缀 */ export declare function rewritePath(originalPath: string): string; /** * 构建上游请求 URL * 1. 去掉 baseUrl 末尾斜杠 * 2. 将客户端路径 /v1/xxx 重写为 /xxx * 3. 拼接为 https://maas-coding-api.../v2/xxx */ export declare function buildUpstreamUrl(path: string): string; export declare const ALLOWED_SSE_EVENTS: Set; /** * 解析 SSE 行前缀,提取字段名和值 * * SSE 规范中冒号后的空格是可选的: * "data:content" 和 "data: content" 都合法 * "event:message" 和 "event: message" 都合法 * 返回 { field, value } 或 null(非 SSE 行) */ export declare function parseSSELine(line: string): { field: string; value: string; } | null; /** * 有状态的 SSE 事件过滤器(白名单策略) * * 只转发 ALLOWED_SSE_EVENTS 中的事件类型("message")和无 event: 行的默认事件。 * 任何不在白名单中的 event: 类型都会被整事件跳过(含其 data 行)。 * * 解决核心问题:TCP 流的 chunk 边界是任意的,一个 SSE 行可能被拆成多个 chunk。 * 例如 "event: progress_notice" 可能被拆成: * chunk1: "event: progress" * chunk2: "_notice\ndata: ..." * 无状态按 chunk 独立处理会漏过滤,导致 Trae IDE 收到非标准事件后报 4054。 */ export declare class SSEFilter { private pendingLine; private skipCurrentEvent; private allowedEvents; constructor(allowedEvents?: Set); /** * 过滤一个 chunk 中的 SSE 事件 * 跨 chunk 维护状态,确保 event: 行完整后再判断是否转发 */ filter(rawChunk: string, log: FastifyInstance['log']): string; } /** * 无状态 SSE 过滤便捷函数(向后兼容) * 对完整 SSE 文本做一次性过滤,不处理跨 chunk 分割 */ export declare function filterSSEEvents(rawChunk: string, log: FastifyInstance['log']): string; /** * 清理讯飞特有字段:reasoning_content 和 plugins_content * ai-sdk/openai-compatible 的 Zod schema 不认识这些字段,可能导致验证失败 * * 支持两种输入格式: * 1. SSE 格式(data: {...}\n)— 实际运行时的流式 chunk * 2. 纯 JSON 字符串({...})— 非流式场景或测试用例 * * 对每行尝试 JSON.parse → delete → JSON.stringify, * 解析失败的行保持原样(如 [DONE]),避免正则替换对转义引号和合法内容的误删 */ export declare function cleanXfyunFields(chunk: string): string; /** * 直接在对象上清理讯飞特有字段(reasoning_content、plugins_content) * 避免多余的 JSON.stringify → cleanXfyunFields → JSON.parse 往返 * @returns 对象是否被修改 */ export declare function cleanXfyunFieldsObj(obj: Record): boolean; /** * 带重试的 fetch 请求 * * @param readBody - 是否读取响应体 * true: 非流式请求,读取 body 以检测讯飞业务层错误码(如 10012) * false: 流式请求,不提前消费 body,保留 ReadableStream 供 SSE 透传; * 此时仅通过 HTTP 状态码判断是否重试 * * 重试策略:指数退避,初始延迟 delayMs,每次翻倍,最多 maxRetries 次 * 重试条件: * - HTTP 429 / 500 / 503(readBody=true 或 false 均生效) * - 响应体包含讯飞错误码 10012(仅 readBody=true 时检测) * - 网络层异常(fetch 抛错,如连接超时、DNS 失败) */ export declare function fetchWithRetry(url: string, options: RequestInit, maxRetries: number, delayMs: number, readBody: boolean, log: FastifyInstance['log']): Promise<{ response: Response; body: string | null; retries: number; }>; export interface UpstreamOptions { protocol: Protocol; upstreamUrl: string; headers: Record; body: Record | undefined; isStream: boolean; /** 流式请求确认上游 2xx 后写入的响应头,由各 handler 传入协议特定的 Content-Type 等 */ streamHeaders?: Record; allowedSSEEvents?: Set; extractStreamUsage?: (rawChunk: string) => { inputTokens?: number; outputTokens?: number; }; extractNonStreamUsage?: (body: Record) => { promptTokens?: number; completionTokens?: number; }; cleanNonStreamBody?: (body: Record) => Record; cleanStreamChunk?: (chunk: string) => string; /** 流式输出转换:将过滤+清理后的 SSE 文本转换为最终写入客户端的格式(如 NDJSON) */ streamTransform?: (cleanedChunk: string) => string[]; formatStreamErrorEvent: (errMsg: string) => string; request: { id: string; url: string; headers: Record; log: FastifyInstance['log']; }; rawReply: { write: (data: string | Buffer) => boolean; end: () => void; writeHeader: (statusCode: number, headers: Record) => void; }; diagnostics?: RequestDiagnostics; } export interface UpstreamResult { responseBody: Record | null; errorBody: string | null; status: number; retries: number; success: boolean; errorType?: 'network' | 'upstream' | 'empty_body' | 'no_stream_body' | 'stream_error'; error?: string; inputTokens: number; outputTokens: number; durationMs: number; } /** * 统一上游请求处理函数 * * 封装了 OpenAI 和 Anthropic 协议共享的上游请求逻辑: * - rolloverDailyStats + requestStarted * - fetchWithRetry 调用 * - 网络错误处理 + stats * - 非流式上游错误 + stats * - 非流式空 body 错误 + stats * - 流式无 body 错误 + stats * - 流式 SSE 循环(SSEFilter + xfyun 错误检测 + usage 提取)+ stats * - 非流式正常响应(JSON 解析 + 字段清理 + usage)+ stats * * 设计决策: * 1. 不直接发送 HTTP 响应(流式数据除外),返回结构化 UpstreamResult 供 handler 格式化 * 2. 流式数据通过 rawReply.write() 实时写入(时序要求) * 3. 流式错误也通过 rawReply.write() 写入(headers 已发送,无法更改状态码) * 4. rawReply.end() 在 finally 块中调用 */ export declare function upstreamRequest(options: UpstreamOptions): Promise; export interface ReplyLike { raw: { headersSent: boolean; write: (data: string) => void; end: () => void; }; sent: boolean; status: (code: number) => { send: (body: unknown) => void; }; } export interface ErrorFormatters { formatStreamErrorEvent: (errMsg: string) => string; formatNetworkErrorReply: (errMsg: string) => unknown; formatUpstreamErrorReply: (status: number, errorBody: string | null) => unknown; formatEmptyBodyErrorReply: (status: number) => unknown; formatNoStreamBodyErrorReply: (status: number) => unknown; formatNonStreamSuccess?: (result: UpstreamResult) => unknown; } export declare function handleUpstreamResult(result: UpstreamResult, isStream: boolean, reply: ReplyLike, formatters: ErrorFormatters): void; //# sourceMappingURL=upstream.d.ts.map