import { EventEmitter } from 'events'; import chokidar, { type FSWatcher } from 'chokidar'; import path from 'path'; import fs from 'fs/promises'; import crypto from 'crypto'; import axios from 'axios'; import { Logger } from '../utils/logger.js'; export interface HarvesterConfig { projectId: string; apiUrl: string; apiKey: string; watchPath: string; // usually process.cwd() } /** * The Knowledge Harvester 🌾 * * Watches the local rule repositories (.cursor/rules/*.mdc) for new patterns * or corrections that the user teaches their local AI agent. * * When a new insight is detected, it is harvested (read), validated, * and shipped to the Rigstate Curator Protocol as a signal. */ export class KnowledgeHarvester extends EventEmitter { private watcher: FSWatcher | null = null; private config: HarvesterConfig; private ruleHashes: Map = new Map(); private isReady: boolean = false; private processingQueue: Set = new Set(); private debounceTimers: Map = new Map(); // Ignore list to prevent feedback loops with system rules private IGNORED_PREFIXES = ['rigstate-identity', 'rigstate-guardian']; constructor(config: HarvesterConfig) { super(); this.config = config; } async start() { // Load initial state to establish baseline (don't harvest initial load) await this.loadHashes(); const rulesPath = path.join(this.config.watchPath, '.cursor', 'rules'); const watchPattern = path.join(rulesPath, '**', '*.mdc'); Logger.debug(`🌾 Harvester watching: ${watchPattern}`); this.watcher = chokidar.watch(watchPattern, { persistent: true, ignoreInitial: true, // Don't harvest what's already there on boot awaitWriteFinish: { stabilityThreshold: 2000, pollInterval: 100 } }); this.watcher .on('add', (filePath: string) => this.handleFileEvent(filePath, 'add')) .on('change', (filePath: string) => this.handleFileEvent(filePath, 'change')); // We treat change same as add (new version of truth) this.isReady = true; } async stop() { if (this.watcher) { await this.watcher.close(); this.watcher = null; } // Save state? Maybe not needed as we re-hash on boot. } private async handleFileEvent(filePath: string, event: 'add' | 'change') { const fileName = path.basename(filePath); // 1. Filter ignored files if (this.IGNORED_PREFIXES.some(prefix => fileName.startsWith(prefix))) { return; } // 2. Debounce (users save frequently) if (this.debounceTimers.has(filePath)) { clearTimeout(this.debounceTimers.get(filePath)); } this.debounceTimers.set(filePath, setTimeout(async () => { this.processFile(filePath); this.debounceTimers.delete(filePath); }, 5000)); // 5 second quiet period } private async processFile(filePath: string) { if (this.processingQueue.has(filePath)) return; this.processingQueue.add(filePath); try { const content = await fs.readFile(filePath, 'utf-8'); const currentHash = this.computeHash(content); // 3. Check against memory (did we just harvest this?) // We also need to check against the server-sync cache (did we just download this?) // For now, simple memory check. if (this.ruleHashes.get(filePath) === currentHash) { Logger.debug(`Skipping ${path.basename(filePath)} (unchanged hash)`); return; } // 4. Validate Content if (content.length < 20) { Logger.debug(`Skipping ${path.basename(filePath)} (too short)`); return; } // 5. Submit Signal await this.submitSignal(filePath, content); // 6. Update Hash this.ruleHashes.set(filePath, currentHash); } catch (error: any) { Logger.warn(`Harvester failed to process ${path.basename(filePath)}: ${error.message}`); } finally { this.processingQueue.delete(filePath); } } private async submitSignal(filePath: string, content: string) { const title = path.basename(filePath, '.mdc'); const relativePath = path.relative(process.cwd(), filePath); Logger.info(`🌾 Harvesting new knowledge: ${title}`); try { // NOTE: We're using the 'mcp_submit_curator_signal' endpoint logic here // URL: /api/v1/curator/signals (Assumed endpoint based on user request) // Extract frontmatter description if possible const descriptionMatch = content.match(/description:\s*(.*)/); const description = descriptionMatch ? descriptionMatch[1].trim() : "Auto-harvested from IDE interaction"; const payload = { project_id: this.config.projectId, title: title, category: 'ARCHITECTURE', // Default severity: 'MEDIUM', instruction: content, reasoning: `Harvested from local file: ${relativePath}`, source_type: 'IDE_HARVESTER' }; const response = await axios.post(`${this.config.apiUrl}/api/v1/curator/signals`, payload, { headers: { Authorization: `Bearer ${this.config.apiKey}` } }); if (response.data.success) { Logger.info(`✅ Signal submitted for review: ${title}`); } else { throw new Error(response.data.error || 'Unknown API error'); } } catch (error: any) { if (error.response?.status === 404) { // API endpoint might not exist yet during migration Logger.debug('Curator API not reachable (404). Signal stored locally (mock).'); } else { Logger.error(`Failed to submit signal: ${error.message}`); // Don't update hash so we try again next edit this.ruleHashes.delete(filePath); throw error; // Re-throw to catch block } } } private async loadHashes() { // Initial scan to build baseline so we don't upload everything on start const rulesPath = path.join(this.config.watchPath, '.cursor', 'rules'); try { // Ensure dir exists await fs.mkdir(rulesPath, { recursive: true }); // This is a simplified recursive walk since we know structure is flat usually const files = await fs.readdir(rulesPath); for (const file of files) { if (file.endsWith('.mdc')) { const fullPath = path.join(rulesPath, file); const content = await fs.readFile(fullPath, 'utf-8'); this.ruleHashes.set(fullPath, this.computeHash(content)); } } } catch (e) { // Directory might not exist yet, that's fine } } private computeHash(content: string): string { return crypto.createHash('sha256').update(content).digest('hex'); } }