import * as Ref from "effect/Ref" import { Rpc } from "effect/unstable/rpc" import { type ClientForOptions, makeQueryKey } from "../client/clientFor.js" import * as Context from "../Context.js" import * as Effect from "../Effect.js" import * as S from "../Schema.js" /** * Shorthand for a handler-derived invalidation key. * Accepts an RPC handler object so `add(GetMe)` is equivalent to * `add(makeQueryKey(GetMe))`. */ export type InvalidationKeyInput = InvalidationKey | { readonly id: string; readonly options?: ClientForOptions } const normalizeKey = (input: InvalidationKeyInput): InvalidationKey => { if (Array.isArray(input)) return input const handler = input as { id: string; options?: ClientForOptions } return makeQueryKey(handler.options ? { id: handler.id, options: handler.options } : { id: handler.id }) } // Disambiguates `add` overloads: a `string[]` means a single key; an array // containing arrays-or-objects means a batch of inputs. const isBatch = ( input: InvalidationKeyInput | ReadonlyArray ): input is ReadonlyArray => Array.isArray(input) && input.length > 0 && typeof input[0] !== "string" const normalizeInputs = ( input: InvalidationKeyInput | ReadonlyArray ): ReadonlyArray => isBatch(input) ? input.map(normalizeKey) : [normalizeKey(input)] /** * A single segment within an `InvalidationKey` array. * Accepts any JSON-compatible value: string, number, boolean, null, * arrays and objects recursively — matching TanStack Query's `queryKey` element type. */ export const InvalidationKeySegment = S.Json export type InvalidationKeySegment = S.Schema.Type /** Schema for a single invalidation key – an array of segments compatible with TanStack Query `queryKey`. */ export const InvalidationKey = S.Array(InvalidationKeySegment) export type InvalidationKey = S.Schema.Type /** Schema for the full set of invalidation keys – an array of `InvalidationKey`. */ export const InvalidationKeys = S.Array(InvalidationKey) export type InvalidationKeys = S.Schema.Type /** Metadata included in every command response for server-driven cache invalidation. */ export const CommandMetaData = S.Struct({ invalidateQueries: InvalidationKeys }) export type CommandMetaData = S.Schema.Type /** * Wraps a command's success schema so that the wire format carries both the `payload` * (the handler's actual return value) and `metadata` (server-driven cache invalidation keys). * Transparent to users: the server handler returns the plain payload and the client receives * the plain payload — wrapping/unwrapping is handled internally by the routing layer. */ export const CommandResponseWithMetaData = (success: S) => S.Struct({ payload: success, metadata: CommandMetaData }) /** * Wraps a command's failure schema so that the wire format carries both the `error` * (the handler's actual failure value) and `metadata` (server-driven cache invalidation keys * accumulated thus far before the failure occurred). * Transparent to users: the server handler fails with the plain error and the client receives * the plain error — wrapping/unwrapping is handled internally by the routing layer. */ export const CommandFailureWithMetaData = (error: E) => S.Struct({ _tag: S.Literal("CommandFailureWithMetaData"), error, metadata: CommandMetaData }) /** * Stream chunk schema for stream responses with metadata. * Each item is either a data value, an intermediate "metadata" signal carrying cache * invalidation keys accumulated since the previous drain, or a final "done" signal. * Transparent to users: stream handlers return plain values and clients receive plain values — * wrapping/unwrapping is handled internally by the routing layer. * * The "done" chunk is always the last item in the stream and carries any remaining invalidation * keys. An optional "metadata" chunk may appear after any "value" chunk and carries keys * accumulated since the last drain (V3: mid-stream invalidation). */ export const StreamResponseChunk = (success: S) => S.Union([ S.Struct({ _tag: S.Literal("value"), value: success }), S.Struct({ _tag: S.Literal("metadata"), metadata: CommandMetaData }), S.Struct({ _tag: S.Literal("done"), metadata: CommandMetaData }) ]) export type StreamResponseChunk = | { readonly _tag: "value"; readonly value: A } | { readonly _tag: "metadata"; readonly metadata: CommandMetaData } | { readonly _tag: "done"; readonly metadata: CommandMetaData } /** * Stream chunk schema for stream failures with metadata. * Used to signal a stream failure while still carrying cache invalidation keys * accumulated thus far. */ export const StreamFailureChunk = (error: E) => S.Struct({ _tag: S.Literal("error"), error, metadata: CommandMetaData }) export type StreamFailureChunk = { readonly _tag: "error"; readonly error: E; readonly metadata: CommandMetaData } /** * Context annotation for declaring static cache invalidation keys on a low-level `Rpc` definition. * These keys are always included in the command response metadata, regardless of the handler logic. * * Prefer using `makeQueryKey` over raw string arrays to stay in sync with the actual query * definitions without manual string maintenance: * * ```ts * import { makeQueryKey } from "effect-app/client" * import { Invalidation } from "effect-app/rpc" * import * as UserRsc from "../User/index.js" // separate module to avoid circular deps * * class UpdateProfile extends Rpc.make("UpdateProfile", { ... }) * .annotate(Invalidation.Invalidates, [makeQueryKey(UserRsc.GetMe), makeQueryKey(UserRsc.GetProfile)]) {} * ``` * * **Circular dependency note:** if mutations and queries live in the same file you may hit a * circular reference at evaluation time. The idiomatic fix is to move mutations into their own * module (e.g. `User/mutations.ts`) that directly imports the relevant query classes rather than * re-exporting them through a barrel. * * For the higher-level `Command`/`Query` builders from `makeRpcClient`, use the * `invalidatesQueries` callback argument instead (it receives the same query keys at runtime). */ export const Invalidates = Context.Reference>( "effect-app/rpc/Invalidates", { defaultValue: () => [] } ) export type Invalidates = typeof Invalidates /** The shape of the per-request service that accumulates invalidation keys. */ export interface InvalidationSetService { readonly add: ( input: InvalidationKeyInput | ReadonlyArray ) => Effect.Effect readonly get: Effect.Effect> /** * V3: Reads all currently accumulated keys and resets the bucket to empty. * Used by the stream routing layer to emit intermediate "metadata" chunks * without re-sending keys that have already been forwarded to the client. */ readonly drain: Effect.Effect> } /** * Request-scoped service for accumulating invalidation keys dynamically inside a handler. * Provided by `InvalidationMiddlewareLive` for every RPC call; has a no-op default so it is * safe to use even when the HTTP middleware is absent (tests, workers, etc.). * * Use `InvalidationSet.add(key)` as a shorthand to skip `.use(_ => _.add(key))`. The * underlying service is still available via `.use` / `.useSync` for advanced cases. * * `add` accepts an RPC handler directly (e.g. `UserRsc.GetMe`) — its query key is derived via * `makeQueryKey` so keys stay in sync with the actual query definitions. Raw `InvalidationKey` * arrays and arrays of either form are also accepted. * * ```ts * import * as Effect from "effect/Effect" * import { Invalidation } from "effect-app/rpc" * import * as CartRsc from "../Cart/queries.js" * import * as UserRsc from "../User/queries.js" * * const handler = Effect.fnUntraced(function*(req: UpdateCartRequest) { * const cart = yield* CartRepo.save(req.cart) * * // single handler * yield* Invalidation.InvalidationSet.add(UserRsc.GetMe) * * // batch * if (cart.isCheckedOut) { * yield* Invalidation.InvalidationSet.add([CartRsc.GetCartStats, UserRsc.GetMe]) * } * * return cart * }) * ``` * * You can combine static (`Invalidates` annotation) and dynamic (`InvalidationSet.use`) keys: * the annotation pre-populates the set before the handler runs; dynamic additions accumulate * throughout the handler. All keys are included in the command response metadata. */ const InvalidationSetRef = Context.Reference( "effect-app/rpc/InvalidationSet", { defaultValue: () => ({ add: (_input: InvalidationKeyInput | ReadonlyArray) => Effect.void, get: Effect.succeed([] as ReadonlyArray), drain: Effect.succeed([] as ReadonlyArray) }) } ) export const InvalidationSet = Object.assign(InvalidationSetRef, { /** * Shortcut for `InvalidationSet.use(_ => _.add(input))`. Accepts a single * `InvalidationKeyInput` or an array of them. */ add: (input: InvalidationKeyInput | ReadonlyArray) => InvalidationSetRef.use((_) => _.add(input)) }) export type InvalidationSet = typeof InvalidationSetRef /** Creates a fresh `InvalidationSet` implementation backed by a `Ref`. */ export const makeInvalidationSet = (ref: Ref.Ref>): InvalidationSetService => ({ add: (input) => Ref.update(ref, (keys) => [...keys, ...normalizeInputs(input)]), get: Ref.get(ref), drain: Ref.getAndSet(ref, []) }) /** * `Rpc.Custom` definition for command RPCs that wrap the success/error schemas * with `CommandResponseWithMetaData` / `CommandFailureWithMetaData`. The wrap * lets server forward accumulated invalidation keys on both success and * handler-thrown failure paths. Middleware-thrown errors bypass the wrap * (the handler never ran, so no metadata) and flow raw at the Cause level — * the client decodes them via the `rpc.middlewares[*].error` failure-union * channel of `Rpc.exitSchema`. */ // eslint-disable-next-line import/namespace export interface CommandRpc extends Rpc.Custom { readonly out: Rpc.Custom.Out< ReturnType>, ReturnType> > } /** * Custom Rpc constructor for command RPCs. * Wraps the success schema with `CommandResponseWithMetaData` and the error * schema with `CommandFailureWithMetaData`. */ export const makeCommandRpc = Rpc.custom(({ defect, error, success }) => ({ success: CommandResponseWithMetaData(success), error: CommandFailureWithMetaData(error), defect })) /** * `Rpc.Custom` definition for stream RPCs that wrap the success/error schemas * with `StreamResponseChunk` / `StreamFailureChunk`. */ // eslint-disable-next-line import/namespace export interface StreamRpc extends Rpc.Custom { readonly out: Rpc.Custom.Out< ReturnType>, ReturnType> > } /** * Custom Rpc constructor for stream RPCs. * Wraps the success schema with `StreamResponseChunk` and * the error schema with `StreamFailureChunk`. */ export const makeStreamRpc = Rpc.custom(({ defect, error, success }) => ({ success: StreamResponseChunk(success), error: StreamFailureChunk(error), defect }))