import { Agent, AgentOutputType } from './agent'; import { Handoff } from './handoff'; import { ResolvedAgentOutput, HandoffsOutput, AgentInputItem, AgentOutputItem, } from './types'; import { RunItem, RunToolApprovalItem } from './items'; import { ModelResponse } from './model'; import { ReadableStreamController, ReadableStream as _ReadableStream, TransformStream, Readable, } from '@openai/agents-core/_shims'; import { ReadableStream } from './shims/interface'; import { RunStreamEvent } from './events'; import { getTurnInput } from './run'; import { RunState } from './runState'; import type { InputGuardrailResult, OutputGuardrailResult } from './guardrail'; import logger from './logger'; import { StreamEventTextStream } from './types/protocol'; /** * Data returned by the run() method of an agent. */ export interface RunResultData< TAgent extends Agent, THandoffs extends (Agent | Handoff)[] = any[], > { /** * The original input items i.e. the items before run() was called. This may be mutated version * of the input, if there are handoff input filters that mutate the input. */ input: string | AgentInputItem[]; /** * The new items generated during the agent run. These include things like new messages, tool * calls and their outputs, etc. */ newItems: RunItem[]; /** * The raw LLM responses generated by the model during the agent run. */ rawResponses: ModelResponse[]; /** * The last response ID generated by the model during the agent run. */ lastResponseId: string | undefined; /** * The last agent that was run */ lastAgent: TAgent | undefined; /** * Guardrail results for the input messages. */ inputGuardrailResults: InputGuardrailResult[]; /** * Guardrail results for the final output of the agent. */ outputGuardrailResults: OutputGuardrailResult[]; /** * The output of the last agent, or any handoff agent. */ finalOutput?: | ResolvedAgentOutput | HandoffsOutput; /** * The interruptions that occurred during the agent run. */ interruptions?: RunToolApprovalItem[]; /** * The state of the run. */ state: RunState; } class RunResultBase> implements RunResultData { readonly state: RunState; constructor(state: RunState) { this.state = state; } /** * The history of the agent run. This includes the input items and the new items generated during * the agent run. * * This can be used as inputs for the next agent run. */ get history(): AgentInputItem[] { return getTurnInput(this.input, this.newItems); } /** * The new items generated during the agent run. These include things like new messages, tool * calls and their outputs, etc. * * It does not include information about the agents and instead represents the model data. * * For the output including the agents, use the `newItems` property. */ get output(): AgentOutputItem[] { return getTurnInput([], this.newItems); } /** * A copy of the original input items. */ get input(): string | AgentInputItem[] { return this.state._originalInput; } /** * The run items generated during the agent run. This associates the model data with the agents. * * For the model data that can be used as inputs for the next agent run, use the `output` property. */ get newItems(): RunItem[] { return this.state._generatedItems; } /** * The raw LLM responses generated by the model during the agent run. */ get rawResponses(): ModelResponse[] { return this.state._modelResponses; } /** * The last response ID generated by the model during the agent run. */ get lastResponseId(): string | undefined { const responses = this.rawResponses; return responses && responses.length > 0 ? responses[responses.length - 1].responseId : undefined; } /** * The last agent that was run */ get lastAgent(): TAgent | undefined { return this.state._currentAgent; } /** * Guardrail results for the input messages. */ get inputGuardrailResults(): InputGuardrailResult[] { return this.state._inputGuardrailResults; } /** * Guardrail results for the final output of the agent. */ get outputGuardrailResults(): OutputGuardrailResult[] { return this.state._outputGuardrailResults; } /** * Any interruptions that occurred during the agent run for example for tool approvals. */ get interruptions(): RunToolApprovalItem[] { if (this.state._currentStep?.type === 'next_step_interruption') { return this.state._currentStep.data.interruptions; } return []; } /** * The final output of the agent. If the output type was set to anything other than `text`, * this will be parsed either as JSON or using the Zod schema you provided. */ get finalOutput(): ResolvedAgentOutput | undefined { if (this.state._currentStep?.type === 'next_step_final_output') { return this.state._currentAgent.processFinalOutput( this.state._currentStep.output, ) as ResolvedAgentOutput; } logger.warn('Accessed finalOutput before agent run is completed.'); return undefined; } } /** * The result of an agent run. */ export class RunResult< TContext, TAgent extends Agent, > extends RunResultBase { constructor(state: RunState) { super(state); } } /** * The result of an agent run in streaming mode. */ export class StreamedRunResult< TContext, TAgent extends Agent, > extends RunResultBase implements AsyncIterable { /** * The current agent that is running */ public get currentAgent(): TAgent | undefined { return this.lastAgent; } /** * The current turn number */ public currentTurn: number = 0; /** * The maximum number of turns that can be run */ public maxTurns: number | undefined; #error: unknown = null; #signal?: AbortSignal; #readableController: ReadableStreamController | undefined; #readableStream: _ReadableStream; #completedPromise: Promise; #completedPromiseResolve: (() => void) | undefined; #completedPromiseReject: ((err: unknown) => void) | undefined; #cancelled: boolean = false; constructor( result: { state: RunState; signal?: AbortSignal; } = {} as any, ) { super(result.state); this.#signal = result.signal; if (this.#signal) { this.#signal.addEventListener('abort', async () => { await this.#readableStream.cancel(); }); } this.#readableStream = new _ReadableStream({ start: (controller) => { this.#readableController = controller; }, cancel: () => { this.#cancelled = true; }, }); this.#completedPromise = new Promise((resolve, reject) => { this.#completedPromiseResolve = resolve; this.#completedPromiseReject = reject; }); } /** * @internal * Adds an item to the stream of output items */ _addItem(item: RunStreamEvent) { if (!this.cancelled) { this.#readableController?.enqueue(item); } } /** * @internal * Indicates that the stream has been completed */ _done() { if (!this.cancelled && this.#readableController) { this.#readableController.close(); this.#readableController = undefined; this.#completedPromiseResolve?.(); } } /** * @internal * Handles an error in the stream loop. */ _raiseError(err: unknown) { if (!this.cancelled && this.#readableController) { this.#readableController.error(err); this.#readableController = undefined; } this.#error = err; this.#completedPromiseReject?.(err); this.#completedPromise.catch((e) => { logger.debug(`Resulted in an error: ${e}`); }); } /** * Returns true if the stream has been cancelled. */ get cancelled(): boolean { return this.#cancelled; } /** * Returns the underlying readable stream. * @returns A readable stream of the agent run. */ toStream(): ReadableStream { return this.#readableStream as ReadableStream; } /** * Await this promise to ensure that the stream has been completed if you are not consuming the * stream directly. */ get completed() { return this.#completedPromise; } /** * Error thrown during the run, if any. */ get error() { return this.#error; } /** * Returns a readable stream of the final text output of the agent run. * * @param options - Options for the stream. * @param options.compatibleWithNodeStreams - Whether to use Node.js streams or web standard streams. * @returns A readable stream of the final output of the agent run. */ toTextStream(): ReadableStream; toTextStream(options?: { compatibleWithNodeStreams: true }): Readable; toTextStream(options?: { compatibleWithNodeStreams?: false; }): ReadableStream; toTextStream( options: { compatibleWithNodeStreams?: boolean } = {}, ): Readable | ReadableStream { const stream = this.#readableStream.pipeThrough( new TransformStream({ transform(event, controller) { if ( event.type === 'raw_model_stream_event' && event.data.type === 'output_text_delta' ) { const item = StreamEventTextStream.parse(event.data); controller.enqueue(item.delta); } }, }), ); if (options.compatibleWithNodeStreams) { return Readable.fromWeb(stream); } return stream as ReadableStream; } [Symbol.asyncIterator](): AsyncIterator { return this.#readableStream[Symbol.asyncIterator](); } }