import type { ClawdbotConfig, RuntimeEnv } from "openclaw/plugin-sdk"; import { buildPendingHistoryContextFromMap, recordPendingHistoryEntryIfEnabled, clearHistoryEntriesIfEnabled, DEFAULT_GROUP_HISTORY_LIMIT, type HistoryEntry, } from "openclaw/plugin-sdk"; import type { PopoConfig, PopoMessageContext, PopoMediaInfo } from "./types.js"; import { getPopoRuntime } from "./runtime.js"; import { resolvePopoGroupConfig, resolvePopoReplyPolicy, resolvePopoAllowlistMatch, isPopoGroupAllowed, } from "./policy.js"; import { createPopoReplyDispatcher } from "./reply-dispatcher.js"; import { downloadFilePopo } from "./media.js"; export type PopoMessageEvent = { eventType: "IM_P2P_TO_ROBOT_MSG" | "IM_CHAT_TO_ROBOT_AT_MSG"; eventData: { uuid: string; from: string; // Sender email sessionId: string; // P2P=email, group=groupId notify: string; // Message content msgType?: string; // text, image, file, etc. fileId?: string; // File message ID timestamp?: number; groupId?: string; groupName?: string; }; }; export type PopoActionEvent = { eventType: "ACTION"; eventData: { actionId: string; userId: string; cardId: string; data?: Record; }; }; function parseMessageContent(notify: string, msgType?: string): string { if (msgType === "text" || !msgType) { return notify; } // For non-text messages, return the raw notify as placeholder return notify; } /** * Infer placeholder text based on message type. */ function inferPlaceholder(msgType?: string): string { switch (msgType) { case "image": return ""; case "file": return ""; case "audio": return ""; case "video": return ""; default: return ""; } } /** * Resolve media from a POPO message, downloading and saving to disk. */ async function resolvePopoMediaList(params: { cfg: ClawdbotConfig; event: PopoMessageEvent; maxBytes: number; log?: (msg: string) => void; }): Promise { const { cfg, event, maxBytes, log } = params; const { msgType, fileId } = event.eventData; // Only process media message types const mediaTypes = ["image", "file", "audio", "video"]; if (!msgType || !mediaTypes.includes(msgType) || !fileId) { return []; } const out: PopoMediaInfo[] = []; const core = getPopoRuntime(); try { const result = await downloadFilePopo({ cfg, fileId, }); let contentType = result.contentType; if (!contentType) { contentType = await core.media.detectMime({ buffer: result.buffer }); } const saved = await core.channel.media.saveMediaBuffer( result.buffer, contentType, "inbound", maxBytes ); out.push({ path: saved.path, contentType: saved.contentType, placeholder: inferPlaceholder(msgType), }); log?.(`popo: downloaded ${msgType} media, saved to ${saved.path}`); } catch (err) { log?.(`popo: failed to download ${msgType} media: ${String(err)}`); } return out; } /** * Build media payload for inbound context. */ function buildPopoMediaPayload( mediaList: PopoMediaInfo[] ): { MediaPath?: string; MediaType?: string; MediaUrl?: string; MediaPaths?: string[]; MediaUrls?: string[]; MediaTypes?: string[]; } { const first = mediaList[0]; const mediaPaths = mediaList.map((media) => media.path); const mediaTypes = mediaList.map((media) => media.contentType).filter(Boolean) as string[]; return { MediaPath: first?.path, MediaType: first?.contentType, MediaUrl: first?.path, MediaPaths: mediaPaths.length > 0 ? mediaPaths : undefined, MediaUrls: mediaPaths.length > 0 ? mediaPaths : undefined, MediaTypes: mediaTypes.length > 0 ? mediaTypes : undefined, }; } export function parsePopoMessageEvent(event: PopoMessageEvent): PopoMessageContext { const { eventType, eventData } = event; const isGroup = eventType === "IM_CHAT_TO_ROBOT_AT_MSG"; const content = parseMessageContent(eventData.notify, eventData.msgType); return { sessionId: eventData.sessionId, messageId: eventData.uuid, senderId: eventData.from, senderEmail: eventData.from, chatType: isGroup ? "group" : "p2p", content, contentType: eventData.msgType ?? "text", fileId: eventData.fileId, }; } export async function handlePopoMessage(params: { cfg: ClawdbotConfig; event: PopoMessageEvent; runtime?: RuntimeEnv; chatHistories?: Map; }): Promise { const { cfg, event, runtime, chatHistories } = params; const popoCfg = cfg.channels?.popo as PopoConfig | undefined; const log = runtime?.log ?? console.log; const error = runtime?.error ?? console.error; const ctx = parsePopoMessageEvent(event); const isGroup = ctx.chatType === "group"; log(`popo: received message from ${ctx.senderEmail} in ${ctx.sessionId} (${ctx.chatType})`); const historyLimit = Math.max( 0, popoCfg?.historyLimit ?? cfg.messages?.groupChat?.historyLimit ?? DEFAULT_GROUP_HISTORY_LIMIT ); if (isGroup) { const groupPolicy = popoCfg?.groupPolicy ?? "open"; const groupAllowFrom = popoCfg?.groupAllowFrom ?? []; const groupConfig = resolvePopoGroupConfig({ cfg: popoCfg, groupId: ctx.sessionId }); // Check if this GROUP is allowed const groupAllowed = isPopoGroupAllowed({ groupPolicy, allowFrom: groupAllowFrom, senderId: ctx.sessionId, senderName: undefined, }); if (!groupAllowed) { log(`popo: group ${ctx.sessionId} not in allowlist`); return; } // Additional sender-level allowlist check if group has specific allowFrom config const senderAllowFrom = groupConfig?.allowFrom ?? []; if (senderAllowFrom.length > 0) { const senderAllowed = isPopoGroupAllowed({ groupPolicy: "allowlist", allowFrom: senderAllowFrom, senderId: ctx.senderEmail, senderName: ctx.senderName, }); if (!senderAllowed) { log(`popo: sender ${ctx.senderEmail} not in group ${ctx.sessionId} sender allowlist`); return; } } // In POPO, group messages with @robot are IM_CHAT_TO_ROBOT_AT_MSG // so requireMention is effectively always satisfied for group messages // that reach this handler } else { const dmPolicy = popoCfg?.dmPolicy ?? "pairing"; const allowFrom = popoCfg?.allowFrom ?? []; if (dmPolicy === "allowlist") { const match = resolvePopoAllowlistMatch({ allowFrom, senderId: ctx.senderEmail, }); if (!match.allowed) { log(`popo: sender ${ctx.senderEmail} not in DM allowlist`); return; } } } try { const core = getPopoRuntime(); const popoFrom = `popo:${ctx.senderEmail}`; const popoTo = isGroup ? `group:${ctx.sessionId}` : `user:${ctx.senderEmail}`; const route = core.channel.routing.resolveAgentRoute({ cfg, channel: "popo", peer: { kind: isGroup ? "group" : "dm", id: ctx.sessionId, }, }); const preview = ctx.content.replace(/\s+/g, " ").slice(0, 160); const inboundLabel = isGroup ? `POPO message in group ${ctx.sessionId}` : `POPO DM from ${ctx.senderEmail}`; core.system.enqueueSystemEvent(`${inboundLabel}: ${preview}`, { sessionKey: route.sessionKey, contextKey: `popo:message:${ctx.sessionId}:${ctx.messageId}`, }); // Resolve media from message const mediaMaxBytes = (popoCfg?.mediaMaxMb ?? 20) * 1024 * 1024; const mediaList = await resolvePopoMediaList({ cfg, event, maxBytes: mediaMaxBytes, log, }); const mediaPayload = buildPopoMediaPayload(mediaList); const envelopeOptions = core.channel.reply.resolveEnvelopeFormatOptions(cfg); // Build message body let messageBody = ctx.content; const speaker = ctx.senderName ?? ctx.senderEmail; messageBody = `${speaker}: ${messageBody}`; const envelopeFrom = isGroup ? `${ctx.sessionId}:${ctx.senderEmail}` : ctx.senderEmail; const body = core.channel.reply.formatAgentEnvelope({ channel: "POPO", from: envelopeFrom, timestamp: new Date(), envelope: envelopeOptions, body: messageBody, }); let combinedBody = body; const historyKey = isGroup ? ctx.sessionId : undefined; if (isGroup && historyKey && chatHistories) { combinedBody = buildPendingHistoryContextFromMap({ historyMap: chatHistories, historyKey, limit: historyLimit, currentMessage: combinedBody, formatEntry: (entry) => core.channel.reply.formatAgentEnvelope({ channel: "POPO", from: `${ctx.sessionId}:${entry.sender}`, timestamp: entry.timestamp, body: entry.body, envelope: envelopeOptions, }), }); } const ctxPayload = core.channel.reply.finalizeInboundContext({ Body: combinedBody, RawBody: ctx.content, CommandBody: ctx.content, From: popoFrom, To: popoTo, SessionKey: route.sessionKey, AccountId: route.accountId, ChatType: isGroup ? "group" : "direct", GroupSubject: isGroup ? ctx.sessionId : undefined, SenderName: ctx.senderName ?? ctx.senderEmail, SenderId: ctx.senderEmail, Provider: "popo" as const, Surface: "popo" as const, MessageSid: ctx.messageId, Timestamp: Date.now(), WasMentioned: isGroup, // Group messages are always @mentions in POPO CommandAuthorized: true, OriginatingChannel: "popo" as const, OriginatingTo: popoTo, ...mediaPayload, }); const { dispatcher, replyOptions, markDispatchIdle } = createPopoReplyDispatcher({ cfg, agentId: route.agentId, runtime: runtime as RuntimeEnv, sessionId: ctx.sessionId, }); log(`popo: dispatching to agent (session=${route.sessionKey})`); const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({ ctx: ctxPayload, cfg, dispatcher, replyOptions, }); markDispatchIdle(); if (isGroup && historyKey && chatHistories) { clearHistoryEntriesIfEnabled({ historyMap: chatHistories, historyKey, limit: historyLimit, }); } log(`popo: dispatch complete (queuedFinal=${queuedFinal}, replies=${counts.final})`); } catch (err) { error(`popo: failed to dispatch message: ${String(err)}`); } }