/** * Process Message — Core inbound dispatch pipeline * * Replaces the old 4-module pipeline (Collector→Aggregator→Scheduler→Executor) * with OpenClaw's standard channelRuntime dispatch: * * 1. Self-message filter * 2. channelRuntime safety check * 3. Append to channel history ring buffer * 4. Fetch group context (name, skill) * 5. TIM → MsgContext conversion (inbound.ts) with enriched context * 6. Agent routing (channelRuntime.routing.resolveAgentRoute) * 7. Finalize context (channelRuntime.reply.finalizeInboundContext) * 8. Record session (channelRuntime.session.recordInboundSession) * 9. Create reply dispatcher (channelRuntime.reply.createReplyDispatcherWithTyping) * 10. Dispatch AI reply (channelRuntime.reply.dispatchReplyFromConfig) * * v3.4.0: Added InboundHistory ring buffer and GroupSystemPrompt/GroupSubject * injection to fix LLM NO_REPLY conversation breaks. * See: docs/audit/008-inbound-context-gap.md * * Reference: docs/reference/plugins/openclaw-weixin/src/messaging/process-message.ts */ import type { PluginRuntime } from 'openclaw/plugin-sdk/core'; import { timMessageToMsgContext } from './inbound.js'; import type { InboundHistoryEntry } from './inbound.js'; import type { RawPushMessage } from '../tim/client.js'; import { buildMentionPrompt, extractAtTargets } from './mention-protocol.js'; import type { MentionMember } from './mention-protocol.js'; // ── Types ── /** Dependencies injected by the caller (AccountRuntime). */ export interface ProcessMessageDeps { accountId: string; selfUserId: string; config: Record; channelRuntime: PluginRuntime['channel']; sendMessage: (channelId: string, text: string, atUserList?: string[]) => Promise; /** Fetch channel skill/rules (stored in TIM group notification). */ getChannelSkill?: (channelId: string) => Promise; /** Fetch channel display name. */ getGroupName?: (channelId: string) => Promise; /** Fetch a group-level custom field (AppDefinedData). */ getGroupCustomField?: (channelId: string, key: string) => Promise; /** Fetch group member list for mention protocol injection. */ getGroupMembers?: (channelId: string) => Promise; log: { info: (msg: string) => void; warn: (msg: string) => void; error: (msg: string) => void; debug: (msg: string) => void; }; } // ── InboundHistory ring buffer ── /** * In-memory per-channel message history. * Provides InboundHistory to LLM so it has conversation context. * * FIFO ring buffer: oldest messages are evicted when limit is reached. * This is volatile (lost on restart), which is acceptable — LLM gets * at least the recent conversation since last connection. */ const HISTORY_LIMIT = 20; const channelHistories = new Map(); function appendHistory( channelId: string, entry: InboundHistoryEntry, ): void { let arr = channelHistories.get(channelId); if (!arr) { arr = []; channelHistories.set(channelId, arr); } arr.push(entry); if (arr.length > HISTORY_LIMIT) arr.shift(); // FIFO eviction } function getHistory(channelId: string): InboundHistoryEntry[] { return [...(channelHistories.get(channelId) ?? [])]; } // ── Group name cache ── /** * Simple in-memory cache for group names to avoid repeated SDK calls. * No TTL — group names rarely change during a session. */ const groupNameCache = new Map(); /** * Per-channel gm_req_mention cache. * "1" = mention-only mode (only @ triggers LLM). * "0" or absent = default mode (LLM receives all messages). */ const requireMentionCache = new Map(); // ── Per-channel promise queue ── const channelQueues = new Map>(); /** * Enqueue message processing per channel to prevent concurrent AI turns * on the same channel (which would cause interleaved replies). * * Each channel gets its own serial queue. Failures in one message * do NOT block subsequent messages (.catch(() => undefined)). * * Reference: openclaw-weixin / yuanbao enqueueForConversation pattern. */ function enqueueForChannel( channelId: string, task: () => Promise, ): void { const prev = channelQueues.get(channelId) ?? Promise.resolve(); const next = prev .catch(() => undefined) .then(() => task()) .catch(() => undefined); channelQueues.set(channelId, next); } // ── Core processing ── /** * Process a single inbound TIM push message through the standard * OpenClaw channelRuntime pipeline. * * This function is fire-and-forget safe — errors are caught and logged, * never propagated to crash the TIM event listener. */ export async function processOneMessage( raw: RawPushMessage, deps: ProcessMessageDeps, ): Promise { // Step 1: Self-message filter // Record self-messages in history (so LLM sees its own replies in context), // but don't process them through the AI pipeline. if (raw.from === deps.selfUserId) { appendHistory(raw.channelId, { sender: '(you)', body: raw.text, timestamp: raw.time, }); deps.log.debug(`[process-message] skip self msg id=${raw.id} (recorded in history)`); return; } // Step 2: channelRuntime safety check if (!deps.channelRuntime) { deps.log.error(`[process-message] channelRuntime is undefined, skip msg id=${raw.id}`); return; } // Queue per channel to serialize AI turns enqueueForChannel(raw.channelId, () => _processOneMessageInner(raw, deps)); } async function _processOneMessageInner( raw: RawPushMessage, deps: ProcessMessageDeps, ): Promise { const { channelRuntime, log } = deps; // Step 3: Record this message in history BEFORE processing // (so the LLM sees all prior messages, but not the current one which is Body) const historySnapshot = getHistory(raw.channelId); // Append current message to history for future turns appendHistory(raw.channelId, { sender: raw.nick || raw.from, body: raw.text, timestamp: raw.time, }); // Step 4: Fetch group context (cached where possible) let groupName: string | undefined; let groupSystemPrompt: string | undefined; // Group name (cached) if (groupNameCache.has(raw.channelId)) { groupName = groupNameCache.get(raw.channelId); } else if (deps.getGroupName) { try { const name = await deps.getGroupName(raw.channelId); if (name) { groupName = name; groupNameCache.set(raw.channelId, name); } } catch (err) { log.warn(`[process-message] getGroupName failed: ${(err as Error).message}`); } } // Group system prompt (not cached — may change between messages) if (deps.getChannelSkill) { try { const skill = await deps.getChannelSkill(raw.channelId); if (skill) groupSystemPrompt = skill; } catch (err) { log.warn(`[process-message] getChannelSkill failed: ${(err as Error).message}`); } } // gm_req_mention (cached) let requireMention = false; if (requireMentionCache.has(raw.channelId)) { requireMention = requireMentionCache.get(raw.channelId) === '1'; } else if (deps.getGroupCustomField) { try { const val = await deps.getGroupCustomField(raw.channelId, 'gm_req_mention'); requireMentionCache.set(raw.channelId, val ?? '0'); requireMention = val === '1'; } catch (err) { log.warn(`[process-message] gm_req_mention fetch failed: ${(err as Error).message}`); } } // Step 4.5: Mention protocol — fetch members + inject rules (only in mention mode) // Always fetch fresh from TIM SDK (it hits the server each call). // Do NOT cache: members can join/leave at any time. // See: docs/audit/013-mention-at-delivery-failure-v3.4.5.md let groupMembers: MentionMember[] | undefined; if (requireMention && deps.getGroupMembers) { try { groupMembers = await deps.getGroupMembers(raw.channelId); } catch (err) { log.warn(`[process-message] getGroupMembers failed: ${(err as Error).message}`); } if (groupMembers && groupMembers.length > 0) { const mentionRule = buildMentionPrompt(groupMembers, deps.selfUserId); if (mentionRule) { groupSystemPrompt = groupSystemPrompt ? `${groupSystemPrompt}\n\n${mentionRule}` : mentionRule; } } } // Step 5: Convert TIM message to standard MsgContext with enriched context const ctx = timMessageToMsgContext(raw, deps.accountId, { groupName, groupSystemPrompt, inboundHistory: historySnapshot.length > 0 ? historySnapshot : undefined, }); log.info( `[process-message] inbound from=${ctx.From} ch=${ctx.To} bodyLen=${ctx.Body.length} mention=${String(ctx.WasMentioned)} reqMention=${String(requireMention)} historyLen=${historySnapshot.length}`, ); // Step 5.5: Mention-only gate // When gm_req_mention="1", only @-mentioned messages trigger LLM. // Non-mentioned messages are already recorded in history (Step 3), // so context is preserved for when the agent IS mentioned. if (requireMention && !ctx.WasMentioned) { log.info( `[process-message] mention_only: skip LLM ch=${ctx.To} from=${ctx.From}`, ); return; } // Step 6: Agent routing const route = channelRuntime.routing.resolveAgentRoute({ cfg: deps.config, channel: 'clawlink', accountId: deps.accountId, peer: { kind: 'group', id: ctx.To }, }); log.debug( `[process-message] route: agentId=${route.agentId ?? '(none)'} sessionKey=${route.sessionKey ?? '(none)'}`, ); if (!route.agentId) { log.error(`[process-message] no agentId resolved for ch=${ctx.To}, skipping`); return; } // Propagate resolved session key ctx.SessionKey = route.sessionKey; // Step 7: Finalize inbound context const finalized = channelRuntime.reply.finalizeInboundContext( ctx as Parameters[0], ); const fin = finalized as Record; log.debug( `[process-message] finalized: From=${fin.From} To=${fin.To} GroupSubject=${fin.GroupSubject ?? '(none)'}`, ); // Step 8: Record inbound session const storePath = channelRuntime.session.resolveStorePath( (deps.config as { session?: { store?: unknown } }).session?.store, { agentId: route.agentId }, ); await channelRuntime.session.recordInboundSession({ storePath, sessionKey: route.sessionKey, ctx: finalized as Parameters[0]['ctx'], updateLastRoute: { sessionKey: route.mainSessionKey, channel: 'clawlink', to: ctx.To, accountId: deps.accountId, }, onRecordError: (err: unknown) => log.error(`[process-message] recordInboundSession error: ${String(err)}`), }); log.debug(`[process-message] session recorded, storePath=${storePath}`); // Step 9: Create reply dispatcher with deliver callback const { dispatcher, replyOptions, markDispatchIdle } = channelRuntime.reply.createReplyDispatcherWithTyping({ deliver: async (payload: { text?: string; mediaUrl?: string; mediaUrls?: string[] }) => { const text = payload.text ?? ''; if (!text) { log.debug('[process-message] deliver: empty text, skip'); return; } // Extract @-mention targets from AI output (mention mode only) const atUserList = (requireMention && groupMembers) ? extractAtTargets(text, groupMembers, deps.selfUserId) : undefined; const effectiveAtList = atUserList && atUserList.length > 0 ? atUserList : undefined; log.info(`[process-message] deliver: ch=${ctx.To} textLen=${text.length} at=${effectiveAtList?.join(',') ?? 'none'}`); await deps.sendMessage(ctx.To, text, effectiveAtList); log.debug(`[process-message] deliver: sent OK`); }, onError: (err: unknown, info: { kind: string }) => { log.error(`[process-message] reply ${info.kind} error: ${String(err)}`); }, }); // Step 10: Dispatch AI reply log.debug(`[process-message] dispatching AI turn agentId=${route.agentId}`); try { await channelRuntime.reply.withReplyDispatcher({ dispatcher, run: () => channelRuntime.reply.dispatchReplyFromConfig({ ctx: finalized, cfg: deps.config, dispatcher, replyOptions: { ...replyOptions, disableBlockStreaming: true }, }), }); log.info(`[process-message] AI turn completed for ch=${ctx.To}`); } catch (err) { log.error( `[process-message] dispatchReplyFromConfig error: ${(err as Error).message}`, ); } finally { markDispatchIdle(); } }