import { appendFileSync, existsSync, mkdirSync, readFileSync, writeFileSync, statSync, openSync, readSync, closeSync } from "node:fs"; import { join } from "node:path"; import type { InboxMessage } from "./types.ts"; const MAX_FILE_BYTES = 10 * 1024 * 1024; const MAX_MESSAGE_BYTES = 4096; const CHANNEL_NAME_RE = /^[a-z._-]+$/; export function validateChannelName(channel: string): string | null { if (!channel) { return "Channel name cannot be empty."; } if (!CHANNEL_NAME_RE.test(channel)) { return 'Invalid channel name. Use only lowercase letters, ".", "-", and "_". ":" is reserved for future namespace use.'; } return null; } function readFileTail(path: string, maxBytes: number): string { const size = statSync(path).size; if (size <= maxBytes) { return readFileSync(path, "utf-8"); } const buf = Buffer.alloc(maxBytes); const fd = openSync(path, "r"); try { readSync(fd, buf, 0, maxBytes, size - maxBytes); } finally { closeSync(fd); } const text = buf.toString("utf-8"); const firstNewline = text.indexOf("\n"); if (firstNewline === -1) return text; // If the read started at a message boundary, the first line is complete. // Check the byte just before the tail window to detect this. const prevByte = Buffer.alloc(1); const fd2 = openSync(path, "r"); try { readSync(fd2, prevByte, 0, 1, size - maxBytes - 1); } finally { closeSync(fd2); } if (prevByte[0] === 0x0a) { // Previous byte is newline — tail starts at message boundary, first line is complete return text; } return text.slice(firstNewline + 1); } export function getChannelDir(dataDir: string, inboxName: string, channel: string): string { const sanitized = channel .toLowerCase() .replace(/_/g, "__") .replace(/[^a-z0-9._-]/g, (c) => "_" + c.charCodeAt(0).toString(16).padStart(2, "0")); const safe = sanitized === "." || sanitized === ".." ? "_" + sanitized : sanitized; return join(dataDir, inboxName, "channels", safe); } export function getChannelPath(dataDir: string, inboxName: string, channel: string): string { return join(getChannelDir(dataDir, inboxName, channel), "messages.jsonl"); } export function ensureChannel(dataDir: string, inboxName: string, channel: string): void { const dir = getChannelDir(dataDir, inboxName, channel); mkdirSync(dir, { recursive: true }); } export function appendMessage(dataDir: string, inboxName: string, channel: string, message: InboxMessage): void { ensureChannel(dataDir, inboxName, channel); const path = getChannelPath(dataDir, inboxName, channel); const line = JSON.stringify(message); if (line.length > MAX_MESSAGE_BYTES) { console.warn(`[inbox] Message ${message.id} exceeds ${MAX_MESSAGE_BYTES} bytes (${line.length}) — may not be atomically written`); } appendFileSync(path, line + "\n", "utf-8"); } export function readMessages(dataDir: string, inboxName: string, channel: string, sinceTs?: number): InboxMessage[] { const path = getChannelPath(dataDir, inboxName, channel); if (!existsSync(path)) { return []; } let raw: string; try { raw = readFileTail(path, MAX_FILE_BYTES); } catch { return []; } const lines = raw.trim().split("\n").filter((l) => l.trim()); const messages: InboxMessage[] = []; for (const line of lines) { try { const msg = JSON.parse(line) as InboxMessage; if (sinceTs === undefined || msg.ts > sinceTs) { messages.push(msg); } } catch { console.warn(`[inbox] Skipping malformed line in ${path}: ${line.slice(0, 80)}`); } } return messages; } export function clearChannel(dataDir: string, inboxName: string, channel: string): void { const path = getChannelPath(dataDir, inboxName, channel); if (existsSync(path)) { writeFileSync(path, "", "utf-8"); } }