import type { Database } from "bun:sqlite"; import type * as fsNode from "node:fs"; import * as fs from "node:fs/promises"; import * as path from "node:path"; import type { AgentMessage } from "@oh-my-pi/pi-agent-core"; import { completeSimple, Effort, type Model } from "@oh-my-pi/pi-ai"; import { getAgentDbPath, getMemoriesDir, logger, parseJsonlLenient, prompt } from "@oh-my-pi/pi-utils"; import type { ModelRegistry } from "../config/model-registry"; import { resolveModelRoleValue } from "../config/model-resolver"; import type { Settings } from "../config/settings"; import consolidationTemplate from "../prompts/memories/consolidation.md" with { type: "text" }; import readPathTemplate from "../prompts/memories/read-path.md" with { type: "text" }; import stageOneInputTemplate from "../prompts/memories/stage_one_input.md" with { type: "text" }; import stageOneSystemTemplate from "../prompts/memories/stage_one_system.md" with { type: "text" }; import type { AgentSession } from "../session/agent-session"; import { claimStage1Jobs, clearMemoryData as clearMemoryDataInDb, closeMemoryDb, enqueueGlobalWatermark, heartbeatGlobalJob, listStage1OutputsForGlobal, type MemoryThread, markGlobalPhase2Failed, markGlobalPhase2FailedUnowned, markGlobalPhase2Succeeded, markStage1Failed, markStage1SucceededNoOutput, markStage1SucceededWithOutput, openMemoryDb, type Stage1Claim, type Stage1OutputRow, tryClaimGlobalPhase2Job, upsertThreads, } from "./storage"; interface MemoryRuntimeConfig { enabled: boolean; maxRolloutsPerStartup: number; maxRolloutAgeDays: number; minRolloutIdleHours: number; threadScanLimit: number; maxRawMemoriesForGlobal: number; stage1Concurrency: number; stage1LeaseSeconds: number; stage1RetryDelaySeconds: number; phase2LeaseSeconds: number; phase2RetryDelaySeconds: number; phase2HeartbeatSeconds: number; rolloutPayloadPercent: number; phase1InputTokenLimit: number; fallbackTokenLimit: number; summaryInjectionTokenLimit: number; } const DEFAULTS: MemoryRuntimeConfig = { enabled: false, maxRolloutsPerStartup: 64, maxRolloutAgeDays: 30, minRolloutIdleHours: 12, threadScanLimit: 300, maxRawMemoriesForGlobal: 200, stage1Concurrency: 8, stage1LeaseSeconds: 120, stage1RetryDelaySeconds: 120, phase2LeaseSeconds: 180, phase2RetryDelaySeconds: 180, phase2HeartbeatSeconds: 30, rolloutPayloadPercent: 0.7, phase1InputTokenLimit: 4_000, fallbackTokenLimit: 16_000, summaryInjectionTokenLimit: 5_000, }; interface Stage1Stats { claimed: number; succeeded: number; succeededNoOutput: number; failed: number; produced: number; usage: { input: number; output: number; cacheRead: number; cacheWrite: number; total: number; }; } interface Stage1OutputSchema { raw_memory: string; rollout_summary: string; rollout_slug: string | null; } interface ConsolidationSkillFileSchema { path: string; content: string; } interface ConsolidationSkillSchema { name: string; content?: string; scripts?: ConsolidationSkillFileSchema[]; templates?: ConsolidationSkillFileSchema[]; examples?: ConsolidationSkillFileSchema[]; } interface ConsolidationOutputSchema { memory_md: string; memory_summary: string; skills: ConsolidationSkillSchema[]; } /** * Start the background memory startup pipeline. * * Skips for ephemeral sessions, subagent sessions, disabled settings, or DB failures. */ export function startMemoryStartupTask(options: { session: AgentSession; settings: Settings; modelRegistry: ModelRegistry; agentDir: string; taskDepth: number; }): void { const { session, settings, modelRegistry, agentDir, taskDepth } = options; const cfg = loadMemoryConfig(settings); if (!cfg.enabled) return; if (taskDepth > 0) return; if (!session.sessionManager.getSessionFile()) return; const dbPath = getAgentDbPath(agentDir); try { const db = openMemoryDb(dbPath); closeMemoryDb(db); } catch (error) { logger.debug("Memory startup skipped: state DB unavailable", { error: String(error) }); return; } void runMemoryStartup({ session, settings, modelRegistry, agentDir, config: cfg }).catch(error => { logger.warn("Memory startup failed", { error: String(error) }); }); } /** * Build memory usage instructions for prompt injection. */ export async function buildMemoryToolDeveloperInstructions( agentDir: string, settings: Settings, ): Promise { const cfg = loadMemoryConfig(settings); if (!cfg.enabled) return undefined; const memoryRoot = getMemoryRoot(agentDir, settings.getCwd()); const summaryPath = path.join(memoryRoot, "memory_summary.md"); let text: string; try { text = await Bun.file(summaryPath).text(); } catch { return undefined; } const summary = text.trim(); if (!summary) return undefined; const truncated = truncateByApproxTokens(summary, cfg.summaryInjectionTokenLimit); if (!truncated.trim()) return undefined; return prompt.render(readPathTemplate, { memory_summary: truncated, }); } /** * Clear all persisted memory state and generated artifacts. */ export async function clearMemoryData(agentDir: string, cwd: string): Promise { const db = openMemoryDb(getAgentDbPath(agentDir)); try { clearMemoryDataInDb(db); } finally { closeMemoryDb(db); } await fs.rm(getMemoryRoot(agentDir, cwd), { recursive: true, force: true }); } /** * Force-enqueue global consolidation maintenance work. */ export function enqueueMemoryConsolidation(agentDir: string, cwd: string, sourceUpdatedAt = unixNow()): void { const db = openMemoryDb(getAgentDbPath(agentDir)); try { enqueueGlobalWatermark(db, sourceUpdatedAt, cwd, { forceDirtyWhenNotAdvanced: true }); } finally { closeMemoryDb(db); } } async function runMemoryStartup(options: { session: AgentSession; settings: Settings; modelRegistry: ModelRegistry; agentDir: string; config: MemoryRuntimeConfig; }): Promise { await runPhase1(options); await runPhase2(options); await options.session.refreshBaseSystemPrompt?.(); } async function runPhase1(options: { session: AgentSession; settings: Settings; modelRegistry: ModelRegistry; agentDir: string; config: MemoryRuntimeConfig; }): Promise { const { session, modelRegistry, agentDir, config } = options; const db = openMemoryDb(getAgentDbPath(agentDir)); const nowSec = unixNow(); const workerId = `memory-${process.pid}`; const memoryRoot = getMemoryRoot(agentDir, session.sessionManager.getCwd()); const currentThreadId = session.sessionManager.getSessionId(); try { const threads = await collectThreads(session, currentThreadId); upsertThreads(db, threads); const phase1Model = await resolveMemoryModel({ modelRegistry, session, fallbackRole: "default", }); if (!phase1Model) { logger.debug("Phase1 skipped: no model available"); return; } const phase1ApiKey = await modelRegistry.getApiKey(phase1Model, session.sessionId); if (!phase1ApiKey) { logger.debug("Phase1 skipped: no API key for phase1 model", { provider: phase1Model.provider, model: phase1Model.id, }); return; } const claims = claimStage1Jobs(db, { nowSec, threadScanLimit: config.threadScanLimit, maxRolloutsPerStartup: config.maxRolloutsPerStartup, maxRolloutAgeDays: config.maxRolloutAgeDays, minRolloutIdleHours: config.minRolloutIdleHours, leaseSeconds: config.stage1LeaseSeconds, runningConcurrencyCap: config.stage1Concurrency, workerId, excludeThreadIds: currentThreadId ? [currentThreadId] : [], }); if (claims.length === 0) return; const stats: Stage1Stats = { claimed: claims.length, succeeded: 0, succeededNoOutput: 0, failed: 0, produced: 0, usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, }; await runWithConcurrency(claims, config.stage1Concurrency, async claim => { const result = await runStage1Job({ claim, model: phase1Model, apiKey: phase1ApiKey, modelMaxTokens: computeModelTokenBudget(phase1Model, config), config, metadata: session.agent?.metadataForProvider(phase1Model.provider), }); if (result.kind === "failed") { logger.error("Memory phase1 stage1 job failed", { threadId: claim.threadId, rolloutPath: claim.rolloutPath, reason: result.reason, }); markStage1Failed(db, { threadId: claim.threadId, ownershipToken: claim.ownershipToken, retryDelaySeconds: config.stage1RetryDelaySeconds, reason: result.reason, nowSec: unixNow(), }); stats.failed += 1; return; } if (result.kind === "no_output") { markStage1SucceededNoOutput(db, { threadId: claim.threadId, ownershipToken: claim.ownershipToken, sourceUpdatedAt: claim.sourceUpdatedAt, nowSec: unixNow(), cwd: claim.cwd, }); stats.succeededNoOutput += 1; return; } markStage1SucceededWithOutput(db, { threadId: claim.threadId, ownershipToken: claim.ownershipToken, sourceUpdatedAt: claim.sourceUpdatedAt, rawMemory: result.output.rawMemory, rolloutSummary: result.output.rolloutSummary, rolloutSlug: result.output.rolloutSlug, nowSec: unixNow(), cwd: claim.cwd, }); stats.succeeded += 1; stats.produced += 1; if (result.usage) { stats.usage.input += result.usage.input; stats.usage.output += result.usage.output; stats.usage.cacheRead += result.usage.cacheRead; stats.usage.cacheWrite += result.usage.cacheWrite; stats.usage.total += result.usage.totalTokens || 0; } }); logger.debug("Memory phase1 completed", { memoryRoot, claimed: stats.claimed, succeeded: stats.succeeded, succeededNoOutput: stats.succeededNoOutput, failed: stats.failed, produced: stats.produced, usage: stats.usage, }); } finally { closeMemoryDb(db); } } async function runPhase2(options: { session: AgentSession; settings: Settings; modelRegistry: ModelRegistry; agentDir: string; config: MemoryRuntimeConfig; }): Promise { const { session, modelRegistry, agentDir, config } = options; const cwd = session.sessionManager.getCwd(); const db = openMemoryDb(getAgentDbPath(agentDir)); const nowSec = unixNow(); const workerId = `memory-${process.pid}`; const memoryRoot = getMemoryRoot(agentDir, cwd); try { const claimResult = tryClaimGlobalPhase2Job(db, { workerId, leaseSeconds: config.phase2LeaseSeconds, nowSec, cwd, }); if (claimResult.kind !== "claimed") return; const claim = claimResult.claim; const outputs = listStage1OutputsForGlobal(db, config.maxRawMemoriesForGlobal, cwd); const newWatermark = computeCompletionWatermark(claim.inputWatermark, outputs); await syncPhase2Artifacts(memoryRoot, outputs); if (outputs.length === 0) { await cleanupConsolidatedArtifacts(memoryRoot); const marked = markGlobalPhase2Succeeded(db, { ownershipToken: claim.ownershipToken, newWatermark, nowSec: unixNow(), cwd, }); if (!marked) { logger.warn("Phase2 empty-input completion lost ownership", { memoryRoot }); } return; } const phase2Model = await resolveMemoryModel({ modelRegistry, session, fallbackRole: "smol", }); if (!phase2Model) { markPhase2FailureWithFallback(db, { claim, retryDelaySeconds: config.phase2RetryDelaySeconds, reason: "No model available for phase2", memoryRoot, cwd, }); return; } const phase2ApiKey = await modelRegistry.getApiKey(phase2Model, session.sessionId); if (!phase2ApiKey) { markPhase2FailureWithFallback(db, { claim, retryDelaySeconds: config.phase2RetryDelaySeconds, reason: "No API key available for phase2", memoryRoot, cwd, }); return; } let heartbeatLostOwnership = false; const heartbeat = setInterval(() => { const ok = heartbeatGlobalJob(db, { ownershipToken: claim.ownershipToken, leaseSeconds: config.phase2LeaseSeconds, nowSec: unixNow(), cwd, }); if (!ok) { heartbeatLostOwnership = true; clearInterval(heartbeat); } }, config.phase2HeartbeatSeconds * 1000); try { const consolidated = await runConsolidationModel({ memoryRoot, model: phase2Model, apiKey: phase2ApiKey, metadata: session.agent?.metadataForProvider(phase2Model.provider), }); await applyConsolidation(memoryRoot, consolidated); if (heartbeatLostOwnership) { throw new Error("Phase2 lease ownership lost before completion"); } const marked = markGlobalPhase2Succeeded(db, { ownershipToken: claim.ownershipToken, newWatermark, nowSec: unixNow(), cwd, }); if (!marked) { throw new Error("Phase2 could not mark success: ownership lost"); } } catch (error) { markPhase2FailureWithFallback(db, { claim, retryDelaySeconds: config.phase2RetryDelaySeconds, reason: String(error), memoryRoot, cwd, error, }); } finally { clearInterval(heartbeat); } } finally { closeMemoryDb(db); } } function markPhase2FailureWithFallback( db: Database, params: { claim: { ownershipToken: string; inputWatermark: number }; retryDelaySeconds: number; reason: string; memoryRoot: string; cwd: string; error?: unknown; }, ): void { const { claim, retryDelaySeconds, reason, memoryRoot, cwd, error } = params; const nowSec = unixNow(); const strictFailed = markGlobalPhase2Failed(db, { ownershipToken: claim.ownershipToken, retryDelaySeconds, reason, nowSec, cwd, }); if (strictFailed) return; const unownedFailed = markGlobalPhase2FailedUnowned(db, { retryDelaySeconds, reason, nowSec, cwd, }); if (!unownedFailed) { logger.warn("Phase2 could not mark failure (ownership lost and unowned fallback skipped)", { error: error ? String(error) : undefined, memoryRoot, reason, inputWatermark: claim.inputWatermark, }); } } async function collectThreads(session: AgentSession, currentThreadId?: string): Promise { const sessionDir = session.sessionManager.getSessionDir(); const files = await fs.readdir(sessionDir); const threads: MemoryThread[] = []; for (const name of files) { if (!name.endsWith(".jsonl")) continue; const fullPath = path.join(sessionDir, name); let stat: fsNode.Stats; try { stat = await fs.stat(fullPath); } catch { continue; } let cwd = ""; let id = name.slice(0, -6); try { const fileText = await Bun.file(fullPath).text(); const firstLine = fileText.split("\n", 1)[0] ?? ""; const parsed = parseJsonlLenient(firstLine); const header = Array.isArray(parsed) && parsed.length > 0 ? (parsed[0] as Record) : undefined; if (header && header.type === "session") { if (typeof header.cwd === "string") cwd = header.cwd; if (typeof header.id === "string") id = header.id; } } catch { // ignore malformed session files } if (currentThreadId && id === currentThreadId) continue; threads.push({ id, updatedAt: Math.floor(stat.mtimeMs / 1000), rolloutPath: fullPath, cwd, sourceKind: "cli", }); } return threads; } function shouldPersistResponseItemForMemories(message: AgentMessage): boolean { const role = (message as { role: string }).role; if (role === "system" || role === "developer" || role === "user" || role === "assistant") { return true; } if (role !== "toolResult") return false; const toolName = (message as { toolName?: string }).toolName; if (toolName === "bash" || toolName === "eval" || toolName === "read" || toolName === "search") { const text = extractMessageText(message); return text.length > 0 && text.length <= 32_000; } return false; } function extractPersistableMessages(payload: string): AgentMessage[] { const rows = parseJsonlLenient(payload); if (!Array.isArray(rows)) return []; const messages: AgentMessage[] = []; for (const row of rows) { if (!row || typeof row !== "object") continue; const entry = row as Record; if (entry.type !== "message") continue; const maybeMessage = entry.message; if (!maybeMessage || typeof maybeMessage !== "object") continue; const message = maybeMessage as AgentMessage; if (shouldPersistResponseItemForMemories(message)) { messages.push(message); } } return messages; } async function runStage1Job(options: { claim: Stage1Claim; model: Model; apiKey: string; modelMaxTokens: number; config: MemoryRuntimeConfig; metadata?: Record; }): Promise< | { kind: "output"; output: { rawMemory: string; rolloutSummary: string; rolloutSlug: string | null }; usage?: { input: number; output: number; cacheRead: number; cacheWrite: number; totalTokens?: number }; } | { kind: "no_output" } | { kind: "failed"; reason: string } > { const { claim, model, apiKey, modelMaxTokens, config } = options; try { const rolloutRaw = await Bun.file(claim.rolloutPath).text(); const persisted = extractPersistableMessages(rolloutRaw); const serializedItems = JSON.stringify(persisted); const budgetTokens = Math.min( config.phase1InputTokenLimit, Math.floor(modelMaxTokens * config.rolloutPayloadPercent), ); const truncatedItems = truncateByApproxTokens(serializedItems, budgetTokens); const inputPrompt = prompt.render(stageOneInputTemplate, { thread_id: claim.threadId, response_items_json: truncatedItems, }); const response = await completeSimple( model, { systemPrompt: [stageOneSystemTemplate], messages: [{ role: "user", content: [{ type: "text", text: inputPrompt }], timestamp: Date.now() }], }, { apiKey, metadata: options.metadata, maxTokens: Math.max(1024, Math.min(4096, Math.floor(modelMaxTokens * 0.2))), reasoning: Effort.Low, }, ); if (response.stopReason === "error") { return { kind: "failed", reason: response.errorMessage || "stage1 model error" }; } const text = response.content .filter((c): c is { type: "text"; text: string } => c.type === "text") .map(c => c.text) .join("\n") .trim(); const parsed = parseJsonObject(text); if (!parsed) { return { kind: "failed", reason: "stage1 JSON parse failure" }; } const schemaOutput = parseStage1OutputSchema(parsed); if (!schemaOutput) { return { kind: "failed", reason: "stage1 JSON schema validation failure" }; } const rawMemory = redactSecrets(schemaOutput.raw_memory).trim(); const rolloutSummary = redactSecrets(schemaOutput.rollout_summary).trim(); const rolloutSlug = schemaOutput.rollout_slug === null ? null : redactSecrets(schemaOutput.rollout_slug).trim(); if (!rawMemory || !rolloutSummary) { return { kind: "no_output" }; } return { kind: "output", output: { rawMemory, rolloutSummary, rolloutSlug: rolloutSlug || null, }, usage: response.usage, }; } catch (error) { return { kind: "failed", reason: String(error) }; } } async function syncPhase2Artifacts(memoryRoot: string, outputs: Stage1OutputRow[]): Promise { const summariesDir = path.join(memoryRoot, "rollout_summaries"); await fs.mkdir(summariesDir, { recursive: true }); const keepFiles = new Set(); for (const row of outputs) { const stem = formatRolloutFilename(row.threadId, row.rolloutSlug); const filename = `${stem}.md`; keepFiles.add(filename); const body = [`thread_id: ${row.threadId}`, `updated_at: ${row.sourceUpdatedAt}`, "", row.rolloutSummary].join( "\n", ); await Bun.write(path.join(summariesDir, filename), `${body.trim()}\n`); } const currentFiles = await fs.readdir(summariesDir).catch(() => [] as string[]); for (const file of currentFiles) { if (!file.endsWith(".md")) continue; if (keepFiles.has(file)) continue; await fs.rm(path.join(summariesDir, file), { force: true }); } const rawBody = buildRawMemoriesMarkdown(outputs); await Bun.write(path.join(memoryRoot, "raw_memories.md"), rawBody); } async function cleanupConsolidatedArtifacts(memoryRoot: string): Promise { await fs.rm(path.join(memoryRoot, "MEMORY.md"), { force: true }); await fs.rm(path.join(memoryRoot, "memory_summary.md"), { force: true }); await fs.rm(path.join(memoryRoot, "skills"), { recursive: true, force: true }); } function buildRawMemoriesMarkdown(outputs: Stage1OutputRow[]): string { if (outputs.length === 0) { return "# Raw Memories\n\nNo raw memories yet.\n"; } const blocks = outputs.map(row => { const header = [`## ${row.threadId}`, `updated_at: ${row.sourceUpdatedAt}`, ""].join("\n"); return `${header}${row.rawMemory.trim()}\n`; }); return `# Raw Memories\n\n${blocks.join("\n")}`; } async function readRolloutSummaries(memoryRoot: string): Promise { const summariesDir = path.join(memoryRoot, "rollout_summaries"); const names = await fs.readdir(summariesDir).catch(() => [] as string[]); const summaryNames = names.filter(name => name.endsWith(".md")).sort((a, b) => a.localeCompare(b)); if (summaryNames.length === 0) return "No rollout summaries yet."; const blocks: string[] = []; for (const name of summaryNames) { const text = await Bun.file(path.join(summariesDir, name)) .text() .catch(() => ""); if (!text.trim()) continue; blocks.push(`--- ${name} ---\n${text.trim()}`); } if (blocks.length === 0) return "No rollout summaries yet."; return blocks.join("\n\n"); } async function runConsolidationModel(options: { memoryRoot: string; model: Model; apiKey: string; metadata?: Record; }): Promise<{ memoryMd: string; memorySummary: string; skills: Array<{ name: string; content: string; scripts: ConsolidationSkillFileSchema[]; templates: ConsolidationSkillFileSchema[]; examples: ConsolidationSkillFileSchema[]; }>; }> { const { memoryRoot, model, apiKey } = options; const rawMemories = await Bun.file(path.join(memoryRoot, "raw_memories.md")).text(); const rolloutSummaries = await readRolloutSummaries(memoryRoot); const input = prompt.render(consolidationTemplate, { raw_memories: truncateByApproxTokens(rawMemories, 20_000), rollout_summaries: truncateByApproxTokens(rolloutSummaries, 12_000), }); const response = await completeSimple( model, { messages: [{ role: "user", content: [{ type: "text", text: input }], timestamp: Date.now() }], }, { apiKey, metadata: options.metadata, maxTokens: 8192, reasoning: Effort.Medium }, ); if (response.stopReason === "error") { throw new Error(response.errorMessage || "phase2 model error"); } const text = response.content .filter((c): c is { type: "text"; text: string } => c.type === "text") .map(c => c.text) .join("\n") .trim(); const parsed = parseJsonObject(text); if (!parsed) throw new Error("phase2 JSON parse failure"); const schemaOutput = parseConsolidationOutputSchema(parsed); if (!schemaOutput) throw new Error("phase2 JSON schema validation failure"); const memoryMd = redactSecrets(schemaOutput.memory_md).trim(); const memorySummary = redactSecrets(schemaOutput.memory_summary).trim(); const skills = schemaOutput.skills .map(item => { const name = sanitizeSkillName(item.name.trim()); const content = redactSecrets(item.content ?? "").trim(); if (!name || !content) return null; return { name, content, scripts: sanitizeConsolidationSkillFiles(item.scripts, "scripts"), templates: sanitizeConsolidationSkillFiles(item.templates, "templates"), examples: sanitizeConsolidationSkillFiles(item.examples, "examples"), }; }) .filter( ( item, ): item is { name: string; content: string; scripts: ConsolidationSkillFileSchema[]; templates: ConsolidationSkillFileSchema[]; examples: ConsolidationSkillFileSchema[]; } => item !== null, ); if (!memoryMd || !memorySummary) { throw new Error("phase2 returned empty consolidated memory"); } return { memoryMd, memorySummary, skills }; } async function applyConsolidation( memoryRoot: string, consolidated: { memoryMd: string; memorySummary: string; skills: Array<{ name: string; content: string; scripts: ConsolidationSkillFileSchema[]; templates: ConsolidationSkillFileSchema[]; examples: ConsolidationSkillFileSchema[]; }>; }, ): Promise { await Bun.write(path.join(memoryRoot, "MEMORY.md"), `${consolidated.memoryMd.trim()}\n`); await Bun.write(path.join(memoryRoot, "memory_summary.md"), `${consolidated.memorySummary.trim()}\n`); const skillsDir = path.join(memoryRoot, "skills"); await fs.mkdir(skillsDir, { recursive: true }); const keep = new Set(); for (const skill of consolidated.skills) { const dir = path.join(skillsDir, skill.name); keep.add(skill.name); await fs.mkdir(dir, { recursive: true }); const files = new Map(); files.set("SKILL.md", `${skill.content.trim()}\n`); for (const item of skill.scripts) { files.set(path.posix.join("scripts", item.path), `${item.content.trim()}\n`); } for (const item of skill.templates) { files.set(path.posix.join("templates", item.path), `${item.content.trim()}\n`); } for (const item of skill.examples) { files.set(path.posix.join("examples", item.path), `${item.content.trim()}\n`); } for (const [relativePath, content] of [...files.entries()].sort(([a], [b]) => a.localeCompare(b))) { await Bun.write(path.join(dir, ...relativePath.split("/")), content); } const keepFiles = new Set(files.keys()); const existingFiles = await listRelativeFiles(dir); for (const relativePath of existingFiles) { if (keepFiles.has(relativePath)) continue; await fs.rm(path.join(dir, ...relativePath.split("/")), { force: true }); } await pruneEmptyDirectories(dir); } const dirs = await fs.readdir(skillsDir, { withFileTypes: true }).catch(() => []); for (const dirent of dirs) { if (!dirent.isDirectory()) continue; if (keep.has(dirent.name)) continue; await fs.rm(path.join(skillsDir, dirent.name), { recursive: true, force: true }); } } async function listRelativeFiles(rootDir: string, prefix = ""): Promise { const entries = await fs.readdir(rootDir, { withFileTypes: true }).catch(() => []); const files: string[] = []; for (const entry of entries) { const relative = prefix ? `${prefix}/${entry.name}` : entry.name; if (entry.isDirectory()) { files.push(...(await listRelativeFiles(path.join(rootDir, entry.name), relative))); continue; } if (entry.isFile()) files.push(relative); } return files; } async function pruneEmptyDirectories(rootDir: string): Promise { const entries = await fs.readdir(rootDir, { withFileTypes: true }).catch(() => []); for (const entry of entries) { if (!entry.isDirectory()) continue; const child = path.join(rootDir, entry.name); await pruneEmptyDirectories(child); const childEntries = await fs.readdir(child).catch(() => []); if (childEntries.length === 0) { await fs.rm(child, { recursive: true, force: true }); } } } function computeCompletionWatermark(claimedInputWatermark: number, outputs: Stage1OutputRow[]): number { const maxOutputWatermark = outputs.reduce((max, row) => Math.max(max, row.sourceUpdatedAt), claimedInputWatermark); return Math.max(claimedInputWatermark, maxOutputWatermark); } function formatRolloutFilename(threadId: string, rolloutSlug: string | null): string { if (!rolloutSlug) return threadId; const normalized = rolloutSlug .toLowerCase() .replace(/[^a-z0-9]+/g, "_") .replace(/_+$/g, "") .slice(0, 20); if (!normalized) return threadId; return `${threadId}-${normalized}`; } function parseJsonObject(text: string): Record | undefined { if (!text) return undefined; try { const parsed = JSON.parse(text) as unknown; if (parsed && typeof parsed === "object" && !Array.isArray(parsed)) { return parsed as Record; } } catch { const match = text.match(/\{[\s\S]*\}/); if (!match) return undefined; try { const parsed = JSON.parse(match[0]) as unknown; if (parsed && typeof parsed === "object" && !Array.isArray(parsed)) { return parsed as Record; } } catch { return undefined; } } return undefined; } function parseStage1OutputSchema(value: Record): Stage1OutputSchema | undefined { if (!hasExactKeys(value, ["rollout_summary", "rollout_slug", "raw_memory"])) return undefined; if (typeof value.rollout_summary !== "string") return undefined; if (!(typeof value.rollout_slug === "string" || value.rollout_slug === null)) return undefined; if (typeof value.raw_memory !== "string") return undefined; return { rollout_summary: value.rollout_summary, rollout_slug: value.rollout_slug, raw_memory: value.raw_memory, }; } function parseConsolidationOutputSchema(value: Record): ConsolidationOutputSchema | undefined { if (!hasExactKeys(value, ["memory_md", "memory_summary", "skills"])) return undefined; if (typeof value.memory_md !== "string") return undefined; if (typeof value.memory_summary !== "string") return undefined; if (!Array.isArray(value.skills)) return undefined; const skills: ConsolidationSkillSchema[] = []; for (const item of value.skills) { if (!item || typeof item !== "object" || Array.isArray(item)) return undefined; const data = item as Record; if (!hasExactKeys(data, ["name", "content", "scripts", "templates", "examples"], true)) return undefined; if (typeof data.name !== "string") return undefined; if (!(typeof data.content === "string" || data.content === undefined)) return undefined; const scripts = parseConsolidationSkillFileArray(data.scripts); const templates = parseConsolidationSkillFileArray(data.templates); const examples = parseConsolidationSkillFileArray(data.examples); if (!scripts || !templates || !examples) return undefined; skills.push({ name: data.name, content: data.content, scripts, templates, examples, }); } return { memory_md: value.memory_md, memory_summary: value.memory_summary, skills, }; } function hasExactKeys(value: Record, expectedKeys: string[], allowMissing = false): boolean { const sortedKeys = Object.keys(value).sort(); const sortedExpected = [...expectedKeys].sort(); if (!allowMissing && sortedKeys.length !== sortedExpected.length) return false; for (const key of sortedKeys) { if (!sortedExpected.includes(key)) return false; } if (allowMissing) return true; for (let i = 0; i < sortedExpected.length; i += 1) { if (sortedKeys[i] !== sortedExpected[i]) return false; } return true; } function redactSecrets(input: string): string { let out = input; const patterns = [ /(?:sk|pk|rk|tok|key|secret|token|password)[-_A-Za-z0-9]{12,}/g, /[A-Za-z0-9_-]{16,}\.[A-Za-z0-9_-]{16,}\.[A-Za-z0-9_-]{16,}/g, /(?:AKIA|ASIA)[A-Z0-9]{16}/g, ]; for (const pattern of patterns) { out = out.replace(pattern, "[REDACTED]"); } return out; } function sanitizeSkillName(name: string): string { return name .toLowerCase() .replace(/[^a-z0-9_-]+/g, "-") .replace(/^-+|-+$/g, "") .slice(0, 64); } function parseConsolidationSkillFileArray(value: unknown): ConsolidationSkillFileSchema[] | undefined { if (value === undefined) return []; if (!Array.isArray(value)) return undefined; const files: ConsolidationSkillFileSchema[] = []; for (const item of value) { if (!item || typeof item !== "object" || Array.isArray(item)) return undefined; const data = item as Record; if (!hasExactKeys(data, ["path", "content"])) return undefined; if (typeof data.path !== "string" || typeof data.content !== "string") return undefined; files.push({ path: data.path, content: data.content }); } return files; } function sanitizeConsolidationSkillFiles( files: ConsolidationSkillFileSchema[] | undefined, bucket: "scripts" | "templates" | "examples", ): ConsolidationSkillFileSchema[] { if (!files || files.length === 0) return []; const sanitized = new Map(); for (const file of files) { const relativePath = sanitizeSkillRelativePath(file.path); if (!relativePath) continue; const content = redactSecrets(file.content).trim(); if (!content) continue; sanitized.set(path.posix.join(bucket, relativePath), content); } return [...sanitized.entries()] .sort(([a], [b]) => a.localeCompare(b)) .map(([fullPath, content]) => ({ path: fullPath.slice(bucket.length + 1), content, })); } function sanitizeSkillRelativePath(rawPath: string): string | undefined { const normalized = rawPath.replace(/\\/g, "/").trim(); if (!normalized) return undefined; if (normalized.startsWith("/")) return undefined; if (normalized.includes("\0")) return undefined; if (normalized.includes(":")) return undefined; const parts = normalized.split("/").filter(Boolean); if (parts.length === 0) return undefined; for (const part of parts) { if (part === "." || part === "..") return undefined; if (!/^[A-Za-z0-9._-]+$/.test(part)) return undefined; } return parts.join("/"); } function extractMessageText(message: AgentMessage): string { const content = (message as { content?: unknown }).content; if (typeof content === "string") return content; if (!Array.isArray(content)) return ""; return content .map(item => { if (item.type === "text") return item.text; if (item.type === "toolCall") return `${item.toolName} ${JSON.stringify(item.arguments)}`; return ""; }) .join("\n"); } function truncateByApproxTokens(text: string, tokenLimit: number): string { if (tokenLimit <= 0) return ""; const maxChars = tokenLimit * 4; if (text.length <= maxChars) return text; const head = Math.floor(maxChars * 0.6); const tail = maxChars - head; return `${text.slice(0, head)}\n\n...[truncated]...\n\n${text.slice(-tail)}`; } function computeModelTokenBudget(model: Model, config: MemoryRuntimeConfig): number { const maxTokens = Number.isFinite(model.contextWindow) && model.contextWindow > 0 ? model.contextWindow : config.fallbackTokenLimit; return Math.max(2048, Math.floor(maxTokens)); } async function resolveMemoryModel(options: { modelRegistry: ModelRegistry; session: AgentSession; fallbackRole: string; }): Promise { const { modelRegistry, session, fallbackRole } = options; const requestedModel = session.settings.getModelRole(fallbackRole) || session.settings.getModelRole("default"); if (requestedModel) { const resolved = resolveModelRoleValue(requestedModel, modelRegistry.getAll(), { settings: session.settings, matchPreferences: { usageOrder: session.settings.getStorage()?.getModelUsageOrder() }, modelRegistry, }); if (resolved.model) return resolved.model; } return session.model ?? modelRegistry.getAll()[0]; } function loadMemoryConfig(settings: Settings): MemoryRuntimeConfig { return { enabled: settings.get("memory.backend") === "local" || settings.get("memories.enabled") === true, maxRolloutsPerStartup: settings.get("memories.maxRolloutsPerStartup") ?? DEFAULTS.maxRolloutsPerStartup, maxRolloutAgeDays: settings.get("memories.maxRolloutAgeDays") ?? DEFAULTS.maxRolloutAgeDays, minRolloutIdleHours: settings.get("memories.minRolloutIdleHours") ?? DEFAULTS.minRolloutIdleHours, threadScanLimit: settings.get("memories.threadScanLimit") ?? DEFAULTS.threadScanLimit, maxRawMemoriesForGlobal: settings.get("memories.maxRawMemoriesForGlobal") ?? DEFAULTS.maxRawMemoriesForGlobal, stage1Concurrency: settings.get("memories.stage1Concurrency") ?? DEFAULTS.stage1Concurrency, stage1LeaseSeconds: settings.get("memories.stage1LeaseSeconds") ?? DEFAULTS.stage1LeaseSeconds, stage1RetryDelaySeconds: settings.get("memories.stage1RetryDelaySeconds") ?? DEFAULTS.stage1RetryDelaySeconds, phase2LeaseSeconds: settings.get("memories.phase2LeaseSeconds") ?? DEFAULTS.phase2LeaseSeconds, phase2RetryDelaySeconds: settings.get("memories.phase2RetryDelaySeconds") ?? DEFAULTS.phase2RetryDelaySeconds, phase2HeartbeatSeconds: settings.get("memories.phase2HeartbeatSeconds") ?? DEFAULTS.phase2HeartbeatSeconds, rolloutPayloadPercent: settings.get("memories.rolloutPayloadPercent") ?? DEFAULTS.rolloutPayloadPercent, phase1InputTokenLimit: settings.get("memories.phase1InputTokenLimit") ?? DEFAULTS.phase1InputTokenLimit, fallbackTokenLimit: settings.get("memories.fallbackTokenLimit") ?? DEFAULTS.fallbackTokenLimit, summaryInjectionTokenLimit: settings.get("memories.summaryInjectionTokenLimit") ?? DEFAULTS.summaryInjectionTokenLimit, }; } export function getMemoryRoot(agentDir: string, cwd: string): string { return path.join(getMemoriesDir(agentDir), encodeProjectPath(cwd)); } function encodeProjectPath(cwd: string): string { return `--${cwd.replace(/^[/\\]/, "").replace(/[/\\:]/g, "-")}--`; } function unixNow(): number { return Math.floor(Date.now() / 1000); } async function runWithConcurrency( items: T[], concurrency: number, worker: (item: T) => Promise, ): Promise { const queue = [...items]; const workers = new Array(Math.max(1, concurrency)).fill(0).map(async () => { while (queue.length > 0) { const item = queue.shift(); if (!item) return; await worker(item); } }); await Promise.all(workers); }