import type { RuntimeEnv, ReplyPayload, BotConfig } from "@hanzo/bot/plugin-sdk/tlon"; import { createLoggerBackedRuntime, createReplyPrefixOptions } from "@hanzo/bot/plugin-sdk/tlon"; import { getTlonRuntime } from "../runtime.js"; import { createSettingsManager, type TlonSettingsStore } from "../settings.js"; import { normalizeShip, parseChannelNest } from "../targets.js"; import { resolveTlonAccount } from "../types.js"; import { authenticate } from "../urbit/auth.js"; import { ssrfPolicyFromAllowPrivateNetwork } from "../urbit/context.js"; import type { Foreigns, DmInvite } from "../urbit/foreigns.js"; import { sendDm, sendGroupMessage } from "../urbit/send.js"; import { UrbitSSEClient } from "../urbit/sse-client.js"; import { type PendingApproval, type AdminCommand, createPendingApproval, formatApprovalRequest, formatApprovalConfirmation, parseApprovalResponse, isApprovalResponse, findPendingApproval, removePendingApproval, parseAdminCommand, isAdminCommand, formatBlockedList, formatPendingList, } from "./approval.js"; import { fetchAllChannels, fetchInitData } from "./discovery.js"; import { cacheMessage, getChannelHistory, fetchThreadHistory } from "./history.js"; import { downloadMessageImages } from "./media.js"; import { createProcessedMessageTracker } from "./processed-messages.js"; import { extractMessageText, extractCites, formatModelName, isBotMentioned, stripBotMention, isDmAllowed, isSummarizationRequest, type ParsedCite, } from "./utils.js"; export type MonitorTlonOpts = { runtime?: RuntimeEnv; abortSignal?: AbortSignal; accountId?: string | null; }; type ChannelAuthorization = { mode?: "restricted" | "open"; allowedShips?: string[]; }; /** * Resolve channel authorization by merging file config with settings store. * Settings store takes precedence for fields it defines. */ function resolveChannelAuthorization( cfg: BotConfig, channelNest: string, settings?: TlonSettingsStore, ): { mode: "restricted" | "open"; allowedShips: string[] } { const tlonConfig = cfg.channels?.tlon as | { authorization?: { channelRules?: Record }; defaultAuthorizedShips?: string[]; } | undefined; // Merge channel rules: settings override file config const fileRules = tlonConfig?.authorization?.channelRules ?? {}; const settingsRules = settings?.channelRules ?? {}; const rule = settingsRules[channelNest] ?? fileRules[channelNest]; // Merge default authorized ships: settings override file config const defaultShips = settings?.defaultAuthorizedShips ?? tlonConfig?.defaultAuthorizedShips ?? []; const allowedShips = rule?.allowedShips ?? defaultShips; const mode = rule?.mode ?? "restricted"; return { mode, allowedShips }; } export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise { const core = getTlonRuntime(); const cfg = core.config.loadConfig() as BotConfig; if (cfg.channels?.tlon?.enabled === false) { return; } const logger = core.logging.getChildLogger({ module: "tlon-auto-reply" }); const runtime: RuntimeEnv = opts.runtime ?? createLoggerBackedRuntime({ logger, }); const account = resolveTlonAccount(cfg, opts.accountId ?? undefined); if (!account.enabled) { return; } if (!account.configured || !account.ship || !account.url || !account.code) { throw new Error("Tlon account not configured (ship/url/code required)"); } const botShipName = normalizeShip(account.ship); runtime.log?.(`[tlon] Starting monitor for ${botShipName}`); const ssrfPolicy = ssrfPolicyFromAllowPrivateNetwork(account.allowPrivateNetwork); // Store validated values for use in closures (TypeScript narrowing doesn't propagate) const accountUrl = account.url; const accountCode = account.code; // Helper to authenticate with retry logic async function authenticateWithRetry(maxAttempts = 10): Promise { for (let attempt = 1; ; attempt++) { if (opts.abortSignal?.aborted) { throw new Error("Aborted while waiting to authenticate"); } try { runtime.log?.(`[tlon] Attempting authentication to ${accountUrl}...`); return await authenticate(accountUrl, accountCode, { ssrfPolicy }); } catch (error: any) { runtime.error?.( `[tlon] Failed to authenticate (attempt ${attempt}): ${error?.message ?? String(error)}`, ); if (attempt >= maxAttempts) { throw error; } const delay = Math.min(30000, 1000 * Math.pow(2, attempt - 1)); runtime.log?.(`[tlon] Retrying authentication in ${delay}ms...`); await new Promise((resolve, reject) => { const timer = setTimeout(resolve, delay); if (opts.abortSignal) { const onAbort = () => { clearTimeout(timer); reject(new Error("Aborted")); }; opts.abortSignal.addEventListener("abort", onAbort, { once: true }); } }); } } } let api: UrbitSSEClient | null = null; const cookie = await authenticateWithRetry(); api = new UrbitSSEClient(account.url, cookie, { ship: botShipName, ssrfPolicy, logger: { log: (message) => runtime.log?.(message), error: (message) => runtime.error?.(message), }, // Re-authenticate on reconnect in case the session expired onReconnect: async (client) => { runtime.log?.("[tlon] Re-authenticating on SSE reconnect..."); const newCookie = await authenticateWithRetry(5); client.updateCookie(newCookie); runtime.log?.("[tlon] Re-authentication successful"); }, }); const processedTracker = createProcessedMessageTracker(2000); let groupChannels: string[] = []; let botNickname: string | null = null; // Settings store manager for hot-reloading config const settingsManager = createSettingsManager(api, { log: (msg) => runtime.log?.(msg), error: (msg) => runtime.error?.(msg), }); // Reactive state that can be updated via settings store let effectiveDmAllowlist: string[] = account.dmAllowlist; let effectiveShowModelSig: boolean = account.showModelSignature ?? false; let effectiveAutoAcceptDmInvites: boolean = account.autoAcceptDmInvites ?? false; let effectiveAutoAcceptGroupInvites: boolean = account.autoAcceptGroupInvites ?? false; let effectiveGroupInviteAllowlist: string[] = account.groupInviteAllowlist; let effectiveAutoDiscoverChannels: boolean = account.autoDiscoverChannels ?? false; let effectiveOwnerShip: string | null = account.ownerShip ? normalizeShip(account.ownerShip) : null; let pendingApprovals: PendingApproval[] = []; let currentSettings: TlonSettingsStore = {}; // Track threads we've participated in (by parentId) - respond without mention requirement const participatedThreads = new Set(); // Track DM senders per session to detect shared sessions (security warning) const dmSendersBySession = new Map>(); let sharedSessionWarningSent = false; // Fetch bot's nickname from contacts try { const selfProfile = await api.scry("/contacts/v1/self.json"); if (selfProfile && typeof selfProfile === "object") { const profile = selfProfile as { nickname?: { value?: string } }; botNickname = profile.nickname?.value || null; if (botNickname) { runtime.log?.(`[tlon] Bot nickname: ${botNickname}`); } } } catch (error: any) { runtime.log?.(`[tlon] Could not fetch nickname: ${error?.message ?? String(error)}`); } // Store init foreigns for processing after settings are loaded let initForeigns: Foreigns | null = null; // Migrate file config to settings store (seed on first run) async function migrateConfigToSettings() { const migrations: Array<{ key: string; fileValue: unknown; settingsValue: unknown }> = [ { key: "dmAllowlist", fileValue: account.dmAllowlist, settingsValue: currentSettings.dmAllowlist, }, { key: "groupInviteAllowlist", fileValue: account.groupInviteAllowlist, settingsValue: currentSettings.groupInviteAllowlist, }, { key: "groupChannels", fileValue: account.groupChannels, settingsValue: currentSettings.groupChannels, }, { key: "defaultAuthorizedShips", fileValue: account.defaultAuthorizedShips, settingsValue: currentSettings.defaultAuthorizedShips, }, { key: "autoDiscoverChannels", fileValue: account.autoDiscoverChannels, settingsValue: currentSettings.autoDiscoverChannels, }, { key: "autoAcceptDmInvites", fileValue: account.autoAcceptDmInvites, settingsValue: currentSettings.autoAcceptDmInvites, }, { key: "autoAcceptGroupInvites", fileValue: account.autoAcceptGroupInvites, settingsValue: currentSettings.autoAcceptGroupInvites, }, { key: "showModelSig", fileValue: account.showModelSignature, settingsValue: currentSettings.showModelSig, }, ]; for (const { key, fileValue, settingsValue } of migrations) { // Only migrate if file has a value and settings store doesn't const hasFileValue = Array.isArray(fileValue) ? fileValue.length > 0 : fileValue != null; const hasSettingsValue = Array.isArray(settingsValue) ? settingsValue.length > 0 : settingsValue != null; if (hasFileValue && !hasSettingsValue) { try { await api!.poke({ app: "settings", mark: "settings-event", json: { "put-entry": { "bucket-key": "tlon", "entry-key": key, value: fileValue, desk: "moltbot", }, }, }); runtime.log?.(`[tlon] Migrated ${key} from config to settings store`); } catch (err) { runtime.log?.(`[tlon] Failed to migrate ${key}: ${String(err)}`); } } } } // Load settings from settings store (hot-reloadable config) try { currentSettings = await settingsManager.load(); // Migrate file config to settings store if not already present await migrateConfigToSettings(); // Apply settings overrides // Note: groupChannels from settings store are merged AFTER discovery runs (below) if (currentSettings.defaultAuthorizedShips?.length) { runtime.log?.( `[tlon] Using defaultAuthorizedShips from settings store: ${currentSettings.defaultAuthorizedShips.join(", ")}`, ); } if (currentSettings.autoDiscoverChannels !== undefined) { effectiveAutoDiscoverChannels = currentSettings.autoDiscoverChannels; runtime.log?.( `[tlon] Using autoDiscoverChannels from settings store: ${effectiveAutoDiscoverChannels}`, ); } if (currentSettings.dmAllowlist?.length) { effectiveDmAllowlist = currentSettings.dmAllowlist; runtime.log?.( `[tlon] Using dmAllowlist from settings store: ${effectiveDmAllowlist.join(", ")}`, ); } if (currentSettings.showModelSig !== undefined) { effectiveShowModelSig = currentSettings.showModelSig; } if (currentSettings.autoAcceptDmInvites !== undefined) { effectiveAutoAcceptDmInvites = currentSettings.autoAcceptDmInvites; runtime.log?.( `[tlon] Using autoAcceptDmInvites from settings store: ${effectiveAutoAcceptDmInvites}`, ); } if (currentSettings.autoAcceptGroupInvites !== undefined) { effectiveAutoAcceptGroupInvites = currentSettings.autoAcceptGroupInvites; runtime.log?.( `[tlon] Using autoAcceptGroupInvites from settings store: ${effectiveAutoAcceptGroupInvites}`, ); } if (currentSettings.groupInviteAllowlist?.length) { effectiveGroupInviteAllowlist = currentSettings.groupInviteAllowlist; runtime.log?.( `[tlon] Using groupInviteAllowlist from settings store: ${effectiveGroupInviteAllowlist.join(", ")}`, ); } if (currentSettings.ownerShip) { effectiveOwnerShip = normalizeShip(currentSettings.ownerShip); runtime.log?.(`[tlon] Using ownerShip from settings store: ${effectiveOwnerShip}`); } if (currentSettings.pendingApprovals?.length) { pendingApprovals = currentSettings.pendingApprovals; runtime.log?.(`[tlon] Loaded ${pendingApprovals.length} pending approval(s) from settings`); } } catch (err) { runtime.log?.(`[tlon] Settings store not available, using file config: ${String(err)}`); } // Run channel discovery AFTER settings are loaded (so settings store value is used) if (effectiveAutoDiscoverChannels) { try { const initData = await fetchInitData(api, runtime); if (initData.channels.length > 0) { groupChannels = initData.channels; } initForeigns = initData.foreigns; } catch (error: any) { runtime.error?.(`[tlon] Auto-discovery failed: ${error?.message ?? String(error)}`); } } // Merge manual config with auto-discovered channels if (account.groupChannels.length > 0) { for (const ch of account.groupChannels) { if (!groupChannels.includes(ch)) { groupChannels.push(ch); } } runtime.log?.( `[tlon] Added ${account.groupChannels.length} manual groupChannels to monitoring`, ); } // Also merge settings store groupChannels (may have been set via tlon settings command) if (currentSettings.groupChannels?.length) { for (const ch of currentSettings.groupChannels) { if (!groupChannels.includes(ch)) { groupChannels.push(ch); } } } if (groupChannels.length > 0) { runtime.log?.( `[tlon] Monitoring ${groupChannels.length} group channel(s): ${groupChannels.join(", ")}`, ); } else { runtime.log?.("[tlon] No group channels to monitor (DMs only)"); } // Helper to resolve cited message content async function resolveCiteContent(cite: ParsedCite): Promise { if (cite.type !== "chan" || !cite.nest || !cite.postId) { return null; } try { // Scry for the specific post: /v4/{nest}/posts/post/{postId} const scryPath = `/channels/v4/${cite.nest}/posts/post/${cite.postId}.json`; runtime.log?.(`[tlon] Fetching cited post: ${scryPath}`); const data: any = await api!.scry(scryPath); // Extract text from the post's essay content if (data?.essay?.content) { const text = extractMessageText(data.essay.content); return text || null; } return null; } catch (err) { runtime.log?.(`[tlon] Failed to fetch cited post: ${String(err)}`); return null; } } // Resolve all cites in message content and return quoted text async function resolveAllCites(content: unknown): Promise { const cites = extractCites(content); if (cites.length === 0) { return ""; } const resolved: string[] = []; for (const cite of cites) { const text = await resolveCiteContent(cite); if (text) { const author = cite.author || "unknown"; resolved.push(`> ${author} wrote: ${text}`); } } return resolved.length > 0 ? resolved.join("\n") + "\n\n" : ""; } // Helper to save pending approvals to settings store async function savePendingApprovals(): Promise { try { await api!.poke({ app: "settings", mark: "settings-event", json: { "put-entry": { desk: "moltbot", "bucket-key": "tlon", "entry-key": "pendingApprovals", value: JSON.stringify(pendingApprovals), }, }, }); } catch (err) { runtime.error?.(`[tlon] Failed to save pending approvals: ${String(err)}`); } } // Helper to update dmAllowlist in settings store async function addToDmAllowlist(ship: string): Promise { const normalizedShip = normalizeShip(ship); if (!effectiveDmAllowlist.includes(normalizedShip)) { effectiveDmAllowlist = [...effectiveDmAllowlist, normalizedShip]; } try { await api!.poke({ app: "settings", mark: "settings-event", json: { "put-entry": { desk: "moltbot", "bucket-key": "tlon", "entry-key": "dmAllowlist", value: effectiveDmAllowlist, }, }, }); runtime.log?.(`[tlon] Added ${normalizedShip} to dmAllowlist`); } catch (err) { runtime.error?.(`[tlon] Failed to update dmAllowlist: ${String(err)}`); } } // Helper to update channelRules in settings store async function addToChannelAllowlist(ship: string, channelNest: string): Promise { const normalizedShip = normalizeShip(ship); const channelRules = currentSettings.channelRules ?? {}; const rule = channelRules[channelNest] ?? { mode: "restricted", allowedShips: [] }; const allowedShips = [...(rule.allowedShips ?? [])]; // Clone to avoid mutation if (!allowedShips.includes(normalizedShip)) { allowedShips.push(normalizedShip); } const updatedRules = { ...channelRules, [channelNest]: { ...rule, allowedShips }, }; // Update local state immediately (don't wait for settings subscription) currentSettings = { ...currentSettings, channelRules: updatedRules }; try { await api!.poke({ app: "settings", mark: "settings-event", json: { "put-entry": { desk: "moltbot", "bucket-key": "tlon", "entry-key": "channelRules", value: JSON.stringify(updatedRules), }, }, }); runtime.log?.(`[tlon] Added ${normalizedShip} to ${channelNest} allowlist`); } catch (err) { runtime.error?.(`[tlon] Failed to update channelRules: ${String(err)}`); } } // Helper to block a ship using Tlon's native blocking async function blockShip(ship: string): Promise { const normalizedShip = normalizeShip(ship); try { await api!.poke({ app: "chat", mark: "chat-block-ship", json: { ship: normalizedShip }, }); runtime.log?.(`[tlon] Blocked ship ${normalizedShip}`); } catch (err) { runtime.error?.(`[tlon] Failed to block ship ${normalizedShip}: ${String(err)}`); } } // Check if a ship is blocked using Tlon's native block list async function isShipBlocked(ship: string): Promise { const normalizedShip = normalizeShip(ship); try { const blocked = (await api!.scry("/chat/blocked.json")) as string[] | undefined; return Array.isArray(blocked) && blocked.some((s) => normalizeShip(s) === normalizedShip); } catch (err) { runtime.log?.(`[tlon] Failed to check blocked list: ${String(err)}`); return false; } } // Get all blocked ships async function getBlockedShips(): Promise { try { const blocked = (await api!.scry("/chat/blocked.json")) as string[] | undefined; return Array.isArray(blocked) ? blocked : []; } catch (err) { runtime.log?.(`[tlon] Failed to get blocked list: ${String(err)}`); return []; } } // Helper to unblock a ship using Tlon's native blocking async function unblockShip(ship: string): Promise { const normalizedShip = normalizeShip(ship); try { await api!.poke({ app: "chat", mark: "chat-unblock-ship", json: { ship: normalizedShip }, }); runtime.log?.(`[tlon] Unblocked ship ${normalizedShip}`); return true; } catch (err) { runtime.error?.(`[tlon] Failed to unblock ship ${normalizedShip}: ${String(err)}`); return false; } } // Helper to send DM notification to owner async function sendOwnerNotification(message: string): Promise { if (!effectiveOwnerShip) { runtime.log?.("[tlon] No ownerShip configured, cannot send notification"); return; } try { await sendDm({ api: api!, fromShip: botShipName, toShip: effectiveOwnerShip, text: message, }); runtime.log?.(`[tlon] Sent notification to owner ${effectiveOwnerShip}`); } catch (err) { runtime.error?.(`[tlon] Failed to send notification to owner: ${String(err)}`); } } // Queue a new approval request and notify the owner async function queueApprovalRequest(approval: PendingApproval): Promise { // Check if ship is blocked - silently ignore if (await isShipBlocked(approval.requestingShip)) { runtime.log?.(`[tlon] Ignoring request from blocked ship ${approval.requestingShip}`); return; } // Check for duplicate - if found, update it with new content and re-notify const existingIndex = pendingApprovals.findIndex( (a) => a.type === approval.type && a.requestingShip === approval.requestingShip && (approval.type !== "channel" || a.channelNest === approval.channelNest) && (approval.type !== "group" || a.groupFlag === approval.groupFlag), ); if (existingIndex !== -1) { // Update existing approval with new content (preserves the original ID) const existing = pendingApprovals[existingIndex]; if (approval.originalMessage) { existing.originalMessage = approval.originalMessage; existing.messagePreview = approval.messagePreview; } runtime.log?.( `[tlon] Updated existing approval for ${approval.requestingShip} (${approval.type}) - re-sending notification`, ); await savePendingApprovals(); const message = formatApprovalRequest(existing); await sendOwnerNotification(message); return; } pendingApprovals.push(approval); await savePendingApprovals(); const message = formatApprovalRequest(approval); await sendOwnerNotification(message); runtime.log?.( `[tlon] Queued approval request: ${approval.id} (${approval.type} from ${approval.requestingShip})`, ); } // Process the owner's approval response async function handleApprovalResponse(text: string): Promise { const parsed = parseApprovalResponse(text); if (!parsed) { return false; } const approval = findPendingApproval(pendingApprovals, parsed.id); if (!approval) { await sendOwnerNotification( "No pending approval found" + (parsed.id ? ` for ID: ${parsed.id}` : ""), ); return true; // Still consumed the message } if (parsed.action === "approve") { switch (approval.type) { case "dm": await addToDmAllowlist(approval.requestingShip); // Process the original message if available if (approval.originalMessage) { runtime.log?.( `[tlon] Processing original message from ${approval.requestingShip} after approval`, ); await processMessage({ messageId: approval.originalMessage.messageId, senderShip: approval.requestingShip, messageText: approval.originalMessage.messageText, messageContent: approval.originalMessage.messageContent, isGroup: false, timestamp: approval.originalMessage.timestamp, }); } break; case "channel": if (approval.channelNest) { await addToChannelAllowlist(approval.requestingShip, approval.channelNest); // Process the original message if available if (approval.originalMessage) { const parsed = parseChannelNest(approval.channelNest); runtime.log?.( `[tlon] Processing original message from ${approval.requestingShip} in ${approval.channelNest} after approval`, ); await processMessage({ messageId: approval.originalMessage.messageId, senderShip: approval.requestingShip, messageText: approval.originalMessage.messageText, messageContent: approval.originalMessage.messageContent, isGroup: true, channelNest: approval.channelNest, hostShip: parsed?.hostShip, channelName: parsed?.channelName, timestamp: approval.originalMessage.timestamp, parentId: approval.originalMessage.parentId, isThreadReply: approval.originalMessage.isThreadReply, }); } } break; case "group": // Accept the group invite (don't add to allowlist - each invite requires approval) if (approval.groupFlag) { try { await api!.poke({ app: "groups", mark: "group-join", json: { flag: approval.groupFlag, "join-all": true, }, }); runtime.log?.(`[tlon] Joined group ${approval.groupFlag} after approval`); // Immediately discover channels from the newly joined group // Small delay to allow the join to propagate setTimeout(async () => { try { const discoveredChannels = await fetchAllChannels(api!, runtime); let newCount = 0; for (const channelNest of discoveredChannels) { if (!watchedChannels.has(channelNest)) { watchedChannels.add(channelNest); newCount++; } } if (newCount > 0) { runtime.log?.( `[tlon] Discovered ${newCount} new channel(s) after joining group`, ); } } catch (err) { runtime.log?.(`[tlon] Channel discovery after group join failed: ${String(err)}`); } }, 2000); } catch (err) { runtime.error?.(`[tlon] Failed to join group ${approval.groupFlag}: ${String(err)}`); } } break; } await sendOwnerNotification(formatApprovalConfirmation(approval, "approve")); } else if (parsed.action === "block") { // Block the ship using Tlon's native blocking await blockShip(approval.requestingShip); await sendOwnerNotification(formatApprovalConfirmation(approval, "block")); } else { // Denied - just remove from pending, no notification to requester await sendOwnerNotification(formatApprovalConfirmation(approval, "deny")); } // Remove from pending pendingApprovals = removePendingApproval(pendingApprovals, approval.id); await savePendingApprovals(); return true; } // Handle admin commands from owner (unblock, blocked, pending) async function handleAdminCommand(text: string): Promise { const command = parseAdminCommand(text); if (!command) { return false; } switch (command.type) { case "blocked": { const blockedShips = await getBlockedShips(); await sendOwnerNotification(formatBlockedList(blockedShips)); runtime.log?.(`[tlon] Owner requested blocked ships list (${blockedShips.length} ships)`); return true; } case "pending": { await sendOwnerNotification(formatPendingList(pendingApprovals)); runtime.log?.( `[tlon] Owner requested pending approvals list (${pendingApprovals.length} pending)`, ); return true; } case "unblock": { const shipToUnblock = command.ship; const isBlocked = await isShipBlocked(shipToUnblock); if (!isBlocked) { await sendOwnerNotification(`${shipToUnblock} is not blocked.`); return true; } const success = await unblockShip(shipToUnblock); if (success) { await sendOwnerNotification(`Unblocked ${shipToUnblock}.`); } else { await sendOwnerNotification(`Failed to unblock ${shipToUnblock}.`); } return true; } } } // Check if a ship is the owner (always allowed to DM) function isOwner(ship: string): boolean { if (!effectiveOwnerShip) { return false; } return normalizeShip(ship) === effectiveOwnerShip; } /** * Extract the DM partner ship from the 'whom' field. * This is the canonical source for DM routing (more reliable than essay.author). * Returns empty string if whom doesn't contain a valid patp-like value. */ function extractDmPartnerShip(whom: unknown): string { const raw = typeof whom === "string" ? whom : whom && typeof whom === "object" && "ship" in whom && typeof whom.ship === "string" ? whom.ship : ""; const normalized = normalizeShip(raw); // Keep DM routing strict: accept only patp-like values. return /^~?[a-z-]+$/i.test(normalized) ? normalized : ""; } const processMessage = async (params: { messageId: string; senderShip: string; messageText: string; messageContent?: unknown; // Raw Tlon content for media extraction isGroup: boolean; channelNest?: string; hostShip?: string; channelName?: string; timestamp: number; parentId?: string | null; isThreadReply?: boolean; }) => { const { messageId, senderShip, isGroup, channelNest, hostShip, channelName, timestamp, parentId, isThreadReply, messageContent, } = params; const groupChannel = channelNest; // For compatibility let messageText = params.messageText; // Download any images from the message content let attachments: Array<{ path: string; contentType: string }> = []; if (messageContent) { try { attachments = await downloadMessageImages(messageContent); if (attachments.length > 0) { runtime.log?.(`[tlon] Downloaded ${attachments.length} image(s) from message`); } } catch (error: any) { runtime.log?.(`[tlon] Failed to download images: ${error?.message ?? String(error)}`); } } // Fetch thread context when entering a thread for the first time if (isThreadReply && parentId && groupChannel) { try { const threadHistory = await fetchThreadHistory(api, groupChannel, parentId, 20, runtime); if (threadHistory.length > 0) { const threadContext = threadHistory .slice(-10) // Last 10 messages for context .map((msg) => `${msg.author}: ${msg.content}`) .join("\n"); // Prepend thread context to the message // Include note about ongoing conversation for agent judgment const contextNote = `[Thread conversation - ${threadHistory.length} previous replies. You are participating in this thread. Only respond if relevant or helpful - you don't need to reply to every message.]`; messageText = `${contextNote}\n\n[Previous messages]\n${threadContext}\n\n[Current message]\n${messageText}`; runtime?.log?.( `[tlon] Added thread context (${threadHistory.length} replies) to message`, ); } } catch (error: any) { runtime?.log?.(`[tlon] Could not fetch thread context: ${error?.message ?? String(error)}`); // Continue without thread context - not critical } } if (isGroup && groupChannel && isSummarizationRequest(messageText)) { try { const history = await getChannelHistory(api, groupChannel, 50, runtime); if (history.length === 0) { const noHistoryMsg = "I couldn't fetch any messages for this channel. It might be empty or there might be a permissions issue."; if (isGroup) { const parsed = parseChannelNest(groupChannel); if (parsed) { await sendGroupMessage({ api: api, fromShip: botShipName, hostShip: parsed.hostShip, channelName: parsed.channelName, text: noHistoryMsg, }); } } else { await sendDm({ api: api, fromShip: botShipName, toShip: senderShip, text: noHistoryMsg, }); } return; } const historyText = history .map( (msg) => `[${new Date(msg.timestamp).toLocaleString()}] ${msg.author}: ${msg.content}`, ) .join("\n"); messageText = `Please summarize this channel conversation (${history.length} recent messages):\n\n${historyText}\n\n` + "Provide a concise summary highlighting:\n" + "1. Main topics discussed\n" + "2. Key decisions or conclusions\n" + "3. Action items if any\n" + "4. Notable participants"; } catch (error: any) { const errorMsg = `Sorry, I encountered an error while fetching the channel history: ${error?.message ?? String(error)}`; if (isGroup && groupChannel) { const parsed = parseChannelNest(groupChannel); if (parsed) { await sendGroupMessage({ api: api, fromShip: botShipName, hostShip: parsed.hostShip, channelName: parsed.channelName, text: errorMsg, }); } } else { await sendDm({ api: api, fromShip: botShipName, toShip: senderShip, text: errorMsg }); } return; } } const route = core.channel.routing.resolveAgentRoute({ cfg, channel: "tlon", accountId: opts.accountId ?? undefined, peer: { kind: isGroup ? "group" : "direct", id: isGroup ? (groupChannel ?? senderShip) : senderShip, }, }); // Warn if multiple users share a DM session (insecure dmScope configuration) if (!isGroup) { const sessionKey = route.sessionKey; if (!dmSendersBySession.has(sessionKey)) { dmSendersBySession.set(sessionKey, new Set()); } const senders = dmSendersBySession.get(sessionKey)!; if (senders.size > 0 && !senders.has(senderShip)) { // Log warning runtime.log?.( `[tlon] ⚠️ SECURITY: Multiple users sharing DM session. ` + `Configure "session.dmScope: per-channel-peer" in HanzoBot config.`, ); // Notify owner via DM (once per monitor session) if (!sharedSessionWarningSent && effectiveOwnerShip) { sharedSessionWarningSent = true; const warningMsg = `⚠️ Security Warning: Multiple users are sharing a DM session with this bot. ` + `This can leak conversation context between users.\n\n` + `Fix: Add to your HanzoBot config:\n` + `session:\n dmScope: "per-channel-peer"\n\n` + `Docs: https://docs.bot.ai/concepts/session#secure-dm-mode`; // Send async, don't block message processing sendDm({ api, fromShip: botShipName, toShip: effectiveOwnerShip, text: warningMsg, }).catch((err) => runtime.error?.(`[tlon] Failed to send security warning to owner: ${err}`), ); } } senders.add(senderShip); } const senderRole = isOwner(senderShip) ? "owner" : "user"; const fromLabel = isGroup ? `${senderShip} [${senderRole}] in ${channelNest}` : `${senderShip} [${senderRole}]`; // Compute command authorization for slash commands (owner-only) const shouldComputeAuth = core.channel.commands.shouldComputeCommandAuthorized( messageText, cfg, ); let commandAuthorized = false; if (shouldComputeAuth) { const useAccessGroups = cfg.commands?.useAccessGroups !== false; const senderIsOwner = isOwner(senderShip); commandAuthorized = core.channel.commands.resolveCommandAuthorizedFromAuthorizers({ useAccessGroups, authorizers: [{ configured: Boolean(effectiveOwnerShip), allowed: senderIsOwner }], }); // Log when non-owner attempts a slash command (will be silently ignored by Gateway) if (!commandAuthorized) { console.log( `[tlon] Command attempt denied: ${senderShip} is not owner (owner=${effectiveOwnerShip ?? "not configured"})`, ); } } // Prepend attachment annotations to message body (similar to Signal format) let bodyWithAttachments = messageText; if (attachments.length > 0) { const mediaLines = attachments .map((a) => `[media attached: ${a.path} (${a.contentType}) | ${a.path}]`) .join("\n"); bodyWithAttachments = mediaLines + "\n" + messageText; } const body = core.channel.reply.formatAgentEnvelope({ channel: "Tlon", from: fromLabel, timestamp, body: bodyWithAttachments, }); // Strip bot ship mention for CommandBody so "/status" is recognized as command-only const commandBody = isGroup ? stripBotMention(messageText, botShipName) : messageText; const ctxPayload = core.channel.reply.finalizeInboundContext({ Body: body, RawBody: messageText, CommandBody: commandBody, From: isGroup ? `tlon:group:${groupChannel}` : `tlon:${senderShip}`, To: `tlon:${botShipName}`, SessionKey: route.sessionKey, AccountId: route.accountId, ChatType: isGroup ? "group" : "direct", ConversationLabel: fromLabel, SenderName: senderShip, SenderId: senderShip, SenderRole: senderRole, CommandAuthorized: commandAuthorized, CommandSource: "text" as const, Provider: "tlon", Surface: "tlon", MessageSid: messageId, // Include downloaded media attachments ...(attachments.length > 0 && { Attachments: attachments }), OriginatingChannel: "tlon", OriginatingTo: `tlon:${isGroup ? groupChannel : botShipName}`, // Include thread context for automatic reply routing ...(parentId && { ThreadId: String(parentId), ReplyToId: String(parentId) }), }); const dispatchStartTime = Date.now(); const responsePrefix = core.channel.reply.resolveEffectiveMessagesConfig( cfg, route.agentId, ).responsePrefix; const humanDelay = core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId); await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({ ctx: ctxPayload, cfg, dispatcherOptions: { responsePrefix, humanDelay, deliver: async (payload: ReplyPayload) => { let replyText = payload.text; if (!replyText) { return; } // Use settings store value if set, otherwise fall back to file config const showSignature = effectiveShowModelSig; if (showSignature) { const extPayload = payload as ReplyPayload & { metadata?: { model?: string }; model?: string; }; const extRoute = route as typeof route & { model?: string }; const defaultModel = cfg.agents?.defaults?.model; const modelInfo = extPayload.metadata?.model || extPayload.model || extRoute.model || (typeof defaultModel === "string" ? defaultModel : defaultModel?.primary); extPayload.metadata?.model || extPayload.model || extRoute.model || (typeof defaultModel === "string" ? defaultModel : defaultModel?.primary); replyText = `${replyText}\n\n_[Generated by ${formatModelName(modelInfo)}]_`; } if (isGroup && groupChannel) { const parsed = parseChannelNest(groupChannel); if (!parsed) { return; } await sendGroupMessage({ api: api, fromShip: botShipName, hostShip: parsed.hostShip, channelName: parsed.channelName, text: replyText, replyToId: parentId ?? undefined, }); // Track thread participation for future replies without mention if (parentId) { participatedThreads.add(String(parentId)); runtime.log?.(`[tlon] Now tracking thread for future replies: ${parentId}`); } } else { await sendDm({ api: api, fromShip: botShipName, toShip: senderShip, text: replyText }); } }, onError: (err, info) => { const dispatchDuration = Date.now() - dispatchStartTime; runtime.error?.( `[tlon] ${info.kind} reply failed after ${dispatchDuration}ms: ${String(err)}`, ); }, }, }); }; // Track which channels we're interested in for filtering firehose events const watchedChannels = new Set(groupChannels); const _watchedDMs = new Set(); // Firehose handler for all channel messages (/v2) const handleChannelsFirehose = async (event: any) => { try { const nest = event?.nest; if (!nest) { return; } // Only process channels we're watching if (!watchedChannels.has(nest)) { return; } const response = event?.response; if (!response) { return; } // Handle post responses (new posts and replies) const essay = response?.post?.["r-post"]?.set?.essay; const memo = response?.post?.["r-post"]?.reply?.["r-reply"]?.set?.memo; if (!essay && !memo) { return; } const content = memo || essay; const isThreadReply = Boolean(memo); const messageId = isThreadReply ? response?.post?.["r-post"]?.reply?.id : response?.post?.id; if (!processedTracker.mark(messageId)) { return; } const senderShip = normalizeShip(content.author ?? ""); if (!senderShip || senderShip === botShipName) { return; } // Resolve any cited/quoted messages first const citedContent = await resolveAllCites(content.content); const rawText = extractMessageText(content.content); const messageText = citedContent + rawText; if (!messageText.trim()) { return; } cacheMessage(nest, { author: senderShip, content: messageText, timestamp: content.sent || Date.now(), id: messageId, }); // Get thread info early for participation check const seal = isThreadReply ? response?.post?.["r-post"]?.reply?.["r-reply"]?.set?.seal : response?.post?.["r-post"]?.set?.seal; const parentId = seal?.["parent-id"] || seal?.parent || null; // Check if we should respond: // 1. Direct mention always triggers response // 2. Thread replies where we've participated - respond if relevant (let agent decide) const mentioned = isBotMentioned(messageText, botShipName, botNickname ?? undefined); const inParticipatedThread = isThreadReply && parentId && participatedThreads.has(String(parentId)); if (!mentioned && !inParticipatedThread) { return; } // Log why we're responding if (inParticipatedThread && !mentioned) { runtime.log?.(`[tlon] Responding to thread we participated in (no mention): ${parentId}`); } // Owner is always allowed if (isOwner(senderShip)) { runtime.log?.(`[tlon] Owner ${senderShip} is always allowed in channels`); } else { const { mode, allowedShips } = resolveChannelAuthorization(cfg, nest, currentSettings); if (mode === "restricted") { const normalizedAllowed = allowedShips.map(normalizeShip); if (!normalizedAllowed.includes(senderShip)) { // If owner is configured, queue approval request if (effectiveOwnerShip) { const approval = createPendingApproval({ type: "channel", requestingShip: senderShip, channelNest: nest, messagePreview: messageText.substring(0, 100), originalMessage: { messageId: messageId ?? "", messageText, messageContent: content.content, timestamp: content.sent || Date.now(), parentId: parentId ?? undefined, isThreadReply, }, }); await queueApprovalRequest(approval); } else { runtime.log?.( `[tlon] Access denied: ${senderShip} in ${nest} (allowed: ${allowedShips.join(", ")})`, ); } return; } } } const parsed = parseChannelNest(nest); await processMessage({ messageId: messageId ?? "", senderShip, messageText, messageContent: content.content, // Pass raw content for media extraction isGroup: true, channelNest: nest, hostShip: parsed?.hostShip, channelName: parsed?.channelName, timestamp: content.sent || Date.now(), parentId, isThreadReply, }); } catch (error: any) { runtime.error?.( `[tlon] Error handling channel firehose event: ${error?.message ?? String(error)}`, ); } }; // Firehose handler for all DM messages (/v3) // Track which DM invites we've already processed to avoid duplicate accepts const processedDmInvites = new Set(); const handleChatFirehose = async (event: any) => { try { // Handle DM invite lists (arrays) if (Array.isArray(event)) { for (const invite of event as DmInvite[]) { const ship = normalizeShip(invite.ship || ""); if (!ship || processedDmInvites.has(ship)) { continue; } // Owner is always allowed if (isOwner(ship)) { try { await api.poke({ app: "chat", mark: "chat-dm-rsvp", json: { ship, ok: true }, }); processedDmInvites.add(ship); runtime.log?.(`[tlon] Auto-accepted DM invite from owner ${ship}`); } catch (err) { runtime.error?.(`[tlon] Failed to auto-accept DM from owner: ${String(err)}`); } continue; } // Auto-accept if on allowlist and auto-accept is enabled if (effectiveAutoAcceptDmInvites && isDmAllowed(ship, effectiveDmAllowlist)) { try { await api.poke({ app: "chat", mark: "chat-dm-rsvp", json: { ship, ok: true }, }); processedDmInvites.add(ship); runtime.log?.(`[tlon] Auto-accepted DM invite from ${ship}`); } catch (err) { runtime.error?.(`[tlon] Failed to auto-accept DM from ${ship}: ${String(err)}`); } continue; } // If owner is configured and ship is not on allowlist, queue approval if (effectiveOwnerShip && !isDmAllowed(ship, effectiveDmAllowlist)) { const approval = createPendingApproval({ type: "dm", requestingShip: ship, messagePreview: "(DM invite - no message yet)", }); await queueApprovalRequest(approval); processedDmInvites.add(ship); // Mark as processed to avoid duplicate notifications } } return; } if (!("whom" in event) || !("response" in event)) { return; } const whom = event.whom; // DM partner ship or club ID const messageId = event.id; const response = event.response; // Handle add events (new messages) const essay = response?.add?.essay; if (!essay) { return; } if (!processedTracker.mark(messageId)) { return; } const authorShip = normalizeShip(essay.author ?? ""); const partnerShip = extractDmPartnerShip(whom); const senderShip = partnerShip || authorShip; // Ignore the bot's own outbound DM events. if (authorShip === botShipName) { return; } if (!senderShip || senderShip === botShipName) { return; } // Log mismatch between author and partner for debugging if (authorShip && partnerShip && authorShip !== partnerShip) { runtime.log?.( `[tlon] DM ship mismatch (author=${authorShip}, partner=${partnerShip}) - routing to partner`, ); } // Resolve any cited/quoted messages first const citedContent = await resolveAllCites(essay.content); const rawText = extractMessageText(essay.content); const messageText = citedContent + rawText; if (!messageText.trim()) { return; } // Check if this is the owner sending an approval response if (isOwner(senderShip) && isApprovalResponse(messageText)) { const handled = await handleApprovalResponse(messageText); if (handled) { runtime.log?.(`[tlon] Processed approval response from owner: ${messageText}`); return; } } // Check if this is the owner sending an admin command if (isOwner(senderShip) && isAdminCommand(messageText)) { const handled = await handleAdminCommand(messageText); if (handled) { runtime.log?.(`[tlon] Processed admin command from owner: ${messageText}`); return; } } // Owner is always allowed to DM (bypass allowlist) if (isOwner(senderShip)) { runtime.log?.(`[tlon] Processing DM from owner ${senderShip}`); await processMessage({ messageId: messageId ?? "", senderShip, messageText, messageContent: essay.content, isGroup: false, timestamp: essay.sent || Date.now(), }); return; } // For DMs from others, check allowlist if (!isDmAllowed(senderShip, effectiveDmAllowlist)) { // If owner is configured, queue approval request if (effectiveOwnerShip) { const approval = createPendingApproval({ type: "dm", requestingShip: senderShip, messagePreview: messageText.substring(0, 100), originalMessage: { messageId: messageId ?? "", messageText, messageContent: essay.content, timestamp: essay.sent || Date.now(), }, }); await queueApprovalRequest(approval); } else { runtime.log?.(`[tlon] Blocked DM from ${senderShip}: not in allowlist`); } return; } await processMessage({ messageId: messageId ?? "", senderShip, messageText, messageContent: essay.content, // Pass raw content for media extraction isGroup: false, timestamp: essay.sent || Date.now(), }); } catch (error: any) { runtime.error?.( `[tlon] Error handling chat firehose event: ${error?.message ?? String(error)}`, ); } }; try { runtime.log?.("[tlon] Subscribing to firehose updates..."); // Subscribe to channels firehose (/v2) await api.subscribe({ app: "channels", path: "/v2", event: handleChannelsFirehose, err: (error) => { runtime.error?.(`[tlon] Channels firehose error: ${String(error)}`); }, quit: () => { runtime.log?.("[tlon] Channels firehose subscription ended"); }, }); runtime.log?.("[tlon] Subscribed to channels firehose (/v2)"); // Subscribe to chat/DM firehose (/v3) await api.subscribe({ app: "chat", path: "/v3", event: handleChatFirehose, err: (error) => { runtime.error?.(`[tlon] Chat firehose error: ${String(error)}`); }, quit: () => { runtime.log?.("[tlon] Chat firehose subscription ended"); }, }); runtime.log?.("[tlon] Subscribed to chat firehose (/v3)"); // Subscribe to contacts updates to track nickname changes await api.subscribe({ app: "contacts", path: "/v1/news", event: (event: any) => { try { // Look for self profile updates if (event?.self) { const selfUpdate = event.self; if (selfUpdate?.contact?.nickname?.value !== undefined) { const newNickname = selfUpdate.contact.nickname.value || null; if (newNickname !== botNickname) { botNickname = newNickname; runtime.log?.(`[tlon] Nickname updated: ${botNickname}`); } } } } catch (error: any) { runtime.error?.( `[tlon] Error handling contacts event: ${error?.message ?? String(error)}`, ); } }, err: (error) => { runtime.error?.(`[tlon] Contacts subscription error: ${String(error)}`); }, quit: () => { runtime.log?.("[tlon] Contacts subscription ended"); }, }); runtime.log?.("[tlon] Subscribed to contacts updates (/v1/news)"); // Subscribe to settings store for hot-reloading config settingsManager.onChange((newSettings) => { currentSettings = newSettings; // Update watched channels if settings changed if (newSettings.groupChannels?.length) { const newChannels = newSettings.groupChannels; for (const ch of newChannels) { if (!watchedChannels.has(ch)) { watchedChannels.add(ch); runtime.log?.(`[tlon] Settings: now watching channel ${ch}`); } } // Note: we don't remove channels from watchedChannels to avoid missing messages // during transitions. The authorization check handles access control. } // Update DM allowlist if (newSettings.dmAllowlist !== undefined) { effectiveDmAllowlist = newSettings.dmAllowlist.length > 0 ? newSettings.dmAllowlist : account.dmAllowlist; runtime.log?.(`[tlon] Settings: dmAllowlist updated to ${effectiveDmAllowlist.join(", ")}`); } // Update model signature setting if (newSettings.showModelSig !== undefined) { effectiveShowModelSig = newSettings.showModelSig; runtime.log?.(`[tlon] Settings: showModelSig = ${effectiveShowModelSig}`); } // Update auto-accept DM invites setting if (newSettings.autoAcceptDmInvites !== undefined) { effectiveAutoAcceptDmInvites = newSettings.autoAcceptDmInvites; runtime.log?.(`[tlon] Settings: autoAcceptDmInvites = ${effectiveAutoAcceptDmInvites}`); } // Update auto-accept group invites setting if (newSettings.autoAcceptGroupInvites !== undefined) { effectiveAutoAcceptGroupInvites = newSettings.autoAcceptGroupInvites; runtime.log?.( `[tlon] Settings: autoAcceptGroupInvites = ${effectiveAutoAcceptGroupInvites}`, ); } // Update group invite allowlist if (newSettings.groupInviteAllowlist !== undefined) { effectiveGroupInviteAllowlist = newSettings.groupInviteAllowlist.length > 0 ? newSettings.groupInviteAllowlist : account.groupInviteAllowlist; runtime.log?.( `[tlon] Settings: groupInviteAllowlist updated to ${effectiveGroupInviteAllowlist.join(", ")}`, ); } if (newSettings.defaultAuthorizedShips !== undefined) { runtime.log?.( `[tlon] Settings: defaultAuthorizedShips updated to ${(newSettings.defaultAuthorizedShips || []).join(", ")}`, ); } // Update auto-discover channels if (newSettings.autoDiscoverChannels !== undefined) { effectiveAutoDiscoverChannels = newSettings.autoDiscoverChannels; runtime.log?.(`[tlon] Settings: autoDiscoverChannels = ${effectiveAutoDiscoverChannels}`); } // Update owner ship if (newSettings.ownerShip !== undefined) { effectiveOwnerShip = newSettings.ownerShip ? normalizeShip(newSettings.ownerShip) : account.ownerShip ? normalizeShip(account.ownerShip) : null; runtime.log?.(`[tlon] Settings: ownerShip = ${effectiveOwnerShip}`); } // Update pending approvals if (newSettings.pendingApprovals !== undefined) { pendingApprovals = newSettings.pendingApprovals; runtime.log?.( `[tlon] Settings: pendingApprovals updated (${pendingApprovals.length} items)`, ); } }); try { await settingsManager.startSubscription(); } catch (err) { // Settings subscription is optional - don't fail if it doesn't work runtime.log?.(`[tlon] Settings subscription not available: ${String(err)}`); } // Subscribe to groups-ui for real-time channel additions (when invites are accepted) try { await api.subscribe({ app: "groups", path: "/groups/ui", event: async (event: any) => { try { // Handle group/channel join events // Event structure: { group: { flag: "~host/group-name", ... }, channels: { ... } } if (event && typeof event === "object") { // Check for new channels being added to groups if (event.channels && typeof event.channels === "object") { const channels = event.channels as Record; for (const [channelNest, _channelData] of Object.entries(channels)) { // Only monitor chat channels if (!channelNest.startsWith("chat/")) { continue; } // If this is a new channel we're not watching yet, add it if (!watchedChannels.has(channelNest)) { watchedChannels.add(channelNest); runtime.log?.( `[tlon] Auto-detected new channel (invite accepted): ${channelNest}`, ); // Persist to settings store so it survives restarts if (effectiveAutoAcceptGroupInvites) { try { const currentChannels = currentSettings.groupChannels || []; if (!currentChannels.includes(channelNest)) { const updatedChannels = [...currentChannels, channelNest]; // Poke settings store to persist await api.poke({ app: "settings", mark: "settings-event", json: { "put-entry": { "bucket-key": "tlon", "entry-key": "groupChannels", value: updatedChannels, desk: "moltbot", }, }, }); runtime.log?.(`[tlon] Persisted ${channelNest} to settings store`); } } catch (err) { runtime.error?.( `[tlon] Failed to persist channel to settings: ${String(err)}`, ); } } } } } // Also check for the "join" event structure if (event.join && typeof event.join === "object") { const join = event.join as { group?: string; channels?: string[] }; if (join.channels) { for (const channelNest of join.channels) { if (!channelNest.startsWith("chat/")) { continue; } if (!watchedChannels.has(channelNest)) { watchedChannels.add(channelNest); runtime.log?.(`[tlon] Auto-detected joined channel: ${channelNest}`); // Persist to settings store if (effectiveAutoAcceptGroupInvites) { try { const currentChannels = currentSettings.groupChannels || []; if (!currentChannels.includes(channelNest)) { const updatedChannels = [...currentChannels, channelNest]; await api.poke({ app: "settings", mark: "settings-event", json: { "put-entry": { "bucket-key": "tlon", "entry-key": "groupChannels", value: updatedChannels, desk: "moltbot", }, }, }); runtime.log?.(`[tlon] Persisted ${channelNest} to settings store`); } } catch (err) { runtime.error?.( `[tlon] Failed to persist channel to settings: ${String(err)}`, ); } } } } } } } } catch (error: any) { runtime.error?.( `[tlon] Error handling groups-ui event: ${error?.message ?? String(error)}`, ); } }, err: (error) => { runtime.error?.(`[tlon] Groups-ui subscription error: ${String(error)}`); }, quit: () => { runtime.log?.("[tlon] Groups-ui subscription ended"); }, }); runtime.log?.("[tlon] Subscribed to groups-ui for real-time channel detection"); } catch (err) { // Groups-ui subscription is optional - channel discovery will still work via polling runtime.log?.(`[tlon] Groups-ui subscription failed (will rely on polling): ${String(err)}`); } // Subscribe to foreigns for auto-accepting group invites // Always subscribe so we can hot-reload the setting via settings store { const processedGroupInvites = new Set(); // Helper to process pending invites const processPendingInvites = async (foreigns: Foreigns) => { if (!foreigns || typeof foreigns !== "object") { return; } for (const [groupFlag, foreign] of Object.entries(foreigns)) { if (processedGroupInvites.has(groupFlag)) { continue; } if (!foreign.invites || foreign.invites.length === 0) { continue; } const validInvite = foreign.invites.find((inv) => inv.valid); if (!validInvite) { continue; } const inviterShip = validInvite.from; const normalizedInviter = normalizeShip(inviterShip); // Owner invites are always accepted if (isOwner(inviterShip)) { try { await api.poke({ app: "groups", mark: "group-join", json: { flag: groupFlag, "join-all": true, }, }); processedGroupInvites.add(groupFlag); runtime.log?.(`[tlon] Auto-accepted group invite from owner: ${groupFlag}`); } catch (err) { runtime.error?.(`[tlon] Failed to accept group invite from owner: ${String(err)}`); } continue; } // Skip if auto-accept is disabled if (!effectiveAutoAcceptGroupInvites) { // If owner is configured, queue approval if (effectiveOwnerShip) { const approval = createPendingApproval({ type: "group", requestingShip: inviterShip, groupFlag, }); await queueApprovalRequest(approval); processedGroupInvites.add(groupFlag); } continue; } // Check if inviter is on allowlist const isAllowed = effectiveGroupInviteAllowlist.length > 0 ? effectiveGroupInviteAllowlist .map((s) => normalizeShip(s)) .some((s) => s === normalizedInviter) : false; // Fail-safe: empty allowlist means deny if (!isAllowed) { // If owner is configured, queue approval if (effectiveOwnerShip) { const approval = createPendingApproval({ type: "group", requestingShip: inviterShip, groupFlag, }); await queueApprovalRequest(approval); processedGroupInvites.add(groupFlag); } else { runtime.log?.( `[tlon] Rejected group invite from ${inviterShip} (not in groupInviteAllowlist): ${groupFlag}`, ); processedGroupInvites.add(groupFlag); } continue; } // Inviter is on allowlist - accept the invite try { await api.poke({ app: "groups", mark: "group-join", json: { flag: groupFlag, "join-all": true, }, }); processedGroupInvites.add(groupFlag); runtime.log?.( `[tlon] Auto-accepted group invite: ${groupFlag} (from ${validInvite.from})`, ); } catch (err) { runtime.error?.(`[tlon] Failed to auto-accept group ${groupFlag}: ${String(err)}`); } } }; // Process existing pending invites from init data if (initForeigns) { await processPendingInvites(initForeigns); } try { await api.subscribe({ app: "groups", path: "/v1/foreigns", event: (data: unknown) => { void (async () => { try { await processPendingInvites(data as Foreigns); } catch (error: any) { runtime.error?.( `[tlon] Error handling foreigns event: ${error?.message ?? String(error)}`, ); } })(); }, err: (error) => { runtime.error?.(`[tlon] Foreigns subscription error: ${String(error)}`); }, quit: () => { runtime.log?.("[tlon] Foreigns subscription ended"); }, }); runtime.log?.( "[tlon] Subscribed to foreigns (/v1/foreigns) for auto-accepting group invites", ); } catch (err) { runtime.log?.(`[tlon] Foreigns subscription failed: ${String(err)}`); } } // Discover channels to watch if (effectiveAutoDiscoverChannels) { const discoveredChannels = await fetchAllChannels(api, runtime); for (const channelNest of discoveredChannels) { watchedChannels.add(channelNest); } runtime.log?.(`[tlon] Watching ${watchedChannels.size} channel(s)`); } // Log watched channels for (const channelNest of watchedChannels) { runtime.log?.(`[tlon] Watching channel: ${channelNest}`); } runtime.log?.("[tlon] All subscriptions registered, connecting to SSE stream..."); await api.connect(); runtime.log?.("[tlon] Connected! Firehose subscriptions active"); // Periodically refresh channel discovery const pollInterval = setInterval( async () => { if (!opts.abortSignal?.aborted) { try { if (effectiveAutoDiscoverChannels) { const discoveredChannels = await fetchAllChannels(api, runtime); for (const channelNest of discoveredChannels) { if (!watchedChannels.has(channelNest)) { watchedChannels.add(channelNest); runtime.log?.(`[tlon] Now watching new channel: ${channelNest}`); } } } } catch (error: any) { runtime.error?.(`[tlon] Channel refresh error: ${error?.message ?? String(error)}`); } } }, 2 * 60 * 1000, ); if (opts.abortSignal) { const signal = opts.abortSignal; await new Promise((resolve) => { signal.addEventListener( "abort", () => { clearInterval(pollInterval); resolve(null); }, { once: true }, ); }); } else { await new Promise(() => {}); } } finally { try { await api?.close(); } catch (error: any) { runtime.error?.(`[tlon] Cleanup error: ${error?.message ?? String(error)}`); } } }