import { env as importedEnv } from 'cloudflare:workers' import { UnknownError } from '@livestore/common' import type { HelperTypes } from '@livestore/common-cf' import { Effect, Schema } from '@livestore/utils/effect' import type { CfTypes, SearchParams } from '../common/mod.ts' import type { CfDeclare } from './mod.ts' import { type Env, type ForwardedHeaders, matchSyncRequest } from './shared.ts' // NOTE We need to redeclare runtime types here to avoid type conflicts with the lib.dom Response type. declare class Response extends CfDeclare.Response {} // HINT: If we ever extend user's custom worker RPC, type T can help here with expected return type safety. Currently unused. export type CFWorker = { fetch: ( request: CfTypes.Request, env: TEnv, ctx: CfTypes.ExecutionContext, ) => Promise } /** Context passed to validatePayload callback */ export type ValidatePayloadContext = { storeId: string /** Request headers (raw, not filtered by forwardHeaders) */ headers: ForwardedHeaders } /** * Options accepted by {@link makeWorker}. The Durable Object binding has to be * supplied explicitly so we never fall back to deprecated defaults when Cloudflare config changes. */ export type MakeWorkerOptions = { /** * Binding name of the sync Durable Object declared in wrangler config. */ syncBackendBinding: HelperTypes.ExtractDurableObjectKeys /** * Optionally pass a schema to decode the client-provided payload into a typed object * before calling {@link validatePayload}. If omitted, the raw JSON value is forwarded. */ syncPayloadSchema?: Schema.Schema /** * Validates the (optionally decoded) payload during WebSocket connection establishment. * If {@link syncPayloadSchema} is provided, `payload` will be of the schema's inferred type. * * The context includes request headers for cookie-based or header-based authentication. * * @example Cookie-based authentication * ```ts * validatePayload: async (payload, { storeId, headers }) => { * const cookie = headers.get('cookie') * const session = await validateSessionFromCookie(cookie) * if (!session) throw new Error('Unauthorized') * } * ``` * * Note: This runs only at connection time, not for individual push events. * For push event validation, use the `onPush` callback in the Durable Object. */ validatePayload?: (payload: TSyncPayload, context: ValidatePayloadContext) => void | Promise /** @default false */ enableCORS?: boolean } /** * Produces a Cloudflare Worker `fetch` handler that delegates sync traffic to the * Durable Object identified by `syncBackendBinding`. * * For more complex setups prefer implementing a custom `fetch` and call {@link handleSyncRequest} * from the branch that handles LiveStore sync requests. */ export const makeWorker = < TEnv extends Env = Env, TDurableObjectRpc extends CfTypes.Rpc.DurableObjectBranded | undefined = undefined, TSyncPayload = Schema.JsonValue, >( options: MakeWorkerOptions, ): CFWorker => { return { fetch: async (request, env, _ctx) => { const url = new URL(request.url) const corsHeaders: CfTypes.HeadersInit = options.enableCORS === true ? { 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'GET, POST, OPTIONS', 'Access-Control-Allow-Headers': request.headers.get('Access-Control-Request-Headers') ?? '*', } : {} if (request.method === 'OPTIONS' && options.enableCORS === true) { return new Response(null, { status: 204, headers: corsHeaders, }) } const searchParams = matchSyncRequest(request) // Check if this is a sync request first, before showing info message if (searchParams !== undefined) { return handleSyncRequest({ request, searchParams, env, ctx: _ctx, syncBackendBinding: options.syncBackendBinding, headers: corsHeaders, validatePayload: options.validatePayload, syncPayloadSchema: options.syncPayloadSchema, }) } // Only show info message for GET requests to / without sync parameters if (request.method === 'GET' && url.pathname === '/') { return new Response('Info: Sync backend endpoint for @livestore/sync-cf.', { status: 200, headers: { 'Content-Type': 'text/plain' }, }) } console.error('Invalid path', url.pathname) return new Response('Invalid path', { status: 400, statusText: 'Bad Request', headers: { ...corsHeaders, 'Content-Type': 'text/plain', }, }) }, } } /** Convert CF Request headers to a ForwardedHeaders map */ const requestHeadersToMap = (request: CfTypes.Request): ForwardedHeaders => { const result = new Map() request.headers.forEach((value, key) => { result.set(key.toLowerCase(), value) }) return result } /** * Handles LiveStore sync requests (e.g. with search params `?storeId=...&transport=...`). * * @example Token-based authentication * ```ts * const validatePayload = (payload: Schema.JsonValue | undefined, context: { storeId: string }) => { * if (payload?.authToken !== 'insecure-token-change-me') { * throw new Error('Invalid auth token') * } * } * ``` * * @example Cookie-based authentication * ```ts * const validatePayload = async (payload: Schema.JsonValue | undefined, { storeId, headers }) => { * const cookie = headers.get('cookie') * const session = await validateSessionFromCookie(cookie) * if (!session) throw new Error('Unauthorized') * } * ``` * * @throws {UnknownError} If the payload is invalid */ export const handleSyncRequest = < TEnv extends Env = Env, TDurableObjectRpc extends CfTypes.Rpc.DurableObjectBranded | undefined = undefined, CFHostMetada = unknown, TSyncPayload = Schema.JsonValue, >({ request, searchParams: { storeId, payload, transport }, env: explicitlyProvidedEnv, syncBackendBinding, headers, validatePayload, syncPayloadSchema, }: { request: CfTypes.Request searchParams: SearchParams env?: TEnv | undefined /** Only there for type-level reasons */ ctx: CfTypes.ExecutionContext /** Binding name of the sync backend Durable Object */ syncBackendBinding: MakeWorkerOptions['syncBackendBinding'] headers?: CfTypes.HeadersInit | undefined validatePayload?: MakeWorkerOptions['validatePayload'] syncPayloadSchema?: MakeWorkerOptions['syncPayloadSchema'] }): Promise => Effect.gen(function* () { if (validatePayload !== undefined) { // Convert request headers to a Map for the validation context const requestHeaders = requestHeadersToMap(request) // Always decode with the supplied schema when present, even if payload is undefined. // This ensures required payloads are enforced by the schema. if (syncPayloadSchema !== undefined) { const decodedEither = Schema.decodeUnknownEither(syncPayloadSchema)(payload) if (decodedEither._tag === 'Left') { const message = decodedEither.left.toString() console.error('Invalid payload (decode failed)', message) return new Response(message, { status: 400, ...(headers !== undefined ? { headers } : {}) }) } const result = yield* Effect.promise(async () => validatePayload(decodedEither.right, { storeId, headers: requestHeaders }), ).pipe(UnknownError.mapToUnknownError, Effect.either) if (result._tag === 'Left') { console.error('Invalid payload (validation failed)', result.left) return new Response(result.left.toString(), { status: 400, ...(headers !== undefined ? { headers } : {}) }) } } else { const result = yield* Effect.promise(async () => validatePayload(payload as TSyncPayload, { storeId, headers: requestHeaders }), ).pipe(UnknownError.mapToUnknownError, Effect.either) if (result._tag === 'Left') { console.error('Invalid payload (validation failed)', result.left) return new Response(result.left.toString(), { status: 400, ...(headers !== undefined ? { headers } : {}) }) } } } const env = explicitlyProvidedEnv ?? (importedEnv as TEnv) if (!(syncBackendBinding in env)) { return new Response( `Failed dependency: Required Durable Object binding '${syncBackendBinding as string}' not available`, { status: 424, ...(headers !== undefined ? { headers } : {}), }, ) } const durableObjectNamespace = env[ syncBackendBinding as keyof TEnv ] as CfTypes.DurableObjectNamespace const id = durableObjectNamespace.idFromName(storeId) const durableObject = durableObjectNamespace.get(id) // Handle WebSocket upgrade request const upgradeHeader = request.headers.get('Upgrade') if (transport === 'ws' && (upgradeHeader === null || upgradeHeader !== 'websocket')) { return new Response('Durable Object expected Upgrade: websocket', { status: 426, ...(headers !== undefined ? { headers } : {}), }) } return yield* Effect.promise(() => durableObject.fetch(request)) }).pipe(Effect.tapCauseLogPretty, Effect.runPromise)