/**
* OllamaExecutor — local Ollama via `ollama` npm package.
* Default endpoint: http://localhost:11434
*/
import type { IExecutor, ExecutorConfig, ExecutorStreamEvent } from "./types.ts";
export class OllamaExecutor implements IExecutor {
readonly executorId: string;
readonly executorType = "ollama" as const;
private messages: Array<{ role: "system" | "user" | "assistant"; content: string }> = [];
private pendingContent: string | null = null;
private model: string;
private host: string;
private systemPrompt: string;
private _turnCount = 0;
private static readonly MAX_TURNS = 50;
private static readonly WARN_TURNS = 40;
private pendingSummary: string | null = null;
private cancelled = false;
constructor(config: ExecutorConfig) {
this.executorId = config.sessionId;
this.model = config.model || "llama3.2";
this.host = config.baseURL || process.env.OLLAMA_HOST || "http://localhost:11434";
this.systemPrompt = config.systemPrompt || "";
if (this.systemPrompt) {
this.messages.push({ role: "system", content: this.systemPrompt });
}
}
sendMessage(content: string): void {
if (!content?.trim()) {
console.warn(`[Executor] sendMessage called with empty content, ignoring`);
return;
}
this.cancelled = false; // reset before new request; interrupt() sets this to true
let msg = content;
if (this.pendingSummary) {
msg = `\n${this.pendingSummary}\n\n\n${content}`;
this.pendingSummary = null;
}
this.pendingContent = msg;
}
async *getOutputStream(): AsyncGenerator {
if (!this.pendingContent) {
yield { type: "error", error: "No message queued" };
return;
}
const userMsg = this.pendingContent;
this.pendingContent = null;
let ollama: any;
try {
const pkg = await import("ollama");
const { Ollama } = pkg;
ollama = new Ollama({ host: this.host });
} catch {
yield { type: "error", error: "ollama package not installed. Run: npm install ollama" };
return;
}
let assistantText = "";
let userPushed = false;
try {
this.messages.push({ role: "user", content: userMsg });
userPushed = true;
// Wrap chat() in a timeout race — ollama npm does not support AbortSignal directly,
// so a firewall-dropped connection would otherwise hang indefinitely.
const chatPromise = ollama.chat({
model: this.model,
messages: this.messages,
stream: true,
});
let timeoutHandle: ReturnType | undefined;
const timeoutPromise = new Promise((_, reject) => {
timeoutHandle = setTimeout(() => reject(new Error("Ollama request timed out after 30s")), 30_000);
});
let stream: any;
try {
stream = await Promise.race([chatPromise, timeoutPromise]);
} finally {
clearTimeout(timeoutHandle);
}
for await (const part of stream) {
if (this.cancelled) break;
const text = part.message?.content;
if (text) {
assistantText += text;
yield { type: "text_delta", text };
}
}
if (this.cancelled) {
if (assistantText) {
this.messages.push({ role: "assistant", content: assistantText });
} else if (userPushed) {
this.messages.pop();
}
} else {
this.messages.push({ role: "assistant", content: assistantText });
yield { type: "result", cost: null, duration: null };
}
} catch (err) {
if (userPushed) this.messages.pop();
yield { type: "error", error: err instanceof Error ? err.message : String(err) };
}
}
interrupt(): void {
this.pendingContent = null;
this.cancelled = true;
}
get turnCount(): number { return this._turnCount; }
get shouldRotate(): boolean { return this._turnCount >= OllamaExecutor.MAX_TURNS; }
get shouldWarnRotation(): boolean { return this._turnCount >= OllamaExecutor.WARN_TURNS && !this.shouldRotate; }
incrementTurn(): void { this._turnCount++; }
async rotate(): Promise {
const sys = this.messages.find(m => m.role === "system");
const hist = this.messages.filter(m => m.role !== "system").map(m => `${m.role}: ${m.content.slice(0, 300)}`).join("\n");
this.pendingSummary = hist.slice(0, 2000);
this.messages = sys ? [sys] : [];
this._turnCount = 0;
}
}