/** * AccountRuntime — All per-account state and lifecycle * * Owns the lifecycle of a single active account: * 1. StateStore (SQLite) * 2. CredentialManager (auth token lifecycle) * 3. TIMClient (SDK connection) * 4. Message pipeline: channelRuntime dispatch (via process-message.ts) * * Supports "half-alive" mode: when agent is not yet claimed, * start() succeeds without TIM connection. Tools trigger * lazy connection via ensureConnected(). * * v3.2.0: Replaced shell-out executor pipeline with channelRuntime dispatch. * See docs/audit/004-messaging-pipeline-v3.1.4.md for migration context. */ import { logger } from '../util/logger.js'; import { CredentialManager, CredentialError } from '../auth/credential-manager.js'; import { StateStore } from '../state/store.js'; import { TIMClient } from '../tim/client.js'; import * as channels from '../tim/channels.js'; import * as messages from '../tim/messages.js'; import { initC2CHandler } from '../tim/c2c.js'; import { resolveClawlinkChannelRuntime } from './plugin-runtime.js'; import { processOneMessage } from '../messaging/process-message.js'; import { processC2CRequest, type C2CRequestEvent } from '../messaging/process-c2c-request.js'; import type { ClawlinkAccountConfig } from '../auth/config.js'; import type { RawPushMessage } from '../tim/client.js'; import type { ChannelInfo, MemberInfo } from '../tim/channels.js'; import type { HistoryMessage, SendResult } from '../tim/messages.js'; export class AccountRuntime { readonly accountId: string; // Sub-modules (accessible by tools via registry) store: StateStore | null = null; client: TIMClient | null = null; private credentialManager: CredentialManager | null = null; private _running = false; private _config: ClawlinkAccountConfig | null = null; private _cfg: Record | null = null; private _connectingPromise: Promise | null = null; constructor(accountId: string) { this.accountId = accountId; } get isRunning(): boolean { return this._running; } get config(): ClawlinkAccountConfig | null { return this._config; } /** * Account startup with half-alive support. * * If agent is claimed: full startup (StateStore + TIM + pipeline). * If agent is unclaimed: half-alive (StateStore only, _running = true). * Tools will trigger lazy connection via ensureConnected(). * * IMPORTANT: This method NEVER throws for unclaimed/credential errors. * It must return normally so channel.ts reaches the blocking await Promise, * preventing OpenClaw auto-restart. * * @param cfg - Full OpenClaw config object, needed for channelRuntime APIs */ async start( account: ClawlinkAccountConfig, _pipelineConfig?: { batchWindowMs?: number; maxBatchSize?: number }, cfg?: Record, ): Promise { this._config = account; this._cfg = cfg || {}; try { // ── Phase 1: StateStore (always) ── logger.info(`[runtime] Starting account '${this.accountId}': init StateStore`); this.store = new StateStore(); await this.store.init(this.accountId); // ── Phase 2: CredentialManager ── this.credentialManager = new CredentialManager(account.agentId, account.apiKey); logger.info(`[runtime] CredentialManager created for agent ${account.agentId}`); // ── Phase 3: Try TIM connect (may fail for unclaimed) ── try { await this._connectTIM(); logger.info(`[runtime] Account '${this.accountId}' started: fully connected`); } catch (err) { if (err instanceof CredentialError) { // Unclaimed or cooldown: enter half-alive state logger.warn(`[runtime] Account '${this.accountId}' started in half-alive mode (${err.reason}: ${err.message})`); } else { // Unexpected error during TIM connect: still go half-alive, don't crash logger.error(`[runtime] Account '${this.accountId}' TIM connect failed, entering half-alive: ${(err as Error).message}`); } } this._running = true; } catch (err) { // Only StateStore init failure reaches here logger.error(`[runtime] Account '${this.accountId}' start failed: ${(err as Error).message}`); await this._cleanup(); throw err; } } /** * Connect TIM and wire the channelRuntime dispatch pipeline. * Called by start() and ensureConnected(). */ private async _connectTIM(): Promise { if (!this.credentialManager) throw new Error('CredentialManager not initialized'); if (!this._config) throw new Error('Account config not set'); const credentials = await this.credentialManager.getCredentials(); logger.info(`[runtime] Connecting TIM SDK for ${this.accountId}`); this.client = new TIMClient(); await this.client.connect(credentials); // Initialize C2C message handler for remote agent communication initC2CHandler(this.client); // Listen for UserSig expiry this.client.on('credential_expired', () => { logger.warn(`[runtime] Received credential_expired for ${this.accountId}, invalidating + reconnecting`); this.credentialManager!.invalidate(); // Attempt to reconnect with fresh credentials void this._connectTIM().catch((err) => { logger.error(`[runtime] Reconnect after UserSig expiry failed: ${(err as Error).message}`); }); }); // ── channelRuntime dispatch pipeline ── // Replaces: Collector → Aggregator → Scheduler → Executor (shell-out) // With: processOneMessage → channelRuntime.reply.dispatchReplyFromConfig try { const channelRuntime = await resolveClawlinkChannelRuntime({}); this.client.on('message', (rawMsg: RawPushMessage) => { void processOneMessage(rawMsg, { accountId: this.accountId, selfUserId: this.client!.agentId, config: this._cfg || {}, channelRuntime, sendMessage: async (chId: string, text: string, atUserList?: string[]) => { await messages.sendMessage(this.client!, chId, text, atUserList); }, getChannelSkill: async (chId: string) => { return channels.getChannelSkill(this.client!, chId); }, getGroupName: async (chId: string) => { return channels.getGroupName(this.client!, chId); }, getGroupCustomField: async (chId: string, key: string) => { return channels.getGroupCustomField(this.client!, chId, key); }, getGroupMembers: async (chId: string) => { const members = await channels.getMembers(this.client!, chId); return members.map(m => ({ id: m.id, nick: m.nick })); }, log: logger, }).catch((err: unknown) => { logger.error(`[runtime] processOneMessage failed: ${(err as Error).message}`); }); }); // ── C2C remote agent request handler (T3) ── // When another agent sends a task request via C2C, dispatch it through // the same channelRuntime pipeline. Mirrors the 'message' listener above. // @see docs/products/orchestrator/specs/T3-remote-handler.md this.client.on('c2c_request', (event: C2CRequestEvent) => { void processC2CRequest(event, { accountId: this.accountId, selfUserId: this.client!.agentId, config: this._cfg || {}, channelRuntime, }).catch((err: unknown) => { logger.error(`[runtime] processC2CRequest failed: ${(err as Error).message}`); }); }); logger.info(`[runtime] TIM connected + channelRuntime pipeline wired for ${this.accountId}`); } catch (err) { // channelRuntime resolution failed — fall through to half-alive // TIM is connected but AI dispatch won't work logger.error(`[runtime] channelRuntime resolution failed: ${(err as Error).message}`); logger.warn(`[runtime] TIM connected but AI dispatch unavailable for ${this.accountId}`); } } /** * Ensure TIM is connected before executing a tool operation. * If not connected, attempts lazy connection. * * @throws CredentialError — propagated to tool, which returns a user-friendly message */ async ensureConnected(): Promise { if (this.client?.isReady) return; // Prevent concurrent connect attempts if (this._connectingPromise) { logger.debug(`[runtime] ensureConnected: waiting for in-progress connection`); return this._connectingPromise; } logger.info(`[runtime] ensureConnected: TIM not ready, initiating lazy connect for ${this.accountId}`); this._connectingPromise = this._connectTIM(); try { await this._connectingPromise; } finally { this._connectingPromise = null; } } /** * Graceful shutdown of all modules. */ async stop(): Promise { logger.info(`[runtime] Stopping account '${this.accountId}'`); await this._cleanup(); logger.info(`[runtime] Account '${this.accountId}' stopped`); } private async _cleanup(): Promise { this._running = false; if (this.client) { await this.client.disconnect(); this.client = null; } if (this.store) { this.store.close(); this.store = null; } } // ── Strategy + status queries (used by ops-tools) ── /** * @deprecated Pipeline batching is now handled by channelRuntime. * Retained for backward compatibility with clawlink_set_strategy tool. */ updateStrategy(_config: { batchWindowMs?: number; maxBatchSize?: number }): { batchWindowMs: number; maxBatchSize: number } { logger.debug('[runtime] updateStrategy called (no-op: batching handled by channelRuntime)'); return { batchWindowMs: _config.batchWindowMs || 20_000, maxBatchSize: _config.maxBatchSize || 20, }; } /** * @deprecated Scheduler queue no longer exists in channelRuntime mode. */ get schedulerQueueSize(): number { return 0; } // ── Delegated SDK operations (used by tools) ── // All operations go through ensureConnected() for lazy connection. async listChannels(): Promise { await this.ensureConnected(); return channels.listChannels(this.client!); } async searchChannels(keyword: string): Promise { await this.ensureConnected(); return channels.searchChannels(this.client!, keyword); } async joinChannel(channelId: string): Promise<{ success: boolean; message: string }> { await this.ensureConnected(); return channels.joinChannel(this.client!, channelId); } async leaveChannel(channelId: string): Promise<{ success: boolean; message: string }> { await this.ensureConnected(); return channels.leaveChannel(this.client!, channelId); } async getMembers(channelId: string): Promise { await this.ensureConnected(); return channels.getMembers(this.client!, channelId); } async createChannel(name: string, description?: string, skill?: string): Promise<{ success: boolean; channelId: string }> { await this.ensureConnected(); return channels.createChannel(this.client!, name, description, skill); } async getChannelSkill(channelId: string): Promise { await this.ensureConnected(); return channels.getChannelSkill(this.client!, channelId); } async setChannelSkill(channelId: string, skill: string): Promise<{ success: boolean }> { await this.ensureConnected(); return channels.setChannelSkill(this.client!, channelId, skill); } async sendMessage(channelId: string, text: string): Promise { await this.ensureConnected(); return messages.sendMessage(this.client!, channelId, text); } async fetchMessages(channelId: string, count?: number): Promise { await this.ensureConnected(); return messages.fetchMessages(this.client!, channelId, count); } }