import "../_dnt.polyfills.js"; import { Calls, Subscription, Subscriptions } from "../rpc/known/mod.js" import { Connection, ConnectionError, RpcSubscriptionMessage, ServerError } from "../rpc/mod.js" import { is, MetaRune, Run, Rune, RunicArgs, Runner, RunStream } from "../rune/mod.js" class RunConnection extends Run { controller = new AbortController() constructor( ctx: Runner, readonly initConnection: (signal: AbortSignal) => Connection | Promise, ) { super(ctx) } connection?: Connection async _evaluate(): Promise { return this.connection ??= await this.initConnection(this.controller.signal) } override cleanup(): void { this.controller.abort() } } export class ConnectionRune extends Rune { static from(init: (signal: AbortSignal) => Connection | Promise) { return Rune.new(RunConnection, init).into(ConnectionRune) } call( callMethod: K, ...args: RunicArgs]> ) { return Rune .tuple([this.as(Rune), ...args]) .map(async ([connection, ...params]) => { const result = await connection.call(callMethod, params) if (result.error) throw new ServerError(result) return result.result as ReturnType }) .throws(is(ConnectionError), is(ServerError)) } subscribe( subscribeMethod: K, unsubscribeMethod: Subscription.UnsubscribeMethod>, ...args: RunicArgs]> ) { return Rune.tuple([this, ...args]) .map(([connection, ...params]) => Rune.new( RunRpcSubscription, connection, params, subscribeMethod, unsubscribeMethod, ) ) .into(MetaRune) .flat() .map((event) => { if (event.error) throw new ServerError(event) return event.params.result as Subscription.Result> }) .throws(is(ConnectionError), is(ServerError)) } } class RunRpcSubscription extends RunStream { controller = new AbortController() constructor( ctx: Runner, connection: Connection, params: unknown[], subscribeMethod: string, unsubscribeMethod: string, ) { super(ctx) connection.subscription( subscribeMethod, unsubscribeMethod, params, (value) => this.push(value), this.controller.signal, ) } override cleanup(): void { this.controller.abort() } }