/// import { DurableObject } from 'cloudflare:workers' import { type CfTypes, setupDurableObjectWebSocketRpc } from '@livestore/common-cf' import { CfDeclare } from '@livestore/common-cf/declare' import { Effect, FetchHttpClient, Layer, Logger, LogLevel, Otlp, RpcMessage, Schema, type Scope, } from '@livestore/utils/effect' import { type Env, extractForwardedHeaders, type MakeDurableObjectClassOptions, matchSyncRequest, type SyncBackendRpcInterface, WebSocketAttachmentSchema, } from '../shared.ts' import { DoCtx } from './layer.ts' import { createDoRpcHandler } from './transport/do-rpc-server.ts' import { createHttpRpcHandler } from './transport/http-rpc-server.ts' import { makeRpcServer } from './transport/ws-rpc-server.ts' // NOTE We need to redeclare runtime types here to avoid type conflicts with the lib.dom Response type. // TODO get rid of those once CF fixed their type mismatch in the worker types declare class Request extends CfDeclare.Request {} declare class Response extends CfDeclare.Response {} declare class WebSocketPair extends CfDeclare.WebSocketPair {} declare class WebSocketRequestResponsePair extends CfDeclare.WebSocketRequestResponsePair {} const DurableObjectBase = DurableObject as any as new ( state: CfTypes.DurableObjectState, env: Env, ) => CfTypes.DurableObject & { ctx: CfTypes.DurableObjectState; env: Env } // Type aliases needed to avoid TS bug https://github.com/microsoft/TypeScript/issues/55021 export type DoState = CfTypes.DurableObjectState export type DoObject = CfTypes.DurableObject & T export type MakeDurableObjectClass = (options?: MakeDurableObjectClassOptions) => { new (ctx: DoState, env: Env): DoObject } /** * Creates a Durable Object class for handling WebSocket-based sync. * A sync Durable Object is uniquely scoped to a specific `storeId`. * * The sync DO supports 3 transport modes: * - HTTP JSON-RPC * - WebSocket * - Durable Object RPC calls (only works in combination with `@livestore/adapter-cf`) * * Example: * * ```ts * // In your Cloudflare Worker file * import { makeDurableObject } from '@livestore/sync-cf/cf-worker' * * export class SyncBackendDO extends makeDurableObject({ * onPush: async (message) => { * console.log('onPush', message.batch) * }, * onPull: async (message) => { * console.log('onPull', message) * }, * }) {} * ``` * * `wrangler.toml` * ```toml * [[durable_objects.bindings]] * name = "SYNC_BACKEND_DO" * class_name = "SyncBackendDO" * [[migrations]] * tag = "v1" * new_sqlite_classes = ["SyncBackendDO"] * ``` */ export const makeDurableObject: MakeDurableObjectClass = (options) => { const enabledTransports = options?.enabledTransports ?? new Set(['http', 'ws', 'do-rpc']) const Logging = Logger.consoleWithThread('SyncDo') const Observability: Layer.Layer = options?.otel?.baseUrl !== undefined ? (Otlp.layer({ baseUrl: options.otel.baseUrl, tracerExportInterval: 50, resource: { serviceName: options.otel.serviceName ?? 'sync-cf-do', }, }).pipe(Layer.provide(FetchHttpClient.layer)) as Layer.Layer) : Layer.empty return class SyncBackendDOBase extends DurableObjectBase implements SyncBackendRpcInterface { __DURABLE_OBJECT_BRAND = 'SyncBackendDOBase' as never constructor(ctx: CfTypes.DurableObjectState, env: Env) { super(ctx, env) const WebSocketRpcServerLive = makeRpcServer({ doSelf: this, doOptions: options }) // This registers the `webSocketMessage` and `webSocketClose` handlers if (enabledTransports.has('ws') === true) { setupDurableObjectWebSocketRpc({ doSelf: this, rpcLayer: WebSocketRpcServerLive, webSocketMode: 'hibernate', // See `pull.ts` for more details how `pull` Effect RPC requests streams are handled // in combination with DO hibernation onMessage: (request, ws) => { if (request._tag === 'Request' && request.tag === 'SyncWsRpc.Pull') { // Is Pull request: add requestId to pullRequestIds const attachment = ws.deserializeAttachment() const { pullRequestIds, ...rest } = Schema.decodeSync(WebSocketAttachmentSchema)(attachment) ws.serializeAttachment( Schema.encodeSync(WebSocketAttachmentSchema)({ ...rest, pullRequestIds: [...pullRequestIds, request.id], }), ) } else if (request._tag === 'Interrupt') { // Is Interrupt request: remove requestId from pullRequestIds const attachment = ws.deserializeAttachment() const { pullRequestIds, ...rest } = Schema.decodeSync(WebSocketAttachmentSchema)(attachment) ws.serializeAttachment( Schema.encodeSync(WebSocketAttachmentSchema)({ ...rest, pullRequestIds: pullRequestIds.filter((id) => id !== request.requestId), }), ) // TODO also emit `Exit` stream RPC message } }, mainLayer: Observability, }) } } override fetch = async (request: Request): Promise => Effect.gen(this, function* () { const searchParams = matchSyncRequest(request) if (searchParams === undefined) { throw new Error('No search params found in request URL') } const { storeId, payload, transport } = searchParams if (enabledTransports.has(transport) === false) { throw new Error(`Transport ${transport} is not enabled (based on \`options.enabledTransports\`)`) } // Extract headers to forward based on configuration (available for all transports) const headers = extractForwardedHeaders(request, options?.forwardHeaders) if (transport === 'http') { return yield* this.handleHttp(request, headers) } if (transport === 'ws') { const { 0: client, 1: server } = new WebSocketPair() // Since we're using websocket hibernation, we need to remember the storeId for subsequent `webSocketMessage` calls // Also store forwarded headers so they're available after hibernation resume server.serializeAttachment( Schema.encodeSync(WebSocketAttachmentSchema)({ storeId, payload, pullRequestIds: [], headers }), ) // See https://developers.cloudflare.com/durable-objects/examples/websocket-hibernation-server this.ctx.acceptWebSocket(server) // Ping requests are sent by Effect RPC internally this.ctx.setWebSocketAutoResponse( new WebSocketRequestResponsePair( JSON.stringify(RpcMessage.constPing), JSON.stringify(RpcMessage.constPong), ), ) return new Response(null, { status: 101, webSocket: client, }) } console.error('Invalid path', request.url) return new Response('Invalid path', { status: 400, statusText: 'Bad Request', }) }).pipe( Effect.tapCauseLogPretty, // Also log errors to console before catching them Effect.catchAllCause((cause) => Effect.succeed(new Response('Error', { status: 500, statusText: cause.toString() })), ), Effect.withSpan('@livestore/sync-cf:durable-object:fetch'), Effect.provide(DoCtx.Default({ doSelf: this, doOptions: options, from: request })), this.runEffectAsPromise, ) /** * Handles DO <-> DO RPC calls */ async rpc(payload: Uint8Array): Promise | CfTypes.ReadableStream> { if (enabledTransports.has('do-rpc') === false) { throw new Error('Do RPC transport is not enabled (based on `options.enabledTransports`)') } return createDoRpcHandler({ payload, input: { doSelf: this, doOptions: options } }).pipe( Effect.withSpan('@livestore/sync-cf:durable-object:rpc'), this.runEffectAsPromise, ) } /** * Handles HTTP RPC calls * * Requires the `enable_request_signal` compatibility flag to properly support `pull` streaming responses */ private handleHttp = (request: CfTypes.Request, forwardedHeaders: Record | undefined) => createHttpRpcHandler({ request, ...(options?.http?.responseHeaders !== undefined ? { responseHeaders: options.http.responseHeaders } : {}), ...(forwardedHeaders !== undefined ? { forwardedHeaders } : {}), }).pipe(Effect.withSpan('@livestore/sync-cf:durable-object:handleHttp')) private runEffectAsPromise = (effect: Effect.Effect): Promise => effect.pipe( Effect.tapCauseLogPretty, Logger.withMinimumLogLevel(LogLevel.Debug), Effect.provide(Layer.mergeAll(Observability, Logging)), Effect.scoped, Effect.runPromise, ) } }