Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor subscription's logic #6210

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions packages/web3-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,17 @@ Documentation:

## [Unreleased]

### Added

- Web3Subscription constructor accept a Subscription Manager (as an alternative to accepting Request Manager that is now marked marked as deprecated) (#6210)

### Changed

- Web3Subscription constructor overloading that accept a Request Manager is marked as deprecated (#6210)

### Fixed

- Fixed Batch requests erroring out on one request (#6164)
- Fixed the issue: Subscribing to multiple blockchain events causes every listener to be fired for every registered event (#6210)
- Fixed the issue: Unsubscribe at a Web3Subscription class will still have the id of the subscription at the Web3SubscriptionManager (#6210)
- Fixed the issue: A call to the provider is made for every subscription object (#6210)
3 changes: 2 additions & 1 deletion packages/web3-core/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ export const isLegacySendAsyncProvider = <API extends Web3APISpec>(
export const isSupportedProvider = <API extends Web3APISpec>(
provider: SupportedProviders<API>,
): provider is SupportedProviders<API> =>
Web3BaseProvider.isWeb3Provider(provider) ||
isWeb3Provider(provider) ||
isEIP1193Provider(provider) ||
isLegacyRequestProvider(provider) ||
isLegacySendAsyncProvider(provider) ||
isLegacySendProvider(provider);
Expand Down
10 changes: 5 additions & 5 deletions packages/web3-core/src/web3_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export class Web3Context<
public static givenProvider?: SupportedProviders<never>;
public readonly providers = Web3RequestManager.providers;
protected _requestManager: Web3RequestManager<API>;
protected _subscriptionManager?: Web3SubscriptionManager<API, RegisteredSubs>;
protected _subscriptionManager: Web3SubscriptionManager<API, RegisteredSubs>;
protected _accountProvider?: Web3AccountProvider<Web3BaseWalletAccount>;
protected _wallet?: Web3BaseWallet<Web3BaseWalletAccount>;

Expand Down Expand Up @@ -146,10 +146,10 @@ export class Web3Context<

if (subscriptionManager) {
this._subscriptionManager = subscriptionManager;
} else if (registeredSubscriptions) {
} else {
this._subscriptionManager = new Web3SubscriptionManager(
this.requestManager,
registeredSubscriptions,
registeredSubscriptions ?? ({} as RegisteredSubs),
);
}

Expand Down Expand Up @@ -195,8 +195,7 @@ export class Web3Context<
provider: this.provider,
requestManager: this.requestManager,
subscriptionManager: this.subscriptionManager,
registeredSubscriptions: this.subscriptionManager
?.registeredSubscriptions as RegisteredSubs,
registeredSubscriptions: this.subscriptionManager?.registeredSubscriptions,
providers: this.providers,
wallet: this.wallet,
accountProvider: this.accountProvider,
Expand Down Expand Up @@ -231,6 +230,7 @@ export class Web3Context<
this.setConfig(parentContext.config);
this._requestManager = parentContext.requestManager;
this.provider = parentContext.provider;
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
this._subscriptionManager = parentContext.subscriptionManager;
this._wallet = parentContext.wallet;
this._accountProvider = parentContext._accountProvider;
Expand Down
123 changes: 101 additions & 22 deletions packages/web3-core/src/web3_subscription_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,22 @@ You should have received a copy of the GNU Lesser General Public License
along with web3.js. If not, see <http://www.gnu.org/licenses/>.
*/

import { DataFormat, DEFAULT_RETURN_FORMAT, Web3APISpec } from 'web3-types';
import {
DataFormat,
DEFAULT_RETURN_FORMAT,
EIP1193Provider,
JsonRpcNotification,
JsonRpcSubscriptionResult,
JsonRpcSubscriptionResultOld,
Log,
Web3APISpec,
Web3BaseProvider,
} from 'web3-types';
import { ProviderError, SubscriptionError } from 'web3-errors';
import { isNullish } from 'web3-utils';
import { isSupportSubscriptions } from './utils.js';
import { Web3RequestManager, Web3RequestManagerEvent } from './web3_request_manager.js';
// eslint-disable-next-line import/no-cycle
import { Web3SubscriptionConstructor } from './web3_subscriptions.js';

type ShouldUnsubscribeCondition = ({
Expand All @@ -31,8 +42,10 @@ type ShouldUnsubscribeCondition = ({
}) => boolean | undefined;

export class Web3SubscriptionManager<
API extends Web3APISpec,
RegisteredSubs extends { [key: string]: Web3SubscriptionConstructor<API> },
API extends Web3APISpec = Web3APISpec,
RegisteredSubs extends { [key: string]: Web3SubscriptionConstructor<API> } = {
[key: string]: Web3SubscriptionConstructor<API>;
},
> {
private readonly _subscriptions: Map<
string,
Expand All @@ -41,34 +54,97 @@ export class Web3SubscriptionManager<

/**
*
* @param requestManager
* @param registeredSubscriptions
* @param - requestManager
* @param - registeredSubscriptions
*
* @example
* ```ts
* const requestManager = new Web3RequestManager("ws://localhost:8545");
* const subscriptionManager = new Web3SubscriptionManager(requestManager, {});
* ```
*/
public constructor(
requestManager: Web3RequestManager<API>,
registeredSubscriptions: RegisteredSubs,
);
/**
* @deprecated This constructor overloading should not be used
*/
public constructor(
requestManager: Web3RequestManager<API>,
registeredSubscriptions: RegisteredSubs,
tolerateUnlinkedSubscription: boolean,
);
public constructor(
public readonly requestManager: Web3RequestManager<API>,
public readonly registeredSubscriptions: RegisteredSubs,
private readonly tolerateUnlinkedSubscription: boolean = false,
) {
this.requestManager.on(Web3RequestManagerEvent.BEFORE_PROVIDER_CHANGE, async () => {
await this.unsubscribe();
});

this.requestManager.on(Web3RequestManagerEvent.PROVIDER_CHANGED, () => {
this.clear();
this.listenToProviderEvents();
});

this.listenToProviderEvents();
}

private listenToProviderEvents() {
const providerAsWebProvider = this.requestManager.provider as Web3BaseProvider;
if (
!this.requestManager.provider ||
(typeof providerAsWebProvider?.supportsSubscriptions === 'function' &&
!providerAsWebProvider?.supportsSubscriptions())
) {
return;
}

if (typeof (this.requestManager.provider as EIP1193Provider<API>).on === 'function') {
if (
typeof (this.requestManager.provider as EIP1193Provider<API>).request === 'function'
) {
// Listen to provider messages and data
(this.requestManager.provider as EIP1193Provider<API>).on(
'message',
// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-argument
(message: any) => this.messageListener(message),
);
} else {
// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-argument
providerAsWebProvider.on<Log>('data', (data: any) => this.messageListener(data));
}
}
}

protected messageListener(
data?:
| JsonRpcSubscriptionResult
| JsonRpcSubscriptionResultOld<Log>
| JsonRpcNotification<Log>,
) {
if (!data) {
throw new SubscriptionError('Should not call messageListener with no data. Type was');
}
const subscriptionId =
(data as JsonRpcNotification).params?.subscription ||
(data as JsonRpcSubscriptionResultOld).data?.subscription ||
(data as JsonRpcSubscriptionResult).id?.toString(16);

// Process if the received data is related to a subscription
if (subscriptionId) {
const sub = this._subscriptions.get(subscriptionId);
sub?.processSubscriptionData(data);
}
}
/**
* Will create a new subscription
*
* @param name - The subscription you want to subscribe to
* @param args (optional) - Optional additional parameters, depending on the subscription type
* @param returnFormat ({@link DataFormat} defaults to {@link DEFAULT_RETURN_FORMAT}) - Specifies how the return data from the call should be formatted.
* @param args - Optional additional parameters, depending on the subscription type
* @param returnFormat- ({@link DataFormat} defaults to {@link DEFAULT_RETURN_FORMAT}) - Specifies how the return data from the call should be formatted.
*
* Will subscribe to a specific topic (note: name)
* @returns The subscription object
Expand All @@ -78,19 +154,16 @@ export class Web3SubscriptionManager<
args?: ConstructorParameters<RegisteredSubs[T]>[0],
returnFormat: DataFormat = DEFAULT_RETURN_FORMAT,
): Promise<InstanceType<RegisteredSubs[T]>> {
if (!this.requestManager.provider) {
throw new ProviderError('Provider not available');
}

const Klass: RegisteredSubs[T] = this.registeredSubscriptions[name];
if (!Klass) {
throw new SubscriptionError('Invalid subscription type');
}

// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
const subscription = new Klass(args ?? undefined, {
requestManager: this.requestManager,
subscriptionManager: this as Web3SubscriptionManager<API, RegisteredSubs>,
returnFormat,
}) as InstanceType<RegisteredSubs[T]>;
} as any) as InstanceType<RegisteredSubs[T]>;

await this.addSubscription(subscription);

Expand All @@ -111,6 +184,10 @@ export class Web3SubscriptionManager<
* @param sub - A {@link Web3Subscription} object
*/
public async addSubscription(sub: InstanceType<RegisteredSubs[keyof RegisteredSubs]>) {
if (!this.requestManager.provider) {
throw new ProviderError('Provider not available');
}

if (!this.supportsSubscriptions()) {
throw new SubscriptionError('The current provider does not support subscriptions');
}
Expand All @@ -119,34 +196,36 @@ export class Web3SubscriptionManager<
throw new SubscriptionError(`Subscription with id "${sub.id}" already exists`);
}

await sub.subscribe();
await sub.sendSubscriptionRequest();

if (isNullish(sub.id)) {
throw new SubscriptionError('Subscription is not subscribed yet.');
}

this._subscriptions.set(sub.id, sub);

return sub.id;
}

/**
* Will clear a subscription
*
* @param id - The subscription of type {@link Web3Subscription} to remove
*/

public async removeSubscription(sub: InstanceType<RegisteredSubs[keyof RegisteredSubs]>) {
if (isNullish(sub.id)) {
const { id } = sub;

if (isNullish(id)) {
throw new SubscriptionError(
'Subscription is not subscribed yet. Or, had already been unsubscribed but not through the Subscription Manager.',
);
}

if (!this._subscriptions.has(sub.id)) {
throw new SubscriptionError(
`Subscription with id "${sub.id.toString()}" does not exists`,
);
if (!this._subscriptions.has(id) && !this.tolerateUnlinkedSubscription) {
throw new SubscriptionError(`Subscription with id "${id.toString()}" does not exists`);
}
const { id } = sub;
await sub.unsubscribe();

await sub.sendUnsubscribeRequest();
this._subscriptions.delete(id);
return id;
}
Expand Down
Loading