/*
This file is part of web3.js.
web3.js is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
web3.js is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with web3.js. If not, see .
*/
import {
DataFormat,
DEFAULT_RETURN_FORMAT,
EIP1193Provider,
JsonRpcNotification,
JsonRpcSubscriptionResult,
JsonRpcSubscriptionResultOld,
Log,
Web3APISpec,
Web3BaseProvider,
} from 'web3-types';
import { ProviderError, SubscriptionError } from 'web3-errors';
import { isNullish } from 'web3-utils';
import { isSupportSubscriptions } from './utils.js';
import { Web3RequestManager, Web3RequestManagerEvent } from './web3_request_manager.js';
// eslint-disable-next-line import/no-cycle
import { Web3SubscriptionConstructor } from './web3_subscriptions.js';
type ShouldUnsubscribeCondition = ({
id,
sub,
}: {
id: string;
sub: unknown;
}) => boolean | undefined;
export class Web3SubscriptionManager<
API extends Web3APISpec = Web3APISpec,
RegisteredSubs extends { [key: string]: Web3SubscriptionConstructor } = {
[key: string]: Web3SubscriptionConstructor;
},
> {
private readonly _subscriptions: Map<
string,
InstanceType
> = new Map();
/**
*
* @param - requestManager
* @param - registeredSubscriptions
*
* @example
* ```ts
* const requestManager = new Web3RequestManager("ws://localhost:8545");
* const subscriptionManager = new Web3SubscriptionManager(requestManager, {});
* ```
*/
public constructor(
requestManager: Web3RequestManager,
registeredSubscriptions: RegisteredSubs,
);
/**
* @deprecated This constructor overloading should not be used
*/
public constructor(
requestManager: Web3RequestManager,
registeredSubscriptions: RegisteredSubs,
tolerateUnlinkedSubscription: boolean,
);
public constructor(
public readonly requestManager: Web3RequestManager,
public readonly registeredSubscriptions: RegisteredSubs,
private readonly tolerateUnlinkedSubscription: boolean = false,
) {
this.requestManager.on(Web3RequestManagerEvent.BEFORE_PROVIDER_CHANGE, async () => {
await this.unsubscribe();
});
this.requestManager.on(Web3RequestManagerEvent.PROVIDER_CHANGED, () => {
this.clear();
this.listenToProviderEvents();
});
this.listenToProviderEvents();
}
private listenToProviderEvents() {
const providerAsWebProvider = this.requestManager.provider as Web3BaseProvider;
if (
!this.requestManager.provider ||
(typeof providerAsWebProvider?.supportsSubscriptions === 'function' &&
!providerAsWebProvider?.supportsSubscriptions())
) {
return;
}
if (typeof (this.requestManager.provider as EIP1193Provider).on === 'function') {
if (
typeof (this.requestManager.provider as EIP1193Provider).request === 'function'
) {
// Listen to provider messages and data
(this.requestManager.provider as EIP1193Provider).on(
'message',
// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-argument
(message: any) => this.messageListener(message),
);
} else {
// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-argument
providerAsWebProvider.on('data', (data: any) => this.messageListener(data));
}
}
}
protected messageListener(
data?:
| JsonRpcSubscriptionResult
| JsonRpcSubscriptionResultOld
| JsonRpcNotification,
) {
if (!data) {
throw new SubscriptionError('Should not call messageListener with no data. Type was');
}
const subscriptionId =
(data as JsonRpcNotification).params?.subscription ||
(data as JsonRpcSubscriptionResultOld).data?.subscription ||
(data as JsonRpcSubscriptionResult).id?.toString(16);
// Process if the received data is related to a subscription
if (subscriptionId) {
const sub = this._subscriptions.get(subscriptionId);
sub?.processSubscriptionData(data);
}
}
/**
* Will create a new subscription
*
* @param name - The subscription you want to subscribe to
* @param args - Optional additional parameters, depending on the subscription type
* @param returnFormat- ({@link DataFormat} defaults to {@link DEFAULT_RETURN_FORMAT}) - Specifies how the return data from the call should be formatted.
*
* Will subscribe to a specific topic (note: name)
* @returns The subscription object
*/
public async subscribe(
name: T,
args?: ConstructorParameters[0],
returnFormat: DataFormat = DEFAULT_RETURN_FORMAT,
): Promise> {
const Klass: RegisteredSubs[T] = this.registeredSubscriptions[name];
if (!Klass) {
throw new SubscriptionError('Invalid subscription type');
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
const subscription = new Klass(args ?? undefined, {
subscriptionManager: this as Web3SubscriptionManager,
returnFormat,
// eslint.disable-next-line @typescript-eslint/no-unsafe-any
} as any) as InstanceType;
await this.addSubscription(subscription);
return subscription;
}
/**
* Will returns all subscriptions.
*/
public get subscriptions() {
return this._subscriptions;
}
/**
*
* Adds an instance of {@link Web3Subscription} and subscribes to it
*
* @param sub - A {@link Web3Subscription} object
*/
public async addSubscription(sub: InstanceType) {
if (!this.requestManager.provider) {
throw new ProviderError('Provider not available');
}
if (!this.supportsSubscriptions()) {
throw new SubscriptionError('The current provider does not support subscriptions');
}
if (sub.id && this._subscriptions.has(sub.id)) {
throw new SubscriptionError(`Subscription with id "${sub.id}" already exists`);
}
await sub.sendSubscriptionRequest();
if (isNullish(sub.id)) {
throw new SubscriptionError('Subscription is not subscribed yet.');
}
this._subscriptions.set(sub.id, sub);
return sub.id;
}
/**
* Will clear a subscription
*
* @param id - The subscription of type {@link Web3Subscription} to remove
*/
public async removeSubscription(sub: InstanceType) {
const { id } = sub;
if (isNullish(id)) {
throw new SubscriptionError(
'Subscription is not subscribed yet. Or, had already been unsubscribed but not through the Subscription Manager.',
);
}
if (!this._subscriptions.has(id) && !this.tolerateUnlinkedSubscription) {
throw new SubscriptionError(`Subscription with id "${id.toString()}" does not exists`);
}
await sub.sendUnsubscribeRequest();
this._subscriptions.delete(id);
return id;
}
/**
* Will unsubscribe all subscriptions that fulfill the condition
*
* @param condition - A function that access and `id` and a `subscription` and return `true` or `false`
* @returns An array of all the un-subscribed subscriptions
*/
public async unsubscribe(condition?: ShouldUnsubscribeCondition) {
const result = [];
for (const [id, sub] of this.subscriptions.entries()) {
if (!condition || (typeof condition === 'function' && condition({ id, sub }))) {
result.push(this.removeSubscription(sub));
}
}
return Promise.all(result);
}
/**
* Clears all subscriptions
*/
public clear() {
this._subscriptions.clear();
}
/**
* Check whether the current provider supports subscriptions.
*
* @returns `true` or `false` depending on if the current provider supports subscriptions
*/
public supportsSubscriptions(): boolean {
return isNullish(this.requestManager.provider)
? false
: isSupportSubscriptions(this.requestManager.provider);
}
}