/** * Process C2C Request — Remote agent task request handler (T3) * * Handles incoming `c2c_request` events from c2c.ts (T1). * Mirrors the process-message.ts pattern but for C2C task requests: * * 1. Parse request parameters (from, requestId, task, sessionMode) * 2. Compute sessionKey (aligned with OpenClaw buildAgentPeerSessionKey) * 3. Register onAgentEvent listener ({clawlink_progress} prefix filter) * 4. Build MsgContext (adapted for C2C, not group) * 5. channelRuntime pipeline: resolveAgentRoute → finalize → record → dispatch * 6. deliver callback routes kind="block" → progress, kind="final" → result * 7. Unsubscribe onAgentEvent listener * * @see docs/products/orchestrator/specs/T3-remote-handler.md * @see docs/products/orchestrator/specs/T1-c2c-infra.md §6.2 (c2c_request emit) */ import { randomUUID } from 'node:crypto'; import fs from 'node:fs'; import nodePath from 'node:path'; import { ZipFile } from 'yazl'; import type { PluginRuntime } from 'openclaw/plugin-sdk/core'; import { sendC2CCustomMessage, type C2CMessagePayload } from '../tim/c2c.js'; import { getClawlinkPluginRuntime } from '../runtime/plugin-runtime.js'; import { registry } from '../runtime/registry.js'; import { logger } from '../util/logger.js'; // ── Types ── /** Parsed task request payload (from c2c.ts L267) */ interface C2CTaskRequest { type: 'clawlink_task_request'; request_id: string; task: string; session_mode: string; } /** Incoming c2c_request event shape (emitted by c2c.ts initC2CHandler) */ export interface C2CRequestEvent { from: string; payload: C2CTaskRequest; time: number; } /** Dependencies injected by AccountRuntime._connectTIM() */ export interface ProcessC2CRequestDeps { accountId: string; selfUserId: string; config: Record; channelRuntime: PluginRuntime['channel']; } // ── Agent event type (local mirror of SDK enriched event shape) ── /** * Local type for the enriched agent event received by onAgentEvent listeners. * Mirrors the shape from agent-events.d.ts — only the fields we use. * * @see docs/reference/openclaw/v2026.3.23/dist/plugin-sdk/src/infra/agent-events.d.ts L23 * @see pi-embedded-CswW9luA.js L94723-94736 (emitAgentEvent enrichment) */ interface AgentEventPayload { runId?: string; sessionKey?: string; stream?: string; data: { phase?: string; partialResult?: unknown; [key: string]: unknown; }; [key: string]: unknown; } // ── Progress prefix constant ── /** * Marker prefix for tool progress messages. * Remote developers add this prefix to onUpdate() calls to opt-in * to progress forwarding via ClawLink C2C. * * @see T3-remote-handler.md §9.1.2 */ const PROGRESS_PREFIX = '{clawlink_progress}'; // ── Directory → ZIP helper ── /** * Recursively zip a directory into a .zip file. * Returns the path to the created zip file in /tmp/openclaw/clawlink-media/. * * Uses yazl (pure JS, ~75KB) for cross-platform zip creation. * The zip preserves directory structure with relative paths. */ function zipDirectory(dirPath: string): Promise { return new Promise((resolve, reject) => { const dirName = nodePath.basename(dirPath); const tmpDir = '/tmp/openclaw/clawlink-media'; fs.mkdirSync(tmpDir, { recursive: true }); const zipPath = nodePath.join(tmpDir, `${Date.now()}-${dirName}.zip`); const zipfile = new ZipFile(); // Recursively add all files const addDir = (currentPath: string, prefix: string) => { const entries = fs.readdirSync(currentPath, { withFileTypes: true }); for (const entry of entries) { const fullPath = nodePath.join(currentPath, entry.name); const arcPath = prefix ? `${prefix}/${entry.name}` : entry.name; if (entry.isDirectory()) { addDir(fullPath, arcPath); } else if (entry.isFile()) { zipfile.addFile(fullPath, arcPath); } } }; addDir(dirPath, dirName); zipfile.end(); const output = fs.createWriteStream(zipPath); zipfile.outputStream.pipe(output); output.on('close', () => { logger.info(`[c2c-request] T4: zipped directory ${dirName}/ → ${zipPath} (${fs.statSync(zipPath).size} bytes)`); resolve(zipPath); }); output.on('error', reject); zipfile.outputStream.on('error', reject); }); } // ── Core processing ── /** * Process a single incoming C2C task request through the standard * OpenClaw channelRuntime pipeline. * * This is the T3 equivalent of processOneMessage (process-message.ts). * Fire-and-forget safe — errors are caught and result in an error C2C reply. */ export async function processC2CRequest( request: C2CRequestEvent, deps: ProcessC2CRequestDeps, ): Promise { const { from: fromUserId, payload, time } = request; const { request_id: requestId, task, session_mode: sessionMode } = payload; logger.info( `[c2c-request] Incoming task: from=${fromUserId} requestId=${requestId} mode=${sessionMode} taskLen=${task.length}`, ); // ① Validate if (!fromUserId || !requestId || !task) { logger.error(`[c2c-request] Invalid request: missing required fields`); await sendErrorResult(fromUserId, requestId, 'Invalid request: missing required fields'); return; } // ② Compute sessionKey — aligned with OpenClaw buildAgentPeerSessionKey // Format: agent:{agentId}:{channel}:{peerKind}:{peerId} // @see T1-c2c-infra.md §3.1 sessionKey naming convention const agentId = 'main'; // Default agent ID; multi-agent is P1 (T3 §9.2) // buildAgentPeerSessionKey (session-key-DAhnzjyr.js L211) lowercases peerId. // Must match exactly for onAgentEvent sessionKey filtering to work. const normalizedFrom = fromUserId.toLowerCase(); const sessionKey = sessionMode === 'run' ? `agent:${agentId}:clawlink:remote:${normalizedFrom}:run:${requestId}` : `agent:${agentId}:clawlink:remote:${normalizedFrom}`; logger.debug(`[c2c-request] sessionKey=${sessionKey}`); // ③ Register onAgentEvent listener for {clawlink_progress} prefix // Must be BEFORE dispatchReplyFromConfig — otherwise tool events are missed. // // Filter by runId (NOT sessionKey). OpenClaw sets isControlUiVisible=false // for external channels like ClawLink, which causes emitAgentEvent to strip // sessionKey to undefined. runId is always preserved. // @see audit/021-progress-pipeline-v1.md §4 (root cause) const runId = randomUUID(); logger.debug(`[c2c-request] generated runId=${runId} for requestId=${requestId}`); let unsubscribe: (() => void) | null = null; try { const runtime = getClawlinkPluginRuntime(); unsubscribe = runtime.events.onAgentEvent((evt: AgentEventPayload) => { if (evt.runId !== runId) return; if (evt.stream !== 'tool' || evt.data.phase !== 'update') return; // partialResult is AgentToolResult object: { content: [{ type: "text", text: "..." }] } // Source: sanitizeToolResult (L173042-173071) returns same structure. // ClawLink toolResult() (types.ts L51-53): { content: [{ type: 'text', text }] } const pr = evt.data.partialResult as | { content?: Array<{ type?: string; text?: string }> } | string | null | undefined; let raw: string; if (pr && typeof pr === 'object' && Array.isArray(pr.content)) { raw = pr.content.find((c) => c?.type === 'text')?.text ?? ''; } else { raw = String(pr ?? ''); } if (!raw.startsWith(PROGRESS_PREFIX)) return; // No prefix → discard const content = raw.slice(PROGRESS_PREFIX.length).trim(); if (!content) return; logger.info(`[c2c-request] Tool progress: requestId=${requestId} runId=${runId} content=${content.slice(0, 80)}`); // Send via same TIM C2C path as deliver callback void sendC2CCustomMessage(fromUserId, { type: 'clawlink_task_progress', request_id: requestId, content, } as C2CMessagePayload).catch((err: unknown) => { logger.error(`[c2c-request] Failed to send tool progress: ${(err as Error).message}`); }); }); } catch (err) { // If PluginRuntime isn't available, continue without progress monitoring logger.warn(`[c2c-request] Could not register onAgentEvent: ${(err as Error).message}`); } try { const { channelRuntime, config, accountId } = deps; // ④ Build MsgContext for C2C request // NOT using timMessageToMsgContext — that's for group messages (ChatType='group'). // C2C requests are a different chat type. const msgContext = { // Message body Body: task, BodyForAgent: task, RawBody: task, CommandBody: task, // Routing From: fromUserId, To: fromUserId, // C2C: destination is the sender (reply goes back to them) AccountId: accountId, // Sender info SenderId: fromUserId, SenderName: fromUserId, // Conversation label ConversationLabel: `clawlink:remote:${fromUserId}`, GroupSubject: '', // Channel metadata OriginatingChannel: 'clawlink', OriginatingTo: fromUserId, MessageSid: `c2c:${requestId}`, Timestamp: time * 1000, // TIM time is seconds, OpenClaw expects ms Provider: 'clawlink', Surface: 'clawlink', ChatType: 'direct', // NOT 'group' — this is a C2C request // Session SessionKey: sessionKey, // No mention, no history for C2C requests WasMentioned: true, // Always process (no mention gating for C2C) }; // ⑤ Agent routing const route = channelRuntime.routing.resolveAgentRoute({ cfg: config, channel: 'clawlink', accountId, peer: { kind: 'remote', id: fromUserId }, }); logger.debug( `[c2c-request] route: agentId=${route.agentId ?? '(none)'} sessionKey=${route.sessionKey ?? '(none)'}`, ); // Use our computed sessionKey, not the route's // (route may return a different key for group-based routing) msgContext.SessionKey = sessionKey; // ⑤ Finalize inbound context const finalized = channelRuntime.reply.finalizeInboundContext( msgContext as Parameters[0], ); // ⑤ Record inbound session const storePath = channelRuntime.session.resolveStorePath( (config as { session?: { store?: unknown } }).session?.store, { agentId: route.agentId || agentId }, ); await channelRuntime.session.recordInboundSession({ storePath, sessionKey, ctx: finalized as Parameters[0]['ctx'], updateLastRoute: { sessionKey: route.mainSessionKey || sessionKey, channel: 'clawlink', to: fromUserId, accountId, }, onRecordError: (err: unknown) => logger.error(`[c2c-request] recordInboundSession error: ${String(err)}`), }); // ⑥ Create reply dispatcher + dispatch AI reply const { dispatcher, replyOptions, markDispatchIdle } = channelRuntime.reply.createReplyDispatcherWithTyping({ deliver: async ( deliverPayload: { text?: string; mediaUrl?: string; mediaUrls?: string[] }, deliverMeta?: { kind?: string }, ) => { const text = deliverPayload.text ?? ''; const kind = deliverMeta?.kind; if (kind === 'block') { // LLM streaming text → C2C progress (unchanged) if (!text) return; logger.debug(`[c2c-request] deliver block: requestId=${requestId} textLen=${text.length}`); await sendC2CCustomMessage(fromUserId, { type: 'clawlink_task_progress', request_id: requestId, content: text, } as C2CMessagePayload); } // kind="tool" → skip. Remote LLM will summarize tool data into natural language. if (kind === 'final') { // ── T4: File detection + upload ── // Collect file paths from two sources: // Path A: deliverPayload.mediaUrls (tool results, e.g. image_generate) // Path B: {clawlink_file} text markers (skill scripts) const filePaths: string[] = []; // Path A: structured mediaUrls from OpenClaw delivery pipeline if (deliverPayload.mediaUrls) { for (const u of deliverPayload.mediaUrls) { if (u) filePaths.push(u); } } if (deliverPayload.mediaUrl) { filePaths.push(deliverPayload.mediaUrl); } // Path B: {clawlink_file} text markers let cleanText = text; const fileMarkerRegex = /\{clawlink_file:([^}]+)\}/g; let match: RegExpExecArray | null; while ((match = fileMarkerRegex.exec(text)) !== null) { const markerPath = match[1]; filePaths.push(markerPath); cleanText = cleanText.replace(match[0], '').trim(); } const tripDataPath = findTripCardDataFile(text); if (tripDataPath && !filePaths.includes(tripDataPath)) { filePaths.push(tripDataPath); logger.info(`[c2c-request] T4: attaching trip card data ${tripDataPath}`); } // Upload files via TIM C2C (TIMFileElem) — before sending text result let fileCount = 0; if (filePaths.length > 0) { logger.info(`[c2c-request] T4: ${filePaths.length} file(s) detected for requestId=${requestId}`); const rt = registry.getDefault(); const chat = rt?.client?._chat; const types = rt?.client?._types; if (chat && types && rt?.client?.isReady) { const fs = await import('node:fs'); const nodePath = await import('node:path'); for (let filePath of filePaths) { try { if (!fs.existsSync(filePath)) { logger.warn(`[c2c-request] T4: file not found, skip: ${filePath}`); continue; } // T4: If path is a directory, zip it first if (fs.statSync(filePath).isDirectory()) { logger.info(`[c2c-request] T4: path is directory, zipping: ${filePath}`); filePath = await zipDirectory(filePath); } const buffer = fs.readFileSync(filePath); const fileName = nodePath.basename(filePath); const file = new File([buffer], fileName); // TIM SDK expects payload.file to behave like an HTMLInputElement // with a .files property (FileList). Node.js File doesn't have this. // Wrap in a shim: { files: [File] } to satisfy SDK's internal check // at createFileMessage → _isEmptyFileList(s.file.files). const fileShim = { files: [file] }; logger.info(`[c2c-request] T4: uploading ${fileName} (${buffer.byteLength} bytes) to ${fromUserId}`); const fileMsg = chat.createFileMessage({ to: fromUserId, conversationType: types.CONV_C2C, payload: { file: fileShim as unknown as File }, }); await chat.sendMessage(fileMsg); fileCount++; logger.info(`[c2c-request] T4: file sent OK: ${fileName}`); } catch (fileErr) { logger.error(`[c2c-request] T4: file upload failed: ${(fileErr as Error).message} path=${filePath}`); } } } else { logger.warn(`[c2c-request] T4: TIM SDK not ready, skipping file upload`); } } // Send text result (with file_count for receiver coordination) const finalText = filePaths.length > 0 ? cleanText : text; if (finalText || fileCount > 0) { logger.info(`[c2c-request] deliver final: requestId=${requestId} textLen=${finalText.length} files=${fileCount}`); await sendC2CCustomMessage(fromUserId, { type: 'clawlink_task_result', request_id: requestId, status: 'complete', content: finalText || '(file transfer)', file_count: fileCount, } as C2CMessagePayload); } } }, onError: (err: unknown, info: { kind: string }) => { logger.error(`[c2c-request] reply ${info.kind} error: ${String(err)}`); }, }); logger.debug(`[c2c-request] dispatching AI turn for requestId=${requestId}`); try { await channelRuntime.reply.withReplyDispatcher({ dispatcher, run: () => channelRuntime.reply.dispatchReplyFromConfig({ ctx: finalized, cfg: config, dispatcher, replyOptions: { ...replyOptions, runId }, }), }); logger.info(`[c2c-request] AI turn completed for requestId=${requestId}`); } catch (err) { logger.error(`[c2c-request] dispatchReplyFromConfig error: ${(err as Error).message}`); await sendErrorResult(fromUserId, requestId, (err as Error).message); } finally { markDispatchIdle(); } } catch (err) { // Top-level error → send error result to requester logger.error(`[c2c-request] Fatal error processing request: ${(err as Error).message}`); await sendErrorResult(fromUserId, requestId, (err as Error).message); } finally { // ⑦ Always unsubscribe onAgentEvent listener if (unsubscribe) { unsubscribe(); logger.debug(`[c2c-request] onAgentEvent listener unsubscribed for requestId=${requestId}`); } } } // ── Helpers ── /** * Send an error result back to the requesting agent. * Best-effort — if this fails, we just log. */ async function sendErrorResult( toUserId: string, requestId: string, errorMessage: string, ): Promise { try { await sendC2CCustomMessage(toUserId, { type: 'clawlink_task_result', request_id: requestId, status: 'error', content: errorMessage, } as C2CMessagePayload); } catch (err) { logger.error(`[c2c-request] Failed to send error result: ${(err as Error).message}`); } } function findTripCardDataFile(text: string): string | null { if (!text || !hasTripCardTags(text)) return null; const candidates = getControlUiCandidates().flatMap((dir) => [ nodePath.join(dir, 'trip-card-data.json'), nodePath.join(dir, 'ctrip-card-data.json'), ]); const cardIds = extractTripCardIds(text); for (const candidate of candidates) { if (!fs.existsSync(candidate)) continue; if (cardIds.length === 0 || tripDataMatches(candidate, cardIds)) { return candidate; } } logger.warn(`[c2c-request] Trip card tags found but no matching trip-card-data.json was available`); return null; } function hasTripCardTags(text: string): boolean { return /<(?:sh_card|tr_card|i_card|la_card|bg_card|fr_card|time_data_card)>/i.test(text) || /\[TRIP_CARDS_READY\]|\[CTRIP_CARDS_READY\]/.test(text); } function extractTripCardIds(text: string): string[] { const ids = new Set(); const re = /<(?:tr_card|i_card|la_card)>([\s\S]*?)<\/(?:tr_card|i_card|la_card)>/gi; let match: RegExpExecArray | null; while ((match = re.exec(text)) !== null) { const id = match[1]?.trim(); if (id) ids.add(id); } return Array.from(ids); } function tripDataMatches(filePath: string, ids: string[]): boolean { try { const raw = fs.readFileSync(filePath, 'utf8'); return ids.some((id) => raw.includes(id)); } catch { return false; } } function getControlUiCandidates(): string[] { const dirs = new Set(); const argvEntry = process.argv[1] || ''; if (argvEntry) { dirs.add(nodePath.join(nodePath.dirname(argvEntry), 'control-ui')); } dirs.add(nodePath.join(process.cwd(), 'dist', 'control-ui')); dirs.add('/app/dist/control-ui'); return Array.from(dirs).filter((dir) => { try { return fs.existsSync(dir) && fs.statSync(dir).isDirectory(); } catch { return false; } }); }