import { createScopedPairingAccess, createNormalizedOutboundDeliverer, createReplyPrefixOptions, emitInboundHistory, emitOutboundHistory, formatTextWithAttachmentLinks, logInboundDrop, readStoreAllowFromForDmPolicy, resolveControlCommandGate, resolveOutboundMediaUrls, resolveEffectiveAllowFromLists, type OutboundReplyPayload, type OpenClawConfig, type RuntimeEnv, } from "openclaw/plugin-sdk/compat"; import type { ResolvedHubAccount } from "./accounts.js"; import { getHubRuntime } from "./runtime.js"; import { sendMessageHub } from "./send.js"; import { normalizeHubAllowEntry } from "./targets.js"; import type { CoreConfig, HubInboundMessage } from "./types.js"; const CHANNEL_ID = "hub" as const; function normalizeHubAllowFrom(entries: Array | undefined): string[] { if (!entries) { return []; } return entries .map((entry) => normalizeHubAllowEntry(String(entry))) .filter((entry): entry is string => Boolean(entry)); } function isAllowedSender(allowFrom: string[], senderId: string): boolean { const normalized = normalizeHubAllowEntry(senderId); if (!normalized) { return false; } return allowFrom.some((entry) => entry === "*" || entry === normalized); } async function deliverHubReply(params: { payload: OutboundReplyPayload; target: string; accountId: string; cfg?: OpenClawConfig; agentId?: string; sessionKey?: string; sendReply?: (target: string, text: string) => Promise; statusSink?: (patch: { lastOutboundAt?: number }) => void; }) { const combined = formatTextWithAttachmentLinks( params.payload.text, resolveOutboundMediaUrls(params.payload), ); if (!combined) { return; } if (params.sendReply) { await params.sendReply(params.target, combined); } else { await sendMessageHub(params.target, combined, { accountId: params.accountId }); } if (params.cfg && params.agentId) { await emitOutboundHistory({ cfg: params.cfg, agentId: params.agentId, surface: CHANNEL_ID, conversationKey: `hub:${params.target}`, text: combined, accountId: params.accountId, sessionKey: params.sessionKey, }); } params.statusSink?.({ lastOutboundAt: Date.now() }); } export async function handleHubInbound(params: { message: HubInboundMessage; account: ResolvedHubAccount; config: CoreConfig; runtime: RuntimeEnv; sendReply?: (target: string, text: string) => Promise; statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; }): Promise { const { message, account, config, runtime, statusSink } = params; const core = getHubRuntime(); const pairing = createScopedPairingAccess({ core, channel: CHANNEL_ID, accountId: account.accountId, }); const rawBody = message.text?.trim() ?? ""; if (!rawBody) { return; } statusSink?.({ lastInboundAt: message.timestamp }); const senderId = message.from; const peerId = senderId; const route = core.channel.routing.resolveAgentRoute({ cfg: config as OpenClawConfig, channel: CHANNEL_ID, accountId: account.accountId, peer: { kind: "direct", id: peerId, }, }); const dmPolicy = account.config.dmPolicy ?? "pairing"; const configAllowFrom = normalizeHubAllowFrom(account.config.allowFrom); const storeAllowFrom = await readStoreAllowFromForDmPolicy({ provider: CHANNEL_ID, accountId: account.accountId, dmPolicy, readStore: pairing.readStoreForDmPolicy, }); const storeAllowList = normalizeHubAllowFrom(storeAllowFrom); const { effectiveAllowFrom } = resolveEffectiveAllowFromLists({ allowFrom: configAllowFrom, groupAllowFrom: [], storeAllowFrom: storeAllowList, dmPolicy, groupAllowFromFallbackToAllowFrom: false, }); const allowTextCommands = core.channel.commands.shouldHandleTextCommands({ cfg: config as OpenClawConfig, surface: CHANNEL_ID, }); const useAccessGroups = config.commands?.useAccessGroups !== false; const senderAllowed = isAllowedSender(effectiveAllowFrom, senderId); const hasControlCommand = core.channel.text.hasControlCommand(rawBody, config as OpenClawConfig); const commandGate = resolveControlCommandGate({ useAccessGroups, authorizers: [ { configured: effectiveAllowFrom.length > 0, allowed: senderAllowed, }, ], allowTextCommands, hasControlCommand, }); const commandAuthorized = commandGate.commandAuthorized; // DM policy enforcement (Hub is DM-only). if (dmPolicy === "disabled") { await emitInboundHistory({ cfg: config as OpenClawConfig, agentId: route.agentId, surface: CHANNEL_ID, conversationKey: `hub:${senderId}`, disposition: "blocked_dm_policy", text: rawBody, ts: message.timestamp, accountId: account.accountId, messageId: message.messageId, senderId, senderLabel: senderId, }); logInboundDrop({ log: (line) => runtime.log?.(line), channel: CHANNEL_ID, reason: "dmPolicy=disabled", target: senderId, }); return; } if (dmPolicy !== "open") { if (!senderAllowed) { if (dmPolicy === "pairing") { const { code, created } = await pairing.upsertPairingRequest({ // Preserve the canonical Hub agent id for notify-on-approve flows. id: senderId, meta: { name: senderId }, }); if (created) { try { const reply = core.channel.pairing.buildPairingReply({ channel: CHANNEL_ID, idLine: `Your Hub id: ${senderId}`, code, }); await deliverHubReply({ payload: { text: reply }, target: senderId, accountId: account.accountId, cfg: config as OpenClawConfig, agentId: route.agentId, sessionKey: route.sessionKey, sendReply: params.sendReply, statusSink, }); } catch (err) { runtime.error?.(`hub: pairing reply failed for ${senderId}: ${String(err)}`); } } } await emitInboundHistory({ cfg: config as OpenClawConfig, agentId: route.agentId, surface: CHANNEL_ID, conversationKey: `hub:${senderId}`, disposition: dmPolicy === "pairing" ? "paired_prompted" : "blocked_dm_policy", text: rawBody, ts: message.timestamp, accountId: account.accountId, messageId: message.messageId, senderId, senderLabel: senderId, }); logInboundDrop({ log: (line) => runtime.log?.(line), channel: CHANNEL_ID, reason: `dmPolicy=${dmPolicy}`, target: senderId, }); return; } } if (commandGate.shouldBlock) { await emitInboundHistory({ cfg: config as OpenClawConfig, agentId: route.agentId, surface: CHANNEL_ID, conversationKey: `hub:${senderId}`, disposition: "blocked_command_auth", text: rawBody, ts: message.timestamp, accountId: account.accountId, messageId: message.messageId, senderId, senderLabel: senderId, }); logInboundDrop({ log: (line) => runtime.log?.(line), channel: CHANNEL_ID, reason: "control command (unauthorized)", target: senderId, }); return; } const storePath = core.channel.session.resolveStorePath(config.session?.store, { agentId: route.agentId, }); const envelopeOptions = core.channel.reply.resolveEnvelopeFormatOptions(config as OpenClawConfig); const previousTimestamp = core.channel.session.readSessionUpdatedAt({ storePath, sessionKey: route.sessionKey, }); const body = core.channel.reply.formatAgentEnvelope({ channel: "Hub", from: senderId, timestamp: message.timestamp, previousTimestamp, envelope: envelopeOptions, body: rawBody, }); const ctxPayload = core.channel.reply.finalizeInboundContext({ Body: body, RawBody: rawBody, CommandBody: rawBody, From: `hub:${senderId}`, To: `hub:${senderId}`, SessionKey: route.sessionKey, AccountId: route.accountId, ChatType: "direct", ConversationLabel: senderId, SenderName: senderId, SenderId: senderId, Provider: CHANNEL_ID, Surface: CHANNEL_ID, MessageSid: message.messageId, Timestamp: message.timestamp, OriginatingChannel: CHANNEL_ID, OriginatingTo: `hub:${senderId}`, CommandAuthorized: commandAuthorized, }); await core.channel.session.recordInboundSession({ storePath, sessionKey: ctxPayload.SessionKey ?? route.sessionKey, ctx: ctxPayload, onRecordError: (err) => { runtime.error?.(`hub: failed updating session meta: ${String(err)}`); }, }); // Passive ACK: notify Hub that this message was loaded into a live session. // Fire-and-forget — does not block the reply dispatch path. // Enables senders to query GET /agents//messages/sent?session_loaded=true // to confirm their message reached an active runtime. // Always use localhost — Hub is at :8080 inside the container; account.url is public const hubUrl = "http://127.0.0.1:8080"; const ackPayload = { secret: account.secret, ack_type: "session_loaded" }; console.log( `[PASSIVE-ACK] message.messageId="${message.messageId}" account.agentId="${account.agentId}" hasSecret=${!!account.secret} hubUrl="${hubUrl}"`, ); fetch(`${hubUrl}/agents/${account.agentId}/messages/${message.messageId}/ack`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify(ackPayload), }).catch((e) => console.log(`[PASSIVE-ACK] fetch error: ${e}`)); // log errors not silent const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({ cfg: config as OpenClawConfig, agentId: route.agentId, channel: CHANNEL_ID, accountId: account.accountId, }); const deliverReply = createNormalizedOutboundDeliverer(async (payload) => { await deliverHubReply({ payload, target: peerId, accountId: account.accountId, cfg: config as OpenClawConfig, agentId: route.agentId, sessionKey: route.sessionKey, sendReply: params.sendReply, statusSink, }); }); await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({ ctx: ctxPayload, cfg: config as OpenClawConfig, dispatcherOptions: { ...prefixOptions, deliver: deliverReply, onError: (err, info) => { runtime.error?.(`hub ${info.kind} reply failed: ${String(err)}`); }, }, replyOptions: { onModelSelected, // Hub transport must not surface incremental internal work/product text. // Even though the plugin advertises blockStreaming support for chunking, // the live runtime/provider path has been leaking non-final assistant text // into Hub conversations. Force final-only delivery until the transport can // provably handle streamed block replies without exposing internal narration. disableBlockStreaming: true, }, }); }