import { type ClawdbotConfig, type ReplyPayload, type RuntimeEnv, } from "openclaw/plugin-sdk"; import { createReplyPrefixContext, createTypingCallbacks, logTypingFailure, } from "./sdk-compat.js"; import { resolveFeishuAccount } from "./accounts.js"; import { createFeishuClient } from "./client.js"; import { buildMentionedCardContent, type MentionTarget } from "./mention.js"; import { normalizeFeishuMarkdownLinks } from "./text/markdown-links.js"; import { getFeishuRuntime } from "./runtime.js"; import { sendMarkdownCardFeishu, sendMessageFeishu } from "./send.js"; import { FeishuStreamingSession, mergeStreamingText } from "./streaming-card.js"; import { resolveReceiveIdType } from "./targets.js"; import { addTypingIndicator, removeTypingIndicator, type TypingIndicatorState } from "./typing.js"; /** Maximum age (ms) for a message to receive a typing indicator reaction. * Messages older than this are likely replays after context compaction (#30418). */ const TYPING_INDICATOR_MAX_AGE_MS = 2 * 60_000; /** Minimum value to treat a timestamp as epoch-milliseconds vs epoch-seconds. */ const MS_EPOCH_MIN = 1_000_000_000_000; /** * Normalize a timestamp to epoch-milliseconds. * Some Feishu payloads use epoch-seconds; values below 1e12 are treated as such. */ function normalizeEpochMs(timestamp: number | undefined): number | undefined { if (!Number.isFinite(timestamp) || timestamp === undefined || timestamp <= 0) { return undefined; } return timestamp < MS_EPOCH_MIN ? timestamp * 1000 : timestamp; } /** Detect if text contains markdown elements that benefit from card rendering */ function shouldUseCard(text: string): boolean { return /```[\s\S]*?```/.test(text) || /\|.+\|[\r\n]+\|[-:| ]+\|/.test(text); } export type CreateFeishuReplyDispatcherParams = { cfg: ClawdbotConfig; agentId: string; runtime: RuntimeEnv; chatId: string; replyToMessageId?: string; mentionTargets?: MentionTarget[]; accountId?: string; /** Epoch ms when the inbound message was created. Used to suppress typing * indicators on old/replayed messages after context compaction (#30418). */ messageCreateTimeMs?: number; }; export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherParams) { const core = getFeishuRuntime(); const { cfg, agentId, chatId, replyToMessageId, mentionTargets, accountId, messageCreateTimeMs } = params; const account = resolveFeishuAccount({ cfg, accountId }); const prefixContext = createReplyPrefixContext({ cfg, agentId }); let typingState: TypingIndicatorState | null = null; const typingCallbacks = createTypingCallbacks({ start: async () => { if (!replyToMessageId) { return; } // Skip typing indicator for old messages — likely replays after context // compaction that would flood users with stale notifications (#30418). const normalizedCreateTime = normalizeEpochMs(messageCreateTimeMs); if ( normalizedCreateTime !== undefined && Date.now() - normalizedCreateTime > TYPING_INDICATOR_MAX_AGE_MS ) { return; } // Feishu reactions persist until explicitly removed, so skip keepalive // re-adds when a reaction already exists. Re-adding the same emoji // triggers a new push notification for every call (#28660). if (typingState?.reactionId) { return; } typingState = await addTypingIndicator({ cfg, messageId: replyToMessageId, accountId }); }, stop: async () => { if (!typingState) { return; } await removeTypingIndicator({ cfg, state: typingState, accountId }); typingState = null; }, onStartError: (err) => logTypingFailure({ log: (message) => params.runtime.log?.(message), channel: "feishu", action: "start", error: err, }), onStopError: (err) => logTypingFailure({ log: (message) => params.runtime.log?.(message), channel: "feishu", action: "stop", error: err, }), }); const textChunkLimit = core.channel.text.resolveTextChunkLimit(cfg, "feishu", account.accountId, { fallbackLimit: 4000, }); const chunkMode = core.channel.text.resolveChunkMode(cfg, "feishu", account.accountId); const tableMode = core.channel.text.resolveMarkdownTableMode({ cfg, channel: "feishu", accountId: account.accountId, }); const renderMode = account.config?.renderMode ?? "auto"; const streamingEnabled = account.config?.streaming === true && renderMode !== "raw"; let streaming: FeishuStreamingSession | null = null; let streamText = ""; let lastPartial = ""; let streamingCompleted = false; let partialUpdateQueue: Promise = Promise.resolve(); let streamingStartPromise: Promise | null = null; const mergeIntoStreamText = (nextText: string) => { streamText = mergeStreamingText(streamText, nextText); }; const enqueueStreamingFlush = () => { partialUpdateQueue = partialUpdateQueue.then(async () => { if (streamingStartPromise) { await streamingStartPromise; } if (streaming?.isActive()) { await streaming.update(streamText); } }); }; const startStreaming = () => { if (!streamingEnabled || streamingStartPromise || streaming || streamingCompleted) { return; } streamingStartPromise = (async () => { const creds = account.appId && account.appSecret ? { appId: account.appId, appSecret: account.appSecret, domain: account.domain } : null; if (!creds) { return; } streaming = new FeishuStreamingSession(createFeishuClient(account), creds, (message) => params.runtime.log?.(`feishu[${account.accountId}] ${message}`), ); try { await streaming.start(chatId, resolveReceiveIdType(chatId), replyToMessageId); } catch (error) { params.runtime.error?.(`feishu: streaming start failed: ${String(error)}`); streaming = null; } })(); }; const closeStreaming = async () => { if (streamingStartPromise) { await streamingStartPromise; } await partialUpdateQueue; if (streaming?.isActive()) { let text = streamText; if (mentionTargets?.length) { text = buildMentionedCardContent(mentionTargets, text); } await streaming.close(normalizeFeishuMarkdownLinks(text)); streamingCompleted = true; } streaming = null; streamingStartPromise = null; streamText = ""; lastPartial = ""; }; const { dispatcher, replyOptions, markDispatchIdle } = core.channel.reply.createReplyDispatcherWithTyping({ responsePrefix: prefixContext.responsePrefix, responsePrefixContextProvider: prefixContext.responsePrefixContextProvider, humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, agentId), onReplyStart: () => { if (streamingEnabled && renderMode === "card") { startStreaming(); } void typingCallbacks.onReplyStart?.(); }, deliver: async (payload: ReplyPayload, info) => { const text = payload.text ?? ""; if (!text.trim()) { return; } const useCard = renderMode === "card" || (renderMode === "auto" && shouldUseCard(text)); if (info?.kind === "block") { // Drop internal block chunks unless we can safely consume them as // streaming-card fallback content (#31723). if (!(streamingEnabled && useCard)) { return; } startStreaming(); if (streamingStartPromise) { await streamingStartPromise; } } if (info?.kind === "final" && streamingEnabled && useCard) { startStreaming(); if (streamingStartPromise) { await streamingStartPromise; } } if (streaming?.isActive()) { if (info?.kind === "block") { // Some runtimes emit block payloads without onPartial/final callbacks. // Mirror block text into streamText so onIdle close still sends content. mergeIntoStreamText(text); enqueueStreamingFlush(); } if (info?.kind === "final") { streamText = text; await closeStreaming(); } return; } // Streaming card already delivered the content — skip regular send to // avoid a duplicate card when the runtime delivers a second payload // after the streaming session has closed (#399). if (streamingCompleted) { return; } let first = true; if (useCard) { for (const chunk of core.channel.text.chunkTextWithMode(text, textChunkLimit, chunkMode)) { await sendMarkdownCardFeishu({ cfg, to: chatId, text: chunk, replyToMessageId, mentions: first ? mentionTargets : undefined, accountId, }); first = false; } } else { const converted = core.channel.text.convertMarkdownTables(text, tableMode); for (const chunk of core.channel.text.chunkTextWithMode( converted, textChunkLimit, chunkMode, )) { await sendMessageFeishu({ cfg, to: chatId, text: chunk, replyToMessageId, mentions: first ? mentionTargets : undefined, accountId, }); first = false; } } }, onError: async (error, info) => { params.runtime.error?.( `feishu[${account.accountId}] ${info.kind} reply failed: ${String(error)}`, ); await closeStreaming(); typingCallbacks.onIdle?.(); }, onIdle: async () => { await closeStreaming(); typingCallbacks.onIdle?.(); }, onCleanup: () => { typingCallbacks.onCleanup?.(); }, }); return { dispatcher, replyOptions: { ...replyOptions, onModelSelected: prefixContext.onModelSelected, onPartialReply: streamingEnabled ? (payload: ReplyPayload) => { const partialText = normalizeFeishuMarkdownLinks(payload.text ?? ""); if (!partialText || partialText === lastPartial) { return; } lastPartial = partialText; // Partials are cumulative — replace streamText directly instead of // merging, which avoids duplication when segment boundaries shift // (e.g. after a tool call mid-stream). streamText = partialText; enqueueStreamingFlush(); } : undefined, }, markDispatchIdle, }; }