/** * EmDash plugin registry aggregator: Worker entrypoint. * * Reading order for someone learning this code: * 1. `constants.ts` — protocol-level constants (NSIDs, etc.). * 2. `env.ts` — project-specific message types passed across binding * boundaries. The Worker `Env` is generated by `wrangler types` into * `worker-configuration.d.ts`. * 3. `records-do.ts` — Jetstream connection. * 4. `records-consumer.ts` — PDS-verified ingest. * 5. `routes/*.ts` — XRPC read endpoints. * * The fetch/queue/scheduled handlers below are wired but no-op for now; * the smoke test in `test/smoke.test.ts` proves migrations apply and the * Worker boots. */ import { isDid } from "@atcute/lexicons/syntax"; import { drainBackfillDeadLetterBatch, processBackfillBatch } from "./backfill-consumer.js"; import { discoverDids, enqueueBackfillJobs } from "./backfill.js"; import type { BackfillJob, RecordsJob } from "./env.js"; import { drainDeadLetterBatch, processBatch } from "./records-consumer.js"; import { RECORDS_DO_NAME } from "./records-do.js"; import { handleXrpc } from "./routes/xrpc/router.js"; import { isPlainObject } from "./utils.js"; const RECORDS_QUEUE_NAME = "emdash-aggregator-records"; const RECORDS_DLQ_NAME = "emdash-aggregator-records-dlq"; const BACKFILL_QUEUE_NAME = "emdash-aggregator-backfill"; const BACKFILL_DLQ_NAME = "emdash-aggregator-backfill-dlq"; export { RecordsJetstreamDO } from "./records-do.js"; /** * Operational admin routes. Both gated by the `ADMIN_TOKEN` secret declared * in `wrangler.jsonc`'s `secrets.required` and validated via constant-time * compare against the `Authorization: Bearer ` header. Recommended * deploy hook: * * wrangler deploy && curl -X POST -H "Authorization: Bearer $ADMIN_TOKEN" \ * https://api.emdashcms.com/_admin/start * * `/_admin/start` spins up the Records DO (idempotent on an already-running DO). * `/_admin/backfill` discovers publishers (or accepts an explicit DID list) * and fans (DID, collection) jobs onto `BACKFILL_QUEUE`; the consumer in * `backfill-consumer.ts` does the per-pair listRecords + records-queue * fan-out work asynchronously, getting queue retry + concurrency for free * and sidestepping the 30s `waitUntil` cap on the POST handler. * * Auth-gating both routes: backfill specifically introduces caller-chosen * URLs into the worker's outbound fetches (via `did:web` resolution + the * publisher's PDS endpoint), so the unauth posture would expose an SSRF-shaped * surface. Same gate on `/_admin/start` for symmetry — anyone with the token * can do operational things. */ const BOOTSTRAP_PATH = "/_admin/start"; const BACKFILL_PATH = "/_admin/backfill"; const STATUS_PATH = "/_admin/status"; /** * Cap on the explicit DID list a single POST may submit. Lower than the * old serial-loop cap (was 1000) because the queue-fan-out path amplifies * a leaked-token attack: each submitted DID becomes * `WANTED_COLLECTIONS.length` queue messages, each consuming a consumer * invocation with its own outbound PDS fetches. 100 × 4 = 400 jobs from * the explicit path is a meaningful operator-recovery batch but a * tractable blast radius for the explicit path. * * The discovery path (empty body) has a separate ceiling at * `MAX_DISCOVERED_DIDS = 1000` and is therefore actually the larger * worst-case fan-out source — by design, since legitimate-publisher * enumeration is the primary use case and we want the explicit list to * be the tighter "operator types it themselves" bucket. Both paths share * the same per-pair caps in the consumer. */ const MAX_BACKFILL_DIDS = 100; const tokenEncoder = new TextEncoder(); /** * Constant-time string equality via workerd's audited * `crypto.subtle.timingSafeEqual`. The primitive returns `false` immediately * for length-mismatched buffers, so the *prefix*-comparison is constant-time * but a length difference is still observable via timing — acceptable here * because the protected secret (`ADMIN_TOKEN`) has a fixed configured length * known only to the operator, and any realistic length-via-timing attack * would require so many requests that other defences (rate-limiting, * Cloudflare Bot Management, log review) catch it first. */ function timingSafeEqual(a: string, b: string): boolean { const aBuf = tokenEncoder.encode(a); const bBuf = tokenEncoder.encode(b); if (aBuf.byteLength !== bBuf.byteLength) return false; return crypto.subtle.timingSafeEqual(aBuf, bBuf); } /** * Validate the request's `Authorization: Bearer ` header against * `env.ADMIN_TOKEN`. Returns null on success, or a 401 Response to return * directly. Empty/missing token in env fails closed. */ function requireAdminAuth(request: Request, env: Env): Response | null { const expected = env.ADMIN_TOKEN; // `trim()` defends against a whitespace-only secret slipping past // `secrets.required`'s presence check — `wrangler secret put ADMIN_TOKEN` // followed by an accidental Enter would otherwise produce a working // endpoint with a trivially guessable token. if (!expected || expected.trim().length === 0) { // Misconfigured production or unset dev — closed by default. return new Response("admin endpoints not configured", { status: 503 }); } const auth = request.headers.get("authorization"); const SCHEME_PREFIX = "bearer "; // RFC 6750 §2.1: the auth scheme is case-insensitive. `curl -H // "authorization: bearer ..."` and SDKs that don't canonicalise the // scheme would otherwise fail with a confusing 401 even though the // token is correct. if ( !auth || auth.length < SCHEME_PREFIX.length || auth.slice(0, SCHEME_PREFIX.length).toLowerCase() !== SCHEME_PREFIX ) { return new Response("unauthorized", { status: 401, headers: { "www-authenticate": "Bearer" }, }); } const token = auth.slice(SCHEME_PREFIX.length); if (!timingSafeEqual(token, expected)) { return new Response("unauthorized", { status: 401, headers: { "www-authenticate": "Bearer" }, }); } return null; } type BackfillRequest = { mode: "explicit"; dids: string[] } | { mode: "discover" }; function parseBackfillBody(body: unknown): BackfillRequest | { error: string } { if (!isPlainObject(body)) { return { error: "request body must be a JSON object" }; } const rawDids = body["dids"]; // Empty body OR `{ "dids": null }` OR `{ "dids": undefined }` ⇒ discovery // mode. Production cold-start uses this path; the explicit list is the // testing / recovery seam. if (rawDids === undefined || rawDids === null) { return { mode: "discover" }; } if (!Array.isArray(rawDids)) { return { error: "`dids` must be an array of DID strings, or omitted to discover via the relay", }; } if (rawDids.length === 0) { return { error: "`dids` must not be empty (omit the field to discover via the relay)", }; } if (rawDids.length > MAX_BACKFILL_DIDS) { return { error: `\`dids\` must contain at most ${MAX_BACKFILL_DIDS} entries (got ${rawDids.length})`, }; } const seen = new Set(); for (const did of rawDids) { if (!isDid(did)) { return { error: `invalid DID in list: ${JSON.stringify(did)}` }; } seen.add(did); } // Set iteration order matches insertion; dedup preserves first-seen order // so the operator sees jobs run in the order they submitted. return { mode: "explicit", dids: [...seen] }; } export default { async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise { const url = new URL(request.url); if (url.pathname === BOOTSTRAP_PATH) { if (request.method !== "POST") { return new Response("method not allowed", { status: 405, headers: { allow: "POST" }, }); } const denied = requireAdminAuth(request, env); if (denied) return denied; const id = env.RECORDS_DO.idFromName(RECORDS_DO_NAME); const stub = env.RECORDS_DO.get(id); // Fire-and-forget so the response shape doesn't depend on the // DO's status output. Caller gets the same 204 whether the DO // was already running, just woke up, or is mid-startup. ctx.waitUntil(stub.fetch("https://do.internal/bootstrap")); return new Response(null, { status: 204 }); } if (url.pathname === STATUS_PATH) { if (request.method !== "GET") { return new Response("method not allowed", { status: 405, headers: { allow: "GET" }, }); } const denied = requireAdminAuth(request, env); if (denied) return denied; // Proxy the DO's status JSON straight through. The DO returns // `{cursor, consecutiveFailures}` — `consecutiveFailures: 0` // means the latest connection attempt produced events; non-zero // means Jetstream is unreachable, the wantedCollections filter // is mismatched, or queue backpressure is biting. const id = env.RECORDS_DO.idFromName(RECORDS_DO_NAME); const stub = env.RECORDS_DO.get(id); return stub.fetch("https://do.internal/status"); } if (url.pathname === BACKFILL_PATH) { if (request.method !== "POST") { return new Response("method not allowed", { status: 405, headers: { allow: "POST" }, }); } const denied = requireAdminAuth(request, env); if (denied) return denied; // Empty / no body is the production discovery path. JSON-parse // failures with no content (Content-Length: 0) come back as // SyntaxError; we treat that as "discover" rather than 400ing // so `curl -X POST ... /_admin/backfill` (no body) does the // expected thing. let body: unknown = {}; const text = await request.text(); if (text.length > 0) { try { body = JSON.parse(text); } catch { return new Response("request body must be valid JSON", { status: 400 }); } } const parsed = parseBackfillBody(body); if ("error" in parsed) { return new Response(parsed.error, { status: 400 }); } // Fire-and-forget via waitUntil so the route returns 202 quickly. // `runBackfill` only does discovery + queue fan-out (both fast); // per-pair work runs in the BACKFILL_QUEUE consumer with its own // invocation budget. Operator-facing observability: // - this handler's logs ([aggregator] backfill discovery/enqueue) // - per-pair logs from `backfill-consumer.ts` // - DLQ inspection (`emdash-aggregator-backfill-dlq`) for // pairs that exhausted retries ctx.waitUntil(runBackfill(parsed, env)); return new Response(null, { status: 202 }); } // XRPC read API: aggregator endpoints + cached sync.getRecord // passthrough. Returns null if pathname doesn't start with /xrpc/, so // non-matching paths fall through to the catch-all below. const xrpc = await handleXrpc(env, request); if (xrpc) return xrpc; return new Response("emdash-aggregator: not found", { status: 404, headers: { "content-type": "text/plain" }, }); }, async queue(batch: MessageBatch, env: Env, _ctx: ExecutionContext): Promise { // Workerd routes every consumer here; dispatch by queue name to the // matching typed handler. The parameter is unparameterised // (`MessageBatch` defaults to `MessageBatch`) because the // binding is shared across queues — narrowing happens per-case via // the queue name, which is a runtime tag the compiler can't see. switch (batch.queue) { case RECORDS_QUEUE_NAME: // eslint-disable-next-line typescript/no-unsafe-type-assertion -- narrowed by queue name await processBatch(batch as MessageBatch, env); return; case RECORDS_DLQ_NAME: // eslint-disable-next-line typescript/no-unsafe-type-assertion -- narrowed by queue name await drainDeadLetterBatch(batch as MessageBatch, env); return; case BACKFILL_QUEUE_NAME: // eslint-disable-next-line typescript/no-unsafe-type-assertion -- narrowed by queue name await processBackfillBatch(batch as MessageBatch, env); return; case BACKFILL_DLQ_NAME: // eslint-disable-next-line typescript/no-unsafe-type-assertion -- narrowed by queue name drainBackfillDeadLetterBatch(batch as MessageBatch, env); return; default: console.error("[aggregator] unknown queue, acking batch", { queue: batch.queue }); for (const m of batch.messages) m.ack(); } }, async scheduled(_event: ScheduledEvent, env: Env, ctx: ExecutionContext): Promise { // DO liveness. The records DO is meant to hold an outbound WebSocket // continuously, but during a Jetstream outage it spends time in // backoff sleeps — that's when CF can evict it. Hitting the DO from // the cron wakes it back up; constructor-time `ingestor.run()` // resumes from the persisted cursor. Reconciliation work will share // this trigger when it lands. const id = env.RECORDS_DO.idFromName(RECORDS_DO_NAME); const stub = env.RECORDS_DO.get(id); ctx.waitUntil(stub.fetch("https://do.internal/liveness")); }, }; type BackfillRequestParsed = { mode: "explicit"; dids: string[] } | { mode: "discover" }; /** * Backfill orchestrator. Runs inside the POST handler's `ctx.waitUntil`. * Two paths: * * - "discover" — call `com.atproto.sync.listReposByCollection` against * `env.RELAY_URL` for each NSID in WANTED_COLLECTIONS, union the DIDs, * then fan them out as backfill jobs. * - "explicit" — operator-supplied DID list, used for testing or for * recovery of a known DID set. * * In both cases the actual per-(DID, collection) work runs in the * `BACKFILL_QUEUE` consumer (see `backfill-consumer.ts`). This orchestrator * just discovers + enqueues — both fast operations that fit well within * Cloudflare's 30s `waitUntil` ceiling regardless of fan-out size, even * after a `MAX_DISCOVERED_DIDS`-sized union. * * Per-pair progress is logged from the consumer; this function only logs * the discovery + fan-out result so an operator watching `wrangler tail` * sees how many jobs were enqueued. */ async function runBackfill(req: BackfillRequestParsed, env: Env): Promise { try { let dids: readonly string[]; if (req.mode === "discover") { console.log("[aggregator] backfill discovery starting", { relay: env.RELAY_URL, }); dids = await discoverDids(env.RELAY_URL); console.log("[aggregator] backfill discovery complete", { relay: env.RELAY_URL, didCount: dids.length, }); } else { dids = req.dids; console.log("[aggregator] backfill enqueue starting", { mode: "explicit", didCount: dids.length, }); } if (dids.length === 0) { console.warn("[aggregator] backfill produced zero DIDs, nothing to enqueue", { mode: req.mode, }); return; } const enqueued = await enqueueBackfillJobs(dids, env.BACKFILL_QUEUE); console.log("[aggregator] backfill enqueue complete", { mode: req.mode, didCount: dids.length, jobsEnqueued: enqueued, }); } catch (err) { console.error("[aggregator] backfill aborted", { mode: req.mode, error: err instanceof Error ? (err.stack ?? err.message) : String(err), }); } }