import { type ExtensionAPI, type ExtensionContext } from "@earendil-works/pi-coding-agent"; import { readFileSync, rmSync } from "node:fs"; import { fileURLToPath } from "node:url"; import { join } from "node:path"; import { tmpdir } from "node:os"; import { randomUUID } from "node:crypto"; import { loadConfig, getDataDir } from "./config.ts"; import { getNewMessagesForSubscriptions, formatInboxNotification } from "./channels.ts"; import { registerInboxTools, createIdentity } from "./tools.ts"; import { clearChannel, appendMessage, validateChannelName } from "./transport.ts"; import type { InboxConfig, InboxCursor, InboxMessage, SubscriptionMode } from "./types.ts"; let _version = "unknown"; try { const pkg = JSON.parse(readFileSync(fileURLToPath(new URL("./package.json", import.meta.url)), "utf-8")); _version = pkg.version ?? "unknown"; } catch { // package.json not accessible } export function parseChannelWithMode(raw: string): { channel: string; mode: SubscriptionMode } | null { const trimmed = raw.trim(); if (!trimmed) return null; if (trimmed.endsWith(":ro")) { return { channel: trimmed.slice(0, -3), mode: "ro" }; } if (trimmed.endsWith(":rw")) { return { channel: trimmed.slice(0, -3), mode: "rw" }; } return { channel: trimmed, mode: "rw" }; } export default function (pi: ExtensionAPI) { // ── CLI flags ─────────────────────────────────────────────────────────── pi.registerFlag("inbox", { description: "Comma-separated channel names to subscribe to on startup", type: "string", }); pi.registerFlag("inbox-namespace", { description: "Named inbox within the data directory (default: default)", type: "string", default: "default", }); pi.registerFlag("inbox-dir", { description: "Override data directory (default: ~/.pi/inbox)", type: "string", }); pi.registerFlag("inbox-poll-ms", { description: "Override poll interval in milliseconds", type: "string", }); pi.registerFlag("inbox-identity", { description: "Stable identity for this inbox (default: auto-generated and persisted)", type: "string", }); // ── Runtime state ─────────────────────────────────────────────────────── let config: InboxConfig = loadConfig(process.cwd()); const subscriptions = new Map(); let identity = ""; let cursor: InboxCursor = { channels: {} }; let pollTimer: ReturnType | null = null; let agentEndTimer: ReturnType | null = null; let agentBusy = false; let sessionTmpDir = ""; function syncConfig(): InboxConfig { const cliOverrides: Partial = {}; const inboxNamespaceFlag = pi.getFlag("inbox-namespace"); if (typeof inboxNamespaceFlag === "string" && inboxNamespaceFlag) { cliOverrides.inbox = inboxNamespaceFlag; } const dirFlag = pi.getFlag("inbox-dir"); if (typeof dirFlag === "string" && dirFlag) { cliOverrides.dataDir = dirFlag; } const pollFlag = pi.getFlag("inbox-poll-ms"); if (typeof pollFlag === "string" && pollFlag) { const ms = parseInt(pollFlag, 10); if (!isNaN(ms) && ms > 0) { cliOverrides.pollIntervalMs = ms; } } config = loadConfig(process.cwd(), cliOverrides); return config; } function currentDataDir(): string { return getDataDir(config); } function getSubscriptions(): string[] { return Array.from(subscriptions.keys()); } function getSubscriptionMode(channel: string): SubscriptionMode | undefined { return subscriptions.get(channel); } function restoreCursor(ctx: ExtensionContext): InboxCursor { try { const entries = ctx.sessionManager.getEntries(); for (let i = entries.length - 1; i >= 0; i--) { const entry = entries[i] as { type: string; content?: InboxCursor }; if (entry.type === "inbox-cursor" && entry.content) { return entry.content; } } } catch { // session manager not available } return { channels: {} }; } function persistCursor(): void { pi.appendEntry("inbox-cursor", cursor); } function restoreSubscriptions(ctx: ExtensionContext): void { try { const entries = ctx.sessionManager.getEntries(); for (let i = entries.length - 1; i >= 0; i--) { const entry = entries[i] as { type: string; content?: Record }; if (entry.type === "inbox-subscriptions" && entry.content) { subscriptions.clear(); for (const [ch, mode] of Object.entries(entry.content)) { if (mode === "ro" || mode === "rw") { subscriptions.set(ch, mode); } } // Seed cursor for any restored channel that has no cursor entry, // so the first poll doesn't replay all messages from epoch 0. for (const ch of subscriptions.keys()) { if (!cursor.channels[ch]) { cursor.channels[ch] = { lastTs: Date.now() }; } } break; } } } catch { // session manager not available } } function persistSubscriptions(): void { pi.appendEntry("inbox-subscriptions", Object.fromEntries(subscriptions)); } function updateCursor(snapshots: { channel: string; messages: { ts: number }[] }[]): boolean { let changed = false; for (const snapshot of snapshots) { if (snapshot.messages.length === 0) continue; const maxTs = Math.max(...snapshot.messages.map((m) => m.ts)); const current = cursor.channels[snapshot.channel]?.lastTs ?? 0; if (maxTs > current) { cursor.channels[snapshot.channel] = { lastTs: maxTs }; changed = true; } } if (changed) { persistCursor(); } return changed; } function poll(triggerTurn: boolean): void { if (agentBusy) return; const snapshots = getNewMessagesForSubscriptions( currentDataDir(), config.inbox, getSubscriptions(), cursor.channels, config.maxReplayMessages, ); if (snapshots.length === 0) return; const notification = formatInboxNotification(snapshots, sessionTmpDir); if (!notification) return; if (triggerTurn) { pi.sendUserMessage(notification); } updateCursor(snapshots); } function startPolling(): void { stopPolling(); pollTimer = setInterval(() => { if (!agentBusy) { poll(true); } }, config.pollIntervalMs); } function stopPolling(): void { if (pollTimer !== null) { clearInterval(pollTimer); pollTimer = null; } } // ── Lifecycle hooks ───────────────────────────────────────────────────── pi.on("session_start", (_event, ctx) => { syncConfig(); sessionTmpDir = join(tmpdir(), "pi-inbox", randomUUID().slice(0, 8)); // Identity: explicit --inbox-identity flag > session resume > generate new const identityFlag = pi.getFlag("inbox-identity"); if (typeof identityFlag === "string" && identityFlag) { identity = identityFlag; } else { try { const entries = ctx.sessionManager.getEntries(); for (let i = entries.length - 1; i >= 0; i--) { const entry = entries[i] as { type: string; content?: string }; if (entry.type === "inbox-identity" && entry.content) { identity = entry.content; break; } } } catch { // session manager not available } if (!identity) { identity = createIdentity(config.inbox); pi.appendEntry("inbox-identity", identity); } } cursor = restoreCursor(ctx); restoreSubscriptions(ctx); const subscribeFlag = pi.getFlag("inbox"); if (typeof subscribeFlag === "string" && subscribeFlag) { for (const ch of subscribeFlag.split(",")) { const parsed = parseChannelWithMode(ch); if (!parsed) continue; const { channel, mode } = parsed; const channelError = validateChannelName(channel); if (channelError) { throw new Error(`Invalid --inbox channel "${channel}": ${channelError}`); } subscriptions.set(channel, mode); if (!cursor.channels[channel]) { cursor.channels[channel] = { lastTs: Date.now() }; } } } const subList = getSubscriptions(); if (subList.length > 0) { console.log(`[inbox] Subscribed to: ${subList.join(", ")}`); persistSubscriptions(); } startPolling(); }); pi.on("agent_start", () => { agentBusy = true; stopPolling(); }); pi.on("agent_end", () => { agentBusy = false; if (agentEndTimer !== null) { clearTimeout(agentEndTimer); } agentEndTimer = setTimeout(() => { agentEndTimer = null; startPolling(); poll(true); }, 500); }); pi.on("session_shutdown", () => { if (agentEndTimer !== null) { clearTimeout(agentEndTimer); agentEndTimer = null; } stopPolling(); persistCursor(); if (sessionTmpDir) { try { rmSync(sessionTmpDir, { recursive: true, force: true }); } catch { /* ignore */ } } }); // ── Tools ─────────────────────────────────────────────────────────────── registerInboxTools( pi, () => config.inbox, () => currentDataDir(), () => getSubscriptions(), () => identity, (snapshots) => updateCursor(snapshots), (channel) => getSubscriptionMode(channel), ); // ── Commands ──────────────────────────────────────────────────────────── pi.registerCommand("inbox-subscribe", { description: "Subscribe current session to an inbox channel (channel:ro for read-only, channel:rw for read-write). Note: channel names 'ro' and 'rw' are valid but may be confused with mode syntax — prefer descriptive names.", handler: async (args, ctx) => { const parsed = parseChannelWithMode(args); if (!parsed) { ctx.ui.notify("Usage: /inbox-subscribe [:ro|:rw]", "error"); return; } const { channel, mode } = parsed; const channelError = validateChannelName(channel); if (channelError) { ctx.ui.notify(channelError, "error"); return; } const existingMode = subscriptions.get(channel); subscriptions.set(channel, mode); if (!cursor.channels[channel]) { cursor.channels[channel] = { lastTs: Date.now() }; persistCursor(); } if (existingMode !== undefined && existingMode !== mode) { persistSubscriptions(); ctx.ui.notify(`Subscription mode changed for "${channel}": ${existingMode} → ${mode}`, "info"); } else if (existingMode !== undefined) { ctx.ui.notify(`Already subscribed to "${channel}" (${mode})`, "info"); } else { persistSubscriptions(); ctx.ui.notify(`Subscribed to inbox channel: ${channel} (${mode})`, "info"); } }, }); pi.registerCommand("inbox-unsubscribe", { description: "Unsubscribe from an inbox channel", handler: async (args, ctx) => { const channel = args.trim(); if (!channel) { ctx.ui.notify("Usage: /inbox-unsubscribe ", "error"); return; } if (subscriptions.has(channel)) { subscriptions.delete(channel); persistSubscriptions(); ctx.ui.notify(`Unsubscribed from inbox channel: ${channel}`, "info"); } else { ctx.ui.notify(`Not subscribed to channel: ${channel}`, "warning"); } }, }); pi.registerCommand("inbox-status", { description: "Show inbox config, subscriptions, and activity", handler: async (_args, ctx) => { const lines = [ `pi-inbox v${_version}`, `Inbox: ${config.inbox}`, `Data dir: ${currentDataDir()}`, `Poll: ${config.pollIntervalMs}ms`, `Identity: ${identity}`, `Agent busy: ${agentBusy ? "yes" : "no"}`, `Polling: ${pollTimer !== null ? "active" : "paused"}`, `Subscriptions (${subscriptions.size}):`, ...(subscriptions.size > 0 ? Array.from(subscriptions.entries()).sort(([a], [b]) => a.localeCompare(b)).map(([ch, mode]) => ` - ${ch} (${mode})`) : [" - none"]), ]; if (Object.keys(cursor.channels).length > 0) { lines.push("Cursors:"); for (const [ch, c] of Object.entries(cursor.channels)) { lines.push(` - ${ch}: ${new Date(c.lastTs).toISOString()}`); } } ctx.ui.notify(lines.join("\n"), "info"); }, }); pi.registerCommand("inbox-clear", { description: "Clear all messages in an inbox channel. Requires a read-write subscription to the channel.", handler: async (args, ctx) => { const channel = args.trim(); if (!channel) { ctx.ui.notify("Usage: /inbox-clear ", "error"); return; } const channelError = validateChannelName(channel); if (channelError) { ctx.ui.notify(channelError, "error"); return; } const mode = subscriptions.get(channel); if (!mode) { const list = subscriptions.size > 0 ? Array.from(subscriptions.keys()).join(", ") : "none"; ctx.ui.notify(`Cannot clear "${channel}" — you are not subscribed. Subscribed channels: ${list}`, "error"); return; } if (mode === "ro") { ctx.ui.notify(`Cannot clear "${channel}" — subscription is read-only (use :rw to re-subscribe)`, "error"); return; } const dataDir = currentDataDir(); clearChannel(dataDir, config.inbox, channel); ctx.ui.notify(`Cleared all messages in channel "${channel}".`, "info"); }, }); pi.registerCommand("inbox-post", { description: "Post a message to an inbox channel", handler: async (args, ctx) => { const trimmed = args.trim(); const spaceIdx = trimmed.indexOf(" "); const channel = spaceIdx === -1 ? trimmed : trimmed.slice(0, spaceIdx); const message = spaceIdx === -1 ? "" : trimmed.slice(spaceIdx + 1); if (!identity) { ctx.ui.notify("Inbox identity not initialized yet. Try again in a moment.", "error"); return; } if (!channel || !message) { ctx.ui.notify("Usage: /inbox-post ", "error"); return; } const channelError = validateChannelName(channel); if (channelError) { ctx.ui.notify(channelError, "error"); return; } const mode = subscriptions.get(channel); if (!mode) { const list = subscriptions.size > 0 ? Array.from(subscriptions.keys()).join(", ") : "none"; ctx.ui.notify(`Cannot post to "${channel}" — you are only subscribed to: ${list}`, "error"); return; } if (mode === "ro") { ctx.ui.notify(`Cannot post to "${channel}" — subscription is read-only (use :rw to re-subscribe)`, "error"); return; } const msg: InboxMessage = { id: randomUUID(), from: identity, content: message, ts: Date.now(), }; const dataDir = currentDataDir(); appendMessage(dataDir, config.inbox, channel, msg); updateCursor([{ channel, messages: [{ ts: msg.ts }] }]); ctx.ui.notify(`Posted to "${channel}": ${message.slice(0, 100)}`, "info"); }, }); }