/** * Scheduler tools registration. * Tools: strategy_daily_scan, strategy_price_monitor, strategy_scan_history, strategy_periodic_report */ import { randomUUID } from "node:crypto"; import type { DatabaseSync } from "node:sqlite"; import { Type } from "@sinclair/typebox"; import type { OpenClawPluginApi } from "openclaw/plugin-sdk/core"; import { DataHubClient, guessMarket } from "../datahub/client.js"; import { countPriceAlertsSince, countScanHistoryByTypeSince, insertPriceAlert, insertScanHistory, queryBacktestResults, queryScanHistory, queryStrategies, updateScanHistory, } from "../db/repositories.js"; import { withLogging } from "../middleware/with-logging.js"; import type { UnifiedPluginConfig } from "../types.js"; import type { AggregatedNewsProvider } from "./news-provider.js"; import { formatPeriodicReportMarkdown } from "./periodic-report-builder.js"; import { buildScanReport, formatScanReportMarkdown } from "./scan-report-builder.js"; const NO_API_KEY = "API key not configured. Set apiKey in plugin config or OPENFINCLAW_API_KEY env var."; /** JSON tool result helper. */ function json(payload: unknown) { return { content: [{ type: "text" as const, text: JSON.stringify(payload, null, 2) }], details: payload, }; } /** * Register scheduler-related agent tools. */ export function registerSchedulerTools( api: OpenClawPluginApi, config: UnifiedPluginConfig, getDb: () => DatabaseSync, newsProvider: AggregatedNewsProvider, ): void { // ── strategy_daily_scan ── api.registerTool( { name: "strategy_daily_scan", label: "Daily strategy scan", description: "Scan all local strategies, fetch latest news and price data for their symbols, " + "and generate an analysis report. Can be triggered by cron or manually. " + "Returns a structured report for analysis and decision-making.", parameters: Type.Object({ strategyId: Type.Optional( Type.String({ description: "Scan a specific strategy (default: all)" }), ), includePrice: Type.Optional( Type.Boolean({ description: "Include price data (default: true)" }), ), }), execute: withLogging( getDb, "strategy_daily_scan", "scheduler", async (_toolCallId, params) => { const scanId = randomUUID(); const now = new Date().toISOString(); // Record scan start insertScanHistory(getDb(), { id: scanId, scan_type: "daily_scan", started_at: now, status: "running", strategies_scanned: 0, news_found: 0, actions_taken: 0, }); try { const db = getDb(); // 1. Get strategies let strategies = queryStrategies(db); if (params.strategyId) { strategies = strategies.filter((s) => s.id === params.strategyId); } // Filter to strategies with symbols strategies = strategies.filter((s) => { try { const syms = JSON.parse(s.symbols ?? "[]"); return Array.isArray(syms) && syms.length > 0; } catch { return false; } }); if (strategies.length === 0) { updateScanHistory(getDb(), scanId, { status: "completed", completed_at: new Date().toISOString(), summary: "No strategies with symbols found.", }); return { content: [ { type: "text" as const, text: "暂无包含标的的策略。请先通过 skill_fork 下载策略或手动创建策略并配置 symbols。", }, ], details: { success: true, strategiesScanned: 0 }, }; } // 2. Collect all unique symbols const allSymbols = new Set(); for (const s of strategies) { try { const syms = JSON.parse(s.symbols ?? "[]") as string[]; syms.forEach((sym) => allSymbols.add(sym)); } catch { /* skip */ } } // 3. Fetch news for all symbols const newsResults = await newsProvider.fetchNewsForSymbols([...allSymbols]); // 4. Optionally fetch price data const includePrice = params.includePrice !== false; if (includePrice && config.apiKey) { const client = new DataHubClient( config.datahubGatewayUrl, config.apiKey, config.requestTimeoutMs, ); for (const nr of newsResults) { try { const ticker = await client.getTicker(nr.symbol, nr.market); nr.currentPrice = ticker.last; // Calculate 24h change from OHLCV const ohlcv = await client.getOHLCV({ symbol: nr.symbol, market: nr.market, limit: 2, }); if (ohlcv.length >= 2) { const prev = ohlcv[ohlcv.length - 2]!.close; const curr = ohlcv[ohlcv.length - 1]!.close; if (prev > 0) nr.priceChange24h = ((curr - prev) / prev) * 100; } } catch { /* DataHub unavailable — degrade gracefully */ } } } // 5. Build backtest history map const backtestMap = new Map>(); for (const s of strategies) { backtestMap.set(s.id, queryBacktestResults(db, s.id)); } // 6. Build report const report = buildScanReport(scanId, strategies, newsResults, backtestMap); const markdown = formatScanReportMarkdown(report); // 7. Update scan history updateScanHistory(getDb(), scanId, { status: "completed", completed_at: new Date().toISOString(), strategies_scanned: report.strategiesScanned, news_found: report.totalNewsFound, summary: `Scanned ${report.strategiesScanned} strategies, found ${report.totalNewsFound} news items.`, detail_json: JSON.stringify(report), }); return { content: [{ type: "text" as const, text: markdown }], details: { success: true, ...report }, }; } catch (err) { updateScanHistory(getDb(), scanId, { status: "failed", completed_at: new Date().toISOString(), summary: `Scan failed: ${err instanceof Error ? err.message : String(err)}`, }); return json({ success: false, error: err instanceof Error ? err.message : String(err), }); } }, ), }, { names: ["strategy_daily_scan"] }, ); // ── strategy_price_monitor ── api.registerTool( { name: "strategy_price_monitor", label: "Strategy price monitor", description: "Check price moves for all symbols referenced by local strategies against a threshold. " + "Persists alerts to SQLite. Intended for cron or manual runs.", parameters: Type.Object({ threshold: Type.Optional( Type.Number({ description: "Alert threshold in percent (default: plugin config)" }), ), strategyId: Type.Optional( Type.String({ description: "Monitor symbols for one strategy only (default: all)" }), ), }), execute: withLogging( getDb, "strategy_price_monitor", "scheduler", async (_toolCallId, params) => { const scanId = randomUUID(); const now = new Date().toISOString(); insertScanHistory(getDb(), { id: scanId, scan_type: "price_monitor", started_at: now, status: "running", strategies_scanned: 0, news_found: 0, actions_taken: 0, }); try { const db = getDb(); let strategies = queryStrategies(db); if (params.strategyId) { strategies = strategies.filter((s) => s.id === params.strategyId); } strategies = strategies.filter((s) => { try { const syms = JSON.parse(s.symbols ?? "[]"); return Array.isArray(syms) && syms.length > 0; } catch { return false; } }); if (!config.apiKey) { updateScanHistory(getDb(), scanId, { status: "failed", completed_at: new Date().toISOString(), summary: "API key not configured.", }); return json({ success: false, error: "API key not configured; price monitoring requires DataHub access.", }); } if (strategies.length === 0) { updateScanHistory(getDb(), scanId, { status: "completed", completed_at: new Date().toISOString(), strategies_scanned: 0, summary: "No strategies with symbols.", }); return { content: [ { type: "text" as const, text: "暂无包含标的的策略。请先配置策略 symbols 后再运行价格监控。", }, ], details: { success: true, strategiesScanned: 0, alertCount: 0 }, }; } const threshold = params.threshold != null && Number.isFinite(Number(params.threshold)) ? Number(params.threshold) : config.priceAlertThreshold; type StratRef = { id: string; name: string }; const symbolToStrategies = new Map(); for (const s of strategies) { try { const syms = JSON.parse(s.symbols ?? "[]") as string[]; for (const sym of syms) { if (typeof sym !== "string" || !sym) continue; const list = symbolToStrategies.get(sym) ?? []; list.push({ id: s.id, name: s.name }); symbolToStrategies.set(sym, list); } } catch { /* skip */ } } const client = new DataHubClient( config.datahubGatewayUrl, config.apiKey, config.requestTimeoutMs, ); const alertLines: string[] = []; const okLines: string[] = []; let alertCount = 0; for (const [symbol, strats] of symbolToStrategies) { const market = guessMarket(symbol); let pct: number | null = null; let lastPrice = 0; let prevPrice = 0; try { const ohlcv = await client.getOHLCV({ symbol, market, limit: 2 }); if (ohlcv.length >= 2) { prevPrice = ohlcv[ohlcv.length - 2]!.close; lastPrice = ohlcv[ohlcv.length - 1]!.close; if (prevPrice > 0) pct = ((lastPrice - prevPrice) / prevPrice) * 100; } } catch { pct = null; } if (pct === null) { okLines.push(`- ${symbol}: 无法获取 K 线(DataHub 或标的异常)`); continue; } if (Math.abs(pct) >= threshold) { for (const st of strats) { const alertId = randomUUID(); const pctStr = `${pct >= 0 ? "+" : ""}${pct.toFixed(2)}%`; insertPriceAlert(getDb(), { id: alertId, strategy_id: st.id, symbol, alert_type: "threshold_breached", trigger_value: pct, threshold, message: `涨跌幅 ${pctStr} 超过阈值 ${threshold}%(策略: ${st.name})`, created_at: new Date().toISOString(), acknowledged: 0, }); alertCount += 1; alertLines.push( `- **${symbol}**: ${pctStr} (阈值 ${threshold}%) — 策略: ${st.name}\n - 当前收盘: ${lastPrice.toFixed(6)},上一根收盘: ${prevPrice.toFixed(6)}`, ); } } else { okLines.push(`- ${symbol}: ${pct >= 0 ? "+" : ""}${pct.toFixed(2)}%`); } } const title = `## 价格监控报告 — ${now.slice(0, 16).replace("T", " ")}`; const body: string[] = [title, ""]; if (alertLines.length > 0) { body.push("### 告警"); body.push(...alertLines); body.push(""); } else { body.push("### 告警"); body.push("(本窗口无触及阈值的标的)"); body.push(""); } body.push("### 正常 / 其他"); body.push(...okLines); const markdown = body.join("\n"); updateScanHistory(getDb(), scanId, { status: "completed", completed_at: new Date().toISOString(), strategies_scanned: strategies.length, actions_taken: alertCount, summary: `Price monitor: ${alertCount} alert(s), ${symbolToStrategies.size} symbol(s).`, }); return { content: [{ type: "text" as const, text: markdown }], details: { success: true, alertCount, symbolsChecked: symbolToStrategies.size, strategiesScanned: strategies.length, threshold, }, }; } catch (err) { updateScanHistory(getDb(), scanId, { status: "failed", completed_at: new Date().toISOString(), summary: `Price monitor failed: ${err instanceof Error ? err.message : String(err)}`, }); return json({ success: false, error: err instanceof Error ? err.message : String(err), }); } }, ), }, { names: ["strategy_price_monitor"] }, ); // ── strategy_scan_history ── api.registerTool( { name: "strategy_scan_history", label: "Scan history", description: "Query past strategy scan and report history.", parameters: Type.Object({ scanType: Type.Optional( Type.String({ description: "Filter by type: daily_scan, price_monitor, weekly_report, monthly_report", }), ), limit: Type.Optional(Type.Number({ description: "Max results (default 10)" })), }), execute: withLogging( getDb, "strategy_scan_history", "scheduler", async (_toolCallId, params) => { if (!config.apiKey) return json({ success: false, error: NO_API_KEY }); try { const db = getDb(); const entries = queryScanHistory(db, { scanType: params.scanType ? String(params.scanType) : undefined, limit: Number(params.limit) || 10, }); if (entries.length === 0) { return { content: [ { type: "text" as const, text: "暂无扫描记录。调用 strategy_daily_scan 执行首次扫描。", }, ], details: { success: true, entries: [] }, }; } const lines: string[] = []; lines.push(`## 扫描历史 (最近 ${entries.length} 条)`); lines.push(""); for (const e of entries) { const date = e.started_at.slice(0, 16).replace("T", " "); const status = e.status === "completed" ? "OK" : e.status === "failed" ? "FAIL" : "..."; lines.push( `- [${status}] ${date} | ${e.scan_type} | 策略: ${e.strategies_scanned} | 新闻: ${e.news_found}`, ); if (e.summary) lines.push(` ${e.summary}`); } return { content: [{ type: "text" as const, text: lines.join("\n") }], details: { success: true, entries }, }; } catch (err) { return json({ success: false, error: err instanceof Error ? err.message : String(err), }); } }, ), }, { names: ["strategy_scan_history"] }, ); // ── strategy_periodic_report ── api.registerTool( { name: "strategy_periodic_report", label: "Strategy periodic report", description: "Build a weekly (7-day rolling) or monthly (30-day rolling) Markdown report: " + "backtest ranking snapshot, scan_history counts by type, and price alert count.", parameters: Type.Object({ period: Type.String({ enum: ["weekly", "monthly"], description: "weekly = past 7×24h from now; monthly = past 30×24h from now (rolling)", }), }), execute: withLogging( getDb, "strategy_periodic_report", "scheduler", async (_toolCallId, params) => { if (!config.apiKey) return json({ success: false, error: NO_API_KEY }); const period = params.period === "monthly" ? "monthly" : "weekly"; const scanId = randomUUID(); const now = new Date(); const start = new Date(now.getTime()); const days = period === "weekly" ? 7 : 30; start.setTime(now.getTime() - days * 24 * 60 * 60 * 1000); const startIso = start.toISOString(); const endIso = now.toISOString(); const scanType = period === "weekly" ? "weekly_report" : "monthly_report"; try { const db = getDb(); // Stats before persisting this run's scan_history row so the report does not count itself. const scanCounts = countScanHistoryByTypeSince(db, startIso); const priceAlertCount = countPriceAlertsSince(db, startIso); const strategies = queryStrategies(db); const rankRows = strategies.map((s) => { const bts = queryBacktestResults(db, s.id); const latest = bts[0]; return { strategyId: s.id, strategyName: s.name, totalReturn: latest?.total_return ?? null, sharpe: latest?.sharpe ?? null, maxDrawdown: latest?.max_drawdown ?? null, }; }); const markdown = formatPeriodicReportMarkdown({ period, windowStartIso: startIso, windowEndIso: endIso, scanCountsByType: scanCounts, priceAlertCount, rankRows, }); const completedAt = new Date().toISOString(); insertScanHistory(getDb(), { id: scanId, scan_type: scanType, started_at: endIso, completed_at: completedAt, status: "completed", strategies_scanned: strategies.length, news_found: 0, actions_taken: 0, summary: `${period} report: ${strategies.length} strategies, ${priceAlertCount} alerts in window.`, }); return { content: [{ type: "text" as const, text: markdown }], details: { success: true, period, windowStartIso: startIso, windowEndIso: endIso, strategyCount: strategies.length, priceAlertCount, }, }; } catch (err) { insertScanHistory(getDb(), { id: scanId, scan_type: scanType, started_at: endIso, completed_at: new Date().toISOString(), status: "failed", strategies_scanned: 0, news_found: 0, actions_taken: 0, summary: `Periodic report failed: ${err instanceof Error ? err.message : String(err)}`, }); return json({ success: false, error: err instanceof Error ? err.message : String(err), }); } }, ), }, { names: ["strategy_periodic_report"] }, ); }