import { Block, SessionState } from "indite-js/schemas"; import { WhatsAppCredentials, WhatsAppIncomingMessage, } from "indite-js/schemas/features/whatsapp"; import { env } from "indite-js/env"; import { sendChatReplyToWhatsApp } from "./sendChatReplyToWhatsApp"; import { startWhatsAppSession } from "./startWhatsAppSession"; import { getSession } from "../queries/getSession"; import { continueBotFlow } from "../continueBotFlow"; import { decrypt } from "indite-js/lib/api/encryption/decrypt"; import { saveStateToDatabase } from "../saveStateToDatabase"; import prisma from "indite-js/lib/prisma"; import { isDefined } from "indite-js/lib/utils"; import { Reply } from "../types"; import { setIsReplyingInChatSession } from "../queries/setIsReplyingInChatSession"; import { removeIsReplyingInChatSession } from "../queries/removeIsReplyingInChatSession"; import redis from "indite-js/lib/redis"; import { downloadMedia } from "./downloadMedia"; import { InputBlockType } from "indite-js/schemas/features/blocks/inputs/constants"; import { uploadFileToBucket } from "indite-js/lib/s3/uploadFileToBucket"; import { getBlockById } from "indite-js/schemas/helpers"; const incomingMessageDebounce = 3000; type Props = { receivedMessage: WhatsAppIncomingMessage; sessionId: string; credentialsId?: string; phoneNumberId?: string; workspaceId?: string; contact: NonNullable["contact"]; }; export const resumeWhatsAppFlow = async ({ receivedMessage, sessionId, workspaceId, credentialsId, phoneNumberId, contact, }: Props): Promise<{ message: string }> => { const messageSendDate = new Date(Number(receivedMessage.timestamp) * 1000); const messageSentBefore3MinutesAgo = messageSendDate.getTime() < Date.now() - 180000; if (messageSentBefore3MinutesAgo) { console.log("Message is too old", messageSendDate.getTime()); return { message: "Message received", }; } const isPreview = workspaceId === undefined || credentialsId === undefined; const credentials = await getCredentials({ credentialsId, isPreview }); if (!credentials) { console.error("Could not find credentials"); return { message: "Message received", }; } if (credentials.phoneNumberId !== phoneNumberId && !isPreview) { console.error("Credentials point to another phone ID, skipping..."); return { message: "Message received", }; } const session = await getSession(sessionId); const { incomingMessages, isReplyingWasSet } = await aggregateParallelMediaMessagesIfRedisEnabled({ receivedMessage, existingSessionId: session?.id, newSessionId: sessionId, }); if (incomingMessages.length === 0) { if (isReplyingWasSet) await removeIsReplyingInChatSession(sessionId); return { message: "Message received", }; } const isSessionExpired = session && isDefined(session.state.expiryTimeout) && session?.updatedAt.getTime() + session.state.expiryTimeout < Date.now(); if (!isReplyingWasSet) { if (session?.isReplying) { if (!isSessionExpired) { console.log("Is currently replying, skipping..."); return { message: "Message received", }; } } else { await setIsReplyingInChatSession({ existingSessionId: session?.id, newSessionId: sessionId, }); } } const currentBot = session?.state.botsQueue[0].bot; const { block } = (currentBot && session?.state.currentBlockId ? getBlockById(session.state.currentBlockId, currentBot.groups) : undefined) ?? {}; const reply = await getIncomingMessageContent({ messages: incomingMessages, workspaceId, accessToken: credentials?.systemUserAccessToken, botId: currentBot?.id, resultId: session?.state.botsQueue[0].resultId, block, }); const resumeResponse = session && !isSessionExpired ? await continueBotFlow(reply, { version: 2, state: { ...session.state, whatsApp: { contact } }, textBubbleContentFormat: "richText", }) : workspaceId ? await startWhatsAppSession({ incomingMessage: reply, workspaceId, credentials: { ...credentials, id: credentialsId as string }, contact, }) : { error: "workspaceId not found" }; if ("error" in resumeResponse) { await removeIsReplyingInChatSession(sessionId); console.log("Chat not starting:", resumeResponse.error); return { message: "Message received", }; } const { input, logs, newSessionState, messages, clientSideActions, visitedEdges, setVariableHistory, } = resumeResponse; const isFirstChatChunk = (!session || isSessionExpired) ?? false; await sendChatReplyToWhatsApp({ to: receivedMessage.from, messages, input, isFirstChatChunk, typingEmulation: newSessionState.typingEmulation, clientSideActions, credentials, state: newSessionState, }); await saveStateToDatabase({ clientSideActions: [], input, logs, session: { id: sessionId, state: { ...newSessionState, currentBlockId: !input ? undefined : newSessionState.currentBlockId, }, }, visitedEdges, setVariableHistory, }); return { message: "Message received", }; }; const getIncomingMessageContent = async ({ messages, workspaceId, accessToken, botId, resultId, block, }: { messages: WhatsAppIncomingMessage[]; workspaceId?: string; accessToken: string; botId?: string; resultId?: string; block?: Block; }): Promise => { let text: string = ""; const attachedFileUrls: string[] = []; for (const message of messages) { switch (message.type) { case "text": { if (text !== "") text += `\n\n${message.text.body}`; else text = message.text.body; break; } case "button": { if (text !== "") text += `\n\n${message.button.text}`; else text = message.button.text; break; } case "interactive": { if (text !== "") text += `\n\n${message.interactive.button_reply.id}`; else text = message.interactive.button_reply.id; break; } case "document": case "audio": case "video": case "image": { let mediaId: string | undefined; if (message.type === "video") mediaId = message.video.id; if (message.type === "image") mediaId = message.image.id; if (message.type === "audio") mediaId = message.audio.id; if (message.type === "document") mediaId = message.document.id; if (!mediaId) return; const fileVisibility = block?.type === InputBlockType.TEXT && block.options?.audioClip?.isEnabled && message.type === "audio" ? block.options?.audioClip.visibility : block?.type === InputBlockType.FILE ? block.options?.visibility : block?.type === InputBlockType.TEXT ? block.options?.attachments?.visibility : undefined; let fileUrl; if (fileVisibility !== "Public") { fileUrl = env.NEXTAUTH_URL + `/api/bots/${botId}/whatsapp/media/${ workspaceId ? `` : "preview/" }${mediaId}`; } else { const { file, mimeType } = await downloadMedia({ mediaId, systemUserAccessToken: accessToken, }); const url = await uploadFileToBucket({ file, key: resultId && workspaceId && botId ? `public/workspaces/${workspaceId}/bots/${botId}/results/${resultId}/${mediaId}` : `tmp/whatsapp/media/${mediaId}`, mimeType, }); fileUrl = url; } if (message.type === "audio") return { type: "audio", url: fileUrl, }; if (block?.type === InputBlockType.FILE) { if (text !== "") text += `, ${fileUrl}`; else text = fileUrl; } else if (block?.type === InputBlockType.TEXT) { let caption: string | undefined; if (message.type === "document" && message.document.caption) { if (!/^[\w,\s-]+\.[A-Za-z]{3}$/.test(message.document.caption)) caption = message.document.caption; } else if (message.type === "image" && message.image.caption) caption = message.image.caption; else if (message.type === "video" && message.video.caption) caption = message.video.caption; if (caption) text = text === "" ? caption : `${text}\n\n${caption}`; attachedFileUrls.push(fileUrl); } break; } case "location": { const location = `${message.location.latitude}, ${message.location.longitude}`; if (text !== "") text += `\n\n${location}`; else text = location; break; } } } return { type: "text", text, attachedFileUrls, }; }; const getCredentials = async ({ credentialsId, isPreview, }: { credentialsId?: string; isPreview: boolean; }): Promise => { if (isPreview) { if ( !env.META_SYSTEM_USER_TOKEN || !env.WHATSAPP_PREVIEW_FROM_PHONE_NUMBER_ID ) return; return { systemUserAccessToken: env.META_SYSTEM_USER_TOKEN, phoneNumberId: env.WHATSAPP_PREVIEW_FROM_PHONE_NUMBER_ID, }; } if (!credentialsId) return; const credentials = await prisma.credentials.findUnique({ where: { id: credentialsId, }, select: { data: true, iv: true, }, }); if (!credentials) return; const data = (await decrypt( credentials.data, credentials.iv )) as WhatsAppCredentials["data"]; return { systemUserAccessToken: data.systemUserAccessToken, phoneNumberId: data.phoneNumberId, }; }; const aggregateParallelMediaMessagesIfRedisEnabled = async ({ receivedMessage, existingSessionId, newSessionId, }: { receivedMessage: WhatsAppIncomingMessage; existingSessionId?: string; newSessionId: string; }): Promise<{ isReplyingWasSet: boolean; incomingMessages: WhatsAppIncomingMessage[]; }> => { if (redis && ["document", "video", "image"].includes(receivedMessage.type)) { const redisKey = `wasession:${newSessionId}`; try { const len = await redis.rpush(redisKey, JSON.stringify(receivedMessage)); if (len === 1) { await setIsReplyingInChatSession({ existingSessionId, newSessionId, }); } await new Promise((resolve) => setTimeout(resolve, incomingMessageDebounce) ); const newMessagesResponse = await redis.lrange(redisKey, 0, -1); if (!newMessagesResponse || newMessagesResponse.length > len) { // Current message was aggregated with other messages another webhook handler. Skipping... return { isReplyingWasSet: true, incomingMessages: [] }; } redis.del(redisKey).then(); return { isReplyingWasSet: true, incomingMessages: newMessagesResponse.map((msgStr) => JSON.parse(msgStr) ), }; } catch (error) { console.error("Failed to process webhook event:", error, receivedMessage); } } return { isReplyingWasSet: false, incomingMessages: [receivedMessage], }; };