/** * Tournament orchestrator — code-driven daily tournament flow. * Deterministic orchestration: cron → ticker → spawn subagents → collect → format → send. * LLM only handles analysis inside each subagent. * @module openfinclaw/tournament/orchestrator */ import type { TournamentDb, TournamentStrategy } from "./db.js"; import { type TournamentRole, buildAgentTask } from "./prompts.js"; // ── Types ───────────────────────────────────────────────────────────────── export interface TournamentConfig { /** Maximum concurrent tournament agents (default: 3) */ maxAgents: number; /** Subagent timeout in seconds (default: 600 = 10 min) */ subagentTimeoutSeconds: number; /** Consecutive skips before alerting (default: 3) */ alertAfterSkips: number; /** Session key for sending messages */ sessionKey: string; } export interface SpawnResult { status: "accepted" | "forbidden" | "error"; childSessionKey?: string; runId?: string; error?: string; } export interface OrchestratorDeps { db: TournamentDb; /** Get today's top mover ticker symbol */ selectTicker: () => Promise; /** Spawn a subagent session */ spawnSubagent: (params: { task: string; label: string; runTimeoutSeconds: number; }) => Promise; /** Push a message to the user's channel */ enqueueSystemEvent: (text: string, options: { sessionKey: string; contextKey?: string }) => void; /** Wake the agent to deliver the message */ requestHeartbeatNow: (options?: { reason?: string; sessionKey?: string }) => void; /** Wait for subagent results (poll DB for tournament_result submissions) */ waitForResults: ( roundId: string, expectedCount: number, timeoutMs: number, ) => Promise; logger: { info: (msg: string) => void; warn: (msg: string) => void; error: (msg: string) => void; }; config: TournamentConfig; } // ── Orchestrator ────────────────────────────────────────────────────────── export class TournamentOrchestrator { constructor(private deps: OrchestratorDeps) {} /** * Run the daily tournament. Called by cron hook. * Pure code flow — no LLM involvement in orchestration. */ async runDailyTournament(): Promise { const { db, config, logger } = this.deps; // 1. Idempotent check const today = new Date().toISOString().slice(0, 10); const roundId = `round-${today.replace(/-/g, "")}`; const existing = db.getRound(roundId); if (existing && existing.status !== "pending") { logger.info(`[tournament] Round ${roundId} already ${existing.status}, skipping`); return; } // 2. Select today's ticker let ticker: string; try { ticker = await this.deps.selectTicker(); } catch (err) { logger.error(`[tournament] Failed to select ticker: ${err}`); db.createRound({ id: roundId, date: today, ticker: "UNKNOWN" }); db.updateRoundStatus(roundId, "skipped", `Ticker selection failed: ${err}`); this.checkConsecutiveFailures(); return; } // 3. Create round record db.createRound({ id: roundId, date: today, ticker }); logger.info(`[tournament] Starting round ${roundId} for ${ticker}`); // 4. Spawn 3 subagents in parallel const roles: TournamentRole[] = ["bull", "bear", "contrarian"]; const spawnResults = await Promise.allSettled( roles.map((role) => this.deps.spawnSubagent({ task: buildAgentTask(role, ticker, roundId), label: `tournament-${role}`, runTimeoutSeconds: config.subagentTimeoutSeconds, }), ), ); // 5. Check spawn results const spawned: Array<{ role: TournamentRole; result: SpawnResult }> = []; for (let i = 0; i < roles.length; i++) { const r = spawnResults[i]; if (r.status === "fulfilled" && r.value.status === "accepted") { spawned.push({ role: roles[i], result: r.value }); logger.info(`[tournament] ${roles[i]} agent spawned: ${r.value.runId ?? "ok"}`); } else { const reason = r.status === "rejected" ? String(r.reason) : ((r.value as SpawnResult).error ?? r.value.status); logger.warn(`[tournament] ${roles[i]} agent spawn failed: ${reason}`); } } if (spawned.length < 2) { db.updateRoundStatus(roundId, "skipped", `Only ${spawned.length}/3 agents spawned`); logger.warn(`[tournament] Round ${roundId} skipped: insufficient agents`); this.checkConsecutiveFailures(); return; } // 6. Wait for subagent results (they call tournament_result tool) const timeoutMs = config.subagentTimeoutSeconds * 1000 + 30_000; // extra 30s buffer let strategies: TournamentStrategy[]; try { strategies = await this.deps.waitForResults(roundId, spawned.length, timeoutMs); } catch (err) { logger.error(`[tournament] Result collection failed: ${err}`); db.updateRoundStatus(roundId, "skipped", `Result collection failed: ${err}`); this.checkConsecutiveFailures(); return; } if (strategies.length < 2) { db.updateRoundStatus(roundId, "skipped", `Only ${strategies.length} strategies submitted`); logger.warn(`[tournament] Round ${roundId} skipped: insufficient strategies`); this.checkConsecutiveFailures(); return; } // 7. Format and send results const message = this.formatMessage(ticker, roundId, strategies); this.deps.enqueueSystemEvent(message, { sessionKey: config.sessionKey, contextKey: `tournament:${roundId}:result`, }); this.deps.requestHeartbeatNow({ reason: "tournament-result", sessionKey: config.sessionKey }); db.updateRoundStatus(roundId, "completed"); logger.info(`[tournament] Round ${roundId} completed with ${strategies.length} strategies`); } /** Check for consecutive failures and send alert if threshold reached. */ private checkConsecutiveFailures(): void { const { db, config, logger } = this.deps; const consecutiveSkips = db.countConsecutiveSkips(); if (consecutiveSkips >= config.alertAfterSkips) { const alertMsg = `⚠️ Strategy Tournament 已连续 ${consecutiveSkips} 天跳过。\n` + "请检查 DeepAgent API 配置和网络连接。"; this.deps.enqueueSystemEvent(alertMsg, { sessionKey: config.sessionKey, contextKey: `tournament:alert:consecutive-skip-${consecutiveSkips}`, }); this.deps.requestHeartbeatNow({ reason: "tournament-alert", sessionKey: config.sessionKey }); logger.warn(`[tournament] Alert sent: ${consecutiveSkips} consecutive skips`); } } /** Format tournament results as a human-readable message. */ private formatMessage(ticker: string, roundId: string, strategies: TournamentStrategy[]): string { const header = `🏆 策略锦标赛 — ${ticker} (${roundId})\n${"═".repeat(50)}`; const roleEmoji: Record = { bull: "🐂", bear: "🐻", contrarian: "🔄", }; const sections = strategies.map((s) => { const emoji = roleEmoji[s.agent_name] ?? "📊"; const lines = [`${emoji} ${s.agent_name.toUpperCase()} (信心: ${s.confidence}%)`]; lines.push(s.thesis); if (s.entry_price != null) lines.push(`入场: ${s.entry_price} | 出场: ${s.exit_price} | 止损: ${s.stop_loss}`); if (s.sharpe != null) lines.push( `Sharpe: ${s.sharpe.toFixed(2)} | 回撤: ${((s.max_drawdown ?? 0) * 100).toFixed(1)}% | 收益: ${((s.total_return ?? 0) * 100).toFixed(1)}%`, ); return lines.join("\n"); }); const footer = [ "─".repeat(50), "选择你认为最佳的策略:", " /pick bull | /pick bear | /pick contrarian", "查看排行榜: /tournament leaderboard", ].join("\n"); return [header, ...sections, footer].join("\n\n"); } } /** * Default waitForResults implementation: poll DB every 15 seconds. * Subagents call tournament_result tool which writes to DB. */ export function createDbPoller(db: TournamentDb): OrchestratorDeps["waitForResults"] { return async ( roundId: string, expectedCount: number, timeoutMs: number, ): Promise => { const pollInterval = 15_000; const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { const strategies = db.getStrategies(roundId); if (strategies.length >= expectedCount) return strategies; if (strategies.length >= 2) { // If we have at least 2 and are past 75% of timeout, accept what we have if (Date.now() > deadline - timeoutMs * 0.25) return strategies; } await new Promise((resolve) => setTimeout(resolve, pollInterval)); } // Return whatever we have at timeout return db.getStrategies(roundId); }; }