import { UnknownError } from '@livestore/common' import { type CfTypes, toDurableObjectHandler } from '@livestore/common-cf' import { Effect, Headers, HttpServer, Layer, Logger, LogLevel, Option, RpcSerialization, Stream, } from '@livestore/utils/effect' import { SyncDoRpc } from '../../../common/do-rpc-schema.ts' import { SyncMessage } from '../../../common/mod.ts' import { DoCtx, type DoCtxInput } from '../layer.ts' import { makeEndingPullStream } from '../pull.ts' import { makePush } from '../push.ts' export interface DoRpcHandlerOptions { payload: Uint8Array input: Omit } export const createDoRpcHandler = ( options: DoRpcHandlerOptions, ): Effect.Effect | CfTypes.ReadableStream> => Effect.gen(this, function* () { const { payload, input } = options // const { rpcSubscriptions, backendId, doOptions, ctx, env } = yield* DoCtx // TODO add admin RPCs const RpcLive = SyncDoRpc.toLayer({ 'SyncDoRpc.Ping': (_req) => { return Effect.succeed(SyncMessage.Pong.make({})) }, 'SyncDoRpc.Pull': (req, { headers }) => Effect.gen(this, function* () { const { rpcSubscriptions } = yield* DoCtx // TODO rename `req.rpcContext` to something more appropriate if (req.rpcContext !== undefined) { rpcSubscriptions.set(req.storeId, { storeId: req.storeId, subscribedAt: Date.now(), requestId: Headers.get(headers, 'x-rpc-request-id').pipe(Option.getOrThrow), callerContext: req.rpcContext.callerContext, ...(req.payload !== undefined ? { payload: req.payload } : {}), }) } // DO-RPC doesn't have HTTP headers context - headers are undefined return makeEndingPullStream({ req, payload: req.payload, headers: undefined }) }).pipe( Stream.unwrap, Stream.map((res) => ({ ...res, rpcRequestId: Headers.get(headers, 'x-rpc-request-id').pipe(Option.getOrThrow), })), Stream.provideLayer(DoCtx.Default({ ...input, from: { storeId: req.storeId } })), Stream.mapError((cause) => cause._tag === 'UnknownError' || cause._tag === 'BackendIdMismatchError' ? cause : new UnknownError({ cause }), ), Stream.tapErrorCause(Effect.log), ), 'SyncDoRpc.Push': (req) => Effect.gen(this, function* () { const { doOptions, ctx, env, storeId } = yield* DoCtx // DO-RPC doesn't have HTTP headers context - headers are undefined const push = makePush({ storeId, payload: req.payload, headers: undefined, options: doOptions, ctx, env }) return yield* push(req) }).pipe( Effect.provide(DoCtx.Default({ ...input, from: { storeId: req.storeId } })), Effect.mapError((cause) => cause._tag === 'UnknownError' || cause._tag === 'ServerAheadError' || cause._tag === 'BackendIdMismatchError' ? cause : new UnknownError({ cause }), ), Effect.tapCauseLogPretty, ), }) const handler = toDurableObjectHandler(SyncDoRpc, { layer: Layer.mergeAll(RpcLive, RpcSerialization.layerJson, HttpServer.layerContext).pipe( Layer.provide(Logger.consoleWithThread('SyncDo')), Layer.provide(Logger.minimumLogLevel(LogLevel.Debug)), ), }) return yield* handler(payload) }).pipe(Effect.withSpan('createDoRpcHandler'))