/** * Time Traveling Stream Rules (TTSR) Manager * * Manages rules that get injected mid-stream when their condition pattern matches * the agent's output. When a match occurs, the stream is aborted, the rule is * injected as a system reminder, and the request is retried. */ import { logger } from "@oh-my-pi/pi-utils"; import type { Rule } from "../capability/rule"; import type { TtsrSettings } from "../config/settings"; export type TtsrMatchSource = "text" | "thinking" | "tool"; /** Context about the stream content currently being checked against TTSR rules. */ export interface TtsrMatchContext { source: TtsrMatchSource; /** Tool name for tool argument deltas, e.g. "edit" or "write". */ toolName?: string; /** Candidate file paths associated with the current stream chunk. */ filePaths?: string[]; /** Stable key to isolate buffering (for example a tool call ID). */ streamKey?: string; } interface ToolScope { toolName?: string; pathGlob?: Bun.Glob; pathPattern?: string; } interface TtsrScope { allowText: boolean; allowThinking: boolean; allowAnyTool: boolean; toolScopes: ToolScope[]; } interface TtsrEntry { rule: Rule; conditions: RegExp[]; scope: TtsrScope; globalPathGlobs?: Bun.Glob[]; } /** Tracks when a rule was last injected (for repeat gating). */ interface InjectionRecord { /** Message count (turn index) when the rule was last injected. */ lastInjectedAt: number; } const DEFAULT_SETTINGS: Required = { enabled: true, contextMode: "discard", interruptMode: "always", repeatMode: "once", repeatGap: 10, }; const DEFAULT_SCOPE: TtsrScope = { allowText: true, allowThinking: false, allowAnyTool: true, toolScopes: [], }; export class TtsrManager { readonly #settings: Required; readonly #rules = new Map(); readonly #injectionRecords = new Map(); readonly #buffers = new Map(); #messageCount = 0; constructor(settings?: TtsrSettings) { this.#settings = { ...DEFAULT_SETTINGS, ...settings }; } /** Check if a rule can be triggered based on repeat settings. */ #canTrigger(ruleName: string): boolean { const record = this.#injectionRecords.get(ruleName); if (!record) { return true; } if (this.#settings.repeatMode === "once") { return false; } const gap = this.#messageCount - record.lastInjectedAt; return gap >= this.#settings.repeatGap; } #compileConditions(rule: Rule): RegExp[] { const compiled: RegExp[] = []; for (const pattern of rule.condition ?? []) { try { compiled.push(new RegExp(pattern)); } catch (error) { logger.warn("TTSR condition has invalid regex pattern, skipping condition", { ruleName: rule.name, pattern, error: error instanceof Error ? error.message : String(error), }); } } return compiled; } #compileGlobalPathGlobs(globs: Rule["globs"]): Bun.Glob[] | undefined { if (!globs || globs.length === 0) { return undefined; } const compiled = globs .map(glob => glob.trim()) .filter(glob => glob.length > 0) .map(glob => new Bun.Glob(glob)); return compiled.length > 0 ? compiled : undefined; } #parseToolScopeToken(token: string): ToolScope | undefined { const match = /^(?:(?tool)(?::(?[a-z0-9_-]+))?|(?[a-z0-9_-]+))(?:\((?[^)]+)\))?$/i.exec( token, ); if (!match) { return undefined; } const groups = match.groups; const hasToolPrefix = groups?.prefix !== undefined; const toolName = (groups?.tool ?? (hasToolPrefix ? undefined : groups?.bare))?.trim().toLowerCase(); const pathPattern = groups?.path?.trim(); if (!pathPattern) { return { toolName }; } return { toolName, pathPattern, pathGlob: new Bun.Glob(pathPattern), }; } #buildScope(rule: Rule): TtsrScope { if (!rule.scope || rule.scope.length === 0) { return { allowText: DEFAULT_SCOPE.allowText, allowThinking: DEFAULT_SCOPE.allowThinking, allowAnyTool: DEFAULT_SCOPE.allowAnyTool, toolScopes: [...DEFAULT_SCOPE.toolScopes], }; } const scope: TtsrScope = { allowText: false, allowThinking: false, allowAnyTool: false, toolScopes: [], }; for (const rawToken of rule.scope) { const token = rawToken.trim(); const normalizedToken = token.toLowerCase(); if (token.length === 0) { continue; } if (normalizedToken === "text") { scope.allowText = true; continue; } if (normalizedToken === "thinking") { scope.allowThinking = true; continue; } if (normalizedToken === "tool" || normalizedToken === "toolcall") { scope.allowAnyTool = true; continue; } const toolScope = this.#parseToolScopeToken(token); if (!toolScope) { logger.warn("TTSR scope token is invalid, skipping token", { ruleName: rule.name, token: rawToken, }); continue; } if (!toolScope.toolName && !toolScope.pathGlob) { scope.allowAnyTool = true; continue; } scope.toolScopes.push(toolScope); } return scope; } #hasReachableScope(scope: TtsrScope): boolean { return scope.allowText || scope.allowThinking || scope.allowAnyTool || scope.toolScopes.length > 0; } #bufferKey(context: TtsrMatchContext): string { if (context.streamKey && context.streamKey.trim().length > 0) { return context.streamKey; } if (context.source !== "tool") { return context.source; } const toolName = context.toolName?.trim().toLowerCase(); return toolName ? `tool:${toolName}` : "tool"; } #normalizePath(pathValue: string): string { return pathValue.replaceAll("\\", "/"); } #matchesGlob(glob: Bun.Glob, filePaths: string[] | undefined): boolean { if (!filePaths || filePaths.length === 0) { return false; } for (const filePath of filePaths) { const normalized = this.#normalizePath(filePath); if (glob.match(normalized)) { return true; } const slashIndex = normalized.lastIndexOf("/"); const basename = slashIndex === -1 ? normalized : normalized.slice(slashIndex + 1); if (basename !== normalized && glob.match(basename)) { return true; } } return false; } #matchesGlobalPaths(entry: TtsrEntry, context: TtsrMatchContext): boolean { if (!entry.globalPathGlobs || entry.globalPathGlobs.length === 0) { return true; } for (const glob of entry.globalPathGlobs) { if (this.#matchesGlob(glob, context.filePaths)) { return true; } } return false; } #matchesScope(entry: TtsrEntry, context: TtsrMatchContext): boolean { if (context.source === "text") { return entry.scope.allowText; } if (context.source === "thinking") { return entry.scope.allowThinking; } if (entry.scope.allowAnyTool) { return true; } const toolName = context.toolName?.trim().toLowerCase(); for (const toolScope of entry.scope.toolScopes) { if (toolScope.toolName && toolScope.toolName !== toolName) { continue; } if (toolScope.pathGlob && !this.#matchesGlob(toolScope.pathGlob, context.filePaths)) { continue; } return true; } return false; } #matchesCondition(entry: TtsrEntry, streamBuffer: string): boolean { for (const condition of entry.conditions) { condition.lastIndex = 0; if (condition.test(streamBuffer)) { return true; } } return false; } /** Add a TTSR rule to be monitored. */ addRule(rule: Rule): boolean { if (this.#rules.has(rule.name)) { return false; } const conditions = this.#compileConditions(rule); if (conditions.length === 0) { return false; } const scope = this.#buildScope(rule); if (!this.#hasReachableScope(scope)) { logger.warn("TTSR scope excludes all streams, skipping rule", { ruleName: rule.name, scope: rule.scope, }); return false; } const globalPathGlobs = this.#compileGlobalPathGlobs(rule.globs); this.#rules.set(rule.name, { rule, conditions, scope, globalPathGlobs, }); logger.debug("TTSR rule registered", { ruleName: rule.name, conditions: rule.condition, scope: rule.scope, globs: rule.globs, }); return true; } /** * Add a stream chunk to its scoped buffer and return matching rules. * * Buffers are isolated by source/tool key so matches don't bleed across * assistant prose, thinking text, and unrelated tool argument streams. */ checkDelta(delta: string, context: TtsrMatchContext): Rule[] { const bufferKey = this.#bufferKey(context); const nextBuffer = `${this.#buffers.get(bufferKey) ?? ""}${delta}`; this.#buffers.set(bufferKey, nextBuffer); const matches: Rule[] = []; for (const [name, entry] of this.#rules) { if (!this.#canTrigger(name)) { continue; } if (!this.#matchesScope(entry, context)) { continue; } if (!this.#matchesGlobalPaths(entry, context)) { continue; } if (!this.#matchesCondition(entry, nextBuffer)) { continue; } matches.push(entry.rule); logger.debug("TTSR condition matched", { ruleName: name, conditions: entry.rule.condition, source: context.source, toolName: context.toolName, filePaths: context.filePaths, }); } return matches; } /** Mark rules as injected (won't trigger again until conditions allow). */ markInjected(rulesToMark: Rule[]): void { this.markInjectedByNames(rulesToMark.map(rule => rule.name)); } /** Mark rule names as injected (won't trigger again until conditions allow). */ markInjectedByNames(ruleNames: string[]): void { for (const rawName of ruleNames) { const ruleName = rawName.trim(); if (ruleName.length === 0) { continue; } const record = this.#injectionRecords.get(ruleName); if (!record) { this.#injectionRecords.set(ruleName, { lastInjectedAt: this.#messageCount }); } else { record.lastInjectedAt = this.#messageCount; } logger.debug("TTSR rule marked as injected", { ruleName, messageCount: this.#messageCount, repeatMode: this.#settings.repeatMode, }); } } /** Get names of all injected rules (for persistence). */ getInjectedRuleNames(): string[] { return Array.from(this.#injectionRecords.keys()); } /** Restore injected state from a list of rule names. */ restoreInjected(ruleNames: string[]): void { for (const name of ruleNames) { this.#injectionRecords.set(name, { lastInjectedAt: 0 }); } if (ruleNames.length > 0) { logger.debug("TTSR injected state restored", { ruleNames }); } } /** Reset stream buffers (called on new turn). */ resetBuffer(): void { this.#buffers.clear(); } /** Check if any TTSR rules are registered. */ hasRules(): boolean { return this.#rules.size > 0; } /** Increment message counter (call after each turn). */ incrementMessageCount(): void { this.#messageCount++; } /** Get current message count. */ getMessageCount(): number { return this.#messageCount; } /** Get settings. */ getSettings(): Required { return this.#settings; } }