/** * 星环OPC中心 — 事件驱动触发引擎 * * 每次 OPC 工具写入数据后,自动检查业务规则,触发后续动作。 * 不是轮询 — 是实时响应。 * * 触发引擎不启动子会话(太重),只创建 pending 状态的 staff_task, * 任务在下次 AI 交互时由 context-injector 呈现给老板,或由 cron 定时执行。 */ import type { OpcDatabase } from "../db/index.js"; import { checkAutonomy, type AutonomyResult } from "./autonomy-rules.js"; type LogFn = (msg: string) => void; /** * 在 after_tool_call 中调用。 * 根据工具名 + 参数 + 返回值,匹配触发规则,执行自动动作。 */ export function triggerEventRules( db: OpcDatabase, companyId: string, toolName: string, params: Record | undefined, result: Record | undefined, log?: LogFn, ): void { const l = log ?? (() => {}); try { // 只处理写操作 const action = params?.action as string | undefined; if (!action) return; const writeActions = [ "add_transaction", "create_contract", "update_contract", "add_employee", "create_content", "create_project", "create_tax_filing", "create_invoice", "register_company", "update_company", "batch_import_transactions", "batch_import_invoices", "batch_import_contacts", "add_interaction", "generate_document", "approve_content", ]; if (!writeActions.includes(action)) return; // ── 应收款逾期检查(交易记录后) ── if (toolName === "opc_finance" && action === "add_transaction") { checkOverdueReceivables(db, companyId, l); } // ── 现金流异常检查(交易记录后) ── if (toolName === "opc_finance" && action === "add_transaction") { checkCashFlowAnomaly(db, companyId, l); } // ── 合同到期检查(合同创建 / 更新后) ── if (toolName === "opc_legal" && (action === "create_contract" || action === "update_contract")) { checkContractExpiry(db, companyId, l); } // ── 新告警分配(监控工具触发后) ── if (toolName === "opc_monitoring") { assignAlertsToStaff(db, companyId, l); } // ── 跟进逾期检测(CRM) ── checkOverdueFollowUps(db, companyId, l); l(`opc-events: 事件触发检查完成 [${toolName}.${action}]`); } catch (err) { l(`opc-events: 触发检查异常: ${err instanceof Error ? err.message : String(err)}`); } } /** * 将新产生的告警自动分配给对应 AI 员工。 * 在 proactive-service 的 runReminderScan 之后调用。 */ export function alertsToStaffTasks(db: OpcDatabase, log: LogFn): void { try { // 查找最近 1 小时内新创建的 active alerts,且没有对应 staff_task 的 const recentAlerts = db.query( `SELECT a.id, a.company_id, a.title, a.message, a.severity, a.category FROM opc_alerts a WHERE a.status = 'active' AND a.created_at > datetime('now', '-1 hour') AND NOT EXISTS ( SELECT 1 FROM opc_staff_tasks t WHERE t.company_id = a.company_id AND t.title LIKE '%' || a.title || '%' AND t.status IN ('pending', 'in_progress', 'pending_approval') )`, ) as { id: string; company_id: string; title: string; message: string; severity: string; category: string }[]; if (recentAlerts.length === 0) return; let created = 0; const now = new Date().toISOString(); for (const alert of recentAlerts) { const staffRole = mapAlertCategoryToRole(alert.category); // 检查该公司是否有对应岗位 const staffExists = db.queryOne( "SELECT id FROM opc_staff_config WHERE company_id = ? AND role = ? AND enabled = 1", alert.company_id, staffRole, ); if (!staffExists) continue; // 通过自主行动规则判断是否需要老板审批 const autonomy = checkAutonomy(staffRole, mapAlertToTrigger(alert.category, alert.severity)); const status = autonomy.needsBossApproval ? "pending_approval" : "pending"; const priority = alert.severity === "critical" ? "urgent" : alert.severity === "warning" ? "high" : "normal"; const taskId = db.genId(); db.execute( `INSERT INTO opc_staff_tasks (id, company_id, staff_role, title, description, status, priority, task_type, schedule, assigned_at, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, 'auto_alert', 'on_demand', ?, ?)`, taskId, alert.company_id, staffRole, `[告警处理] ${alert.title}`, `系统检测到告警:${alert.message}\n\n告警ID: ${alert.id}\n严重程度: ${alert.severity}\n类别: ${alert.category}`, status, priority, now, now, ); created++; // 如果需要审批,同时写入洞察 if (autonomy.needsBossApproval && autonomy.escalationMsg) { db.execute( `INSERT INTO opc_insights (id, company_id, insight_type, category, priority, title, message, action_hint, staff_role, status, expires_at, created_at, updated_at) VALUES (?, ?, 'escalation', ?, ?, ?, ?, ?, ?, 'active', datetime('now', '+7 days'), ?, ?)`, db.genId(), alert.company_id, alert.category, alert.severity === "critical" ? 95 : 80, `需要决策: ${alert.title}`, autonomy.escalationMsg, `审批后系统将自动执行`, staffRole, now, now, ); } } if (created > 0) { log(`opc-events: 已将 ${created} 条新告警自动分配给 AI 员工`); } } catch (err) { log(`opc-events: 告警分配异常: ${err instanceof Error ? err.message : String(err)}`); } } /** * 将洞察中含有 staff_role 的自动转化为员工待办。 */ export function insightsToStaffTasks(db: OpcDatabase, log: LogFn): void { try { // 查找有 staff_role 且类型为 staff_observation 的活跃洞察,且没有对应 staff_task 的 const actionableInsights = db.query( `SELECT i.id, i.company_id, i.title, i.message, i.action_hint, i.staff_role, i.priority FROM opc_insights i WHERE i.status = 'active' AND i.staff_role != '' AND i.insight_type = 'staff_observation' AND i.action_hint != '' AND (i.expires_at = '' OR i.expires_at > datetime('now')) AND NOT EXISTS ( SELECT 1 FROM opc_staff_tasks t WHERE t.company_id = i.company_id AND t.staff_role = i.staff_role AND t.title LIKE '%' || i.title || '%' AND t.status IN ('pending', 'in_progress', 'pending_approval') )`, ) as { id: string; company_id: string; title: string; message: string; action_hint: string; staff_role: string; priority: number }[]; if (actionableInsights.length === 0) return; let created = 0; const now = new Date().toISOString(); for (const insight of actionableInsights) { // 检查该公司是否有对应岗位 const staffExists = db.queryOne( "SELECT id FROM opc_staff_config WHERE company_id = ? AND role = ? AND enabled = 1", insight.company_id, insight.staff_role, ); if (!staffExists) continue; const priority = insight.priority >= 80 ? "high" : "normal"; const taskId = db.genId(); db.execute( `INSERT INTO opc_staff_tasks (id, company_id, staff_role, title, description, status, priority, task_type, schedule, assigned_at, created_at) VALUES (?, ?, ?, ?, ?, 'pending', ?, 'auto_insight', 'on_demand', ?, ?)`, taskId, insight.company_id, insight.staff_role, `[洞察建议] ${insight.action_hint}`, `来源洞察:${insight.title}\n${insight.message}`, priority, now, now, ); created++; } if (created > 0) { log(`opc-events: 已将 ${created} 条洞察建议转化为员工待办`); } } catch (err) { log(`opc-events: 洞察转化异常: ${err instanceof Error ? err.message : String(err)}`); } } // ── 内部检查函数 ────────────────────────────────────────────── function checkOverdueReceivables(db: OpcDatabase, companyId: string, log: LogFn): void { // 检查逾期超过 7 天的应收发票 const overdueInvoices = db.query( `SELECT id, counterparty, total_amount, issue_date FROM opc_invoices WHERE company_id = ? AND type = 'sales' AND status IN ('sent', 'pending') AND issue_date != '' AND issue_date < date('now', '-7 days')`, companyId, ) as { id: string; counterparty: string; total_amount: number; issue_date: string }[]; if (overdueInvoices.length === 0) return; const staffExists = db.queryOne( "SELECT id FROM opc_staff_config WHERE company_id = ? AND role = 'finance' AND enabled = 1", companyId, ); if (!staffExists) return; const now = new Date().toISOString(); let created = 0; for (const inv of overdueInvoices) { const days = Math.floor((Date.now() - new Date(inv.issue_date).getTime()) / 86400000); // 检查是否已有催收任务 const existing = db.queryOne( `SELECT id FROM opc_staff_tasks WHERE company_id = ? AND staff_role = 'finance' AND title LIKE ? AND status IN ('pending', 'in_progress', 'pending_approval')`, companyId, `%催收%${inv.counterparty}%`, ); if (existing) continue; const autonomy = checkAutonomy("finance", days > 30 ? "invoice_overdue_30d" : "invoice_overdue_7d"); const status = autonomy.needsBossApproval ? "pending_approval" : "pending"; const taskId = db.genId(); db.execute( `INSERT INTO opc_staff_tasks (id, company_id, staff_role, title, description, status, priority, task_type, schedule, assigned_at, created_at) VALUES (?, ?, 'finance', ?, ?, ?, ?, 'auto_trigger', 'on_demand', ?, ?)`, taskId, companyId, `催收 ${inv.counterparty} 逾期款 ${inv.total_amount.toLocaleString()} 元`, `发票逾期 ${days} 天,金额 ${inv.total_amount} 元。发票ID: ${inv.id}`, status, days > 30 ? "urgent" : "high", now, now, ); created++; if (autonomy.needsBossApproval && autonomy.escalationMsg) { db.execute( `INSERT INTO opc_insights (id, company_id, insight_type, category, priority, title, message, action_hint, staff_role, status, expires_at, created_at, updated_at) VALUES (?, ?, 'escalation', 'finance', 90, ?, ?, '审批后自动催收', 'finance', 'active', datetime('now', '+7 days'), ?, ?)`, db.genId(), companyId, `需要决策: ${inv.counterparty} 逾期应收款`, autonomy.escalationMsg, now, now, ); } } if (created > 0) { log(`opc-events: 已为 ${companyId} 创建 ${created} 个应收款催收任务`); } } function checkCashFlowAnomaly(db: OpcDatabase, companyId: string, log: LogFn): void { const thisMonth = new Date().toISOString().slice(0, 7); const lastMonth = (() => { const d = new Date(); d.setMonth(d.getMonth() - 1); return d.toISOString().slice(0, 7); })(); const thisMonthNet = (db.queryOne( `SELECT COALESCE(SUM(CASE WHEN type='income' THEN amount ELSE -amount END), 0) as total FROM opc_transactions WHERE company_id = ? AND strftime('%Y-%m', transaction_date) = ?`, companyId, thisMonth, ) as { total: number }).total; const lastMonthNet = (db.queryOne( `SELECT COALESCE(SUM(CASE WHEN type='income' THEN amount ELSE -amount END), 0) as total FROM opc_transactions WHERE company_id = ? AND strftime('%Y-%m', transaction_date) = ?`, companyId, lastMonth, ) as { total: number }).total; // 本月现金流为负且恶化超过上月 50% if (thisMonthNet >= 0) return; if (lastMonthNet >= 0) return; // 上月为正,本月转负也值得关注但用不同逻辑 if (Math.abs(thisMonthNet) <= Math.abs(lastMonthNet) * 1.5) return; // 检查是否已有任务 const existing = db.queryOne( `SELECT id FROM opc_staff_tasks WHERE company_id = ? AND staff_role = 'finance' AND title LIKE '%支出异常%' AND status IN ('pending', 'in_progress', 'pending_approval') AND DATE(created_at) = DATE('now')`, companyId, ); if (existing) return; const staffExists = db.queryOne( "SELECT id FROM opc_staff_config WHERE company_id = ? AND role = 'finance' AND enabled = 1", companyId, ); if (!staffExists) return; const now = new Date().toISOString(); const taskId = db.genId(); db.execute( `INSERT INTO opc_staff_tasks (id, company_id, staff_role, title, description, status, priority, task_type, schedule, assigned_at, created_at) VALUES (?, ?, 'finance', '分析本月支出异常', ?, 'pending', 'high', 'auto_trigger', 'on_demand', ?, ?)`, taskId, companyId, `本月现金流 ${thisMonthNet.toLocaleString()} 元,较上月 ${lastMonthNet.toLocaleString()} 元恶化超过 50%`, now, now, ); log(`opc-events: 已为 ${companyId} 创建现金流异常分析任务`); } function checkContractExpiry(db: OpcDatabase, companyId: string, log: LogFn): void { // 检查 30 天内到期的合同(active 或 draft 状态都检查) const expiringContracts = db.query( `SELECT id, title, counterparty, end_date FROM opc_contracts WHERE company_id = ? AND status IN ('active', 'draft') AND end_date != '' AND end_date <= date('now', '+30 days') AND end_date >= date('now')`, companyId, ) as { id: string; title: string; counterparty: string; end_date: string }[]; if (expiringContracts.length === 0) return; const staffExists = db.queryOne( "SELECT id FROM opc_staff_config WHERE company_id = ? AND role = 'legal' AND enabled = 1", companyId, ); if (!staffExists) return; const now = new Date().toISOString(); let created = 0; for (const contract of expiringContracts) { const existing = db.queryOne( `SELECT id FROM opc_staff_tasks WHERE company_id = ? AND staff_role = 'legal' AND title LIKE ? AND status IN ('pending', 'in_progress', 'pending_approval')`, companyId, `%${contract.title}%续签%`, ); if (existing) continue; const daysLeft = Math.floor((new Date(contract.end_date).getTime() - Date.now()) / 86400000); const autonomy = checkAutonomy("legal", daysLeft <= 7 ? "contract_expiry_7d" : "contract_expiry_30d"); const status = autonomy.needsBossApproval ? "pending_approval" : "pending"; const taskId = db.genId(); db.execute( `INSERT INTO opc_staff_tasks (id, company_id, staff_role, title, description, status, priority, task_type, schedule, assigned_at, created_at) VALUES (?, ?, 'legal', ?, ?, ?, ?, 'auto_trigger', 'on_demand', ?, ?)`, taskId, companyId, `审查合同「${contract.title}」续签方案`, `合同将在 ${daysLeft} 天后到期(${contract.end_date})。\n对方: ${contract.counterparty}\n合同ID: ${contract.id}`, status, daysLeft <= 7 ? "urgent" : "high", now, now, ); created++; if (autonomy.needsBossApproval && autonomy.escalationMsg) { db.execute( `INSERT INTO opc_insights (id, company_id, insight_type, category, priority, title, message, action_hint, staff_role, status, expires_at, created_at, updated_at) VALUES (?, ?, 'escalation', 'legal', 90, ?, ?, '审批后法务将准备续签方案', 'legal', 'active', ?, ?, ?)`, db.genId(), companyId, `需要决策: ${contract.title} 续签`, autonomy.escalationMsg, contract.end_date, now, now, ); } } if (created > 0) { log(`opc-events: 已为 ${companyId} 创建 ${created} 个合同续签审查任务`); } } function assignAlertsToStaff(db: OpcDatabase, companyId: string, log: LogFn): void { // 调用通用的 alertsToStaffTasks 来处理该公司的新告警 alertsToStaffTasks(db, log); } /** 检查客户跟进逾期(CRM 联系人 follow_up_date 过期且未成交/流失) */ function checkOverdueFollowUps(db: OpcDatabase, companyId: string, log: LogFn): void { try { const today = new Date().toISOString().slice(0, 10); const overdueContacts = db.query( `SELECT id, name, follow_up_date, pipeline_stage, deal_value FROM opc_contacts WHERE company_id = ? AND follow_up_date != '' AND follow_up_date < ? AND pipeline_stage NOT IN ('won', 'lost', 'churned')`, companyId, today, ) as { id: string; name: string; follow_up_date: string; pipeline_stage: string; deal_value: number }[]; if (overdueContacts.length === 0) return; // 检查是否有 marketing 或 ops 岗位 const staffRole = (db.queryOne( "SELECT id FROM opc_staff_config WHERE company_id = ? AND role = 'marketing' AND enabled = 1", companyId, ) ? "marketing" : db.queryOne( "SELECT id FROM opc_staff_config WHERE company_id = ? AND role = 'ops' AND enabled = 1", companyId, ) ? "ops" : null); if (!staffRole) return; const now = new Date().toISOString(); let created = 0; for (const contact of overdueContacts) { // 检查是否已有跟进任务 const existing = db.queryOne( `SELECT id FROM opc_staff_tasks WHERE company_id = ? AND title LIKE ? AND status IN ('pending', 'in_progress', 'pending_approval')`, companyId, `%跟进%${contact.name}%`, ); if (existing) continue; const taskId = db.genId(); db.execute( `INSERT INTO opc_staff_tasks (id, company_id, staff_role, title, description, status, priority, task_type, schedule, assigned_at, created_at) VALUES (?, ?, ?, ?, ?, 'pending', 'high', 'auto_trigger', 'on_demand', ?, ?)`, taskId, companyId, staffRole, `跟进客户 ${contact.name}(逾期)`, `客户 ${contact.name} 的跟进日期 ${contact.follow_up_date} 已逾期。\n当前阶段: ${contact.pipeline_stage}\n潜在金额: ${contact.deal_value} 元`, now, now, ); created++; } if (created > 0) { log(`opc-events: 已为 ${companyId} 创建 ${created} 个客户跟进逾期任务`); } } catch (err) { log(`opc-events: 跟进逾期检测异常: ${err instanceof Error ? err.message : String(err)}`); } } // ── 映射辅助 ──────────────────────────────────────────────── function mapAlertCategoryToRole(category: string): string { const mapping: Record = { tax: "finance", cashflow: "finance", finance: "finance", contract: "legal", legal: "legal", hr: "hr", employee: "hr", marketing: "marketing", content: "marketing", project: "ops", ops: "ops", }; return mapping[category] ?? "admin"; } function mapAlertToTrigger(category: string, severity: string): string { if (category === "contract") { return severity === "critical" ? "contract_expiry_7d" : "contract_expiry_30d"; } if (category === "tax") { return "monthly_close"; } if (category === "cashflow") { return "invoice_overdue_7d"; } return "general_alert"; }