import { AbortedError, UnexpectedMessageError, WebSocketClosedError } from "./errors.ts"; import { promiseWithResolvers } from "./promise.ts"; import { type MessageFormat, recv, send } from "./sendrecv.ts"; import { closeTag, type TaggedWebSocket } from "./taggedws.ts"; export interface Procedure { ( helpers: ProcedureHelpers, ): Generator, R, TRecv>; } export interface ProcedureHelpers { send: (payload: TSend) => SendCommand; recv: () => RecvCommand; expect: (predicate: (message: TRecv) => boolean) => RecvCommand; settle: (promise: Promise) => SettleCommand; } export type ProcedureCommand = | SendCommand | RecvCommand | SettleCommand; export interface SendCommand { action: "send"; payload: TSend; } export interface RecvCommand { action: "recv"; predicate?: (payload: TRecv) => boolean; strict: boolean; } export interface SettleCommand { action: "settle"; promise: Promise; } export interface ExecProcedureOptions { ws: TaggedWebSocket; procedure: Procedure; format: MessageFormat; } type ProcedureCont = | { type: "next"; args: [] | [payload: TRecv] } | { type: "throw"; args: [error: Error] }; export async function execProcedure( options: ExecProcedureOptions, ): Promise { const steps = options.procedure(getProcedureHelpers()); let stepResult: IteratorResult, R>; let cont: ProcedureCont = { type: "next", args: [] }; const next = (cont: ProcedureCont) => { switch (cont.type) { case "next": return steps.next(...cont.args); case "throw": return steps.throw(...cont.args); } }; while (!(stepResult = next(cont)).done) { const cmd = stepResult.value; const action = cmd.action; switch (action) { case "send": cont = execSend(cmd, options); break; case "recv": cont = await execRecv(cmd, options); break; case "settle": cont = await execSettle(cmd, options); break; default: throw new Error("Procedure yielded unexpected command"); } } return stepResult.value; } function execSend( cmd: SendCommand, options: ExecProcedureOptions, ): ProcedureCont { send(cmd.payload, options); return { type: "next", args: [] }; } function execRecv( cmd: RecvCommand, options: ExecProcedureOptions, ): Promise> { const { ws } = options; const { promise, resolve, reject } = promiseWithResolvers>(); const ac = new AbortController(); ws.addEventListener( "message", (event: MessageEvent) => { const result = recv(event, options); switch (result.type) { case "error": return resolve({ type: "throw", args: [result.error] }); case "message": { let ok = true; try { if (cmd.predicate) { ok = cmd.predicate(result.message); } } catch (err) { return reject(err); } return ok ? resolve({ type: "next", args: [result.message] }) : cmd.strict ? resolve({ type: "throw", args: [new UnexpectedMessageError(result.message)], }) : undefined; } } }, { signal: ac.signal }, ); rejectOnClose(ws, reject, ac.signal); return promise.finally(() => ac.abort()); } function execSettle( cmd: SettleCommand, options: ExecProcedureOptions, ): Promise> { const ac = new AbortController(); const settle: Promise> = cmd.promise.then( () => ({ type: "next", args: [] }), (error) => ({ type: "throw", args: [error] }), ); const close: Promise = new Promise((_, reject) => { rejectOnClose(options.ws, reject, ac.signal); }); return Promise.race([settle, close]).finally(() => ac.abort()); } function getProcedureHelpers(): ProcedureHelpers { return { send: (payload) => ({ action: "send", payload }), recv: () => ({ action: "recv", strict: false }), expect: (predicate) => ({ action: "recv", predicate, strict: true }), settle: (promise) => ({ action: "settle", promise }), }; } function rejectOnClose( ws: TaggedWebSocket, reject: (reason?: any) => void, signal: AbortSignal, ): void { ws.addEventListener( "close", (event) => { reject( ws[closeTag] ? new AbortedError("Procedure aborted") : new WebSocketClosedError( "Connection was closed while executing procedure", event, ), ); }, { signal }, ); }