import { isCompiledBinary, logger, Snowflake } from "@oh-my-pi/pi-utils"; import type { ToolSession } from "../../tools"; import { ToolAbortError, ToolError } from "../../tools/tool-errors"; import { callSessionTool, type JsStatusEvent } from "./tool-bridge"; import { WorkerCore } from "./worker-core"; // Worker entry. See `tab-supervisor.ts` for the rationale behind the // literal-string + `new URL(import.meta.url)` hybrid: the literal is what // Bun's `--compile` bundler discovers, the `new URL` form is what makes dev // runs portable across cwds. The worker is registered as an additional // `--compile` entrypoint in `scripts/build-binary.ts`. import type { JsDisplayOutput, RunErrorPayload, SessionSnapshot, Transport, WorkerInbound, WorkerOutbound, } from "./worker-protocol"; export { rewriteImports, wrapCode } from "./shared/rewrite-imports"; export type { JsDisplayOutput } from "./worker-protocol"; export interface VmRunState { signal?: AbortSignal; onText?: (chunk: string) => void; onDisplay?: (output: JsDisplayOutput) => void; } interface WorkerHandle { mode: "worker" | "inline"; send(msg: WorkerInbound): void; onMessage(handler: (msg: WorkerOutbound) => void): () => void; terminate(): Promise; } interface PendingRun { runId: string; runState: VmRunState; toolSession: ToolSession; resolve(value: { value: unknown }): void; reject(error: Error): void; toolCalls: Map; settled: boolean; } interface JsSession { sessionKey: string; worker: WorkerHandle; state: "alive" | "dead"; pending: Map; queue: Promise; } const sessions = new Map(); const READY_TIMEOUT_MS_DEFAULT = 5_000; export async function executeInVmContext(options: { sessionKey: string; sessionId: string; cwd: string; session: ToolSession; reset?: boolean; code: string; filename: string; timeoutMs?: number; runState: VmRunState; }): Promise<{ value: unknown }> { if (options.reset) { await resetVmContext(options.sessionKey); } const session = await acquireSession( options.sessionKey, { cwd: options.cwd, sessionId: options.sessionId }, options.timeoutMs, ); return await runQueued(session, () => runOnce(session, options)); } export async function resetVmContext(sessionKey: string): Promise { const session = sessions.get(sessionKey); if (!session) return; sessions.delete(sessionKey); await killSession(session, new ToolError("JS context reset")); } export async function disposeAllVmContexts(): Promise { const all = [...sessions.values()]; sessions.clear(); await Promise.all(all.map(session => killSession(session, new ToolError("JS context disposed")))); } async function runQueued(session: JsSession, work: () => Promise): Promise { const previous = session.queue; const { promise, resolve } = Promise.withResolvers(); session.queue = promise; try { await previous; } catch { // Previous run's failure must not poison this one. } try { return await work(); } finally { resolve(); } } async function runOnce( session: JsSession, options: { sessionId: string; cwd: string; session: ToolSession; code: string; filename: string; runState: VmRunState; }, ): Promise<{ value: unknown }> { const runId = `r-${Snowflake.next()}`; const { promise, resolve, reject } = Promise.withResolvers<{ value: unknown }>(); const pending: PendingRun = { runId, runState: options.runState, toolSession: options.session, resolve, reject, toolCalls: new Map(), settled: false, }; session.pending.set(runId, pending); const onAbort = (): void => { const reason = options.runState.signal?.reason; const abortError = reasonToError(reason, "Execution aborted"); // Cancel any in-flight tool calls first. for (const ctrl of pending.toolCalls.values()) ctrl.abort(abortError); // Hard-kill the worker — only way to interrupt synchronous user code. void killSessionFor(session, abortError); }; if (options.runState.signal?.aborted) { queueMicrotask(onAbort); } else { options.runState.signal?.addEventListener("abort", onAbort, { once: true }); } try { session.worker.send({ type: "run", runId, code: options.code, filename: options.filename, snapshot: { cwd: options.cwd, sessionId: options.sessionId }, }); return await promise; } finally { options.runState.signal?.removeEventListener("abort", onAbort); session.pending.delete(runId); } } async function acquireSession(sessionKey: string, snapshot: SessionSnapshot, timeoutMs?: number): Promise { const existing = sessions.get(sessionKey); if (existing && existing.state === "alive") return existing; const worker = await spawnJsWorker(); const session: JsSession = { sessionKey, worker, state: "alive", pending: new Map(), queue: Promise.resolve(), }; const { promise: readyPromise, resolve: resolveReady, reject: rejectReady } = Promise.withResolvers(); let resolved = false; const unsubscribe = worker.onMessage(msg => { if (!resolved && msg.type === "ready") { resolved = true; resolveReady(); return; } if (!resolved && msg.type === "init-failed") { resolved = true; rejectReady(errorFromPayload(msg.error)); return; } handleSessionMessage(session, msg); }); try { // Cold-start can exceed 5s on slow hosts. Let the caller's per-cell timeout dominate so // users can grant more headroom when they raise `timeout` on a cell. const readyTimeoutMs = Math.max(READY_TIMEOUT_MS_DEFAULT, timeoutMs ?? 0); await raceWithTimeout(readyPromise, readyTimeoutMs, "Timed out initializing JS eval worker"); } catch (error) { unsubscribe(); await worker.terminate().catch(() => undefined); throw error; } worker.send({ type: "init", snapshot }); sessions.set(sessionKey, session); return session; } function handleSessionMessage(session: JsSession, msg: WorkerOutbound): void { switch (msg.type) { case "text": { const pending = session.pending.get(msg.runId); pending?.runState.onText?.(msg.chunk); return; } case "display": { const pending = session.pending.get(msg.runId); pending?.runState.onDisplay?.(msg.output); return; } case "tool-call": void handleToolCall(session, msg); return; case "result": settlePending(session, msg); return; case "log": logWorkerMessage(msg); return; case "ready": case "init-failed": case "closed": return; } } async function handleToolCall(session: JsSession, msg: Extract): Promise { const pending = session.pending.get(msg.runId); if (!pending) { safeSend(session, { type: "tool-reply", id: msg.id, reply: { ok: false, error: { message: "Run no longer active" } }, }); return; } const ctrl = new AbortController(); pending.toolCalls.set(msg.id, ctrl); try { const value = await callSessionTool(msg.name, msg.args, { session: pending.toolSession, signal: ctrl.signal, emitStatus: (event: JsStatusEvent) => pending.runState.onDisplay?.({ type: "status", event }), }); safeSend(session, { type: "tool-reply", id: msg.id, reply: { ok: true, value } }); } catch (error) { safeSend(session, { type: "tool-reply", id: msg.id, reply: { ok: false, error: toErrorPayload(error) } }); } finally { pending.toolCalls.delete(msg.id); } } function settlePending(session: JsSession, msg: Extract): void { const pending = session.pending.get(msg.runId); if (!pending || pending.settled) return; pending.settled = true; if (msg.ok) { pending.resolve({ value: undefined }); return; } pending.reject(errorFromPayload(msg.error)); } async function killSessionFor(session: JsSession, error: Error): Promise { if (sessions.get(session.sessionKey) === session) { sessions.delete(session.sessionKey); } await killSession(session, error); } async function killSession(session: JsSession, error: Error): Promise { if (session.state === "dead") return; session.state = "dead"; for (const pending of session.pending.values()) { if (pending.settled) continue; pending.settled = true; for (const ctrl of pending.toolCalls.values()) ctrl.abort(error); pending.reject(error); } session.pending.clear(); await session.worker.terminate().catch(() => undefined); } function safeSend(session: JsSession, msg: WorkerInbound): void { if (session.state !== "alive") return; try { session.worker.send(msg); } catch (err) { logger.debug("js worker send failed", { error: err instanceof Error ? err.message : String(err) }); } } function reasonToError(reason: unknown, fallback: string): Error { if (reason instanceof Error) return reason; if (typeof reason === "string") return new ToolAbortError(reason); return new ToolAbortError(fallback); } function errorFromPayload(payload: RunErrorPayload): Error { if (payload.isAbort) { const err = new ToolAbortError(payload.message || "Execution aborted"); if (payload.stack) err.stack = payload.stack; return err; } const ctor = payload.isToolError ? ToolError : Error; const error = new ctor(payload.message); if (payload.name) error.name = payload.name; if (payload.stack) error.stack = payload.stack; return error; } function toErrorPayload(error: unknown): RunErrorPayload { if (error instanceof Error) { return { name: error.name, message: error.message, stack: error.stack, isAbort: error.name === "AbortError" || error.name === "ToolAbortError", isToolError: error instanceof ToolError || error.name === "ToolError", }; } return { message: String(error) }; } function logWorkerMessage(msg: Extract): void { if (msg.level === "debug") logger.debug(msg.msg, msg.meta); else if (msg.level === "warn") logger.warn(msg.msg, msg.meta); else logger.error(msg.msg, msg.meta); } async function raceWithTimeout(promise: Promise, timeoutMs: number, reason: string): Promise { const timeoutSignal = AbortSignal.timeout(timeoutMs); const { promise: timeoutPromise, reject } = Promise.withResolvers(); const onAbort = (): void => reject(new ToolError(reason)); timeoutSignal.addEventListener("abort", onAbort, { once: true }); try { return await Promise.race([promise, timeoutPromise]); } finally { timeoutSignal.removeEventListener("abort", onAbort); } } async function spawnJsWorker(): Promise { try { const worker = isCompiledBinary() ? new Worker("./packages/coding-agent/src/eval/js/worker-entry.ts", { type: "module" }) : new Worker(new URL("./worker-entry.ts", import.meta.url).href, { type: "module" }); return wrapBunWorker(worker); } catch (err) { logger.warn("Bun Worker spawn failed; using inline JS eval worker (no sync-loop guard)", { error: err instanceof Error ? err.message : String(err), }); return spawnInlineWorker(); } } function wrapBunWorker(worker: Worker): WorkerHandle { return { mode: "worker", send(msg) { worker.postMessage(msg); }, onMessage(handler) { const wrap = (event: MessageEvent): void => handler(event.data as WorkerOutbound); worker.addEventListener("message", wrap); return () => worker.removeEventListener("message", wrap); }, async terminate() { worker.terminate(); }, }; } /** * Inline fallback for environments where Bun cannot spawn the worker entry * (e.g. some test runners). Preserves behavior but cannot interrupt synchronous * infinite loops because user code runs on the main thread. */ function spawnInlineWorker(): WorkerHandle { const hostListeners = new Set<(message: WorkerOutbound) => void>(); const workerListeners = new Set<(message: WorkerInbound) => void>(); const workerTransport: Transport = { send: msg => queueMicrotask(() => { for (const listener of hostListeners) listener(msg); }), onMessage: handler => { workerListeners.add(handler); return () => workerListeners.delete(handler); }, close: () => {}, }; new WorkerCore(workerTransport); return { mode: "inline", send: msg => queueMicrotask(() => { for (const listener of workerListeners) listener(msg); }), onMessage: handler => { hostListeners.add(handler); return () => hostListeners.delete(handler); }, async terminate() { hostListeners.clear(); workerListeners.clear(); }, }; }