/** * 星环OPC中心 — 主动智能后台服务 * * 统一调度所有后台扫描任务,替代单独的 startReminderService。 * 30 秒后首次扫描,之后每小时执行: * 1. runReminderScan() — 已有的 4 项检查(税务、合同、现金流、融资) * 2. recalculateAllStages() — 阶段检测 * 3. runIntelligenceScan() — 智能洞察分析 * 4. detectMilestonesForAll() — 里程碑检测 * 5. expireStaleInsights() — 清理过期洞察 * 6. saveDailyBriefings() — 保存每日简报快照 * 7. createScheduledStaffTasks() — 创建定时员工任务(pending,等 AI 触发) * 8. reapStaleTasks() — 清理超时的 in_progress 任务(subagent 异常未被钩子捕获时的兜底) */ import type { OpcDatabase } from "../db/index.js"; import { runReminderScan } from "./reminder-service.js"; import { runIntelligenceScan, expireStaleInsights } from "./intelligence-engine.js"; import { detectMilestonesForAll } from "./milestone-detector.js"; import { recalculateAllStages } from "./stage-detector.js"; import { saveDailyBriefing } from "./briefing-builder.js"; import { TaskExecutor } from "./task-executor.js"; import { alertsToStaffTasks, insightsToStaffTasks } from "./event-triggers.js"; import { generateDailyBrief, formatDailyBriefMarkdown } from "./daily-brief.js"; /** 为所有公司保存每日简报快照 */ function saveDailyBriefings(db: OpcDatabase, log: (msg: string) => void): void { const companies = db.query("SELECT id FROM opc_companies") as { id: string }[]; let saved = 0; for (const c of companies) { try { saveDailyBriefing(db, c.id); saved++; } catch (err) { log(`opc-proactive: 保存简报失败 [${c.id}]: ${err instanceof Error ? err.message : String(err)}`); } } if (saved > 0) log(`opc-proactive: 已保存 ${saved} 家公司的每日简报快照`); } /** 创建定时员工任务(pending 状态,等 AI 触发执行) */ function createScheduledStaffTasks(db: OpcDatabase, log: (msg: string) => void): void { const executor = new TaskExecutor(db); // 检查今日是否已创建过 daily 任务 const lastDailyRun = db.queryOne( "SELECT value FROM opc_tool_config WHERE key = 'last_daily_task_run'", ) as { value: string } | null; const today = new Date().toISOString().slice(0, 10); if (!lastDailyRun || lastDailyRun.value !== today) { const dailyCount = executor.createPendingScheduledTasks("daily"); if (dailyCount > 0) { log(`opc-executor: 已创建 ${dailyCount} 个每日定时任务(pending)`); } db.execute( `INSERT INTO opc_tool_config (key, value) VALUES ('last_daily_task_run', ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value`, today, ); } // 每周一创建 weekly 任务 const dayOfWeek = new Date().getDay(); if (dayOfWeek === 1) { const lastWeeklyRun = db.queryOne( "SELECT value FROM opc_tool_config WHERE key = 'last_weekly_task_run'", ) as { value: string } | null; if (!lastWeeklyRun || lastWeeklyRun.value !== today) { const weeklyCount = executor.createPendingScheduledTasks("weekly"); if (weeklyCount > 0) { log(`opc-executor: 已创建 ${weeklyCount} 个每周定时任务(pending)`); } db.execute( `INSERT INTO opc_tool_config (key, value) VALUES ('last_weekly_task_run', ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value`, today, ); } } } /** 逾期款项检测 */ function detectOverduePayments(db: OpcDatabase, log: (msg: string) => void): void { try { const today = new Date().toISOString().slice(0, 10); const overduePayments = db.query( `SELECT id, company_id, direction, counterparty, amount, due_date FROM opc_payments WHERE status IN ('pending', 'partial') AND due_date != '' AND due_date < ?`, today, ) as { id: string; company_id: string; direction: string; counterparty: string; amount: number; due_date: string }[]; let count = 0; for (const payment of overduePayments) { const daysOverdue = Math.floor((Date.now() - new Date(payment.due_date).getTime()) / 86400000); // 检查是否已有告警 const existingAlert = db.queryOne( `SELECT id FROM opc_alerts WHERE company_id = ? AND category = 'overdue_payment' AND title LIKE ? AND status = 'active'`, payment.company_id, `%${payment.counterparty}%`, ); if (!existingAlert) { db.execute( `INSERT INTO opc_alerts (id, company_id, title, severity, category, status, message, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))`, db.genId(), payment.company_id, `逾期${payment.direction === "receivable" ? "应收" : "应付"}款:${payment.counterparty}`, daysOverdue > 30 ? "critical" : "warning", "overdue_payment", "active", `${payment.direction === "receivable" ? "应收" : "应付"}款项 ${payment.amount.toLocaleString()} 元已逾期 ${daysOverdue} 天`, ); count++; } } if (count > 0) log(`opc-proactive: 检测到 ${count} 条逾期款项告警`); } catch (err) { log(`opc-proactive: 逾期款项检测异常: ${err instanceof Error ? err.message : String(err)}`); } } /** 合同到期检测 */ function detectExpiringContracts(db: OpcDatabase, log: (msg: string) => void): void { try { const today = new Date().toISOString().slice(0, 10); const expiringContracts = db.query( `SELECT id, company_id, title, counterparty, end_date FROM opc_contracts WHERE status = 'active' AND end_date != '' AND end_date >= ? AND end_date <= date(?, '+30 days')`, today, today, ) as { id: string; company_id: string; title: string; counterparty: string; end_date: string }[]; let count = 0; for (const contract of expiringContracts) { const daysUntilExpiry = Math.floor((new Date(contract.end_date).getTime() - Date.now()) / 86400000); // 检查是否已有告警 const existingAlert = db.queryOne( `SELECT id FROM opc_alerts WHERE company_id = ? AND category = 'expiring_contract' AND title LIKE ? AND status = 'active'`, contract.company_id, `%${contract.title}%`, ); if (!existingAlert) { db.execute( `INSERT INTO opc_alerts (id, company_id, title, severity, category, status, message, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))`, db.genId(), contract.company_id, `合同即将到期:${contract.title}`, daysUntilExpiry <= 7 ? "critical" : daysUntilExpiry <= 14 ? "warning" : "info", "expiring_contract", "active", `与 ${contract.counterparty} 的合同将在 ${daysUntilExpiry} 天后到期(${contract.end_date}),需要续约或重新协商`, ); count++; } } if (count > 0) log(`opc-proactive: 检测到 ${count} 条合同到期告警`); } catch (err) { log(`opc-proactive: 合同到期检测异常: ${err instanceof Error ? err.message : String(err)}`); } } /** 客户回访检测 */ function detectStaleCustomers(db: OpcDatabase, log: (msg: string) => void): void { try { const staleContacts = db.query( `SELECT id, company_id, name, last_contact_date, pipeline_stage FROM opc_contacts WHERE last_contact_date != '' AND last_contact_date < date('now', '-60 days') AND pipeline_stage NOT IN ('lost', 'won')`, ) as { id: string; company_id: string; name: string; last_contact_date: string; pipeline_stage: string }[]; let count = 0; for (const contact of staleContacts) { const daysSinceContact = Math.floor((Date.now() - new Date(contact.last_contact_date).getTime()) / 86400000); // 检查是否已有告警 const existingAlert = db.queryOne( `SELECT id FROM opc_alerts WHERE company_id = ? AND category = 'stale_customer' AND title LIKE ? AND status = 'active'`, contact.company_id, `%${contact.name}%`, ); if (!existingAlert) { db.execute( `INSERT INTO opc_alerts (id, company_id, title, severity, category, status, message, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))`, db.genId(), contact.company_id, `客户需要回访:${contact.name}`, daysSinceContact > 90 ? "warning" : "info", "stale_customer", "active", `已 ${daysSinceContact} 天未联系,当前阶段:${contact.pipeline_stage}`, ); count++; } } if (count > 0) log(`opc-proactive: 检测到 ${count} 个客户需要回访`); } catch (err) { log(`opc-proactive: 客户回访检测异常: ${err instanceof Error ? err.message : String(err)}`); } } /** 里程碑到期检测(新增:订单闭环功能) */ function detectMilestonesDue(db: OpcDatabase, log: (msg: string) => void): void { try { const today = new Date().toISOString().slice(0, 10); const in7Days = new Date(Date.now() + 7 * 24 * 60 * 60 * 1000).toISOString().slice(0, 10); // 查询 7 天内到期的里程碑 const dueMilestones = db.query( `SELECT m.id, m.company_id, m.contract_id, m.title, m.due_date, m.amount, c.title as contract_title, c.counterparty FROM opc_contract_milestones m JOIN opc_contracts c ON m.contract_id = c.id WHERE m.status IN ('pending', 'in_progress') AND m.due_date != '' AND m.due_date BETWEEN ? AND ?`, today, in7Days, ) as { id: string; company_id: string; contract_id: string; title: string; due_date: string; amount: number; contract_title: string; counterparty: string }[]; let count = 0; for (const milestone of dueMilestones) { const daysUntilDue = Math.floor((new Date(milestone.due_date).getTime() - Date.now()) / 86400000); // 检查是否已有告警 const existingAlert = db.queryOne( `SELECT id FROM opc_alerts WHERE company_id = ? AND category = 'milestone_due' AND title LIKE ? AND status = 'active'`, milestone.company_id, `%${milestone.title}%`, ); if (!existingAlert) { db.execute( `INSERT INTO opc_alerts (id, company_id, title, severity, category, status, message, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))`, db.genId(), milestone.company_id, `里程碑即将到期:${milestone.title}`, daysUntilDue <= 3 ? "warning" : "info", "milestone_due", "active", `合同「${milestone.contract_title}」的里程碑将在 ${daysUntilDue} 天后到期(${milestone.due_date}),应收款 ${milestone.amount.toLocaleString()} 元`, ); count++; } } if (count > 0) log(`opc-proactive: 检测到 ${count} 个即将到期的里程碑`); } catch (err) { log(`opc-proactive: 里程碑到期检测异常: ${err instanceof Error ? err.message : String(err)}`); } } /** 应收风险分层(新增:订单闭环功能) */ function classifyPaymentRisks(db: OpcDatabase, log: (msg: string) => void): void { try { const today = new Date().toISOString().slice(0, 10); // 更新所有应收的逾期天数 db.execute( `UPDATE opc_payments SET overdue_days = CAST((julianday('${today}') - julianday(due_date)) AS INTEGER) WHERE direction = 'receivable' AND status IN ('pending', 'partial', 'overdue') AND due_date != ''`, ); // 更新风险等级 db.execute( `UPDATE opc_payments SET risk_level = CASE WHEN overdue_days <= 7 THEN 'normal' WHEN overdue_days BETWEEN 8 AND 30 THEN 'warning' ELSE 'critical' END WHERE direction = 'receivable' AND status IN ('pending', 'partial', 'overdue')`, ); // 统计各风险等级 const riskStats = db.query( `SELECT risk_level, COUNT(*) as count, SUM(amount - paid_amount) as total_amount FROM opc_payments WHERE direction = 'receivable' AND status IN ('pending', 'partial', 'overdue') GROUP BY risk_level`, ) as { risk_level: string; count: number; total_amount: number }[]; let warningCount = 0; let criticalCount = 0; for (const stat of riskStats) { if (stat.risk_level === "warning") warningCount = stat.count; if (stat.risk_level === "critical") criticalCount = stat.count; } if (warningCount > 0 || criticalCount > 0) { log(`opc-proactive: 应收风险分层完成 - 预警: ${warningCount} 笔, 严重: ${criticalCount} 笔`); } // 为严重逾期的应收创建告警 const criticalPayments = db.query( `SELECT id, company_id, counterparty, amount, paid_amount, overdue_days FROM opc_payments WHERE risk_level = 'critical' AND direction = 'receivable'`, ) as { id: string; company_id: string; counterparty: string; amount: number; paid_amount: number; overdue_days: number }[]; let alertCount = 0; for (const payment of criticalPayments) { // 检查是否已有告警 const existingAlert = db.queryOne( `SELECT id FROM opc_alerts WHERE company_id = ? AND category = 'critical_receivable' AND title LIKE ? AND status = 'active'`, payment.company_id, `%${payment.counterparty}%`, ); if (!existingAlert) { const remainingAmount = payment.amount - payment.paid_amount; db.execute( `INSERT INTO opc_alerts (id, company_id, title, severity, category, status, message, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))`, db.genId(), payment.company_id, `严重逾期应收:${payment.counterparty}`, "critical", "critical_receivable", "active", `应收款 ${remainingAmount.toLocaleString()} 元已逾期 ${payment.overdue_days} 天,建议升级催收或法律途径`, ); alertCount++; } } if (alertCount > 0) log(`opc-proactive: 创建 ${alertCount} 条严重逾期应收告警`); } catch (err) { log(`opc-proactive: 应收风险分层异常: ${err instanceof Error ? err.message : String(err)}`); } } /** 清理超过 2 小时仍 in_progress 且有 session_key 的任务(subagent_ended 未触发时的兜底) */ function reapStaleTasks(db: OpcDatabase, log: (msg: string) => void): void { try { const staleTasks = db.query( `SELECT id, company_id, staff_role, title FROM opc_staff_tasks WHERE status = 'in_progress' AND session_key != '' AND started_at != '' AND started_at < datetime('now', '-2 hours')`, ) as { id: string; company_id: string; staff_role: string; title: string }[]; if (staleTasks.length === 0) return; const now = new Date().toISOString(); for (const task of staleTasks) { db.execute( `UPDATE opc_staff_tasks SET status = 'cancelled', completed_at = ?, result_summary = '[系统] 任务执行超时(超过2小时未完成),已自动取消' WHERE id = ? AND status = 'in_progress'`, now, task.id, ); } log(`opc-proactive: 已清理 ${staleTasks.length} 个超时任务(in_progress 超过2小时)`); } catch (err) { log(`opc-proactive: 清理超时任务异常: ${err instanceof Error ? err.message : String(err)}`); } } /** 执行一次完整的主动扫描 */ function runFullScan(db: OpcDatabase, log: (msg: string) => void, webhookUrl?: string): void { try { log("opc-proactive: 开始全量扫描..."); // 1. 原有提醒扫描 runReminderScan(db, log, webhookUrl); // 2. 阶段检测(先于洞察分析,因为 generateNextSteps 依赖阶段) recalculateAllStages(db, log); // 3. 智能洞察分析 runIntelligenceScan(db, log); // 4. 里程碑检测 detectMilestonesForAll(db, log); // 5. 清理过期洞察 expireStaleInsights(db, log); // 6. 保存每日简报快照 saveDailyBriefings(db, log); // 7. 创建定时员工任务 createScheduledStaffTasks(db, log); // 8. 新增检测:逾期款项、合同到期、客户回访 detectOverduePayments(db, log); detectExpiringContracts(db, log); detectStaleCustomers(db, log); // 8.1. 订单闭环检测:里程碑到期、应收风险分层 detectMilestonesDue(db, log); classifyPaymentRisks(db, log); // 9. 清理超时的 in_progress 任务 reapStaleTasks(db, log); // 10. 将新告警自动分配给对应 AI 员工 alertsToStaffTasks(db, log); // 11. 将可执行洞察转化为员工待办 insightsToStaffTasks(db, log); log("opc-proactive: 全量扫描完成"); } catch (err) { log(`opc-proactive: 扫描异常: ${err instanceof Error ? err.message : String(err)}`); } } /** * 启动主动智能后台服务,返回停止函数。 * 30 秒后首次扫描,之后每小时重复。 */ export function startProactiveService( db: OpcDatabase, log: (msg: string) => void, webhookUrl?: string, intervalMs = 3600_000, ): () => void { // 延迟 30 秒首次扫描 log("opc-proactive: 首次全量扫描将在 30 秒后启动(阶段检测 + 智能洞察 + 里程碑检测)"); const initTimer = setTimeout(() => { log("opc-proactive: 首次全量扫描启动"); runFullScan(db, log, webhookUrl); }, 30_000); // 每小时周期性扫描 const interval = setInterval(() => { runFullScan(db, log, webhookUrl); }, intervalMs); return () => { clearTimeout(initTimer); clearInterval(interval); log("opc-proactive: 主动智能服务已停止"); }; }