/** * Gateway Cron job registration for openfinclaw-strategy. * * Directly reads/writes the Gateway cron store (~/.openclaw/cron/jobs.json) * so jobs can be registered at plugin startup via the gateway_start hook * without requiring a CronService API reference. * * The running CronService watches the store file and picks up new jobs * on its next tick. * * Idempotent: creates missing jobs and migrates legacy "main" session jobs * to "isolated" so cron tasks run in their own session without blocking the * user's active conversation. * * Jobs: * - openfinclaw:daily-scan (0 8 * * *) 每日策略扫描 * - openfinclaw:price-monitor (every 30 min) 价格异动监控 * - openfinclaw:weekly-report (0 20 * * 0) 周报 * - openfinclaw:monthly-report (0 20 1 * *) 月报 */ import { randomUUID } from "node:crypto"; import fs from "node:fs"; import path from "node:path"; import type { UnifiedPluginConfig } from "../types.js"; // ── Cron store types (mirrors src/cron/types.ts subset) ────────────────── type CronPayload = | { kind: "systemEvent"; text: string } | { kind: "agentTurn"; message: string }; /** Minimal stored cron job shape. */ interface StoredCronJob { id: string; name: string; agentId?: string; enabled: boolean; schedule: { kind: "cron"; expr: string; tz?: string }; payload: CronPayload; sessionTarget: string; wakeMode: string; delivery: { mode: string; channel?: string; to?: string }; createdAtMs: number; updatedAtMs: number; state: Record; } interface CronStoreFile { version: 1; jobs: StoredCronJob[]; } /** Names managed by this plugin — used for migration detection. */ const OPENFINCLAW_JOB_NAMES = new Set([ "openfinclaw:daily-scan", "openfinclaw:price-monitor", "openfinclaw:weekly-report", "openfinclaw:monthly-report", ]); // ── File I/O ────────────────────────────────────────────────────────────── /** Resolve default cron store path: ~/.openclaw/cron/jobs.json */ function defaultStorePath(): string { const home = process.env.HOME ?? process.env.USERPROFILE ?? ""; return path.join(home, ".openclaw", "cron", "jobs.json"); } /** Load cron store. Returns empty store when file doesn't exist. */ async function loadStore(storePath: string): Promise { try { const raw = await fs.promises.readFile(storePath, "utf-8"); const parsed = JSON.parse(raw) as Record; const jobs = Array.isArray(parsed.jobs) ? (parsed.jobs as StoredCronJob[]) : []; return { version: 1, jobs }; } catch (err) { if ((err as { code?: string }).code === "ENOENT") { return { version: 1, jobs: [] }; } throw err; } } /** Atomic save: write to tmp then rename. */ async function saveStore(storePath: string, store: CronStoreFile): Promise { const dir = path.dirname(storePath); await fs.promises.mkdir(dir, { recursive: true }); const json = JSON.stringify(store, null, 2); const tmp = `${storePath}.${process.pid}.${Date.now()}.tmp`; await fs.promises.writeFile(tmp, json, "utf-8"); await fs.promises.rename(tmp, storePath); } // ── Job definitions ────────────────────────────────────────────────────── /** Build cron job definitions from plugin config. */ function buildCronJobDefs(config: UnifiedPluginConfig): Array<{ name: string; schedule: { kind: "cron"; expr: string; tz?: string }; payload: CronPayload; sessionTarget: string; delivery: { mode: string }; }> { const sessionTarget = config.cronSessionTarget; const delivery = { mode: config.cronDeliveryMode }; const useAgentTurn = sessionTarget !== "main"; function makePayload(text: string): CronPayload { return useAgentTurn ? { kind: "agentTurn", message: text } : { kind: "systemEvent", text }; } return [ { name: "openfinclaw:daily-scan", schedule: { kind: "cron", expr: config.scanCronExpr, tz: config.scanTimezone }, sessionTarget, delivery, payload: makePayload( [ "[openfinclaw-strategy 每日扫描]", "1. 调用 strategy_daily_scan 获取策略扫描报告", "2. 分析每条新闻对策略的影响(利好/利空/中性)", "3. 如果发现重大影响,调整策略参数并用 skill_publish 提交回测", "4. 用 skill_publish_verify 确认回测完成", "5. 将分析报告和建议操作发送给用户", ].join("\n"), ), }, { name: "openfinclaw:price-monitor", schedule: { kind: "cron", expr: config.monitorCronExpr, tz: config.scanTimezone }, sessionTarget, delivery, payload: makePayload( [ "[openfinclaw-strategy 价格监控]", "调用 strategy_price_monitor 检查所有策略标的的价格异动。", "如有告警(涨跌幅超阈值),分析可能的原因并通知用户。", ].join("\n"), ), }, { name: "openfinclaw:weekly-report", schedule: { kind: "cron", expr: config.weeklyReportCronExpr, tz: config.scanTimezone }, sessionTarget, delivery, payload: makePayload( [ "[openfinclaw-strategy 周报]", '调用 strategy_periodic_report(period="weekly") 生成策略绩效周报。', "汇总本周回测结果、价格告警和扫描记录,发送给用户。", ].join("\n"), ), }, { name: "openfinclaw:monthly-report", schedule: { kind: "cron", expr: config.monthlyReportCronExpr, tz: config.scanTimezone }, sessionTarget, delivery, payload: makePayload( [ "[openfinclaw-strategy 月报]", '调用 strategy_periodic_report(period="monthly") 生成策略绩效月报。', "汇总本月回测结果、价格告警和扫描记录,发送给用户。", ].join("\n"), ), }, ]; } // ── Migration ───────────────────────────────────────────────────────────── /** * Migrate legacy openfinclaw jobs that still use sessionTarget "main" + * systemEvent payload to the new isolated agentTurn pattern. * Returns the number of jobs migrated. */ function migrateLegacyJobs( store: CronStoreFile, defs: ReturnType, ): number { const defsByName = new Map(defs.map((d) => [d.name, d])); let migrated = 0; for (const job of store.jobs) { if (!OPENFINCLAW_JOB_NAMES.has(job.name)) continue; if (job.sessionTarget !== "main") continue; const def = defsByName.get(job.name); if (!def) continue; job.payload = def.payload; job.sessionTarget = def.sessionTarget; job.delivery = def.delivery; job.updatedAtMs = Date.now(); migrated++; } return migrated; } // ── Public API ──────────────────────────────────────────────────────────── /** * Register and migrate openfinclaw cron jobs by writing to the * Gateway cron store file. Safe to call at plugin startup. * * - Creates missing jobs with the current config (isolated session by default). * - Migrates existing legacy "main" session jobs to "isolated" + "agentTurn" * so cron tasks no longer block the user's active conversation. */ export async function setupOpenfinclawCronJobs( config: UnifiedPluginConfig, ): Promise<{ ok: boolean; created: number; existing: number; migrated: number }> { const storePath = defaultStorePath(); const store = await loadStore(storePath); const existingNames = new Set(store.jobs.map((j) => j.name)); const defs = buildCronJobDefs(config); const now = Date.now(); let created = 0; for (const def of defs) { if (existingNames.has(def.name)) continue; store.jobs.push({ id: randomUUID(), name: def.name, agentId: config.cronAgentId, enabled: true, schedule: def.schedule, payload: def.payload, sessionTarget: def.sessionTarget, wakeMode: "now", delivery: def.delivery, createdAtMs: now, updatedAtMs: now, state: {}, }); created++; } const migrated = migrateLegacyJobs(store, defs); if (created > 0 || migrated > 0) { await saveStore(storePath, store); } return { ok: true, created, existing: existingNames.size, migrated }; }