import type { CfTypes } from '@livestore/common-cf' import { Effect, HttpApp, Layer, RpcSerialization, RpcServer } from '@livestore/utils/effect' import { SyncHttpRpc } from '../../../common/http-rpc-schema.ts' import * as SyncMessage from '../../../common/sync-message-types.ts' import { headersRecordToMap } from '../../shared.ts' import { DoCtx } from '../layer.ts' import { makeEndingPullStream } from '../pull.ts' import { makePush } from '../push.ts' export const createHttpRpcHandler = Effect.fn('createHttpRpcHandler')(function* ({ request, responseHeaders, forwardedHeaders, }: { request: CfTypes.Request responseHeaders?: Record forwardedHeaders?: Record }) { const handlerLayer = createHttpRpcLayer(forwardedHeaders) const httpApp = RpcServer.toHttpApp(SyncHttpRpc).pipe(Effect.provide(handlerLayer)) const webHandler = yield* httpApp.pipe(Effect.map(HttpApp.toWebHandler)) const response = yield* Effect.promise( () => webHandler(request as TODO as Request) as TODO as Promise, ).pipe(Effect.timeout(10000)) if (responseHeaders !== undefined) { for (const [key, value] of Object.entries(responseHeaders)) { response.headers.set(key, value) } } return response }) const createHttpRpcLayer = (forwardedHeaders: Record | undefined) => { const headers = headersRecordToMap(forwardedHeaders) // TODO implement admin requests return SyncHttpRpc.toLayer({ 'SyncHttpRpc.Pull': (req) => makeEndingPullStream({ req, payload: req.payload, headers }), 'SyncHttpRpc.Push': (req) => Effect.gen(function* () { const { ctx, env, doOptions, storeId } = yield* DoCtx const push = makePush({ payload: undefined, headers, options: doOptions, storeId, ctx, env }) return yield* push(req) }), 'SyncHttpRpc.Ping': () => Effect.succeed(SyncMessage.Pong.make({})), }).pipe( Layer.provideMerge(RpcServer.layerProtocolHttp({ path: '/http-rpc' })), Layer.provideMerge(RpcSerialization.layerJson), ) }