import { SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_PLAN, SolanaError } from '@solana/errors'; import { Callable, Flatten, OverloadImplementations, UnionToIntersection } from '@solana/rpc-spec-types'; import { createAsyncIterableFromDataPublisher, createReactiveStoreFromDataPublisher } from '@solana/subscribable'; import { RpcSubscriptionsApi, RpcSubscriptionsPlan } from './rpc-subscriptions-api'; import { PendingRpcSubscriptionsRequest, RpcSubscribeOptions } from './rpc-subscriptions-request'; import { RpcSubscriptionsTransport } from './rpc-subscriptions-transport'; export type RpcSubscriptionsConfig = Readonly<{ api: RpcSubscriptionsApi; transport: RpcSubscriptionsTransport; }>; /** * An object that exposes all of the functions described by `TRpcSubscriptionsMethods`. * * Calling each method returns a * {@link PendingRpcSubscriptionsRequest | PendingRpcSubscriptionsRequest} where * `TNotification` is that method's notification type. */ export type RpcSubscriptions = { [TMethodName in keyof TRpcSubscriptionsMethods]: PendingRpcSubscriptionsRequestBuilder< OverloadImplementations >; }; type PendingRpcSubscriptionsRequestBuilder = UnionToIntersection< Flatten<{ [P in keyof TSubscriptionMethodImplementations]: PendingRpcSubscriptionsRequestReturnTypeMapper< TSubscriptionMethodImplementations[P] >; }> >; type PendingRpcSubscriptionsRequestReturnTypeMapper = // Check that this property of the TRpcSubscriptionMethods interface is, in fact, a function. TSubscriptionMethodImplementation extends Callable ? ( ...args: Parameters ) => PendingRpcSubscriptionsRequest> : never; /** * Creates a {@link RpcSubscriptions} instance given a * {@link RpcSubscriptionsApi | RpcSubscriptionsApi} and a * {@link RpcSubscriptionsTransport} capable of fulfilling them. */ export function createSubscriptionRpc( rpcConfig: RpcSubscriptionsConfig, ): RpcSubscriptions { return new Proxy(rpcConfig.api, { defineProperty() { return false; }, deleteProperty() { return false; }, get(target, p, receiver) { if (p === 'then') { return undefined; } return function (...rawParams: unknown[]) { const notificationName = p.toString(); const createRpcSubscriptionPlan = Reflect.get(target, notificationName, receiver); if (!createRpcSubscriptionPlan) { throw new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_PLAN, { notificationName, }); } const subscriptionPlan = createRpcSubscriptionPlan(...rawParams); return createPendingRpcSubscription(rpcConfig.transport, subscriptionPlan); }; }, }) as RpcSubscriptions; } function createPendingRpcSubscription( transport: RpcSubscriptionsTransport, subscriptionsPlan: RpcSubscriptionsPlan, ): PendingRpcSubscriptionsRequest { return { async reactive({ abortSignal }: RpcSubscribeOptions) { const notificationsDataPublisher = await transport({ signal: abortSignal, ...subscriptionsPlan, }); return createReactiveStoreFromDataPublisher({ abortSignal, dataChannelName: 'notification', dataPublisher: notificationsDataPublisher, errorChannelName: 'error', }); }, async subscribe({ abortSignal }: RpcSubscribeOptions): Promise> { const notificationsDataPublisher = await transport({ signal: abortSignal, ...subscriptionsPlan, }); return createAsyncIterableFromDataPublisher({ abortSignal, dataChannelName: 'notification', dataPublisher: notificationsDataPublisher, errorChannelName: 'error', }); }, }; }