/** * Remote compaction utilities. * * Provider-side conversation summarization endpoints. Two flavors: * * - **OpenAI remote compaction** (`/responses/compact`): preserves encrypted * reasoning across compactions by submitting the full responses-API native * history and storing the returned `compaction` / `compaction_summary` * item in `preserveData` so future turns can replay the encrypted state. * - **Generic remote compaction**: a thin POST helper for self-hosted * summarization endpoints that accept `{ systemPrompt, prompt }` and reply * with `{ summary, shortSummary? }`. */ import { parseTextSignature } from "@oh-my-pi/pi-ai/providers/openai-responses-shared"; import { transformMessages } from "@oh-my-pi/pi-ai/providers/transform-messages"; import type { AssistantMessage, FetchImpl, Message, Model } from "@oh-my-pi/pi-ai/types"; import { getOpenAIResponsesHistoryItems, getOpenAIResponsesHistoryPayload, normalizeResponsesToolCallId, } from "@oh-my-pi/pi-ai/utils"; import { CODEX_BASE_URL, getCodexAccountId, OPENAI_HEADER_VALUES, OPENAI_HEADERS, } from "@oh-my-pi/pi-catalog/wire/codex"; import { logger } from "@oh-my-pi/pi-utils"; // ============================================================================ // Public types // ============================================================================ export const OPENAI_REMOTE_COMPACTION_PRESERVE_KEY = "openaiRemoteCompaction"; export type OpenAiRemoteCompactionItem = { type: "compaction" | "compaction_summary"; encrypted_content?: string; summary?: string; }; export interface OpenAiRemoteCompactionPreserveData { provider?: string; replacementHistory: Array>; compactionItem: OpenAiRemoteCompactionItem; } export interface OpenAiRemoteCompactionRequest { model: string; input: Array>; instructions: string; } export interface OpenAiRemoteCompactionResponse extends OpenAiRemoteCompactionPreserveData {} export interface RemoteCompactionRequest { systemPrompt: string; prompt: string; } export interface RemoteCompactionResponse { summary: string; shortSummary?: string; } // ============================================================================ // OpenAI provider gating + endpoint resolution // ============================================================================ export function shouldUseOpenAiRemoteCompaction(model: Model): boolean { return model.provider === "openai" || model.provider === "openai-codex"; } function resolveOpenAiCompactEndpoint(model: Model): string { if (model.provider === "openai-codex") { return resolveOpenAiCodexCompactEndpoint(model.baseUrl); } const defaultBase = "https://api.openai.com/v1"; const rawBase = model.baseUrl && model.baseUrl.length > 0 ? model.baseUrl : defaultBase; const normalizedBase = rawBase.endsWith("/") ? rawBase.slice(0, -1) : rawBase; if (normalizedBase.endsWith("/v1")) return `${normalizedBase}/responses/compact`; return `${normalizedBase}/v1/responses/compact`; } function resolveOpenAiCodexCompactEndpoint(baseUrl: string | undefined): string { const rawBase = baseUrl && baseUrl.length > 0 ? baseUrl : CODEX_BASE_URL; const normalizedBase = rawBase.endsWith("/") ? rawBase.slice(0, -1) : rawBase; if (/\/codex(?:\/v\d+)?$/.test(normalizedBase)) return `${normalizedBase}/responses/compact`; return `${normalizedBase}/codex/responses/compact`; } function normalizeOpenAiCompactionToolCallId(id: string): string { const normalized = normalizeResponsesToolCallId(id); return `${normalized.callId}|${normalized.itemId ?? normalized.callId}`; } // ============================================================================ // Preserve-data helpers // ============================================================================ export function getPreservedOpenAiRemoteCompactionData( preserveData: Record | undefined, ): OpenAiRemoteCompactionPreserveData | undefined { const candidate = preserveData?.[OPENAI_REMOTE_COMPACTION_PRESERVE_KEY]; if (!candidate || typeof candidate !== "object") return undefined; const maybeData = candidate as { provider?: unknown; replacementHistory?: unknown; compactionItem?: unknown }; if (!Array.isArray(maybeData.replacementHistory)) return undefined; const maybeItem = maybeData.compactionItem; if (!maybeItem || typeof maybeItem !== "object") return undefined; const compactionItem = maybeItem as { type?: unknown; encrypted_content?: unknown; summary?: unknown }; const isClassicCompaction = compactionItem.type === "compaction" && typeof compactionItem.encrypted_content === "string"; const isSummaryCompaction = compactionItem.type === "compaction_summary"; if (!isClassicCompaction && !isSummaryCompaction) { return undefined; } return { provider: typeof maybeData.provider === "string" ? maybeData.provider : undefined, replacementHistory: maybeData.replacementHistory as Array>, compactionItem: compactionItem as unknown as OpenAiRemoteCompactionItem, }; } export function withOpenAiRemoteCompactionPreserveData( preserveData: Record | undefined, remoteCompaction: OpenAiRemoteCompactionPreserveData | undefined, ): Record | undefined { if (remoteCompaction) { return { ...(preserveData ?? {}), [OPENAI_REMOTE_COMPACTION_PRESERVE_KEY]: remoteCompaction, }; } if (!preserveData || !(OPENAI_REMOTE_COMPACTION_PRESERVE_KEY in preserveData)) { return preserveData; } const { [OPENAI_REMOTE_COMPACTION_PRESERVE_KEY]: _removed, ...rest } = preserveData; return Object.keys(rest).length > 0 ? rest : undefined; } // ============================================================================ // Input/output filtering for OpenAI compact endpoint // ============================================================================ function estimateOpenAiCompactInputTokens(input: Array>, instructions: string): number { let chars = instructions.length; for (const item of input) { chars += JSON.stringify(item).length; } return Math.ceil(chars / 4); } function shouldTrimOpenAiCompactInputItem(item: Record): boolean { return item.type === "function_call_output" || (item.type === "message" && item.role === "developer"); } function shouldKeepOpenAiCompactOutputItem(item: Record): boolean { if (item.type === "compaction" || item.type === "compaction_summary") return true; if (item.type !== "message") return false; return item.role === "assistant" || item.role === "user"; } function trimOpenAiCompactInput( input: Array>, contextWindow: number, instructions: string, ): Array> { const trimmed = [...input]; while (trimmed.length > 0 && estimateOpenAiCompactInputTokens(trimmed, instructions) > contextWindow) { const last = trimmed[trimmed.length - 1]; if (last?.type === "function_call_output" || last?.type === "custom_tool_call_output") { const callId = typeof last.call_id === "string" ? last.call_id : undefined; const callType = last.type === "custom_tool_call_output" ? "custom_tool_call" : "function_call"; trimmed.pop(); if (callId) { const matchingCallIndex = trimmed.findLastIndex(item => item.type === callType && item.call_id === callId); if (matchingCallIndex >= 0) { trimmed.splice(matchingCallIndex, 1); } } continue; } if (!last || !shouldTrimOpenAiCompactInputItem(last)) { break; } trimmed.pop(); } return trimmed; } // Register every tool-call id in `items` (and the subset using the custom-tool // wire shape) into the running sets. The history builder maintains both sets // incrementally as native history is appended, so this only scans the // newly-added items (or, after a full-snapshot replace, the fresh input) rather // than re-scanning the whole growing history per message — the latter was // O(N²) and blocked the event loop for seconds while compacting large codex // contexts (frozen spinner until the next forced render). function addOpenAiCallIds( items: Array>, knownCallIds: Set, customCallIds: Set, ): void { for (const item of items) { if (typeof item.call_id !== "string") continue; if (item.type === "function_call") { knownCallIds.add(item.call_id); } else if (item.type === "custom_tool_call") { knownCallIds.add(item.call_id); customCallIds.add(item.call_id); } } } // ============================================================================ // Native history construction (responses-API shape) // ============================================================================ /** * Build the OpenAI Responses-API native history array from LLM messages. * * Caller is responsible for converting any custom message types to * `Message[]` first (e.g. via the agent's `convertToLlm`); this function * operates purely on the LLM-domain shape. * * @param messages - LLM messages to encode. * @param model - Target model (used for provider gating + tool-call id rules). * @param previousReplacementHistory - History from a prior compaction whose * encrypted reasoning we want to preserve. */ export function buildOpenAiNativeHistory( messages: Message[], model: Model, previousReplacementHistory?: Array>, ): Array> { const input: Array> = previousReplacementHistory ? [...previousReplacementHistory] : []; const transformedMessages = transformMessages(messages, model, id => normalizeOpenAiCompactionToolCallId(id)); let msgIndex = 0; const knownCallIds = new Set(); const customCallIds = new Set(); addOpenAiCallIds(input, knownCallIds, customCallIds); for (const message of transformedMessages) { if (message.role === "user" || message.role === "developer") { const providerPayload = (message as { providerPayload?: AssistantMessage["providerPayload"] }).providerPayload; const historyItems = getOpenAIResponsesHistoryItems(providerPayload, model.provider); if (historyItems) { input.push(...historyItems); addOpenAiCallIds(historyItems, knownCallIds, customCallIds); msgIndex++; continue; } const contentBlocks: Array> = []; if (typeof message.content === "string") { if (message.content.trim().length > 0) { contentBlocks.push({ type: "input_text", text: message.content.toWellFormed() }); } } else { for (const block of message.content) { if (block.type === "text") { if (!block.text || block.text.trim().length === 0) continue; contentBlocks.push({ type: "input_text", text: block.text.toWellFormed() }); continue; } if (block.type === "image") { contentBlocks.push({ type: "input_image", detail: "auto", image_url: `data:${block.mimeType};base64,${block.data}`, }); } } } if (contentBlocks.length > 0) { input.push({ type: "message", role: message.role, content: contentBlocks }); } msgIndex++; continue; } if (message.role === "assistant") { const assistant = message as AssistantMessage; const providerPayload = getOpenAIResponsesHistoryPayload( assistant.providerPayload, model.provider, assistant.provider, ); if (providerPayload) { if (providerPayload.dt) { input.push(...providerPayload.items); addOpenAiCallIds(providerPayload.items, knownCallIds, customCallIds); } else { input.splice(0, input.length, ...providerPayload.items); knownCallIds.clear(); customCallIds.clear(); addOpenAiCallIds(input, knownCallIds, customCallIds); } msgIndex++; continue; } const isDifferentModel = assistant.model !== model.id && assistant.provider === model.provider && assistant.api === model.api; for (const block of assistant.content) { if (block.type === "thinking" && assistant.stopReason !== "error" && block.thinkingSignature) { try { const reasoningItem = JSON.parse(block.thinkingSignature) as Record; if (reasoningItem && typeof reasoningItem === "object") { input.push(reasoningItem); } } catch { logger.warn("Failed to parse assistant reasoning for remote compaction", { model: assistant.model, provider: assistant.provider, }); } continue; } if (block.type === "text") { if (!block.text || block.text.trim().length === 0) continue; const parsedSignature = parseTextSignature(block.textSignature); let msgId = parsedSignature?.id; if (!msgId) { msgId = `msg_${msgIndex}`; } else if (msgId.length > 64) { msgId = `msg_${Bun.hash(msgId).toString(36)}`; } input.push({ type: "message", role: "assistant", content: [{ type: "output_text", text: block.text.toWellFormed(), annotations: [] }], status: "completed", id: msgId, phase: parsedSignature?.phase, }); continue; } if (block.type === "toolCall") { const normalized = normalizeResponsesToolCallId(block.id, block.customWireName ? "ctc" : "fc"); let itemId: string | undefined = normalized.itemId; if ( isDifferentModel && (itemId?.startsWith("fc_") || itemId?.startsWith("fcr_") || itemId?.startsWith("ctc_")) ) { itemId = undefined; } knownCallIds.add(normalized.callId); if (block.customWireName) { const rawInput = typeof block.arguments?.input === "string" ? block.arguments.input : ""; customCallIds.add(normalized.callId); input.push({ type: "custom_tool_call", id: itemId, call_id: normalized.callId, name: block.customWireName, input: rawInput, }); continue; } input.push({ type: "function_call", id: itemId, call_id: normalized.callId, name: block.name, arguments: JSON.stringify(block.arguments), }); } } msgIndex++; continue; } if (message.role === "toolResult") { const normalized = normalizeResponsesToolCallId(message.toolCallId); if (!knownCallIds.has(normalized.callId)) { msgIndex++; continue; } const textOutput = message.content .filter(block => block.type === "text") .map(block => block.text) .join("\n"); const hasImages = message.content.some(block => block.type === "image"); const outputText = textOutput.length > 0 ? textOutput : hasImages ? "(see attached image)" : ""; input.push({ type: customCallIds.has(normalized.callId) ? "custom_tool_call_output" : "function_call_output", call_id: normalized.callId, output: outputText.toWellFormed(), }); if (hasImages && model.input.includes("image")) { const contentBlocks: Array> = [ { type: "input_text", text: "Attached image(s) from tool result:" }, ]; for (const block of message.content) { if (block.type !== "image") continue; contentBlocks.push({ type: "input_image", detail: "auto", image_url: `data:${block.mimeType};base64,${block.data}`, }); } input.push({ type: "message", role: "user", content: contentBlocks }); } } msgIndex++; } return input; } // ============================================================================ // Endpoint requests // ============================================================================ export async function requestOpenAiRemoteCompaction( model: Model, apiKey: string, compactInput: Array>, instructions: string, signal?: AbortSignal, opts?: { fetch?: FetchImpl }, ): Promise { const endpoint = resolveOpenAiCompactEndpoint(model); const request: OpenAiRemoteCompactionRequest = { model: model.id, input: trimOpenAiCompactInput(compactInput, model.contextWindow, instructions), instructions, }; const headers: Record = { "content-type": "application/json", Authorization: `Bearer ${apiKey}`, ...(model.headers ?? {}), }; // Codex endpoints require additional auth headers if (model.provider === "openai-codex") { const accountId = getCodexAccountId(apiKey); if (accountId) { headers[OPENAI_HEADERS.ACCOUNT_ID] = accountId; } headers[OPENAI_HEADERS.BETA] = OPENAI_HEADER_VALUES.BETA_RESPONSES; headers[OPENAI_HEADERS.ORIGINATOR] = OPENAI_HEADER_VALUES.ORIGINATOR_CODEX; } const response = await (opts?.fetch ?? fetch)(endpoint, { method: "POST", headers, body: JSON.stringify(request), signal, }); if (!response.ok) { const errorText = await response.text().catch(() => ""); logger.warn("OpenAI remote compaction failed", { endpoint, status: response.status, statusText: response.statusText, errorText, }); throw new Error(`Remote compaction failed (${response.status} ${response.statusText})`); } const data = (await response.json()) as { output?: unknown[] } | undefined; const rawOutput = data?.output ?? []; const replacementHistory = rawOutput.filter( (item): item is Record => !!item && typeof item === "object" && shouldKeepOpenAiCompactOutputItem(item as Record), ); const compactionItem = replacementHistory.findLast((item): item is OpenAiRemoteCompactionItem => { if (item.type === "compaction" && typeof item.encrypted_content === "string") return true; if (item.type === "compaction_summary") return true; return false; }); if (!compactionItem) { const outputTypes = rawOutput.map(item => typeof item === "object" && item !== null ? (item as Record).type : typeof item, ); logger.warn("Remote compaction response missing compaction item", { endpoint, model: model.id, provider: model.provider, rawOutputLength: rawOutput.length, outputTypes, replacementHistoryLength: replacementHistory.length, }); throw new Error("Remote compaction response missing compaction item"); } return { provider: model.provider, replacementHistory, compactionItem }; } export async function requestRemoteCompaction( endpoint: string, request: RemoteCompactionRequest, signal?: AbortSignal, opts?: { fetch?: FetchImpl }, ): Promise { const response = await (opts?.fetch ?? fetch)(endpoint, { method: "POST", headers: { "content-type": "application/json" }, body: JSON.stringify(request), signal, }); if (!response.ok) { const errorText = await response.text().catch(() => ""); logger.warn("Remote compaction failed", { endpoint, status: response.status, statusText: response.statusText, errorText, }); throw new Error(`Remote compaction failed (${response.status} ${response.statusText})`); } const data = (await response.json()) as RemoteCompactionResponse | undefined; if (!data || typeof data.summary !== "string") { throw new Error("Remote compaction response missing summary"); } return data; }