import type { Convergence } from '../../Convergence'; import { OperationConstructor, Operation, KeyOfOperation, InputOfOperation, OutputOfOperation, OperationHandler, OperationOptions, OperationScope, } from '../../types'; import { Disposable, DisposableScope } from '../../utils'; import { OperationHandlerMissingError } from '../../errors'; import { AsyncCollection, toCollection } from './AsyncCollection'; /** * @group Modules */ export class OperationClient { /** * Maps the name of an operation with its operation handler. * Whilst the types on the Map are relatively loose, we ensure * operations match with their handlers when registering them. */ protected operationHandlers: Map< string, OperationHandler > = new Map(); constructor(protected readonly convergence: Convergence) {} register< T extends Operation, K extends string = KeyOfOperation, I = InputOfOperation, O = OutputOfOperation >( operationConstructor: OperationConstructor, operationHandler: OperationHandler ) { this.operationHandlers.set(operationConstructor.key, operationHandler); return this; } get< T extends Operation, K extends string = KeyOfOperation, I = InputOfOperation, O = OutputOfOperation >(operation: T): OperationHandler { const operationHandler = this.operationHandlers.get(operation.key) as | OperationHandler | undefined; if (!operationHandler) { throw new OperationHandlerMissingError(operation.key); } return operationHandler; } async execute< T extends Operation, K extends string = KeyOfOperation, I = InputOfOperation, O = OutputOfOperation >(operation: T, options: OperationOptions = {}): Promise { const operationHandler = this.get(operation); const signal = options.signal ?? new AbortController().signal; const process = async (scope: DisposableScope): Promise => { const result = operationHandler.handle( operation, this.convergence, this.getOperationScope(options, scope) ); if (Symbol.asyncIterator in Object(result)) { // throw new TypeError('You cannot call execute ') const values: O[] = []; for await (const value of result as AsyncGenerator) { values.push(value); } return values as O | Promise; } return (await result) as O | Promise; }; return new Disposable(signal).run(process); } toCollection< T extends Operation, K extends string = KeyOfOperation, I = InputOfOperation, O = OutputOfOperation >(operation: T, options: OperationOptions = {}): AsyncCollection { const operationHandler = this.get(operation); const signal = options.signal ?? new AbortController().signal; const process = (scope: DisposableScope) => { const result = operationHandler.handle( operation, this.convergence, this.getOperationScope(options, scope) ); if (Symbol.asyncIterator in Object(result)) { const generator = result as AsyncGenerator; return toCollection(() => generator); } throw new TypeError('toCollection not supported'); }; return new Disposable(signal).runSync(process); } protected getOperationScope( options: OperationOptions, scope: DisposableScope ): OperationScope { if (!!options.commitment && !options.confirmOptions) { options.confirmOptions = { commitment: options.commitment }; } const payer = options.payer ?? this.convergence.rpc().getDefaultFeePayer(); return { ...options, ...scope, payer }; } }