/** * TIM C2C — Agent-to-Agent direct messaging * * Provides C2C (peer-to-peer) communication for remote agent orchestration. * Used by clawlink_call_remote_agent tool (T2) to send requests and await replies. * * Architecture: * - sendC2CCustomMessage: sends a TIM CustomMessage via CONV_C2C * - waitForC2CReply: registers a pending request, resolves when reply arrives * - initC2CHandler: listens for client 'c2c_message' events, dispatches to pending requests * * @see docs/products/orchestrator/specs/T1-c2c-infra.md * @see docs/products/orchestrator/clawlink-teams-architecture.md §3.3.3 */ import { logger } from '../util/logger.js'; import { registry } from '../runtime/registry.js'; import type { TIMClient } from './client.js'; // ── Types ── /** C2C message payload types for the ClawLink orchestration protocol */ export type C2CMessagePayload = | { type: 'clawlink_task_request'; request_id: string; task: string; session_mode: string; } | { type: 'clawlink_task_progress'; request_id: string; content: string; } | { type: 'clawlink_task_result'; request_id: string; status: 'complete' | 'error'; content: string; /** T4: number of TIMFileElem messages sent before this result. 0 or absent = no files. */ file_count?: number; }; /** Result returned by waitForC2CReply when the remote agent completes */ export interface C2CReplyResult { status: 'complete' | 'error'; content: string; /** T4: local file paths after saveMediaBuffer (populated from pending TIMFileElem messages) */ mediaUrls?: string[]; } /** Options for waitForC2CReply */ export interface WaitForC2CReplyOptions { timeoutMs: number; signal?: AbortSignal; onProgress?: (content: string) => void; } /** Raw C2C message emitted by client.ts */ interface RawC2CMessage { from: string; payload: unknown; type: string; time: number; } // ── Pending Request Registry ── interface PendingRequest { onProgress?: (content: string) => void; resolve: (result: C2CReplyResult) => void; reject: (error: Error) => void; timeoutHandle: ReturnType; } /** * All pending requests waiting for C2C replies. * Key: requestId (UUID), Value: callbacks + timeout handle. * * Memory safety: every entry is guaranteed to be deleted via one of three paths: * 1. Normal completion (task_result received) → clearTimeout + delete + resolve * 2. Timeout (timeoutMs expired) → delete + reject * 3. Abort (signal.abort) → clearTimeout + delete + reject */ const pendingRequests = new Map(); // ── T4: Pending File Registry ── /** * Pending TIMFileElem messages received from remote agents. * Key: senderUserId, Value: array of file metadata. * * Files may arrive before OR after the clawlink_task_result message. * When result arrives with file_count > 0, we drain this queue. * If files haven't arrived yet, a fileWaiter is registered to resolve later. * @see docs/audit/023-tim-upload-plugin-v1.md §3.2 (方案 α) */ interface PendingFileInfo { fileUrl: string; fileName: string; fileSize: number; } const pendingFiles = new Map(); /** * Waiters for files that haven't arrived yet when task_result comes first. * Key: senderUserId. When TIMFileElem arrives and file count is met, the * waiter's callback is invoked to drain files and resolve the pending request. * * Each waiter has a timeout (FILE_WAIT_TIMEOUT_MS) to prevent indefinite blocking. */ interface FileWaiter { expectedCount: number; onFilesReady: (files: PendingFileInfo[]) => void; timeoutHandle: ReturnType; } const fileWaiters = new Map(); /** Max time to wait for late-arriving TIMFileElem messages (10 min). */ const FILE_WAIT_TIMEOUT_MS = 10 * 60_000; // ── sendC2CCustomMessage ── /** * Send a C2C custom message to a remote agent. * * Uses TIM SDK createCustomMessage + CONV_C2C. * Protocol payload is JSON-serialized into the custom message `data` field. * * Timeout pattern matches messages.ts sendMessage (10s SDK-call guard). * @see src/tim/messages.ts L61-87 * * @param userId - Target agent's TIM userId * @param payload - Structured message payload (request/progress/result) */ export async function sendC2CCustomMessage( userId: string, payload: C2CMessagePayload, ): Promise { const rt = registry.getDefault(); if (!rt || !rt.isRunning || !rt.client) { throw new Error('ClawLink not connected'); } const chat = rt.client._chat; const types = rt.client._types; if (!chat || !rt.client.isReady) { throw new Error('TIM SDK not initialized'); } const message = chat.createCustomMessage({ to: userId, conversationType: types.CONV_C2C, payload: { data: JSON.stringify(payload), description: payload.type, extension: '', }, }); logger.info(`[tim/c2c] Sending C2C: to=${userId} type=${payload.type}`); // 10s SDK-call guard — same pattern as messages.ts L64-69 // If SDK hangs (dead WebSocket), this prevents the Tool from blocking forever. let timeoutHandle: ReturnType; const timeoutPromise = new Promise((_, reject) => { timeoutHandle = setTimeout( () => reject(new Error(`sendC2CCustomMessage timeout (10s) to=${userId}`)), 10_000, ); }); try { await Promise.race([chat.sendMessage(message), timeoutPromise]); logger.info(`[tim/c2c] C2C sent OK: to=${userId} type=${payload.type}`); } catch (err) { const code = (err as { code?: number })?.code; logger.error(`[tim/c2c] C2C send failed: code=${code} msg=${(err as Error).message} to=${userId}`); throw err; } finally { clearTimeout(timeoutHandle!); } } // ── waitForC2CReply ── /** * Wait for a C2C reply matching a specific requestId. * * Registers a pending entry in the registry. When c2cMessageHandler receives * a matching message, it dispatches to the entry's callbacks: * - task_progress → onProgress(content) * - task_result → resolve({ status, content }) * * @param requestId - UUID to match replies against * @param options - Timeout, abort signal, and progress callback */ export function waitForC2CReply( requestId: string, options: WaitForC2CReplyOptions, ): Promise { return new Promise((resolve, reject) => { // Timeout: auto-reject + cleanup after timeoutMs const timeoutHandle = setTimeout(() => { pendingRequests.delete(requestId); reject(new Error(`C2C reply timeout (${options.timeoutMs}ms) requestId=${requestId}`)); }, options.timeoutMs); // Abort signal: reject + cleanup on user cancel if (options.signal) { options.signal.addEventListener('abort', () => { clearTimeout(timeoutHandle); pendingRequests.delete(requestId); reject(new Error(`C2C reply aborted requestId=${requestId}`)); }, { once: true }); } // Register pending entry pendingRequests.set(requestId, { onProgress: options.onProgress, resolve, reject, timeoutHandle, }); logger.info(`[tim/c2c] Registered pending request: requestId=${requestId} timeout=${options.timeoutMs}ms`); }); } // ── C2C Message Handler ── /** * Parse a raw C2C message payload into a ClawLink protocol message. * Returns null if the message is not a ClawLink protocol message. */ function parseC2CPayload(raw: RawC2CMessage): C2CMessagePayload | null { // TIM CustomMessage payload: { data: string, description: string, extension: string } const p = raw.payload as { data?: string } | undefined; if (!p?.data) return null; try { const parsed = JSON.parse(p.data); // Validate: must be a ClawLink protocol message with request_id if (typeof parsed.type !== 'string' || !parsed.type.startsWith('clawlink_task_')) { return null; } if (typeof parsed.request_id !== 'string') { logger.warn(`[tim/c2c] C2C message missing request_id: type=${parsed.type}`); return null; } return parsed as C2CMessagePayload; } catch { // Not a JSON payload or parse error → not a ClawLink message return null; } } /** * Initialize the C2C message handler. * * Listens for 'c2c_message' events from TIMClient and dispatches to: * - pendingRequests registry (for progress/result → T2's waitForC2CReply) * - 'c2c_request' event (for incoming task requests → T3's handler) * * Must be called after TIMClient.connect() succeeds. * * @param client - Connected TIMClient instance */ export function initC2CHandler(client: TIMClient): void { client.on('c2c_message', async (raw: RawC2CMessage) => { try { // T4: Check for TIMFileElem before parsing as CustomMessage // MSG_FILE messages have a different payload structure (fileUrl, fileName, fileSize) // and won't pass parseC2CPayload. We accumulate them for later retrieval. if (raw.type === 'TIMFileElem') { handleC2CFileMessage(raw); return; } const payload = parseC2CPayload(raw); if (!payload) return; // Not a ClawLink protocol message const requestId = payload.request_id; switch (payload.type) { case 'clawlink_task_progress': { const pending = pendingRequests.get(requestId); if (!pending) { logger.debug(`[tim/c2c] No pending request for progress requestId=${requestId}`); return; } logger.debug(`[tim/c2c] Progress for requestId=${requestId}: ${payload.content.slice(0, 80)}`); pending.onProgress?.(payload.content); break; } case 'clawlink_task_result': { const pending = pendingRequests.get(requestId); if (!pending) { logger.debug(`[tim/c2c] No pending request for result requestId=${requestId}`); return; } logger.info(`[tim/c2c] Result for requestId=${requestId}: status=${payload.status}`); clearTimeout(pending.timeoutHandle); pendingRequests.delete(requestId); // T4: Collect pending files from this sender. // Files may arrive before or after task_result (race condition). // If files are already here → drain immediately. // If not → register a fileWaiter and wait up to FILE_WAIT_TIMEOUT_MS. const fileCount = (payload as { file_count?: number }).file_count ?? 0; let mediaUrls: string[] | undefined; if (fileCount > 0) { const senderFiles = pendingFiles.get(raw.from) ?? []; logger.info(`[tim/c2c] Result has file_count=${fileCount}, pending files from sender: ${senderFiles.length}`); let filesToDrain: PendingFileInfo[]; if (senderFiles.length >= fileCount) { // Files already arrived → drain immediately filesToDrain = senderFiles; pendingFiles.delete(raw.from); } else { // Files haven't arrived yet → wait for them logger.info(`[tim/c2c] Waiting for ${fileCount} file(s) from ${raw.from} (have ${senderFiles.length})...`); filesToDrain = await new Promise((resolveFiles) => { const timeoutHandle = setTimeout(() => { fileWaiters.delete(raw.from); const partial = pendingFiles.get(raw.from) ?? []; pendingFiles.delete(raw.from); logger.warn(`[tim/c2c] File wait timeout (${FILE_WAIT_TIMEOUT_MS}ms) for ${raw.from}, got ${partial.length}/${fileCount} file(s)`); resolveFiles(partial); }, FILE_WAIT_TIMEOUT_MS); fileWaiters.set(raw.from, { expectedCount: fileCount, onFilesReady: (files) => { clearTimeout(timeoutHandle); fileWaiters.delete(raw.from); resolveFiles(files); }, timeoutHandle, }); }); } if (filesToDrain.length > 0) { mediaUrls = []; for (const fileInfo of filesToDrain) { try { logger.info(`[tim/c2c] Downloading file: ${fileInfo.fileName} (${fileInfo.fileSize} bytes)`); const resp = await fetch(fileInfo.fileUrl); if (!resp.ok) { logger.error(`[tim/c2c] File download failed: HTTP ${resp.status} for ${fileInfo.fileName}`); continue; } const buffer = Buffer.from(await resp.arrayBuffer()); // Save to temp dir with original filename preserved // Use /tmp/openclaw/ prefix — framework's default localRoots whitelist. const fs = await import('node:fs'); const nodePath = await import('node:path'); const tmpDir = nodePath.join('/tmp/openclaw', 'clawlink-media'); fs.mkdirSync(tmpDir, { recursive: true }); const tmpPath = nodePath.join(tmpDir, `${Date.now()}-${fileInfo.fileName}`); fs.writeFileSync(tmpPath, buffer); mediaUrls.push(tmpPath); logger.info(`[tim/c2c] File saved: ${fileInfo.fileName} → ${tmpPath} (${buffer.byteLength} bytes)`); } catch (err) { logger.error(`[tim/c2c] File download/save error: ${(err as Error).message} for ${fileInfo.fileName}`); } } if (mediaUrls.length === 0) mediaUrls = undefined; } } pending.resolve({ status: payload.status, content: payload.content, mediaUrls }); break; } case 'clawlink_task_request': { // Incoming task request from another agent → forward to T3 handler // T1 does not process requests, only replies (progress + result) logger.info(`[tim/c2c] Incoming task request from=${raw.from} requestId=${requestId}`); client.emit('c2c_request', { from: raw.from, payload, time: raw.time, }); break; } default: logger.debug(`[tim/c2c] Unknown C2C payload type: ${(payload as { type: string }).type}`); } } catch (err) { logger.error(`[tim/c2c] Error handling C2C message: ${(err as Error).message}`); } }); logger.info('[tim/c2c] C2C message handler initialized'); } /** * Handle a raw C2C message that might be a TIMFileElem. * Called from initC2CHandler for non-CustomMessage types. * Accumulates file metadata in pendingFiles for later retrieval. * * @see docs/audit/023-tim-upload-plugin-v1.md §4.4 */ function handleC2CFileMessage(raw: RawC2CMessage): void { // TIMFileElem payload structure (from TIM SDK): // { fileUrl: string, fileName: string, fileSize: number, uuid: string } const payload = raw.payload as { fileUrl?: string; fileName?: string; fileSize?: number; } | undefined; if (!payload?.fileUrl || !payload?.fileName) { logger.debug(`[tim/c2c] MSG_FILE from=${raw.from} but missing fileUrl/fileName, skip`); return; } const fileInfo: PendingFileInfo = { fileUrl: payload.fileUrl, fileName: payload.fileName, fileSize: payload.fileSize ?? 0, }; if (!pendingFiles.has(raw.from)) { pendingFiles.set(raw.from, []); } pendingFiles.get(raw.from)!.push(fileInfo); const totalFiles = pendingFiles.get(raw.from)!.length; logger.info(`[tim/c2c] Pending file from=${raw.from}: ${fileInfo.fileName} (${fileInfo.fileSize} bytes) total=${totalFiles}`); // Check if a fileWaiter is waiting for files from this sender. // This handles the race condition where task_result arrived before the file. const waiter = fileWaiters.get(raw.from); if (waiter && totalFiles >= waiter.expectedCount) { logger.info(`[tim/c2c] File waiter satisfied for ${raw.from}: ${totalFiles}/${waiter.expectedCount} file(s)`); const files = pendingFiles.get(raw.from) ?? []; pendingFiles.delete(raw.from); waiter.onFilesReady(files); } }