# Warlock Herald — full skills > Package: `@warlock.js/herald` > Generated artifact. Concatenates every SKILL.md and reference file under `@warlock.js/herald/skills/`. Re-run `node scripts/generate-llms.mjs` after any change. ## consume-message `@warlock.js/herald/consume-message/SKILL.md` --- name: consume-message description: 'Subscribe to a channel via .subscribe(handler, options?) — handler receives (message, ctx). Control flow via ctx.ack / ctx.nack(requeue?) / ctx.reject / ctx.retry(delayMs). Configure prefetch, retry policy, dead-letter, consumer groups. Triggers: `subscribe`, `ctx.ack`, `ctx.nack`, `ctx.reject`, `ctx.retry`, `prefetch`, `group`, `retry`, `deadLetter`, `@Consumable`, `EventConsumer`, `defineConsumer`, `autoAck`; "consume messages from a queue", "implement a worker", "background queue processor", "set up retry and dead-letter", "class-based event consumer"; typical import `import { herald, Consumable, EventConsumer } from "@warlock.js/herald"`. Skip: setup — `@warlock.js/herald/herald-basics/SKILL.md`; producing — `@warlock.js/herald/publish-message/SKILL.md`; RPC reply — `@warlock.js/herald/request-and-respond/SKILL.md`; competing libs `amqplib`, `bullmq`, `kafkajs`; NestJS `@MessagePattern` / `@EventPattern`.' --- # Consume messages Subscribe to a channel; receive each message with a flow-control `ctx`. ## Minimal subscribe ```ts import { herald } from "@warlock.js/herald"; herald() .channel<{ userId: number; email: string }>("user.created") .subscribe(async (message, ctx) => { try { await sendWelcomeEmail(message.payload.email); await ctx.ack(); // success — message removed from queue } catch (error) { await ctx.nack(true); // failure — requeue for redelivery } }); ``` **Smart auto-ack is on by default** (when `autoAck` is unset or `false`). The consumer runs with manual-ack enabled, and herald acks/nacks based on how your handler ends: - Handler returns without calling any `ctx` method → herald **auto-acks**. - Handler **throws** → herald **auto-nacks** (requeue, or DLQ/reject once retries are exhausted — see below). - Handler calls `ctx.ack()` / `ctx.nack()` / `ctx.reject()` / `ctx.retry()` → herald respects that and does nothing further. So the `try/catch` above is optional — letting the error throw produces the same auto-nack. Call `ctx` methods explicitly only when you want a *non-default* outcome (reject without requeue, route to DLQ, delayed retry). The one mode that loses messages is `autoAck: true`, where the broker acks on delivery before your handler runs. ## Message context — flow control ```ts ctx.ack(); // acknowledge — message removed from the queue ctx.nack(requeue?); // negative ack — requeue (true) or send to DLQ (false) ctx.reject(); // shorthand for nack(false) ctx.retry(delayMs); // delayed retry — requeue after delay ctx.reply(payload); // for request-response pattern (see request-and-respond skill) ``` Pick by intent: | Intent | Use | | --- | --- | | Processed cleanly | `ctx.ack()` | | Transient failure, try again | `ctx.nack(true)` or `ctx.retry(5000)` | | Permanent failure, send to DLQ | `ctx.nack(false)` or `ctx.reject()` | | Validation failure (bad message) | `ctx.reject()` — don't requeue | ## Subscribe options ```ts await channel.subscribe(handler, { group: "email-workers", // consumer group / tag — multiple consumers share work prefetch: 10, // concurrency — how many in-flight messages this consumer holds autoAck: false, // default; keep it false in production exclusive: false, // single consumer only on this channel? retry: { maxRetries: 3, // redelivery ceiling on handler throw (delay not auto-applied — see below) delay: 1000, }, deadLetter: { channel: "user.created.failed", preserveOriginal: true, }, }); ``` `group` is the unit of "share work" — N consumers in the same group split messages across them. Different groups each receive every message (fanout). `prefetch` is the per-consumer concurrency cap. Higher = throughput; lower = even spread. For CPU-bound handlers, set ~= CPU cores. For IO-bound, can go higher (50+). ## Retry policy ```ts retry: { maxRetries: 3, delay: 1000, // see the caveat below — delay is not auto-applied } ``` `maxRetries` is the part that does the work: when a handler **throws**, herald reads the message's `x-retry-count` header and, while it's under `maxRetries`, nacks with requeue so the broker redelivers. Once `x-retry-count` reaches `maxRetries`, it nacks-without-requeue (→ DLQ if configured) or rejects outright. **Caveat on `delay`.** `RetryOptions.delay` (number or `(attempt) => number`) is **not applied on the automatic throw path** — a thrown handler requeues immediately, with no wait. The only place a delay takes effect is the explicit `ctx.retry(delayMs)` call, which republishes the message with an `x-delay` header — and even that needs the RabbitMQ delayed-message-exchange plugin installed, or the delay is ignored. So if you need real backoff, call `ctx.retry(ms)` from inside the handler and install the plugin; don't rely on the channel-level `retry.delay` for timing. ## Dead-letter queue ```ts deadLetter: { channel: "user.created.failed", preserveOriginal: true, // accepted, but currently a no-op (see note) } ``` The DLQ always forwards the full original payload plus the message metadata, so `preserveOriginal` makes no observable difference today — the field is part of the type but the RabbitMQ driver doesn't branch on it yet. Set it or omit it; behaviour is the same. Messages that exhausted retries land in `user.created.failed`. Subscribe to it separately for alerting / manual inspection: ```ts herald().channel("user.created.failed").subscribe(async (message, ctx) => { await alerts.notify(`Failed: ${JSON.stringify(message.payload)}`); await ctx.ack(); }); ``` ## Decorator-style consumers — `@Consumable` + `EventConsumer` For class-based consumers that pair with the [`EventMessage`](@warlock.js/herald/publish-message/SKILL.md) producer layer. This is a **different handler shape** from the raw `.subscribe()` above — ack/nack is automatic, and you get the unwrapped payload plus an event-metadata object (no `ctx`): ```ts import { Consumable, EventConsumer } from "@warlock.js/herald"; import type { ConsumedEventMessage } from "@warlock.js/herald"; @Consumable() // or @Consumable({ broker: "analytics" }) to target a non-default broker export class UserCreatedConsumer extends EventConsumer<{ id: number; email: string }> { public static eventName = "user.created"; // handle(payload, event) — NOT (message, ctx). No ctx.ack() here. public async handle(payload: { id: number; email: string }, event: ConsumedEventMessage) { await sendWelcomeEmail(payload.email); // return cleanly → herald acks. throw → herald nacks (requeue). } } ``` The handler receives the **already-unwrapped payload** as the first argument and a `ConsumedEventMessage` as the second — `{ messageId, eventName, payload, version?, occurredAt?, metadata?, message }` (where `message` is the raw `Message` if you need `message.metadata.headers`). There is **no `ctx`**: the framework auto-acks when `handle` resolves and auto-nacks-with-requeue when it throws, so you never call `ack`/`nack` yourself in this style. Wiring: the channel name comes from `static eventName`, and `@Consumable` self-registers the moment the class module is **imported** — if a broker is already connected it subscribes immediately, otherwise it buffers and subscribes once `connectToBroker` fires. So the only wiring you need is to import the consumer file on the boot path (e.g. your module's `main.ts`). **Pair it with the producer:** an `EventConsumer` expects the envelope shape that `publishEvent(new SomeEvent(...))` emits — it reads `envelope.payload`. Don't point one at a channel fed by a bare `channel(name).publish(rawBody)`; the payload nesting won't match. ### Validation + version gating Two optional gates run before `handle`, both driven by fields on the class: ```ts import { Consumable, EventConsumer } from "@warlock.js/herald"; import { v } from "@warlock.js/seal"; @Consumable() export class UserCreatedConsumer extends EventConsumer { public static eventName = "user.created"; // Only accept events whose `version` falls in [minVersion, maxVersion]. // Out-of-range events are acked and skipped (not requeued). public static minVersion = 2; public static maxVersion = 3; // Seal schema — invalid payloads are nacked before handle() runs. public schema = v.object({ id: v.int(), email: v.string().email() }); public async handle(payload, event) { // payload is validated + version-accepted here } } ``` ### `defineConsumer` — no-class shorthand ```ts import { defineConsumer } from "@warlock.js/herald"; export const userCreatedConsumer = defineConsumer<{ id: number; email: string }>("user.created", { schema: userCreatedSchema, // optional seal validation handle: async (payload, event) => { await sendWelcomeEmail(payload.email); }, }); ``` `defineConsumer` self-registers via `@Consumable` internally, so importing the module is all the wiring it needs — same as the class form. (`subscribeConsumer(SomeConsumerClass)` is the imperative equivalent if you'd rather register explicitly.) ## Subscription handle `subscribe()` resolves to a `Subscription` you can manage: ```ts const subscription = await channel.subscribe(handler); subscription.id; // the consumer id (string) subscription.isActive(); // boolean await subscription.unsubscribe(); // cancel the consumer on the broker await subscription.pause(); // also cancels the consumer (RabbitMQ has no native pause) ``` On the RabbitMQ driver, `pause()` and `unsubscribe()` both cancel the consumer; `resume()` is **not supported and throws** — create a fresh subscription to start consuming again. You can also cancel by id from the channel: `await channel.unsubscribeById(consumerId)`, or drop every consumer on the channel with `await channel.stopConsuming()`. ## Channel admin The channel exposes queue-management helpers beyond pub/sub: ```ts const { messageCount, consumerCount } = await channel.stats(); // queue depth + live consumers const purged = await channel.purge(); // drop all pending messages, returns the count const ok = await channel.exists(); // does the queue exist on the broker? await channel.assert(); // create the queue with its options (idempotent) await channel.delete(); // remove the queue entirely ``` `assert()` runs lazily on the first `publish` / `subscribe`, so you rarely call it directly — reach for it only to pre-create a queue before any traffic. ## Things NOT to do - Don't catch the handler's error and return normally. Smart auto-ack reads a clean return as success and **acks** — silently dropping the work. Let it throw (auto-nack) or call `ctx.reject()` / `ctx.nack(false)` deliberately. - Don't `ctx.nack(true)` in an infinite loop. With no `retry` policy, a perpetually-failing message ping-pongs forever. Set `retry.maxRetries` + a `deadLetter` so failures eventually move out. - Don't ignore the `deadLetter` channel. Subscribe to it (or at least monitor its depth) — DLQ growth means real failures piling up. - Don't use `autoAck: true` for messages that matter. The broker acks on delivery, so a crash mid-handling loses the message. ## See also - [`@warlock.js/herald/publish-message/SKILL.md`](@warlock.js/herald/publish-message/SKILL.md) — the producing side - [`@warlock.js/herald/request-and-respond/SKILL.md`](@warlock.js/herald/request-and-respond/SKILL.md) — when you need to reply to a message ## herald-basics `@warlock.js/herald/herald-basics/SKILL.md` --- name: herald-basics description: 'Start with @warlock.js/herald — connectToBroker config, herald() factory, channel concept, multi-broker support. Triggers: `connectToBroker`, `herald`, `channel`, `isDefault`, `autoAck`; "set up herald", "wire connectToBroker at boot", "configure multiple brokers", "notifications + analytics + events"; typical import `import { connectToBroker, herald } from "@warlock.js/herald"`. Skip: publishing — `@warlock.js/herald/publish-message/SKILL.md`; consuming — `@warlock.js/herald/consume-message/SKILL.md`; RPC — `@warlock.js/herald/request-and-respond/SKILL.md`; competing libs `amqplib`, `kafkajs`, `bullmq`, `nats`; NestJS messaging; native `EventEmitter`.' --- # Herald basics Message bus library — wraps RabbitMQ (Kafka WIP) behind a unified pub/sub API. `herald()` returns a broker, `.channel(name)` returns a pub/sub interface, type-safe via TypeScript generics. > This skill is the herald **map** — read it first, then load the specific skill for the task. ## Install ```bash yarn add @warlock.js/herald amqplib # amqplib for RabbitMQ ``` ## Foundations 1. **`connectToBroker(config)` is the bootstrap.** Call once per broker at app startup, before any publish/subscribe. 2. **`herald()` returns the default broker.** `herald("name")` returns a named one. Most apps have one broker; multi-broker is for "notifications + analytics + events" scale. 3. **`.channel(name)` is the queue / topic.** Publish into it from producer, subscribe to it from consumer. 4. **Channels are typed.** `channel("user.created")` gives full TS inference on publish and subscribe. 5. **`@warlock.js/seal` schemas validate on publish + receive.** Pass `{ schema }` to `.channel(name, { schema })`. 6. **Subscribers control message flow** via `ctx.ack()` / `ctx.nack()` / `ctx.reject()` / `ctx.retry(ms)`. 7. **Smart auto-ack is the default** (`autoAck` unset/`false`). The consumer runs with manual-ack enabled, but herald acks for you when the handler returns cleanly and nacks-with-requeue when it throws — so a crash mid-handling re-delivers, and a clean handler that forgot `ctx.ack()` is still acked. Call `ctx` methods explicitly only when you need a non-default outcome (reject, DLQ, delayed retry). `autoAck: true` is the dangerous mode: the broker acks on delivery, so a crash loses the message. ## Minimal example ```ts import { connectToBroker, herald } from "@warlock.js/herald"; // Boot await connectToBroker({ driver: "rabbitmq", host: "localhost", port: 5672, username: "guest", password: "guest", }); // Produce await herald().channel("user.created").publish({ userId: 1, email: "ada@example.com" }); // Consume herald() .channel<{ userId: number; email: string }>("user.created") .subscribe(async (message, ctx) => { console.log("New user:", message.payload.userId); await ctx.ack(); }); ``` ## Multi-broker ```ts await connectToBroker({ driver: "rabbitmq", name: "notifications", isDefault: true, host: process.env.NOTIFICATIONS_HOST, }); await connectToBroker({ driver: "rabbitmq", name: "analytics", isDefault: false, // ← required: `isDefault` defaults to true, so omitting it makes analytics the default host: process.env.ANALYTICS_HOST, }); herald().channel("emails").publish({ /* ... */ }); // → notifications herald("analytics").channel("events").publish({ /* ... */ }); // → analytics ``` `connectToBroker` defaults `isDefault` to `true`, and the registry promotes the most-recently-registered default. So when you register more than one broker, mark the secondaries `isDefault: false` — otherwise the last one wins and `herald()` returns the wrong broker. ## Pick a skill | If the task is about… | Load | | --- | --- | | Publishing messages — single + batch + publish options (priority, ttl, delay, persistent, headers) | [`@warlock.js/herald/publish-message/SKILL.md`](@warlock.js/herald/publish-message/SKILL.md) | | Subscribing to messages — handler signature, message context (ack/nack/retry), prefetch, retry policy, dead-letter | [`@warlock.js/herald/consume-message/SKILL.md`](@warlock.js/herald/consume-message/SKILL.md) | | Request/response (RPC) — `channel.request(...)` + `channel.respond(...)` for synchronous-style calls over a message bus | [`@warlock.js/herald/request-and-respond/SKILL.md`](@warlock.js/herald/request-and-respond/SKILL.md) | | Schema-validated channels — `.channel(name, { schema })` with `@warlock.js/seal` | See [`@warlock.js/seal/seal-basics/SKILL.md`](@warlock.js/seal/seal-basics/SKILL.md) + skills below | ## Things NOT to do - Don't `connectToBroker` from inside a request handler. Call once at boot. - Don't catch a handler error and swallow it — under smart auto-ack a thrown handler auto-nacks (re-delivers), but a caught-and-ignored error looks like success and gets auto-acked, silently dropping the work. Either let it throw or call `ctx.reject()` / `ctx.nack(false)` deliberately. - Don't use `autoAck: true` in production. The default smart auto-ack re-delivers on a mid-handling crash; `autoAck: true` acks on delivery, so a crash loses the message. - Don't share a typed channel across producer and consumer code without a shared type / schema file. Drift between sides causes silent payload corruption. ## See also - README at `@warlock.js/herald/README.md` for the full API surface and RabbitMQ / Kafka driver config - [`@warlock.js/seal/seal-basics/SKILL.md`](@warlock.js/seal/seal-basics/SKILL.md) — schema validation ## publish-message `@warlock.js/herald/publish-message/SKILL.md` --- name: publish-message description: 'Publish messages to a channel — .publish(payload, options?) for single, .publishBatch([...], options?) for batch, with priority / ttl / delay / persistent / correlationId / headers options. Optional schema validation via .channel(name, {schema}). Triggers: `publish`, `publishBatch`, `channel`, `priority`, `ttl`, `delay`, `persistent`, `correlationId`, `headers`, `schema`; "publish a message", "emit an event after a domain change", "fan out a notification", "schedule delayed work", "batch publish"; typical import `import { herald } from "@warlock.js/herald"`. Skip: setup — `@warlock.js/herald/herald-basics/SKILL.md`; consuming — `@warlock.js/herald/consume-message/SKILL.md`; RPC — `@warlock.js/herald/request-and-respond/SKILL.md`; competing libs `amqplib`, `kafkajs`, `bullmq`; NestJS `ClientProxy.emit`.' --- # Publish messages Push messages into a channel. The consumer side picks them up via [`@warlock.js/herald/consume-message/SKILL.md`](@warlock.js/herald/consume-message/SKILL.md). ## Single publish ```ts import { herald } from "@warlock.js/herald"; await herald().channel("user.created").publish({ userId: 1, email: "ada@example.com", }); ``` Returns when the broker has accepted the message (not when a consumer has handled it — that's `.request()` territory). ## Typed channels ```ts type UserPayload = { userId: number; email: string }; const channel = herald().channel("user.created"); await channel.publish({ userId: 1, email: "test@example.com" }); // ✅ typed await channel.publish({ userId: "1" } as never); // ❌ compile error ``` Share the payload type between producer and consumer via a common `types/` file. ## Schema-validated publish ```ts import { v } from "@warlock.js/seal"; const userSchema = v.object({ userId: v.int(), email: v.string().email(), }); const channel = herald().channel("user.created", { schema: userSchema }); await channel.publish({ userId: 1, email: "invalid" }); // Throws — fails .email() ``` The schema runs `validate()` before the message hits the broker. Invalid payloads never leave the producer. See [`@warlock.js/seal/seal-basics/SKILL.md`](@warlock.js/seal/seal-basics/SKILL.md). ## Publish options ```ts await channel.publish(payload, { priority: 5, // 0-9, higher = served first expiration: 60_000, // ms — message expires if not consumed (RabbitMQ uses `expiration`, not `ttl`) delay: 5_000, // ms — delayed delivery (requires RabbitMQ delay plugin) persistent: true, // survive broker restart (default true) correlationId: "uuid", // for tracking across services headers: { tenantId: "42", source: "billing-service", }, }); ``` The headers field is free-form — consumers read `message.metadata.headers?.tenantId` to route or filter (headers live under `message.metadata`, alongside `messageId`, `correlationId`, `timestamp`, and `retryCount`). ## Batch publishing ```ts await channel.publishBatch([ { userId: 1, email: "a@example.com" }, { userId: 2, email: "b@example.com" }, { userId: 3, email: "c@example.com" }, ], { // Options apply to every message in the batch persistent: true, }); ``` Under the hood `publishBatch` iterates and calls `.publish()` per message — there's no AMQP-level batching today. It's an ergonomic shape for "emit these N items" (bulk import, fanout to many recipients), not a throughput optimization. The same `options` apply to every message. ## Publish + transaction (outbox pattern) Don't publish inside a database transaction — if the transaction rolls back but the publish already happened, you've emitted an event for a state that never persisted. The outbox pattern: write to an `outbox` table inside the transaction, dispatch from the outbox via a worker after commit: ```ts await transaction(async () => { await Order.create(orderData); await Outbox.create({ channel: "order.created", payload: { orderId: order.id, ... }, status: "pending", }); }); // In a separate worker: const pending = await Outbox.where("status", "pending").get(); for (const row of pending) { await herald().channel(row.get("channel")).publish(row.get("payload")); await row.merge({ status: "sent", sent_at: new Date() }).save(); } ``` Full recipe at `domains/cascade/docs/recipes/outbox-pattern.md`. ## Broadcasting use-case results `@warlock.js/herald` ships `heraldBroadcast()` — a channel adapter that lets a `@warlock.js/core` `useCase()` publish its result to the bus automatically on success. Register it once in the use-cases config; each use case opts in with `broadcast: true`. ```ts title="src/config/use-cases.ts" import { type UseCaseConfigurations } from "@warlock.js/core"; import { heraldBroadcast } from "@warlock.js/herald"; export default { broadcast: { enabled: true, channels: [heraldBroadcast({ broker: "default" })], // omit broker for the default }, } satisfies UseCaseConfigurations; ``` ```ts // The use case opts in — channel name defaults to the use case name. export const createUserUseCase = useCase({ name: "users.create", // → publishes to channel "users.create" schema: createUserSchema, handler: async (data) => User.create(data), broadcast: true, }); ``` The adapter publishes the broadcast envelope's `payload` to `channel(event.event)`. It's structurally typed, so herald keeps no dependency on core. See [`@warlock.js/core/write-use-case/SKILL.md`](@warlock.js/core/write-use-case/SKILL.md) for the `broadcast` option (payload projection, custom event name, payload-safety rules). ## Event classes (`EventMessage`) — the typed event layer Raw `channel(name).publish(payload)` is the low-level path. For domain events that travel between modules, herald ships a higher-level pair: define the event once as a class, publish instances of it, and consume them with an [`EventConsumer`](@warlock.js/herald/consume-message/SKILL.md). The channel name comes from the event's `eventName` — no string to keep in sync across producer and consumer. ```ts import { defineEvent, publishEvent } from "@warlock.js/herald"; // `defineEvent` — `toJSON` projects the wire payload. const UserCreatedEvent = defineEvent("user.created", { toJSON: (user) => ({ id: user.id, email: user.email }), // schema?: an @warlock.js/seal ObjectValidator (held on the instance; the consumer side validates) }); // `publishEvent` serializes the instance and publishes to channel "user.created" on the default broker. await publishEvent(new UserCreatedEvent(user)); ``` `publishEvent(event)` is shorthand for `herald().publish(event)` — it calls `event.serialize()` and sends an **envelope**: `{ payload, metadata, messageId, eventName, version, occurredAt }`. That nesting is why an `EventConsumer` reads `envelope.payload`, not the raw body — pair this only with the consumer side, not a bare `.subscribe()`. Set an optional `version` on the event (instance field) to drive the consumer's `minVersion` / `maxVersion` acceptance gate. The class form (`class X extends EventMessage`) is equivalent — override `toJSON()` and set `eventName`, `version`, `schema` as fields. ## Things NOT to do - Don't publish inside a transaction. Use the outbox pattern. - Don't pass non-JSON-serializable values (functions, `BigInt`, class instances with methods). Serialization happens at the broker boundary; functions go silent, BigInt throws. - Don't `expiration: 0` — set a real expiry or omit. `0` means "expire immediately." - Don't omit `persistent: true` for messages where loss matters. Default is `true` in this lib, but worth being explicit when the message represents money / customer state. - Don't put secrets in headers. Headers travel in plaintext (encrypted only by TLS in transit, not at rest in the broker). ## See also - [`@warlock.js/herald/consume-message/SKILL.md`](@warlock.js/herald/consume-message/SKILL.md) — the receiving side - [`@warlock.js/herald/request-and-respond/SKILL.md`](@warlock.js/herald/request-and-respond/SKILL.md) — when you need a reply ## request-and-respond `@warlock.js/herald/request-and-respond/SKILL.md` --- name: request-and-respond description: 'Synchronous-style RPC over the message bus — channel.request(payload, {timeout}) waits for a reply, channel.respond(handler) registers the responder, ctx.reply(response) sends the answer. Triggers: `channel.request`, `channel.respond`, `ctx.reply`, `timeout`, `correlationId`, `headers.correlationId`; "RPC over message bus", "request-response across services", "wait for a reply", "internal service-to-service call instead of HTTP"; typical import `import { herald } from "@warlock.js/herald"`. Skip: setup — `@warlock.js/herald/herald-basics/SKILL.md`; fire-and-forget — `@warlock.js/herald/publish-message/SKILL.md`; consumer ctx flow — `@warlock.js/herald/consume-message/SKILL.md`; competing libs `amqplib` RPC, `nats` request/reply; NestJS `ClientProxy.send`; gRPC; HTTP.' --- # Request-response over the bus Most messaging is fire-and-forget. When you need a reply, use the request/respond pair — a reply-queue + correlation-id pattern under the hood, surfaced as a typed promise. ## Shape ```ts // Caller (client) const response = await herald() .channel("compute.tax") .request({ amount: 1000, country: "US" }, { timeout: 30_000 }); // Responder (server) herald() .channel("compute.tax") .respond(async (message, ctx) => { const tax = await computeTax(message.payload); return { tax, currency: "USD" }; // ← the return value IS the reply }); ``` `request` returns a promise that resolves with the responder's return value. `respond` registers a handler and **automatically replies with whatever the handler returns** (then acks) — so just `return` the response; you don't call `ctx.reply()` yourself inside a `respond` handler. (`ctx.reply` is the lower-level primitive used when you wire a plain `.subscribe()` as a responder by hand.) ## Timeout ```ts await channel.request(payload, { timeout: 30_000 }); ``` The promise rejects if no reply arrives within `timeout` ms. Pick a sane number — too short rejects mid-work; too long lets a hung responder block the caller. ## When to use it vs HTTP | Use HTTP | Use request/respond | | --- | --- | | Stateless, fast, idempotent ops | Slow / queued ops where the caller can wait | | Public API surface | Internal service-to-service | | Frontend consumption | Backend orchestration | | Synchronous user-facing flow | Async-but-needs-result patterns | The bus adds queue persistence and retry. HTTP is faster for sub-100ms ops; the bus shines when the operation has variable duration or needs a queue's backpressure. ## When NOT to use it - "I need a result in under 50ms" — too much overhead on the bus. - "The responder might be down for hours" — request will time out repeatedly; consider `publish` + write the result somewhere the caller polls. - "Many callers, the response is the same for all" — cache the result and use `publish` for invalidation. ## Multiple responders `respond(handler)` takes no options — there's no `group` knob here. If multiple consumers `respond()` to the same channel, they all sit on the same queue, so RabbitMQ round-robins between them: **one responder handles each request, and the caller gets exactly one reply.** That's the "share work across responders" pattern — a synchronous worker pool. Make every responder functionally identical, since the caller has no say in which one answers. For "multiple replies expected" patterns, use a regular `subscribe` + a request id, then the caller listens on a result channel. ## Correlation across replies Each `request()` generates its own correlation ID and an exclusive reply queue under the hood. The reply carries the same ID; the client matches it to the awaiting promise and resolves. You don't manage correlation IDs manually — and you shouldn't pass your own `correlationId` on a `request()`, since herald overwrites it with the one it uses to match the reply. To thread your own trace id (e.g. a transaction id for logs) through to the responder, put it in `headers` instead — the responder reads it from `message.metadata.headers`. ## Things NOT to do - Don't use request/respond for high-throughput, low-latency ops. The reply-queue round-trip adds at least one broker hop. HTTP is the right tool. - Don't return huge payloads from `respond`. Reply messages travel through the broker; a multi-MB response is a load on the bus and on memory. For big results, write to S3 / cache and reply with a reference. - Don't forget to `respond` in long-lived consumers. If the responder crashes, every caller's request will time out — they don't know why. - Don't set timeout to `Infinity`. A stuck request becomes a leaked promise — set a real timeout and handle the rejection. ## See also - [`@warlock.js/herald/publish-message/SKILL.md`](@warlock.js/herald/publish-message/SKILL.md) — fire-and-forget pattern - [`@warlock.js/herald/consume-message/SKILL.md`](@warlock.js/herald/consume-message/SKILL.md) — subscribe + ctx flow control (including `ctx.reply`)