/** * @since 1.0.0 */ /// import { DurableObject } from "cloudflare:workers" import * as Cause from "effect/Cause" import * as Effect from "effect/Effect" import type * as Layer from "effect/Layer" import * as ManagedRuntime from "effect/ManagedRuntime" import { RemoteId } from "../EventJournal.js" import type { EncryptedRemoteEntry } from "../EventLogEncryption.js" import * as EventLogRemote from "../EventLogRemote.js" import * as EventLogServer from "../EventLogServer.js" /** * @since 1.0.0 * @category DurableObject */ export abstract class EventLogDurableObject extends DurableObject { /** * @since 1.0.0 */ readonly runtime: ManagedRuntime.ManagedRuntime constructor(options: { readonly ctx: DurableObjectState readonly env: unknown readonly storageLayer: Layer.Layer }) { super(options.ctx, options.env) this.ctx.setHibernatableWebSocketEventTimeout(5000) this.runtime = ManagedRuntime.make(options.storageLayer) } /** * @since 1.0.0 */ webSocketMessage(ws: WebSocket, message: string | ArrayBuffer) { return this.handleRequest( ws, EventLogRemote.decodeRequest( message instanceof ArrayBuffer ? new Uint8Array(message) : new TextEncoder().encode(message) ) ) } private chunks = new Map< number, { readonly parts: Array count: number bytes: number } >() /** * @since 1.0.0 */ private async handleRequest( ws: WebSocket, request: typeof EventLogRemote.ProtocolRequest.Type ): Promise { switch (request._tag) { case "WriteEntries": { return Effect.gen(this, function*() { const storage = yield* EventLogServer.Storage const entries = request.encryptedEntries.map( ({ encryptedEntry, entryId }) => new EventLogServer.PersistedEntry({ entryId, iv: request.iv, encryptedEntry }) ) const encryptedEntries = yield* storage.write( request.publicKey, entries ) ws.send( EventLogRemote.encodeResponse( new EventLogRemote.Ack({ id: request.id, sequenceNumbers: encryptedEntries.map((_) => _.sequence) }) ) ) const changes = this.encodeChanges( request.publicKey, encryptedEntries ) for (const peer of this.ctx.getWebSockets()) { if (peer === ws) continue for (const change of changes) { peer.send(change) } } }).pipe(this.runtime.runPromise) } case "ChunkedMessage": { const data = EventLogRemote.ChunkedMessage.join(this.chunks, request) if (!data) return return this.handleRequest(ws, EventLogRemote.decodeRequest(data)) } case "RequestChanges": { return Effect.gen(this, function*() { const storage = yield* EventLogServer.Storage const entries = yield* storage.entries( request.publicKey, request.startSequence ) if (entries.length === 0) return const changes = this.encodeChanges(request.publicKey, entries) for (const change of changes) { ws.send(change) } }).pipe(this.runtime.runPromise) } } } /** * @since 1.0.0 */ private encodeChanges( publicKey: string, entries: ReadonlyArray ): ReadonlyArray { let changes = [ EventLogRemote.encodeResponse( new EventLogRemote.Changes({ publicKey, entries }) ) ] if (changes[0].byteLength > 512_000) { changes = EventLogRemote.ChunkedMessage.split( Math.floor(Math.random() * 1_000_000_000), changes[0] ).map((_) => EventLogRemote.encodeResponse(_)) } return changes } /** * @since 1.0.0 */ webSocketError(_ws: WebSocket, error: Error): void { this.runtime.runFork(Effect.logWarning(Cause.fail(error))) } /** * @since 1.0.0 */ webSocketClose(_ws: WebSocket, code: number, reason: string): void { this.runtime.runFork(Effect.logWarning("WebSocket closed", { code, reason })) } /** * @since 1.0.0 */ async fetch(): Promise { const webSocketPair = new WebSocketPair() const [client, server] = Object.values(webSocketPair) this.ctx.acceptWebSocket(server) EventLogServer.Storage.pipe( Effect.flatMap((_) => _.getId), Effect.tap((remoteId) => { server.send( EventLogRemote.encodeResponse( new EventLogRemote.Hello({ remoteId: RemoteId.make(remoteId) }) ) ) }), this.runtime.runFork ) return new Response(null, { status: 101, webSocket: client }) } }