import type * as cf from "@cloudflare/workers-types"; import * as Context from "effect/Context"; import * as Effect from "effect/Effect"; import * as Option from "effect/Option"; import type { HttpServerError } from "effect/unstable/http/HttpServerError"; import * as HttpServerRequest from "effect/unstable/http/HttpServerRequest"; import * as HttpServerResponse from "effect/unstable/http/HttpServerResponse"; import type { HttpEffect } from "../../Http.ts"; import * as Output from "../../Output.ts"; import type { PlatformServices } from "../../Platform.ts"; import { effectClass, taggedFunction } from "../../Util/effect.ts"; import { makeRpcStub } from "./Rpc.ts"; import { fromWebSocket, type DurableWebSocket } from "./WebSocket.ts"; import { Worker, WorkerEnvironment, type WorkerServices } from "./Worker.ts"; export interface DurableObjectExport { readonly kind: "durableObject"; readonly make: ( state: cf.DurableObjectState, env: any, ) => Effect.Effect>; } export const isDurableObjectExport = ( value: unknown, ): value is DurableObjectExport => typeof value === "object" && value !== null && "kind" in value && (value as any).kind === "durableObject"; export type DurableObjectId = cf.DurableObjectId; export type DurableObjectJurisdiction = cf.DurableObjectJurisdiction; export type DurableObjectNamespaceGetDurableObjectOptions = cf.DurableObjectNamespaceGetDurableObjectOptions; export type AlarmInvocationInfo = cf.AlarmInvocationInfo; type TypeId = "Cloudflare.DurableObjectNamespace"; const TypeId = "Cloudflare.DurableObjectNamespace"; export interface DurableObjectNamespace { Type: TypeId; name: string; namespaceId: Output.Output; getByName: (name: string) => DurableObjectStub; newUniqueId: () => DurableObjectId; idFromName: (name: string) => DurableObjectId; idFromString: (id: string) => DurableObjectId; get: ( id: DurableObjectId, options?: DurableObjectNamespaceGetDurableObjectOptions, ) => DurableObjectStub; jurisdiction: ( jurisdiction: DurableObjectJurisdiction, ) => DurableObjectNamespace; } export interface DurableObjectShape { fetch?: HttpEffect; alarm?: ( alarmInfo?: AlarmInvocationInfo, ) => Effect.Effect; webSocketMessage?: ( socket: DurableWebSocket, message: string | Uint8Array, ) => Effect.Effect; webSocketClose?: ( socket: DurableWebSocket, code: number, reason: string, wasClean: boolean, ) => Effect.Effect; } export type DurableObjectServices = | DurableObjectNamespace | DurableObjectState | WorkerServices | PlatformServices; export interface DurableObjectNamespaceClass extends Effect.Effect< DurableObjectNamespace, never, DurableObjectNamespace > { (): { ( name: string, impl: Effect.Effect< Effect.Effect, never, InitReq >, ): Effect.Effect< DurableObjectNamespace, never, Worker | Exclude > & { new (_: never): Shape; }; }; ( name: string, impl: Effect.Effect< Effect.Effect, never, InitReq >, ): Effect.Effect< DurableObjectNamespace, never, Worker | Exclude >; } export class DurableObjectNamespaceScope extends Context.Service< DurableObjectNamespaceScope, DurableObjectNamespace >()("Cloudflare.DurableObjectNamespace") {} export const DurableObjectNamespace: DurableObjectNamespaceClass = taggedFunction(DurableObjectNamespaceScope, (( ...args: | [] | [ name: string, impl: Effect.Effect< Effect.Effect< DurableObjectNamespace, never, DurableObjectState > >, ] ) => args.length === 0 ? DurableObjectNamespace : effectClass( Effect.gen(function* () { const [namespace, impl] = args; const worker = yield* Worker; yield* worker.bind`${namespace}`({ // TODO(sam): automate class migrations, probably in the provider bindings: [ { type: "durable_object_namespace", name: namespace, className: namespace, // scriptName: // binding.scriptName === props.workerName // ? undefined // : binding.scriptName, // environment: binding.environment, // namespaceId: binding.namespaceId, }, ], }); const services = yield* Effect.context>(); yield* worker.export(namespace, { kind: "durableObject", make: (state: cf.DurableObjectState, env: any) => { const doState = fromDurableObjectState(state); return constructor.pipe( Effect.provideContext(services), Effect.provideService(DurableObjectState, doState), Effect.provideService(WorkerEnvironment, env), Effect.map((methods: any) => { console.log( "[DurableObject] wrapping methods:", Object.keys(methods), "own:", Object.getOwnPropertyNames(methods), ); const wrapped: Record = {}; for (const key of Object.getOwnPropertyNames(methods)) { const value = methods[key]; if (Effect.isEffect(value)) { wrapped[key] = ( value as Effect.Effect ).pipe( Effect.provideService(DurableObjectState, doState), ); } else if (typeof value === "function") { wrapped[key] = (...args: unknown[]) => { const result = (value as Function)(...args); if (Effect.isEffect(result)) { return ( result as Effect.Effect ).pipe( Effect.provideService( DurableObjectState, doState, ), ); } return result; }; } else { wrapped[key] = value; } } return wrapped; }), ); }, } satisfies DurableObjectExport); const binding = yield* Effect.serviceOption(WorkerEnvironment).pipe( Effect.map(Option.getOrUndefined), Effect.flatMap((env) => { if (env === undefined) { // should be fine to return undefined here (it is only undefined at plantime) return Effect.succeed(undefined); } const ns = env[namespace]; if (!ns) { return Effect.die( new Error( `DurableObjectNamespace '${namespace}' not found`, ), ); } else if (typeof ns.getByName === "function") { return Effect.succeed(ns); } else { return Effect.die( new Error( `DurableObjectNamespace '${namespace}' is not a DurableObjectNamespace`, ), ); } }), ); const namespaceId = worker.durableObjectNamespaces.pipe( Output.map( (durableObjectNamespaces) => durableObjectNamespaces?.[namespace], ), ); const self = { Type: TypeId, LogicalId: namespace, name: namespace, namespaceId, getByName: (name: string) => makeRpcStub(binding.getByName(name)), // newUniqueId: () => use((ns) => ns.newUniqueId()), // idFromName: (name: string) => use((ns) => ns.idFromName(name)), // idFromString: (id: string) => use((ns) => ns.idFromString(id)), // get: ( // id: cf.DurableObjectId, // options?: cf.DurableObjectNamespaceGetDurableObjectOptions, // ) => use((ns) => makeRpcStub(ns.get(id, options))), // jurisdiction: (jurisdiction: cf.DurableObjectJurisdiction) => // use((ns) => ns.jurisdiction(jurisdiction) as any), }; const constructor = yield* impl.pipe( Effect.provideService(DurableObjectNamespaceScope, self as any), ); return self; }), )) as any); export type DurableObjectStub = { // TODO(sam): do we need to transform? hopefully not [key in keyof Shape]: Shape[key]; } & { fetch: ( request: HttpServerRequest.HttpServerRequest, ) => Effect.Effect< HttpServerResponse.HttpServerResponse, HttpServerError, never >; }; export class DurableObjectState extends Context.Service< DurableObjectState, { // TODO(sam): is this needed when we have Effect? // waitUntil(promise: Promise): Effect.Effect; // TODO(sam): what are these? Where do they come from? // readonly props: Props; readonly id: cf.DurableObjectId; readonly storage: DurableObjectStorage; // TODO(sam): effect-native interface for container container?: cf.Container; blockConcurrencyWhile( callback: () => Effect.Effect, ): Effect.Effect; acceptWebSocket(ws: DurableWebSocket, tags?: string[]): Effect.Effect; getWebSockets(tag?: string): Effect.Effect; setWebSocketAutoResponse( maybeReqResp?: cf.WebSocketRequestResponsePair, ): Effect.Effect; getWebSocketAutoResponse(): Effect.Effect; getWebSocketAutoResponseTimestamp( ws: cf.WebSocket, ): Effect.Effect; setHibernatableWebSocketEventTimeout( timeoutMs?: number, ): Effect.Effect; getHibernatableWebSocketEventTimeout(): Effect.Effect; getTags(ws: cf.WebSocket): Effect.Effect; abort(reason?: string): Effect.Effect; } >()("Cloudflare.DurableObjectState") {} export interface DurableObjectTransaction { get( key: string, options?: cf.DurableObjectGetOptions, ): Effect.Effect; get( keys: string[], options?: cf.DurableObjectGetOptions, ): Effect.Effect>; list( options?: cf.DurableObjectListOptions, ): Effect.Effect>; put( key: string, value: T, options?: cf.DurableObjectPutOptions, ): Effect.Effect; put( entries: Record, options?: cf.DurableObjectPutOptions, ): Effect.Effect; delete( key: string, options?: cf.DurableObjectPutOptions, ): Effect.Effect; delete( keys: string[], options?: cf.DurableObjectPutOptions, ): Effect.Effect; rollback(): Effect.Effect; getAlarm( options?: cf.DurableObjectGetAlarmOptions, ): Effect.Effect; setAlarm( scheduledTime: number | Date, options?: cf.DurableObjectSetAlarmOptions, ): Effect.Effect; deleteAlarm(options?: cf.DurableObjectSetAlarmOptions): Effect.Effect; } export interface DurableObjectStorage { get( key: string, options?: cf.DurableObjectGetOptions, ): Effect.Effect; get( keys: string[], options?: cf.DurableObjectGetOptions, ): Effect.Effect>; list( options?: cf.DurableObjectListOptions, ): Effect.Effect>; put( key: string, value: T, options?: cf.DurableObjectPutOptions, ): Effect.Effect; put( entries: Record, options?: cf.DurableObjectPutOptions, ): Effect.Effect; delete( key: string, options?: cf.DurableObjectPutOptions, ): Effect.Effect; delete( keys: string[], options?: cf.DurableObjectPutOptions, ): Effect.Effect; deleteAll(options?: cf.DurableObjectPutOptions): Effect.Effect; transaction( closure: (txn: DurableObjectTransaction) => Effect.Effect, ): Effect.Effect; getAlarm( options?: cf.DurableObjectGetAlarmOptions, ): Effect.Effect; setAlarm( scheduledTime: number | Date, options?: cf.DurableObjectSetAlarmOptions, ): Effect.Effect; deleteAlarm(options?: cf.DurableObjectSetAlarmOptions): Effect.Effect; sync(): Effect.Effect; sql: cf.SqlStorage; kv: cf.SyncKvStorage; transactionSync(closure: () => T): T; getCurrentBookmark(): Effect.Effect; getBookmarkForTime(timestamp: number | Date): Effect.Effect; onNextSessionRestoreBookmark(bookmark: string): Effect.Effect; } const fromDurableObjectState = ( state: cf.DurableObjectState, ): DurableObjectState["Service"] => ({ id: state.id, container: state.container, storage: fromDurableObjectStorage(state.storage), blockConcurrencyWhile: (callback: () => Effect.Effect) => Effect.tryPromise(() => state.blockConcurrencyWhile(() => Effect.runPromise(callback())), ), acceptWebSocket: (ws: DurableWebSocket, tags?: string[]) => Effect.sync(() => state.acceptWebSocket(ws.ws, tags)), getWebSockets: (tag?: string) => Effect.sync(() => state.getWebSockets(tag).map(fromWebSocket)), setWebSocketAutoResponse: (maybeReqResp?: cf.WebSocketRequestResponsePair) => Effect.sync(() => state.setWebSocketAutoResponse(maybeReqResp)), getWebSocketAutoResponse: () => Effect.sync(() => state.getWebSocketAutoResponse()), getWebSocketAutoResponseTimestamp: (ws: cf.WebSocket) => Effect.sync(() => state.getWebSocketAutoResponseTimestamp(ws)), setHibernatableWebSocketEventTimeout: (timeoutMs?: number) => Effect.sync(() => state.setHibernatableWebSocketEventTimeout(timeoutMs)), getHibernatableWebSocketEventTimeout: () => Effect.sync(() => state.getHibernatableWebSocketEventTimeout()), getTags: (ws: cf.WebSocket) => Effect.sync(() => state.getTags(ws)), abort: (reason?: string) => Effect.sync(() => state.abort(reason)), }); const fromDurableObjectTransaction = ( txn: cf.DurableObjectTransaction, ): DurableObjectTransaction => ({ get: ((keyOrKeys: string | string[], options?: cf.DurableObjectGetOptions) => Effect.tryPromise(() => txn.get(keyOrKeys as any, options))) as any, list: (options?: cf.DurableObjectListOptions) => Effect.tryPromise(() => txn.list(options)), put: (( keyOrEntries: string | Record, valueOrOptions?: unknown, maybeOptions?: cf.DurableObjectPutOptions, ) => typeof keyOrEntries === "string" ? Effect.tryPromise(() => txn.put(keyOrEntries, valueOrOptions, maybeOptions), ) : Effect.tryPromise(() => txn.put( keyOrEntries, valueOrOptions as cf.DurableObjectPutOptions | undefined, ), )) as any, delete: (( keyOrKeys: string | string[], options?: cf.DurableObjectPutOptions, ) => Effect.tryPromise(() => txn.delete(keyOrKeys as any, options))) as any, rollback: () => Effect.sync(() => txn.rollback()), getAlarm: (options?: cf.DurableObjectGetAlarmOptions) => Effect.tryPromise(() => txn.getAlarm(options)), setAlarm: ( scheduledTime: number | Date, options?: cf.DurableObjectSetAlarmOptions, ) => Effect.tryPromise(() => txn.setAlarm(scheduledTime, options)), deleteAlarm: (options?: cf.DurableObjectSetAlarmOptions) => Effect.tryPromise(() => txn.deleteAlarm(options)), }); const fromDurableObjectStorage = ( storage: cf.DurableObjectStorage, ): DurableObjectStorage => ({ get: ((keyOrKeys: string | string[], options?: cf.DurableObjectGetOptions) => Effect.tryPromise(() => storage.get(keyOrKeys as any, options))) as any, list: (options?: cf.DurableObjectListOptions) => Effect.tryPromise(() => storage.list(options)), put: (( keyOrEntries: string | Record, valueOrOptions?: unknown, maybeOptions?: cf.DurableObjectPutOptions, ) => typeof keyOrEntries === "string" ? Effect.tryPromise(() => storage.put(keyOrEntries, valueOrOptions, maybeOptions), ) : Effect.tryPromise(() => storage.put( keyOrEntries, valueOrOptions as cf.DurableObjectPutOptions | undefined, ), )) as any, delete: (( keyOrKeys: string | string[], options?: cf.DurableObjectPutOptions, ) => Effect.tryPromise(() => storage.delete(keyOrKeys as any, options))) as any, deleteAll: (options?: cf.DurableObjectPutOptions) => Effect.tryPromise(() => storage.deleteAll(options)), transaction: ( closure: (txn: DurableObjectTransaction) => Effect.Effect, ) => Effect.tryPromise(() => storage.transaction((txn) => Effect.runPromise(closure(fromDurableObjectTransaction(txn))), ), ), getAlarm: (options?: cf.DurableObjectGetAlarmOptions) => Effect.tryPromise(() => storage.getAlarm(options)), setAlarm: ( scheduledTime: number | Date, options?: cf.DurableObjectSetAlarmOptions, ) => Effect.tryPromise(() => storage.setAlarm(scheduledTime, options)), deleteAlarm: (options?: cf.DurableObjectSetAlarmOptions) => Effect.tryPromise(() => storage.deleteAlarm(options)), sync: () => Effect.tryPromise(() => storage.sync()), sql: storage.sql, kv: storage.kv, transactionSync: (closure: () => T) => storage.transactionSync(closure), getCurrentBookmark: () => Effect.tryPromise(() => storage.getCurrentBookmark()), getBookmarkForTime: (timestamp: number | Date) => Effect.tryPromise(() => storage.getBookmarkForTime(timestamp)), onNextSessionRestoreBookmark: (bookmark: string) => Effect.tryPromise(() => storage.onNextSessionRestoreBookmark(bookmark)), });