import { getProjectDir, logger } from "@oh-my-pi/pi-utils"; import { Settings } from "../../config/settings"; import { OutputSink } from "../../session/streaming-output"; import type { ToolSession } from "../../tools"; import { resolveOutputMaxColumns, resolveOutputSinkHeadBytes } from "../../tools/output-meta"; import type { JsStatusEvent } from "../js/shared/types"; import type { KernelDisplayOutput } from "./display"; import { checkPythonKernelAvailability, type KernelExecuteOptions, type KernelExecuteResult, PythonKernel, } from "./kernel"; import { ensurePyToolBridge, registerPyToolBridge } from "./tool-bridge"; export type PythonKernelMode = "session" | "per-call"; export interface PythonExecutorOptions { /** Working directory for command execution */ cwd?: string; /** Timeout in milliseconds */ timeoutMs?: number; /** Absolute wall-clock deadline in milliseconds since epoch */ deadlineMs?: number; /** Callback for streaming output chunks (already sanitized) */ onChunk?: (chunk: string) => Promise | void; /** AbortSignal for cancellation */ signal?: AbortSignal; /** Session identifier for kernel reuse */ sessionId?: string; /** Logical owner identifier for retained kernel cleanup */ kernelOwnerId?: string; /** Kernel mode (session reuse vs per-call) */ kernelMode?: PythonKernelMode; /** Restart the kernel before executing */ reset?: boolean; /** Session file path for accessing task outputs */ sessionFile?: string; /** * Effective artifacts directory for the current session. Subagents share * the parent's directory, so this can differ from `sessionFile`'s sibling * dir. When present, exported to the kernel as `PI_ARTIFACTS_DIR` and * preferred over `PI_SESSION_FILE`-derived paths. */ artifactsDir?: string; /** Artifact path/id for full output storage */ artifactPath?: string; artifactId?: string; /** * ToolSession used to resolve host-side `tool.(args)` calls made from * the Python prelude's bridge proxy. When omitted, the bridge env vars are * not injected and any `tool.foo(...)` raises in Python. */ toolSession?: ToolSession; /** Callback for status events emitted by tool bridge invocations. */ emitStatus?: (event: JsStatusEvent) => void; /** @internal Bridge session id, set by `executePython` before delegating. */ bridgeSessionId?: string; /** @internal Bridge endpoint info, set by `executePython` before delegating. */ bridge?: { url: string; token: string }; } export interface PythonKernelExecutor { execute: (code: string, options?: KernelExecuteOptions) => Promise; } export interface PythonResult { /** Combined stdout + stderr output (sanitized, possibly truncated) */ output: string; /** Execution exit code (0 ok, 1 error, undefined if cancelled) */ exitCode: number | undefined; /** Whether the execution was cancelled via signal */ cancelled: boolean; /** Whether the output was truncated */ truncated: boolean; /** Artifact ID if full output was saved to artifact storage */ artifactId?: string; /** Total number of lines in the output stream */ totalLines: number; /** Total number of bytes in the output stream */ totalBytes: number; /** Number of lines included in the output text */ outputLines: number; /** Number of bytes included in the output text */ outputBytes: number; /** Rich display outputs captured from display_data/execute_result */ displayOutputs: KernelDisplayOutput[]; /** Whether stdin was requested */ stdinRequested: boolean; } // --------------------------------------------------------------------------- // Session bookkeeping // // One PythonKernel subprocess per session id. Sessions are reused until they // die or are explicitly disposed. Multiple agent owners can register against // the same session id; the kernel stays alive until the last owner detaches. // --------------------------------------------------------------------------- interface PythonSession { sessionId: string; kernel: PythonKernel; ownerIds: Set; hasFallbackOwner: boolean; queue: Promise; } const sessions = new Map(); // --------------------------------------------------------------------------- // Cancellation plumbing // --------------------------------------------------------------------------- class PythonExecutionCancelledError extends Error { readonly timedOut: boolean; constructor(timedOut: boolean) { super(timedOut ? "Command timed out" : "Command aborted"); this.name = timedOut ? "TimeoutError" : "AbortError"; this.timedOut = timedOut; } } function getExecutionDeadlineMs(options?: Pick): number | undefined { if (options?.deadlineMs !== undefined) return options.deadlineMs; if (options?.timeoutMs === undefined) return undefined; return Date.now() + options.timeoutMs; } function getRemainingTimeoutMs(deadlineMs?: number): number | undefined { if (deadlineMs === undefined) return undefined; return deadlineMs - Date.now(); } function requireRemainingTimeoutMs(deadlineMs?: number): number | undefined { const remainingMs = getRemainingTimeoutMs(deadlineMs); if (remainingMs === undefined) return undefined; if (remainingMs <= 0) { throw new PythonExecutionCancelledError(true); } return remainingMs; } function isCancellationError(error: unknown): boolean { return ( error instanceof PythonExecutionCancelledError || (error instanceof DOMException && (error.name === "AbortError" || error.name === "TimeoutError")) || (error instanceof Error && (error.name === "AbortError" || error.name === "TimeoutError")) ); } function isTimedOutCancellation(error: unknown, signal?: AbortSignal): boolean { if (error instanceof PythonExecutionCancelledError) return error.timedOut; if (error instanceof DOMException) return error.name === "TimeoutError"; if (error instanceof Error && error.name === "TimeoutError") return true; const reason = signal?.reason; if (reason instanceof DOMException) return reason.name === "TimeoutError"; return reason instanceof Error ? reason.name === "TimeoutError" : false; } async function waitForPromiseWithCancellation( promise: Promise, options: Pick, ): Promise { if (options.signal?.aborted) { throw new PythonExecutionCancelledError(isTimedOutCancellation(options.signal.reason, options.signal)); } const remainingMs = getRemainingTimeoutMs(options.deadlineMs); if (remainingMs !== undefined && remainingMs <= 0) { throw new PythonExecutionCancelledError(true); } if (!options.signal && remainingMs === undefined) { return await promise; } const { promise: resultPromise, resolve, reject } = Promise.withResolvers(); const cleanups: Array<() => void> = []; const finish = (cb: () => void): void => { while (cleanups.length > 0) cleanups.pop()?.(); cb(); }; if (options.signal) { const onAbort = (): void => finish(() => reject(new PythonExecutionCancelledError(isTimedOutCancellation(options.signal?.reason, options.signal))), ); options.signal.addEventListener("abort", onAbort, { once: true }); cleanups.push(() => options.signal?.removeEventListener("abort", onAbort)); } if (remainingMs !== undefined) { const timer = setTimeout(() => finish(() => reject(new PythonExecutionCancelledError(true))), remainingMs); timer.unref(); cleanups.push(() => clearTimeout(timer)); } promise.then( value => finish(() => resolve(value)), err => finish(() => reject(err)), ); return await resultPromise; } // --------------------------------------------------------------------------- // Result formatting // --------------------------------------------------------------------------- function formatTimeoutAnnotation(timeoutMs?: number): string | undefined { if (timeoutMs === undefined) return "Command timed out"; const secs = Math.max(1, Math.round(timeoutMs / 1000)); return `Command timed out after ${secs} seconds`; } function formatKernelTimeoutAnnotation(timeoutMs: number | undefined, kernelKilled: boolean): string { const secs = timeoutMs === undefined ? undefined : Math.max(1, Math.round(timeoutMs / 1000)); if (kernelKilled) { return "eval cell timed out and the kernel was unresponsive to interrupt; the kernel has been killed and will be recreated on the next call."; } const duration = secs === undefined ? "the configured timeout" : `${secs}s`; return `eval cell timed out after ${duration}; kernel interrupted but remains running. Reset the kernel via { reset: true } if state appears corrupted.`; } function createCancelledPythonResult(timedOut: boolean, timeoutMs?: number): PythonResult { const output = timedOut ? (formatTimeoutAnnotation(timeoutMs) ?? "Command timed out") : ""; const outputBytes = Buffer.byteLength(output, "utf-8"); const outputLines = output.length > 0 ? 1 : 0; return { output, exitCode: undefined, cancelled: true, truncated: false, totalLines: outputLines, totalBytes: outputBytes, outputLines, outputBytes, displayOutputs: [], stdinRequested: false, }; } // --------------------------------------------------------------------------- // Kernel start helpers // --------------------------------------------------------------------------- function buildKernelEnv(options: { sessionFile?: string; artifactsDir?: string; bridgeSessionId?: string; bridge?: { url: string; token: string }; }): Record | undefined { const env: Record = {}; if (options.sessionFile) env.PI_SESSION_FILE = options.sessionFile; if (options.artifactsDir) env.PI_ARTIFACTS_DIR = options.artifactsDir; if (options.bridge && options.bridgeSessionId) { env.PI_TOOL_BRIDGE_URL = options.bridge.url; env.PI_TOOL_BRIDGE_TOKEN = options.bridge.token; env.PI_TOOL_BRIDGE_SESSION = options.bridgeSessionId; } return Object.keys(env).length > 0 ? env : undefined; } async function startKernel(cwd: string, options: PythonExecutorOptions): Promise { requireRemainingTimeoutMs(options.deadlineMs); return await PythonKernel.start({ cwd, env: buildKernelEnv(options), signal: options.signal, deadlineMs: options.deadlineMs, }); } function attachOwner(session: PythonSession, sessionId: string, ownerId: string | undefined): void { if (ownerId !== undefined) { if (session.hasFallbackOwner) { session.ownerIds.delete(sessionId); session.hasFallbackOwner = false; } session.ownerIds.add(ownerId); return; } if (session.hasFallbackOwner || session.ownerIds.size === 0) { session.ownerIds.add(sessionId); session.hasFallbackOwner = true; } } async function acquireSession(sessionId: string, cwd: string, options: PythonExecutorOptions): Promise { const existing = sessions.get(sessionId); if (existing) { attachOwner(existing, sessionId, options.kernelOwnerId); return existing; } const kernel = await startKernel(cwd, options); const session: PythonSession = { sessionId, kernel, ownerIds: new Set(), hasFallbackOwner: false, queue: Promise.resolve(), }; attachOwner(session, sessionId, options.kernelOwnerId); sessions.set(sessionId, session); return session; } async function replaceSessionKernel( session: PythonSession, cwd: string, options: PythonExecutorOptions, ): Promise { const old = session.kernel; const remaining = getRemainingTimeoutMs(options.deadlineMs); await old .shutdown(remaining !== undefined ? { timeoutMs: Math.max(0, remaining) } : undefined) .catch(() => undefined); if (sessions.get(session.sessionId) !== session) { throw new PythonExecutionCancelledError(false); } requireRemainingTimeoutMs(options.deadlineMs); const next = await startKernel(cwd, options); if (sessions.get(session.sessionId) !== session) { await next.shutdown().catch(() => undefined); throw new PythonExecutionCancelledError(false); } session.kernel = next; } async function resetSession(sessionId: string): Promise { const existing = sessions.get(sessionId); if (!existing) return; sessions.delete(sessionId); await existing.kernel.shutdown().catch(() => undefined); } async function runQueued( session: PythonSession, options: Pick, work: () => Promise, ): Promise { const previous = session.queue; const { promise: ourSlot, resolve: releaseSlot } = Promise.withResolvers(); // Keep the queue chained even if WE bail out: future runs must still wait // for `previous` to finish before they touch the kernel. session.queue = previous.catch(() => undefined).then(() => ourSlot); try { await waitForPromiseWithCancellation( previous.catch(() => undefined), options, ); return await work(); } finally { releaseSlot(); } } // --------------------------------------------------------------------------- // Public dispose entry points // --------------------------------------------------------------------------- export async function disposeAllKernelSessions(): Promise { const all = [...sessions.entries()]; for (const [id, session] of all) { if (sessions.get(id) === session) sessions.delete(id); } const results = await Promise.allSettled(all.map(([, session]) => session.kernel.shutdown())); for (let i = 0; i < all.length; i += 1) { const [id, session] = all[i]; const result = results[i]; if (result.status === "fulfilled" && result.value?.confirmed !== false) continue; const reason = result.status === "rejected" ? result.reason : "not confirmed"; logger.warn("Python kernel shutdown not confirmed", { sessionId: id, reason }); if (!sessions.has(id)) sessions.set(id, session); } } export async function disposeKernelSessionsByOwner(ownerId: string): Promise { const toShutdown: PythonSession[] = []; for (const session of [...sessions.values()]) { if (!session.ownerIds.has(ownerId)) continue; if (session.ownerIds.size === 1) { toShutdown.push(session); continue; } session.ownerIds.delete(ownerId); } for (const session of toShutdown) { if (sessions.get(session.sessionId) === session) sessions.delete(session.sessionId); } const results = await Promise.allSettled(toShutdown.map(session => session.kernel.shutdown())); for (let i = 0; i < toShutdown.length; i += 1) { const session = toShutdown[i]; const result = results[i]; if (result.status === "fulfilled" && result.value?.confirmed !== false) { session.ownerIds.delete(ownerId); continue; } const reason = result.status === "rejected" ? result.reason : "not confirmed"; logger.warn("Python kernel shutdown not confirmed", { sessionId: session.sessionId, reason }); if (!sessions.has(session.sessionId)) sessions.set(session.sessionId, session); } } // --------------------------------------------------------------------------- // Execution // --------------------------------------------------------------------------- async function executeWithKernel( kernel: PythonKernelExecutor, code: string, options: PythonExecutorOptions | undefined, ): Promise { const settings = await Settings.init(); const sink = new OutputSink({ onChunk: options?.onChunk, artifactPath: options?.artifactPath, artifactId: options?.artifactId, headBytes: resolveOutputSinkHeadBytes(settings), maxColumns: resolveOutputMaxColumns(settings), }); const displayOutputs: KernelDisplayOutput[] = []; const deadlineMs = getExecutionDeadlineMs(options); let executionTimeoutMs: number | undefined; const emitStatus = options?.emitStatus ?? ((event: JsStatusEvent) => { displayOutputs.push({ type: "status", event }); }); const unregisterBridge = options?.toolSession && options?.bridgeSessionId ? registerPyToolBridge(options.bridgeSessionId, { toolSession: options.toolSession, signal: options.signal, emitStatus, }) : null; try { executionTimeoutMs = requireRemainingTimeoutMs(deadlineMs); const result = await kernel.execute(code, { signal: options?.signal, timeoutMs: executionTimeoutMs, onChunk: text => sink.push(text), onDisplay: output => void displayOutputs.push(output), }); if (result.cancelled) { const annotation = result.timedOut ? formatKernelTimeoutAnnotation(executionTimeoutMs, result.kernelKilled ?? false) : undefined; return { exitCode: undefined, cancelled: true, displayOutputs, stdinRequested: result.stdinRequested, ...(await sink.dump(annotation)), }; } if (result.stdinRequested) { return { exitCode: 1, cancelled: false, displayOutputs, stdinRequested: true, ...(await sink.dump("Kernel requested stdin; interactive input is not supported.")), }; } const exitCode = result.status === "ok" ? 0 : 1; return { exitCode, cancelled: false, displayOutputs, stdinRequested: false, ...(await sink.dump()), }; } catch (err) { if (isCancellationError(err) || options?.signal?.aborted) { const timedOut = isTimedOutCancellation(err, options?.signal); return { exitCode: undefined, cancelled: true, displayOutputs, stdinRequested: false, ...(await sink.dump(timedOut ? formatTimeoutAnnotation(executionTimeoutMs) : undefined)), }; } const error = err instanceof Error ? err : new Error(String(err)); logger.error("Python execution failed", { error: error.message }); throw error; } finally { unregisterBridge?.(); } } async function ensureKernelAvailable(cwd: string, options: PythonExecutorOptions): Promise { const availability = await waitForPromiseWithCancellation(checkPythonKernelAvailability(cwd), options); if (!availability.ok) { throw new Error(availability.reason ?? "Python kernel unavailable"); } } async function ensureToolBridge(options: PythonExecutorOptions): Promise { if (!options.toolSession || options.bridge) return; try { options.bridge = await ensurePyToolBridge(); } catch (err) { logger.warn("Failed to start Python tool bridge", { error: err instanceof Error ? err.message : String(err), }); } } async function executePerCall(code: string, cwd: string, options: PythonExecutorOptions): Promise { if (options.bridge && !options.bridgeSessionId) { options.bridgeSessionId = `py-bridge:${crypto.randomUUID()}`; } const kernel = await startKernel(cwd, options); try { return await executeWithKernel(kernel, code, options); } finally { await kernel.shutdown().catch(() => undefined); } } async function executeOnSession(code: string, cwd: string, options: PythonExecutorOptions): Promise { const sessionId = options.sessionId ?? `session:${cwd}`; if (options.bridge && !options.bridgeSessionId) { options.bridgeSessionId = sessionId; } if (options.reset) { await resetSession(sessionId); } const session = await acquireSession(sessionId, cwd, options); return await runQueued(session, options, async () => { if (options.signal?.aborted) { throw new PythonExecutionCancelledError(isTimedOutCancellation(options.signal.reason, options.signal)); } if (sessions.get(session.sessionId) !== session) { throw new PythonExecutionCancelledError(false); } if (!session.kernel.isAlive()) { await replaceSessionKernel(session, cwd, options); if (sessions.get(session.sessionId) !== session) { throw new PythonExecutionCancelledError(false); } } try { return await executeWithKernel(session.kernel, code, options); } catch (err) { if (isCancellationError(err) || options.signal?.aborted) throw err; if (session.kernel.isAlive()) throw err; if (sessions.get(session.sessionId) !== session) { throw new PythonExecutionCancelledError(false); } // Kernel died during execute. Replace it and retry once on a fresh one. await replaceSessionKernel(session, cwd, options); if (sessions.get(session.sessionId) !== session) { throw new PythonExecutionCancelledError(false); } return await executeWithKernel(session.kernel, code, options); } }); } export async function executePythonWithKernel( kernel: PythonKernelExecutor, code: string, options?: PythonExecutorOptions, ): Promise { return await executeWithKernel(kernel, code, options); } export async function executePython(code: string, options?: PythonExecutorOptions): Promise { const cwd = options?.cwd ?? getProjectDir(); const deadlineMs = getExecutionDeadlineMs(options); const executionOptions: PythonExecutorOptions = { ...(options ?? {}), deadlineMs, }; try { requireRemainingTimeoutMs(deadlineMs); if (executionOptions.signal?.aborted) { throw new PythonExecutionCancelledError( isTimedOutCancellation(executionOptions.signal.reason, executionOptions.signal), ); } await ensureKernelAvailable(cwd, executionOptions); await ensureToolBridge(executionOptions); const kernelMode = executionOptions.kernelMode ?? "session"; if (kernelMode === "per-call") { return await executePerCall(code, cwd, executionOptions); } return await executeOnSession(code, cwd, executionOptions); } catch (err) { if (isCancellationError(err) || executionOptions.signal?.aborted) { return createCancelledPythonResult(isTimedOutCancellation(err, executionOptions.signal)); } throw err; } }