/** * Agent loop that works with AgentMessage throughout. * Transforms to Message[] only at the LLM call boundary. */ import { type ApiKeyResolveContext, type AssistantMessage, type AssistantMessageEvent, type Context, EventStream, isZodSchema, streamSimple, type ToolResultMessage, type TSchema, validateToolArguments, zodToWireSchema, } from "@oh-my-pi/pi-ai"; import { sanitizeText } from "@oh-my-pi/pi-utils"; import { createHarmonyAuditEvent, detectHarmonyLeakInAssistantMessage, extractHarmonyRemoved, type HarmonyDetection, type HarmonyRecoveredToolCall, isHarmonyLeakMitigationTarget, recoverHarmonyToolCall, signalListLabel, } from "./harmony-leak"; import { type AgentRunCoverage, type AgentRunSummary, ToolCallBlockedError } from "./run-collector"; import { type AgentTelemetry, failChatSpan, finishChatSpan, finishExecuteToolSpan, finishInvokeAgentSpan, fireOnRunEnd, PiGenAIAttr, recordSkippedTool, resolveTelemetry, runInActiveSpan, type Span, startChatSpan, startExecuteToolSpan, startInvokeAgentSpan, } from "./telemetry"; import type { AgentContext, AgentEvent, AgentLoopConfig, AgentMessage, AgentTool, AgentToolResult, AsideMessage, StreamFn, } from "./types"; import { yieldIfDue } from "./utils/yield"; /** Sentinel returned by the abort race in `streamAssistantResponse`. */ const ABORTED: unique symbol = Symbol("agent-loop-aborted"); class HarmonyLeakInterruption extends Error { constructor( readonly detection: HarmonyDetection, readonly removed: string, readonly recovered?: HarmonyRecoveredToolCall, ) { super(`Detected GPT-5 Harmony protocol leakage (${signalListLabel(detection.signals)})`); this.name = "HarmonyLeakInterruption"; } } type AssistantContentBlock = AssistantMessage["content"][number]; type AssistantToolCallBlock = Extract; type CloneableRecord = Record; function cloneUnknown(value: unknown): unknown { if (Array.isArray(value)) return value.map(cloneUnknown); if (!value || typeof value !== "object") return value; const source = value as CloneableRecord; const out: CloneableRecord = {}; for (const [key, child] of Object.entries(source)) { out[key] = cloneUnknown(child); } return out; } function cloneToolArguments(args: AssistantToolCallBlock["arguments"]): AssistantToolCallBlock["arguments"] { return cloneUnknown(args) as AssistantToolCallBlock["arguments"]; } function snapshotAssistantContentBlock(block: AssistantContentBlock): AssistantContentBlock { switch (block.type) { case "text": return { ...block }; case "thinking": return { ...block }; case "redactedThinking": return { ...block }; case "toolCall": return { ...block, arguments: cloneToolArguments(block.arguments) }; } } function snapshotAssistantMessage(message: AssistantMessage): AssistantMessage { return { ...message, content: message.content.map(snapshotAssistantContentBlock), usage: { ...message.usage, cost: { ...message.usage.cost }, }, disabledFeatures: message.disabledFeatures ? [...message.disabledFeatures] : undefined, }; } function snapshotAssistantMessageEvent(event: AssistantMessageEvent): AssistantMessageEvent { switch (event.type) { case "start": return { ...event, partial: snapshotAssistantMessage(event.partial) }; case "text_start": case "text_delta": case "text_end": case "thinking_start": case "thinking_delta": case "thinking_end": case "toolcall_start": case "toolcall_delta": return { ...event, partial: snapshotAssistantMessage(event.partial) }; case "toolcall_end": return { ...event, toolCall: snapshotAssistantContentBlock(event.toolCall) as AssistantToolCallBlock, partial: snapshotAssistantMessage(event.partial), }; case "done": return { ...event, message: snapshotAssistantMessage(event.message) }; case "error": return { ...event, error: snapshotAssistantMessage(event.error) }; } } /** * Normalize a value coming back from `tool.execute()` (or its streaming partial-update callback) * into a structurally valid {@link AgentToolResult}. * * The tool interface is typed, but third-party tools (MCP, extensions, user-authored AgentTools) * can violate the contract at runtime. Persisting a malformed result corrupts the session file * (missing `content` array → crash on reload). We coerce at the single boundary where untyped * results enter the agent loop, so every downstream consumer can rely on the type. */ const EMPTY_ERROR_TOOL_RESULT_TEXT = "Tool failed with no output."; function hasSubstantiveToolResultContent(content: AgentToolResult["content"]): boolean { for (const block of content) { if (block.type === "image") return true; if (block.type === "text" && block.text.trim().length > 0) return true; } return false; } function coerceToolResult(raw: unknown): { result: AgentToolResult; malformed: boolean } { const rawObj = raw && typeof raw === "object" ? (raw as Record) : null; const rawContent = rawObj?.content; const details = rawObj && "details" in rawObj ? rawObj.details : {}; // Tools may flag a non-throwing failure on the result itself (e.g. an // aggregator that catches per-entry errors and synthesizes a combined // result). Preserve the flag so agent-loop can surface it on the wire. const explicitError = Boolean(rawObj && "isError" in rawObj && rawObj.isError); if (!Array.isArray(rawContent)) { return { result: { content: [{ type: "text", text: "Tool returned an invalid result: missing content array." }], details, isError: true, }, malformed: true, }; } const content: AgentToolResult["content"] = []; let invalidBlocks = 0; for (const block of rawContent) { if (!block || typeof block !== "object" || !("type" in block)) { invalidBlocks++; continue; } if (block.type === "text" && typeof (block as { text?: unknown }).text === "string") { content.push({ type: "text", text: sanitizeText((block as { text: string }).text) }); } else if ( block.type === "image" && typeof (block as { data?: unknown }).data === "string" && typeof (block as { mimeType?: unknown }).mimeType === "string" ) { content.push(block as { type: "image"; data: string; mimeType: string }); } else { invalidBlocks++; } } if (invalidBlocks > 0) { content.push({ type: "text", text: `Tool returned an invalid result: ${invalidBlocks} content block${invalidBlocks === 1 ? "" : "s"} had an unsupported shape.`, }); } const isError = explicitError || invalidBlocks > 0; // Anthropic rejects tool_result blocks with is_error: true and empty content. if (isError && !hasSubstantiveToolResultContent(content)) { content.length = 0; content.push({ type: "text", text: EMPTY_ERROR_TOOL_RESULT_TEXT }); } return { result: { content, details, ...(isError ? { isError: true } : {}) }, malformed: invalidBlocks > 0, }; } /** * Start an agent loop with a new prompt message. * The prompt is added to the context and events are emitted for it. */ export function agentLoop( prompts: AgentMessage[], context: AgentContext, config: AgentLoopConfig, signal?: AbortSignal, streamFn?: StreamFn, ): EventStream { const stream = createAgentStream(); (async () => { const newMessages: AgentMessage[] = [...prompts]; const currentContext: AgentContext = { ...context, messages: [...context.messages, ...prompts], }; stream.push({ type: "agent_start" }); stream.push({ type: "turn_start" }); for (const prompt of prompts) { stream.push({ type: "message_start", message: prompt }); stream.push({ type: "message_end", message: prompt }); } try { await runLoop(currentContext, newMessages, config, signal, stream, streamFn); } catch (err) { stream.fail(err); } })(); return stream; } /** * Continue an agent loop from the current context without adding a new message. * Used for retries - context already has user message or tool results. * * **Important:** The last message in context must convert to a `user` or `toolResult` message * via `convertToLlm`. If it doesn't, the LLM provider will reject the request. * This cannot be validated here since `convertToLlm` is only called once per turn. */ export function agentLoopContinue( context: AgentContext, config: AgentLoopConfig, signal?: AbortSignal, streamFn?: StreamFn, ): EventStream { if (context.messages.length === 0) { throw new Error("Cannot continue: no messages in context"); } if (context.messages[context.messages.length - 1].role === "assistant") { throw new Error("Cannot continue from message role: assistant"); } const stream = createAgentStream(); (async () => { const newMessages: AgentMessage[] = []; const currentContext: AgentContext = { ...context, messages: [...context.messages] }; stream.push({ type: "agent_start" }); stream.push({ type: "turn_start" }); try { await runLoop(currentContext, newMessages, config, signal, stream, streamFn); } catch (err) { stream.fail(err); } })(); return stream; } function createAgentStream(): EventStream { return new EventStream( (event: AgentEvent) => event.type === "agent_end", (event: AgentEvent) => (event.type === "agent_end" ? event.messages : []), ); } /** * Build the `agent_end` event payload. When telemetry is enabled, snapshots * the run collector so consumers receive {@link AgentRunSummary} + * {@link AgentRunCoverage} alongside the messages without parsing OTEL spans. * When telemetry is unset, returns the bare event for backwards compatibility. */ function buildAgentEndEvent( messages: AgentMessage[], telemetry: AgentTelemetry | undefined, stepCount: number, ): Extract { if (!telemetry) return { type: "agent_end", messages }; const snapshot = telemetry.collector.snapshot({ stepCount }); if (telemetry.collector.markRunEnded()) { fireOnRunEnd(telemetry, snapshot.summary, snapshot.coverage); } return { type: "agent_end", messages, telemetry: snapshot.summary, coverage: snapshot.coverage }; } /** * Detailed-result handle returned by {@link agentLoopDetailed}. Adds the * run-level telemetry/coverage rollup to the existing `AgentMessage[]` * payload without changing the resolved type of `stream.result()`. */ export interface AgentLoopDetailedResult { readonly messages: AgentMessage[]; readonly telemetry: AgentRunSummary | undefined; readonly coverage: AgentRunCoverage | undefined; } /** * Convenience wrapper over {@link agentLoop} that exposes the run-level * summary + coverage alongside the messages. The returned `stream` is the * same `EventStream` callers already consume; `detailed()` awaits the * stream's `agent_end` event and returns the additive fields. * * Existing `stream.result()` semantics are preserved — it still resolves to * `AgentMessage[]`. Use {@link agentLoopDetailed} when you need the rollup; * use {@link agentLoop} when you do not. */ export function agentLoopDetailed( prompts: AgentMessage[], context: AgentContext, config: AgentLoopConfig, signal?: AbortSignal, streamFn?: StreamFn, ): { readonly stream: EventStream; readonly detailed: () => Promise; } { const capture = createDetailedCapture(config); const stream = agentLoop(prompts, context, capture.config, signal, streamFn); return { stream, detailed: () => capture.detailed(stream) }; } /** * Like {@link agentLoopDetailed} but built on top of * {@link agentLoopContinue}. */ export function agentLoopContinueDetailed( context: AgentContext, config: AgentLoopConfig, signal?: AbortSignal, streamFn?: StreamFn, ): { readonly stream: EventStream; readonly detailed: () => Promise; } { const capture = createDetailedCapture(config); const stream = agentLoopContinue(context, capture.config, signal, streamFn); return { stream, detailed: () => capture.detailed(stream) }; } /** * Wire an `onRunEnd` telemetry hook onto `config` so the detailed helper can * capture the run summary without consuming the event stream. Preserves any * existing `onRunEnd` the caller had set. */ function createDetailedCapture(config: AgentLoopConfig): { readonly config: AgentLoopConfig; readonly detailed: (stream: EventStream) => Promise; } { let captured: { summary: AgentRunSummary; coverage: AgentRunCoverage } | undefined; const userHook = config.telemetry?.onRunEnd; const wired: AgentLoopConfig = { ...config, telemetry: { ...(config.telemetry ?? {}), onRunEnd: (summary, coverage) => { captured = { summary, coverage }; userHook?.(summary, coverage); }, }, }; return { config: wired, detailed: async stream => { const messages = await stream.result(); return { messages, telemetry: captured?.summary, coverage: captured?.coverage, }; }, }; } function normalizeMessagesForProvider( messages: Context["messages"], model: AgentLoopConfig["model"], ): Context["messages"] { if (model.provider !== "cerebras") { return messages; } let hasThinking = false; for (const message of messages) { if (message.role !== "assistant" || !Array.isArray(message.content)) continue; for (const block of message.content) { if (block.type === "thinking") { hasThinking = true; break; } } if (hasThinking) break; } if (!hasThinking) return messages; return messages.map(message => { if (message.role !== "assistant" || !Array.isArray(message.content)) { return message; } const filtered = message.content.filter(block => block.type !== "thinking"); return filtered.length === message.content.length ? message : { ...message, content: filtered }; }); } export const INTENT_FIELD = "_i"; function injectIntentIntoSchema(schema: unknown, mode: "require" | "optional" = "require"): unknown { if (!schema || typeof schema !== "object" || Array.isArray(schema)) return schema; const schemaRecord = schema as Record; const propertiesValue = schemaRecord.properties; const properties = propertiesValue && typeof propertiesValue === "object" && !Array.isArray(propertiesValue) ? (propertiesValue as Record) : {}; const requiredValue = schemaRecord.required; const required = Array.isArray(requiredValue) ? requiredValue.filter((item): item is string => typeof item === "string") : []; if (INTENT_FIELD in properties) { const { [INTENT_FIELD]: intentProp, ...rest } = properties; const needsReorder = Object.keys(properties)[0] !== INTENT_FIELD; const needsRequired = mode === "require" && !required.includes(INTENT_FIELD); if (!needsReorder && !needsRequired) return schema; return { ...schemaRecord, ...(needsReorder ? { properties: { [INTENT_FIELD]: intentProp, ...rest } } : {}), ...(needsRequired ? { required: [...required, INTENT_FIELD] } : {}), }; } return { ...schemaRecord, properties: { [INTENT_FIELD]: { type: "string", }, ...properties, }, ...(mode === "require" ? { required: [...required, INTENT_FIELD] } : {}), }; } export function normalizeTools(tools: AgentContext["tools"], injectIntent: boolean): Context["tools"] { injectIntent = injectIntent && Bun.env.PI_NO_INTENT !== "1"; return tools?.map(t => { const intentMode = resolveIntentMode(t.intent); let parameters: TSchema = t.parameters; if (injectIntent && intentMode !== "omit") { if (isZodSchema(parameters)) { const wired = zodToWireSchema(parameters); parameters = injectIntentIntoSchema(wired, intentMode) as TSchema; } else { parameters = injectIntentIntoSchema(parameters, intentMode) as TSchema; } } const description = t.description ?? ""; return { ...t, parameters, description }; }); } function resolveIntentMode(intent: AgentTool["intent"]): "require" | "optional" | "omit" { if (typeof intent === "function") return "omit"; if (intent === "optional" || intent === "omit") return intent; return "require"; } function extractIntent(args: Record): { intent?: string; strippedArgs: Record } { const { [INTENT_FIELD]: intent, ...strippedArgs } = args; if (typeof intent !== "string") { return { strippedArgs }; } const trimmed = intent.trim(); return { intent: trimmed.length > 0 ? trimmed : undefined, strippedArgs }; } /** * Main loop logic shared by agentLoop and agentLoopContinue. */ async function runLoop( currentContext: AgentContext, newMessages: AgentMessage[], config: AgentLoopConfig, signal: AbortSignal | undefined, stream: EventStream, streamFn?: StreamFn, ): Promise { const telemetry = resolveTelemetry(config.telemetry, config.sessionId); const invokeAgentSpan = startInvokeAgentSpan(telemetry, config.model); const stepCounter = { count: 0 }; let caughtError: unknown; try { await runInActiveSpan(invokeAgentSpan, () => runLoopBody( currentContext, newMessages, config, signal, stream, telemetry, invokeAgentSpan, stepCounter, streamFn, ), ); } catch (err) { caughtError = err; throw err; } finally { finishInvokeAgentSpan(telemetry, invokeAgentSpan, { stepCount: stepCounter.count, errorObject: caughtError, }); } } interface StepCounter { count: number; } /** * Resolve aside entries at the moment the loop is about to inject them. Each entry * is either a ready {@link AgentMessage} or a sync thunk evaluated here so the * producer can make the final inject-or-drop decision (return null) against * up-to-the-injection state — e.g. dropping late diagnostics a newer edit * superseded. Kept sync so it can never stall the loop. */ function resolveAsides(entries: AsideMessage[] | undefined): AgentMessage[] { if (!entries || entries.length === 0) return []; const out: AgentMessage[] = []; for (const entry of entries) { const message = typeof entry === "function" ? entry() : entry; if (message) out.push(message); } return out; } async function runLoopBody( currentContext: AgentContext, newMessages: AgentMessage[], config: AgentLoopConfig, signal: AbortSignal | undefined, stream: EventStream, telemetry: AgentTelemetry | undefined, invokeAgentSpan: Span | undefined, stepCounter: StepCounter, streamFn?: StreamFn, ): Promise { let firstTurn = true; // Check for steering messages at start (user may have typed while waiting). // Skip when the run is already externally aborted — dequeuing would strand // the messages in a run that is about to die. let pendingMessages: AgentMessage[] = signal?.aborted ? [] : (await config.getSteeringMessages?.()) || []; let harmonyRetryAttempt = 0; let harmonyTruncateResumeCount = 0; // Outer loop: continues when queued follow-up messages arrive after agent would stop while (true) { let hasMoreToolCalls = true; // Inner loop: process tool calls and steering messages while (hasMoreToolCalls || pendingMessages.length > 0) { // Yield at the top of each iteration to prevent busy-wait when // the agent loop is executing tool calls back-to-back. await yieldIfDue(); if (!firstTurn) { stream.push({ type: "turn_start" }); } else { firstTurn = false; } // Process pending messages (inject before next assistant response) if (pendingMessages.length > 0) { for (const message of pendingMessages) { stream.push({ type: "message_start", message }); stream.push({ type: "message_end", message }); currentContext.messages.push(message); newMessages.push(message); } pendingMessages = []; } // Refresh prompt/tool context from live state before each model call if (config.syncContextBeforeModelCall) { await config.syncContextBeforeModelCall(currentContext); } // Stream assistant response let recovered: HarmonyRecoveredToolCall | undefined; let message: AssistantMessage; try { message = await streamAssistantResponse( currentContext, config, signal, stream, telemetry, invokeAgentSpan, stepCounter, streamFn, harmonyRetryAttempt, ); harmonyRetryAttempt = 0; harmonyTruncateResumeCount = 0; } catch (err) { if (!(err instanceof HarmonyLeakInterruption)) throw err; if (err.recovered) { if (harmonyTruncateResumeCount >= 2) { await emitHarmonyAudit(config, err, "escalated", harmonyRetryAttempt); throw new Error( `GPT-5 Harmony leak recurred after truncate-and-resume recovery (${signalListLabel(err.detection.signals)}).`, ); } harmonyTruncateResumeCount++; recovered = err.recovered; message = recovered.message; await emitHarmonyAudit(config, err, "truncate_resume", harmonyRetryAttempt); } else { if (harmonyRetryAttempt >= 2) { await emitHarmonyAudit(config, err, "escalated", harmonyRetryAttempt); throw new Error( `GPT-5 Harmony leak persisted after ${harmonyRetryAttempt} retries (${signalListLabel(err.detection.signals)}).`, ); } await emitHarmonyAudit(config, err, "abort_retry", harmonyRetryAttempt); harmonyRetryAttempt++; continue; } } if (recovered) { message = snapshotAssistantMessage(message); currentContext.messages.push(message); stream.push({ type: "message_start", message: snapshotAssistantMessage(message) }); stream.push({ type: "message_end", message: snapshotAssistantMessage(message) }); } newMessages.push(message); let steeringMessagesFromExecution: AgentMessage[] | undefined; if (message.stopReason === "error" || message.stopReason === "aborted") { // Create placeholder tool results for any tool calls in the aborted message // This maintains the tool_use/tool_result pairing that the API requires type ToolCallContent = Extract; const toolCalls = message.content.filter((c): c is ToolCallContent => c.type === "toolCall"); const toolResults: ToolResultMessage[] = []; for (const toolCall of toolCalls) { const result = createAbortedToolResult(toolCall, stream, message.stopReason, message.errorMessage); currentContext.messages.push(result); newMessages.push(result); toolResults.push(result); // The placeholder result above keeps the API's tool_use/tool_result // pairing intact, but no execute_tool span is started for these // calls. Mirror the run-collector entry directly so the run // summary's tool counters and `coverage.toolsInvoked` reflect // what the user actually saw on the wire. recordSkippedTool(telemetry, { toolCallId: toolCall.id, toolName: toolCall.name, status: message.stopReason === "aborted" ? "aborted" : "error", }); } stream.push({ type: "turn_end", message, toolResults }); stream.push(buildAgentEndEvent(newMessages, telemetry, stepCounter.count)); stream.end(newMessages); return; } // Run tools whenever the turn carries tool_use blocks AND was not truncated. // `stop_reason` is provider metadata that never goes back on the wire, so it // does not gate continuation validity: replaying a tool_use turn with the // tool_results appended is accepted whether the turn ended on `tool_use` or // `end_turn` (adaptive/interleaved-thinking Opus routinely emits tool calls // under `end_turn`; verified against the live Anthropic API). The only // continuation hazard is a thinking block carrying a stale/invalid signature, // which `transformMessages` already neutralizes — it strips the signature on // non-`toolUse` turns and the encoder downgrades the unsigned block to text, // which the API accepts. So treat `stop` (end_turn/pause_turn) the same as // `toolUse`. `length` (max_tokens) is the one reason we must NOT run: the // trailing tool_use may be truncated with incomplete arguments — those calls // are abandoned below. (`error`/`aborted` already returned above.) type ToolCallContent = Extract; const toolCalls = message.content.filter((c): c is ToolCallContent => c.type === "toolCall"); const runnableStop = message.stopReason === "toolUse" || message.stopReason === "stop"; hasMoreToolCalls = runnableStop && toolCalls.length > 0; const toolResults: ToolResultMessage[] = []; if (hasMoreToolCalls) { const executionResult = await executeToolCalls( currentContext, message, signal, stream, config, telemetry, invokeAgentSpan, ); toolResults.push(...executionResult.toolResults); steeringMessagesFromExecution = executionResult.steeringMessages; for (const result of toolResults) { currentContext.messages.push(result); newMessages.push(result); } } else if (toolCalls.length > 0) { // Turn ended on a non-runnable reason (`length` truncation) but left // toolCall blocks behind. The trailing call's arguments may be incomplete, // so don't execute or continue — pair each with a placeholder result to keep // the tool_use/tool_result contract valid for any later request that // replays this turn. When the truncation was `length`, surface an actionable // hint so the model doesn't loop by re-emitting the same oversized payload // (e.g. 1000+ line `write` content blowing past the model's output cap). const skipReason = message.stopReason === "length" ? "length" : "skipped"; for (const toolCall of toolCalls) { const result = createAbortedToolResult(toolCall, stream, skipReason); currentContext.messages.push(result); newMessages.push(result); toolResults.push(result); recordSkippedTool(telemetry, { toolCallId: toolCall.id, toolName: toolCall.name, status: "skipped", }); } if (message.stopReason === "length" && toolResults.length > 0) { hasMoreToolCalls = true; } } stream.push({ type: "turn_end", message, toolResults }); // On external abort (user interrupt), leave the steering queue intact: the // session aborts then continues, delivering the queue into a fresh run. // Draining it here would inject the messages right before a model call that // instantly aborts — message lands in history, agent never responds. const steering = steeringMessagesFromExecution ?? (signal?.aborted ? [] : (await config.getSteeringMessages?.()) || []); if (hasMoreToolCalls) { // Mid-work: fold any non-interrupting asides into the next turn alongside steering. const asides = resolveAsides(await config.getAsideMessages?.()); pendingMessages = asides.length > 0 ? [...steering, ...asides] : steering; } else { // Stop boundary: only steering (live user input) forces another turn here. Leave // asides for the outer drain below so a passive aside can't trigger an extra model // turn ahead of a queued follow-up — the outer drain batches asides + follow-ups together. pendingMessages = steering; } } // Agent would stop here. Drain non-interrupting asides + follow-up messages. await config.onBeforeYield?.(); // Skip queue drains when externally aborted (same stranding hazard as above). const asideMessages = signal?.aborted ? [] : resolveAsides(await config.getAsideMessages?.()); const followUpMessages = signal?.aborted ? [] : (await config.getFollowUpMessages?.()) || []; if (asideMessages.length > 0 || followUpMessages.length > 0) { // Set as pending so the inner loop processes them before stopping. pendingMessages = [...asideMessages, ...followUpMessages]; continue; } // No more messages, exit break; } stream.push(buildAgentEndEvent(newMessages, telemetry, stepCounter.count)); stream.end(newMessages); } async function emitHarmonyAudit( config: AgentLoopConfig, interruption: HarmonyLeakInterruption, action: "truncate_resume" | "abort_retry" | "escalated", retryN: number, ): Promise { await config.onHarmonyLeak?.( createHarmonyAuditEvent({ action, detection: interruption.detection, model: config.model, retryN, removed: interruption.removed, }), ); } /** * Stream an assistant response from the LLM. * This is where AgentMessage[] gets transformed to Message[] for the LLM. */ async function streamAssistantResponse( context: AgentContext, config: AgentLoopConfig, signal: AbortSignal | undefined, stream: EventStream, telemetry: AgentTelemetry | undefined, invokeAgentSpan: Span | undefined, stepCounter: StepCounter, streamFn?: StreamFn, harmonyRetryAttempt = 0, ): Promise { // Apply context transform if configured (AgentMessage[] → AgentMessage[]) let messages = context.messages; if (config.transformContext) { messages = await config.transformContext(messages, signal); } // Convert to LLM-compatible messages (AgentMessage[] → Message[]) const llmMessages = await config.convertToLlm(messages); const normalizedMessages = normalizeMessagesForProvider(llmMessages, config.model); // Build LLM context — append-only mode caches system prompt + tools // AND keeps an append-only message log so prior-turn bytes are stable. let llmContext: Context; if (config.appendOnlyContext) { config.appendOnlyContext.syncMessages(normalizedMessages); llmContext = config.appendOnlyContext.build(context, { intentTracing: !!config.intentTracing }); } else { llmContext = { systemPrompt: context.systemPrompt, messages: normalizedMessages, tools: normalizeTools(context.tools, !!config.intentTracing), }; } if (config.transformProviderContext) { llmContext = config.transformProviderContext(llmContext); } const streamFunction = streamFn || streamSimple; // Resolve API key (important for expiring tokens) — do this before resolving // metadata so that the session-sticky credential recorded by getApiKey is // visible to metadataResolver (e.g. for the correct account_uuid in metadata.user_id). const staticApiKey = typeof config.apiKey === "string" ? config.apiKey : undefined; const resolvedApiKey = (config.getApiKey ? await config.getApiKey(config.model.provider) : undefined) || staticApiKey; // Re-resolve metadata after credential selection so the per-request value // reflects the credential actually used, not the snapshot from AgentLoopConfig construction. const resolvedMetadata = config.metadataResolver ? config.metadataResolver(config.model.provider) : config.metadata; const dynamicToolChoice = config.getToolChoice?.(); const dynamicReasoning = config.getReasoning?.(); const dynamicDisableReasoning = config.getDisableReasoning?.(); const harmonyMitigationEnabled = isHarmonyLeakMitigationTarget(config.model); const harmonyAbortController = harmonyMitigationEnabled ? new AbortController() : undefined; const requestSignal = harmonyAbortController ? signal ? AbortSignal.any([signal, harmonyAbortController.signal]) : harmonyAbortController.signal : signal; const effectiveTemperature = harmonyRetryAttempt > 0 && config.temperature !== undefined ? config.temperature + 0.05 : config.temperature; const effectiveToolChoice = dynamicToolChoice ?? config.toolChoice; const effectiveReasoning = dynamicReasoning ?? config.reasoning; const effectiveDisableReasoning = dynamicDisableReasoning ?? config.disableReasoning; const chatStepNumber = stepCounter.count; stepCounter.count += 1; const chatSpan = startChatSpan(telemetry, config.model, { parent: invokeAgentSpan, stepNumber: chatStepNumber, request: { maxTokens: config.maxTokens, temperature: effectiveTemperature, topP: config.topP, topK: config.topK, presencePenalty: config.presencePenalty, serviceTier: config.serviceTier, reasoningEffort: typeof effectiveReasoning === "string" ? effectiveReasoning : undefined, toolChoice: effectiveToolChoice, tools: llmContext.tools, systemPrompt: llmContext.systemPrompt, messages: llmContext.messages, }, }); // Wrap the user-supplied onResponse so we always observe response headers // for telemetry (`ChatUsageEvent.headers`, gateway auto-detection) without // stealing them from the configured hook. let capturedHeaders: Readonly> | undefined; const userOnResponse = config.onResponse; const captureOnResponse: AgentLoopConfig["onResponse"] = (response, modelInfo) => { capturedHeaders = response.headers; return userOnResponse?.(response, modelInfo); }; const finishChat = async (message: AssistantMessage): Promise => { await finishChatSpan(telemetry, chatSpan, message, { stepNumber: chatStepNumber, serviceTier: config.serviceTier, responseHeaders: capturedHeaders, baseUrl: config.model.baseUrl, }); }; try { return await runInActiveSpan(chatSpan, async () => { const response = await streamFunction(config.model, llmContext, { ...config, // Hand streamSimple a resolver so its central auth-retry policy can // re-resolve on 401 / usage-limit: the initial step reuses the key // already resolved above (which set the session-sticky credential // feeding metadataResolver), and retry steps forward the a/b/c ctx // to config.getApiKey (force-refresh, then rotate). With no // getApiKey hook the caller's own apiKey (string or resolver) flows // through unchanged. apiKey: config.getApiKey ? (ctx: ApiKeyResolveContext) => ctx.error === undefined ? resolvedApiKey : Promise.resolve(config.getApiKey!(config.model.provider, ctx)) : config.apiKey, metadata: resolvedMetadata, toolChoice: effectiveToolChoice, reasoning: effectiveReasoning, disableReasoning: effectiveDisableReasoning, temperature: effectiveTemperature, signal: requestSignal, onResponse: captureOnResponse, }); let partialMessage: AssistantMessage | null = null; let addedPartial = false; const completedToolCallIds = new Set(); const responseIterator = response[Symbol.asyncIterator](); const finishAbortedStream = async (): Promise => { try { const cleanup = responseIterator.return?.(); if (cleanup) void cleanup.catch(() => {}); } catch { // Provider cancellation failures cannot change the committed aborted message. } const aborted = emitAbortedAssistantMessage( partialMessage, addedPartial, completedToolCallIds, context, config, stream, requestSignal, ); await finishChat(aborted); return aborted; }; // Set up a single abort race: register the abort listener once for the whole // stream and reuse the same race promise for every iterator.next() instead of // allocating Promise.withResolvers and add/removeEventListener per event. let abortRacePromise: Promise | undefined; let detachAbortListener: (() => void) | undefined; if (requestSignal) { if (requestSignal.aborted) { return await finishAbortedStream(); } const { promise, resolve } = Promise.withResolvers(); const onAbort = () => resolve(ABORTED); requestSignal.addEventListener("abort", onAbort, { once: true }); abortRacePromise = promise; detachAbortListener = () => requestSignal.removeEventListener("abort", onAbort); } try { while (true) { let next: IteratorResult; if (abortRacePromise) { const result = await Promise.race([responseIterator.next(), abortRacePromise]); if (result === ABORTED) { return await finishAbortedStream(); } next = result; } else { next = await responseIterator.next(); } if (next.done) break; const event = next.value; if (event.type === "done" || event.type === "error") { let finalMessage = retainCompletedToolCalls(await response.result(), completedToolCallIds); if (harmonyMitigationEnabled) { const detection = detectHarmonyLeakInAssistantMessage(finalMessage); if (detection) { const recovered = recoverHarmonyToolCall(finalMessage, detection); const removed = recovered?.removed ?? extractHarmonyRemoved(finalMessage, detection); if (addedPartial) { emitDiscardedHarmonyPartial( partialMessage, stream, `Discarded after GPT-5 Harmony protocol leakage (${signalListLabel(detection.signals)})`, ); context.messages.pop(); addedPartial = false; } throw new HarmonyLeakInterruption(detection, removed, recovered); } } finalMessage = snapshotAssistantMessage(finalMessage); if (addedPartial) { context.messages[context.messages.length - 1] = finalMessage; } else { context.messages.push(finalMessage); } if (!addedPartial) { stream.push({ type: "message_start", message: snapshotAssistantMessage(finalMessage) }); } stream.push({ type: "message_end", message: snapshotAssistantMessage(finalMessage) }); await finishChat(finalMessage); return finalMessage; } if (requestSignal?.aborted) { return await finishAbortedStream(); } // Yield to the event loop periodically to prevent busy-wait // when the LLM is streaming chunks faster than the loop can rest. await yieldIfDue(); switch (event.type) { case "start": partialMessage = event.partial; context.messages.push(partialMessage); addedPartial = true; stream.push({ type: "message_start", message: snapshotAssistantMessage(partialMessage) }); break; case "text_start": case "text_delta": case "text_end": case "thinking_start": case "thinking_delta": case "thinking_end": case "toolcall_start": case "toolcall_delta": case "toolcall_end": if (partialMessage) { if (event.type === "toolcall_end") { completedToolCallIds.add(event.toolCall.id); } partialMessage = event.partial; context.messages[context.messages.length - 1] = partialMessage; config.onAssistantMessageEvent?.(partialMessage, event); stream.push({ type: "message_update", assistantMessageEvent: snapshotAssistantMessageEvent(event), message: snapshotAssistantMessage(partialMessage), }); } break; } } } finally { detachAbortListener?.(); } let trailing = await response.result(); if (harmonyMitigationEnabled) { const detection = detectHarmonyLeakInAssistantMessage(trailing); if (detection) { const recovered = recoverHarmonyToolCall(trailing, detection); const removed = recovered?.removed ?? extractHarmonyRemoved(trailing, detection); if (addedPartial) { emitDiscardedHarmonyPartial( partialMessage, stream, `Discarded after GPT-5 Harmony protocol leakage (${signalListLabel(detection.signals)})`, ); context.messages.pop(); addedPartial = false; } throw new HarmonyLeakInterruption(detection, removed, recovered); } } trailing = snapshotAssistantMessage(trailing); if (addedPartial) { context.messages[context.messages.length - 1] = trailing; stream.push({ type: "message_end", message: snapshotAssistantMessage(trailing) }); } await finishChat(trailing); return trailing; }); } catch (err) { failChatSpan(telemetry, chatSpan, { errorObject: err, responseHeaders: capturedHeaders, baseUrl: config.model.baseUrl, }); throw err; } } function retainCompletedToolCalls( message: AssistantMessage, completedToolCallIds: ReadonlySet, ): AssistantMessage { if (message.stopReason !== "error" && message.stopReason !== "aborted") return message; let changed = false; const content = message.content.filter(block => { if (block.type !== "toolCall") return true; const keep = completedToolCallIds.has(block.id); if (!keep) changed = true; return keep; }); return changed ? { ...message, content } : message; } function emitDiscardedHarmonyPartial( partialMessage: AssistantMessage | null, stream: EventStream, errorMessage: string, ): void { if (!partialMessage) return; stream.push({ type: "message_end", message: snapshotAssistantMessage({ ...partialMessage, stopReason: "error", errorMessage }), }); } /** Resolve the human-readable reason an abort carried. A caller that aborts via * `AbortController.abort(reason)` with a string or a non-`AbortError` `Error` * (e.g. the coding agent's user-interrupt label) gets that text surfaced on the * synthesized assistant message's `errorMessage`; a bare `abort()` (whose * `signal.reason` is the default `AbortError` `DOMException`) falls back to the * generic sentinel that downstream renderers treat as "no specific reason". */ export function abortReasonText(signal: AbortSignal | undefined): string { const reason = signal?.reason; if (typeof reason === "string" && reason.trim().length > 0) return reason; if (reason instanceof Error && reason.name !== "AbortError" && reason.message.trim().length > 0) { return reason.message; } return "Request was aborted"; } /** True when an abort carried a *deliberate*, human-meaningful reason — a string * reason or a non-`AbortError` `Error` (TTSR rule match, user-interrupt label). * A bare `abort()` (default `AbortError` `DOMException`) is anonymous and returns * false. Used to decide whether a mid-stream tool call survives the abort: a * deliberate interruption is a conscious decision made after the (partial) call * was observed, so the block is retained and paired with a labeled placeholder; * an anonymous abort drops incomplete calls whose args may be unsafe to replay. */ function isExplicitAbortReason(signal: AbortSignal | undefined): boolean { const reason = signal?.reason; if (typeof reason === "string") return reason.trim().length > 0; if (reason instanceof Error) return reason.name !== "AbortError" && reason.message.trim().length > 0; return false; } function emitAbortedAssistantMessage( partialMessage: AssistantMessage | null, addedPartial: boolean, completedToolCallIds: ReadonlySet, context: AgentContext, config: AgentLoopConfig, stream: EventStream, requestSignal: AbortSignal | undefined, ): AssistantMessage { const errorMessage = abortReasonText(requestSignal); const base: AssistantMessage = partialMessage ? { ...partialMessage, stopReason: "aborted", errorMessage } : { role: "assistant", content: [], api: config.model.api, provider: config.model.provider, model: config.model.id, usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0, cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, }, stopReason: "aborted", errorMessage, timestamp: Date.now(), }; // A deliberate, labeled abort (TTSR rule match, user interrupt) keeps every // committed tool-call block so the loop pairs it with a placeholder labeled by // `errorMessage`; an anonymous abort still drops calls that never completed // (no `toolcall_end`), whose partial args are unsafe to replay. const retained = isExplicitAbortReason(requestSignal) ? base : retainCompletedToolCalls(base, completedToolCallIds); const abortedMessage = snapshotAssistantMessage(retained); if (addedPartial) { context.messages[context.messages.length - 1] = abortedMessage; } else { context.messages.push(abortedMessage); stream.push({ type: "message_start", message: snapshotAssistantMessage(abortedMessage) }); } stream.push({ type: "message_end", message: snapshotAssistantMessage(abortedMessage) }); return abortedMessage; } /** * Execute tool calls from an assistant message. */ async function executeToolCalls( currentContext: AgentContext, assistantMessage: AssistantMessage, signal: AbortSignal | undefined, stream: EventStream, config: AgentLoopConfig, telemetry: AgentTelemetry | undefined, invokeAgentSpan: Span | undefined, ): Promise<{ toolResults: ToolResultMessage[]; steeringMessages?: AgentMessage[] }> { const tools = currentContext.tools; const { getSteeringMessages, interruptMode = "immediate", getToolContext, transformToolCallArguments, intentTracing, beforeToolCall, afterToolCall, } = config; type ToolCallContent = Extract; const toolCalls = assistantMessage.content.filter((c): c is ToolCallContent => c.type === "toolCall"); const emittedToolResults: ToolResultMessage[] = []; const toolCallInfos = toolCalls.map(call => ({ id: call.id, name: call.name })); const batchId = `${assistantMessage.timestamp ?? Date.now()}_${toolCalls[0]?.id ?? "batch"}`; const shouldInterruptImmediately = interruptMode !== "wait"; const steeringAbortController = new AbortController(); const toolSignal = signal ? AbortSignal.any([signal, steeringAbortController.signal]) : steeringAbortController.signal; const interruptState = { triggered: false }; let steeringMessages: AgentMessage[] | undefined; let steeringCheckTail: Promise = Promise.resolve(); const records = toolCalls.map(toolCall => ({ toolCall, // Tools emitted via OpenAI's custom-tool path (e.g. `apply_patch` on GPT-5) // come back under their wire-level name, which may differ from the // harness-internal `name`. Match on either, preferring `name` for // determinism if both somehow collide. tool: tools?.find(t => t.name === toolCall.name) ?? tools?.find(t => t.customWireName !== undefined && t.customWireName === toolCall.name), args: toolCall.arguments as Record, started: false, result: undefined as AgentToolResult | undefined, isError: false, skipped: false, toolResultMessage: undefined as ToolResultMessage | undefined, resultEmitted: false, })); const checkSteering = async (): Promise => { // `signal` (external/user abort) is checked separately from the internal // steeringAbortController: once the run is externally aborted it is // unwinding, and draining the steering queue here would strand the // messages in the dying run instead of leaving them for the post-abort // continue (interruptAndFlushQueuedMessages → Agent.continue()). if (!shouldInterruptImmediately || !getSteeringMessages || interruptState.triggered || signal?.aborted) { return; } const check = steeringCheckTail.then(async () => { if (interruptState.triggered || signal?.aborted) return; const steering = await getSteeringMessages(); if (steering.length > 0) { steeringMessages = steering; interruptState.triggered = true; steeringAbortController.abort(); } }); steeringCheckTail = check.catch(() => {}); await check; }; const emitToolResult = (record: (typeof records)[number], result: AgentToolResult, isError: boolean): void => { if (record.resultEmitted) return; const { toolCall } = record; if (!record.started) { stream.push({ type: "tool_execution_start", toolCallId: toolCall.id, toolName: toolCall.name, args: record.args, intent: toolCall.intent, }); } stream.push({ type: "tool_execution_end", toolCallId: toolCall.id, toolName: toolCall.name, result, isError, }); const toolResultMessage: ToolResultMessage = { role: "toolResult", toolCallId: toolCall.id, toolName: toolCall.name, content: result.content, details: result.details, isError, timestamp: Date.now(), }; record.result = result; record.isError = isError; record.toolResultMessage = toolResultMessage; record.resultEmitted = true; emittedToolResults.push(toolResultMessage); stream.push({ type: "message_start", message: toolResultMessage }); stream.push({ type: "message_end", message: toolResultMessage }); }; const runTool = async (record: (typeof records)[number], index: number): Promise => { if (interruptState.triggered) { // Skip both span emission and the collector orphan record here. The // tail sweep below (after `Promise.allSettled`) is the single path // that handles "no result message was produced" — it calls // `recordSkippedTool` and `emitToolResult` once per record, so any // work we did here would double-count. record.skipped = true; return; } const { toolCall, tool } = record; let argsForExecution = toolCall.arguments as Record; if (intentTracing) { const { intent, strippedArgs } = extractIntent(toolCall.arguments); argsForExecution = strippedArgs; if (intent) { toolCall.intent = intent; } else if (typeof tool?.intent === "function") { try { const derived = tool.intent(strippedArgs as never)?.trim(); if (derived) { toolCall.intent = derived; } } catch { // intent function must never break tool execution } } } record.args = argsForExecution; if (toolSignal.aborted) { record.skipped = true; recordSkippedTool(telemetry, { toolCallId: toolCall.id, toolName: toolCall.name, status: "aborted", }); emitToolResult(record, createToolSignalAbortedResult(toolSignal), true); return; } record.started = true; stream.push({ type: "tool_execution_start", toolCallId: toolCall.id, toolName: toolCall.name, args: argsForExecution, intent: toolCall.intent, }); const toolSpan = startExecuteToolSpan(telemetry, { tool, toolName: toolCall.name, toolCallId: toolCall.id, args: argsForExecution, parent: invokeAgentSpan, }); if (toolSpan && toolCall.intent) { toolSpan.setAttribute(PiGenAIAttr.ToolCallIntent, toolCall.intent); } let result: AgentToolResult = { content: [], details: {} }; let isError = false; let caughtError: unknown; let completedToolExecution = false; await runInActiveSpan(toolSpan, async () => { try { if (!tool) throw new Error(`Tool ${toolCall.name} not found`); if (toolSignal.aborted) { result = createToolSignalAbortedResult(toolSignal); isError = true; return; } let effectiveArgs: Record; try { effectiveArgs = validateToolArguments(tool, { ...toolCall, arguments: argsForExecution }); } catch (validationError) { if (tool.lenientArgValidation) { effectiveArgs = argsForExecution; } else { throw validationError; } } if (beforeToolCall) { const beforeResult = await beforeToolCall( { assistantMessage, toolCall, args: effectiveArgs, context: currentContext, }, toolSignal, ); if (beforeResult?.block) { throw new ToolCallBlockedError(beforeResult.reason); } } if (toolSignal.aborted) { result = createToolSignalAbortedResult(toolSignal); isError = true; return; } const executionArgs = transformToolCallArguments ? transformToolCallArguments(effectiveArgs, toolCall.name) : effectiveArgs; record.args = executionArgs; const toolContext = getToolContext ? getToolContext({ batchId, index, total: toolCalls.length, toolCalls: toolCallInfos, }) : undefined; const rawResult = await tool.execute( toolCall.id, executionArgs, toolSignal, partialResult => { stream.push({ type: "tool_execution_update", toolCallId: toolCall.id, toolName: toolCall.name, args: executionArgs, partialResult: coerceToolResult(partialResult).result, }); }, toolContext, ); completedToolExecution = true; const coerced = coerceToolResult(rawResult); result = coerced.result; if (coerced.malformed || result.isError) isError = true; } catch (e) { caughtError = e; result = { content: [{ type: "text", text: e instanceof Error ? e.message : String(e) }], details: {}, }; isError = true; } if (afterToolCall && (!toolSignal.aborted || completedToolExecution)) { try { const after = await afterToolCall( { assistantMessage, toolCall, args: record.args, result, isError, context: currentContext, }, toolSignal, ); if (after) { // Re-normalize the post-hook result: `afterToolCall` is untyped user/extension // code and may return malformed `content` (non-array / invalid blocks), which // would otherwise be persisted verbatim and corrupt the session — the same // hazard `coerceToolResult` guards on the execute path. const coerced = coerceToolResult({ content: after.content ?? result.content, details: after.details ?? result.details, isError: after.isError ?? result.isError, }); result = coerced.result; isError = coerced.malformed || (after.isError ?? isError); } } catch (e) { caughtError = e; result = { content: [{ type: "text", text: e instanceof Error ? e.message : String(e) }], details: {}, }; isError = true; } } }); const interrupted = interruptState.triggered; const abortedDuringExecution = toolSignal.aborted && isError; if (interrupted && isError) { // Steering/abort fired AND this tool failed — it was cut off before producing a // usable result, so report it as skipped. record.skipped = true; emitToolResult(record, createSkippedToolResult(), true); } else { // No interrupt, or the tool finished (successfully or with a genuine error) before // the interrupt landed. Keep its real result: a completed tool already ran its side // effects, so the model must see what actually happened rather than a false "skipped". emitToolResult(record, result, isError); } const firstTextBlock = result.content?.[0]; const errorMessageForSpan = caughtError === undefined && isError && firstTextBlock?.type === "text" ? firstTextBlock.text : undefined; const status = (interrupted && isError) || abortedDuringExecution ? "aborted" : caughtError instanceof ToolCallBlockedError ? "blocked" : isError ? "error" : "ok"; finishExecuteToolSpan(telemetry, toolSpan, { result, isError, status, errorMessage: errorMessageForSpan, errorObject: caughtError, toolCallId: toolCall.id, toolName: toolCall.name, }); await checkSteering(); }; let lastExclusive: Promise = Promise.resolve(); let sharedTasks: Promise[] = []; const tasks: Promise[] = []; for (let index = 0; index < records.length; index++) { const record = records[index]; const concurrencyMode = record.tool?.concurrency; let concurrency: "shared" | "exclusive"; if (typeof concurrencyMode === "function") { // Resolved from raw pre-validation args; a throwing resolver must not // take down the whole batch, so fall back to the safe (serial) mode. try { concurrency = concurrencyMode(record.args); } catch { concurrency = "exclusive"; } } else { concurrency = concurrencyMode ?? "shared"; } const start = concurrency === "exclusive" ? Promise.all([lastExclusive, ...sharedTasks]) : lastExclusive; const task = start.then(() => runTool(record, index)); tasks.push(task); if (concurrency === "exclusive") { lastExclusive = task; sharedTasks = []; } else { sharedTasks.push(task); } } await Promise.allSettled(tasks); // Yield after batch tool execution to let GC and I/O catch up, // especially when tool results are large (e.g. bash output). await yieldIfDue(); for (const record of records) { if (!record.toolResultMessage) { record.skipped = true; recordSkippedTool(telemetry, { toolCallId: record.toolCall.id, toolName: record.toolCall.name, status: "skipped", }); emitToolResult(record, createSkippedToolResult(), true); } } return { toolResults: emittedToolResults, steeringMessages }; } /** * Create a tool result for a tool call that was aborted or errored before execution. * Maintains the tool_use/tool_result pairing required by the API. */ function createAbortedToolResult( toolCall: Extract, stream: EventStream, reason: "aborted" | "error" | "skipped" | "length", errorMessage?: string, ): ToolResultMessage { const message = reason === "aborted" ? "Tool execution was aborted" : reason === "length" ? "Tool call was not executed because the assistant hit its output token limit (stop_reason: length) before the arguments could complete; the recorded arguments are truncated and unsafe to run. Do NOT retry by re-emitting the same large payload — split the work into several smaller tool calls (e.g. for `write`/`edit`, write the first chunk then append the rest with subsequent `edit` insert ops, or break the file into multiple `write` targets)" : reason === "skipped" ? "Tool call was not executed because the assistant ended its turn" : "Tool execution failed due to an error"; const result: AgentToolResult = { content: [{ type: "text", text: errorMessage ? `${message}: ${errorMessage}` : `${message}.` }], details: {}, }; stream.push({ type: "tool_execution_start", toolCallId: toolCall.id, toolName: toolCall.name, args: toolCall.arguments, intent: toolCall.intent, }); stream.push({ type: "tool_execution_end", toolCallId: toolCall.id, toolName: toolCall.name, result, isError: true, }); const toolResultMessage: ToolResultMessage = { role: "toolResult", toolCallId: toolCall.id, toolName: toolCall.name, content: result.content, details: {}, isError: true, timestamp: Date.now(), }; stream.push({ type: "message_start", message: toolResultMessage }); stream.push({ type: "message_end", message: toolResultMessage }); return toolResultMessage; } function createToolSignalAbortedResult(signal: AbortSignal): AgentToolResult { const reason = abortReasonText(signal); return { content: [{ type: "text", text: `Tool was not executed because the run was aborted: ${reason}.` }], details: {}, }; } function createSkippedToolResult(): AgentToolResult { return { content: [{ type: "text", text: "Skipped due to queued user message." }], details: {}, }; }