import { randomUUID } from "node:crypto"; import type { DurableConnection, DurableRequest, DurableResponse, } from "./types"; export const DURABLE_HEADER_ROOMID = `X-DJS-ROOMID`; export const DURABLE_HEADER_USERID = `X-DJS-USERID`; interface Options { roomId: string; userId: string; } export const getDurableRequest = ( request: Request, options?: Partial ): DurableRequest => { if (!options || Object.keys(options).length < 1) return request; const newRequest = new Request(request); newRequest.headers.append(DURABLE_HEADER_ROOMID, options.roomId || ""); newRequest.headers.append(DURABLE_HEADER_USERID, options.userId || ""); return newRequest; }; export type DO = { state: DurableObjectState; roomId: string; broadcast: (data: string, ignore?: string[]) => void; env: TEnv; getConnection: (id: string) => DurableConnection | undefined; getConnections: () => DurableConnection[]; }; export abstract class Durable { constructor(readonly durable: DO) {} abstract onFetch?(req: DurableRequest): void | Promise; abstract onStart?(): void | Promise; static onBeforeConnect?(req: DurableRequest): void | Promise; abstract onConnect( connection: DurableConnection, request: DurableRequest ): void | Promise; abstract onMessage?( message: string | ArrayBuffer | ArrayBufferView, connection: DurableConnection ): void | Promise; abstract onClose?( connection: DurableConnection, closeEvent: CloseEvent ): void | Promise; abstract onError?( connection: DurableConnection, errorEvent: ErrorEvent ): void | Promise; abstract onRequest?( req: DurableRequest ): Promise | DurableResponse; } interface Constructable { new (durable: DO): T; } export const _createDurable = (Worker: Constructable) => { return class MyDurable implements DurableObject { worker?: Durable; durable?: DO; #connections: DurableConnection[] = []; constructor(public state: DurableObjectState, public env: any) {} async fetch(request: DurableRequest): Promise { try { if (!this.worker) { if (!this.durable) this.durable = { state: this.state, env: this.env, broadcast: this.broadcast, roomId: request.headers.get(DURABLE_HEADER_ROOMID) || "", getConnection: this.getConnection, getConnections: this.getConnections, }; this.worker = new Worker(this.durable); if (typeof this.worker.onFetch === "function") await this.worker.onFetch(request); } else { if (typeof this.worker.onFetch === "function") await this.worker.onFetch(request); } const upgradeHeader = request.headers.get("Upgrade"); if (!upgradeHeader || upgradeHeader !== "websocket") { if (typeof this.worker.onRequest === "function") return this.worker!.onRequest(request); return new Response("Not Found", { status: 404 }); } const webSocketPair = new WebSocketPair(); const client = webSocketPair[0], server = webSocketPair[1]; server.accept(); const connection = server as DurableConnection; connection.id = request.headers.get(DURABLE_HEADER_USERID) || randomUUID(); this.#connections.push(connection); this.worker?.onConnect(connection, request); const handleMessage = async (e: MessageEvent) => { try { if (typeof this.worker?.onMessage === "function") await this.worker.onMessage(e.data, connection); } catch (error) {} }; const handleClose = async (e: CloseEvent) => { this.#connections = this.#connections.filter( (con) => con.id !== connection.id ); try { if (typeof this.worker?.onClose === "function") await this.worker.onClose(connection, e); } catch (error) {} }; const handleError = async (e: ErrorEvent) => { this.#connections = this.#connections.filter( (con) => con.id !== connection.id ); try { if (typeof this.worker?.onError === "function") await this.worker.onError(connection, e); } catch (error) {} }; server.addEventListener("message", (e) => { handleMessage(e); }); server.addEventListener("close", (e) => { handleClose(e); }); server.addEventListener("error", (e) => { handleError(e); }); return new Response(null, { status: 101, webSocket: client, }); } catch (error) { return new Response("Unexpected Error", { status: 400, }); } } broadcast = (data: string, ignore?: string[]) => { for (let connection of this.#connections) { if (ignore?.includes(connection.id)) continue; connection.send(data); } }; getConnection = (id: string) => { return this.#connections.find((con) => con.id === id); }; getConnections = () => { return this.#connections; }; }; };