From 3f6d4eece902f77f5511463596b3abfb1efa2b72 Mon Sep 17 00:00:00 2001 From: tgmichel Date: Mon, 8 May 2023 04:48:01 +0200 Subject: [PATCH] Improve block import notification strategy (#1030) * Improve block import notification strategy * oops * taplo * clippy * Notify only when not major syncing --- Cargo.lock | 15 ++ Cargo.toml | 1 + client/mapping-sync/Cargo.toml | 17 ++ client/mapping-sync/src/lib.rs | 37 +++- client/mapping-sync/src/worker.rs | 319 ++++++++++++++++++++++++++++++ client/rpc/Cargo.toml | 2 + client/rpc/src/eth_pubsub.rs | 16 +- template/node/src/eth.rs | 12 +- template/node/src/rpc/eth.rs | 6 + template/node/src/rpc/mod.rs | 6 + template/node/src/service.rs | 19 +- 11 files changed, 442 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6fa1d0f21d..35f7267707 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1946,17 +1946,30 @@ dependencies = [ name = "fc-mapping-sync" version = "2.0.0-dev" dependencies = [ + "ethereum", + "ethereum-types", "fc-db", "fc-storage", "fp-consensus", "fp-rpc", + "fp-storage", + "frontier-template-runtime", "futures", "futures-timer", "log", + "parking_lot 0.12.1", + "sc-block-builder", "sc-client-api", + "sc-client-db", + "sc-utils", "sp-api", "sp-blockchain", + "sp-consensus", + "sp-core", "sp-runtime", + "substrate-test-runtime-client", + "tempfile", + "tokio", ] [[package]] @@ -1967,6 +1980,7 @@ dependencies = [ "ethereum-types", "evm", "fc-db", + "fc-mapping-sync", "fc-rpc-core", "fc-storage", "fp-ethereum", @@ -1993,6 +2007,7 @@ dependencies = [ "sc-service", "sc-transaction-pool", "sc-transaction-pool-api", + "sc-utils", "sp-api", "sp-block-builder", "sp-blockchain", diff --git a/Cargo.toml b/Cargo.toml index ceb42d3fda..cfff6cb827 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,6 +74,7 @@ sc-service = { version = "0.10.0-dev", git = "https://github.com/paritytech/subs sc-telemetry = { version = "4.0.0-dev", git = "https://github.com/paritytech/substrate", branch = "master" } sc-transaction-pool = { version = "4.0.0-dev", git = "https://github.com/paritytech/substrate", branch = "master" } sc-transaction-pool-api = { version = "4.0.0-dev", git = "https://github.com/paritytech/substrate", branch = "master" } +sc-utils = { version = "4.0.0-dev", git = "https://github.com/paritytech/substrate", branch = "master" } # Substrate Primitive sp-api = { version = "4.0.0-dev", git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } sp-block-builder = { version = "4.0.0-dev", git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } diff --git a/client/mapping-sync/Cargo.toml b/client/mapping-sync/Cargo.toml index 5ca2705103..ef5c8f85d9 100644 --- a/client/mapping-sync/Cargo.toml +++ b/client/mapping-sync/Cargo.toml @@ -14,13 +14,30 @@ targets = ["x86_64-unknown-linux-gnu"] futures = "0.3.25" futures-timer = "3.0.1" log = "0.4.17" +parking_lot = "0.12.1" # Substrate sc-client-api = { workspace = true } sp-api = { workspace = true } sp-blockchain = { workspace = true } +sp-consensus = { workspace = true, features = ["default"] } sp-runtime = { workspace = true } # Frontier fc-db = { workspace = true } fc-storage = { workspace = true } fp-consensus = { workspace = true, features = ["default"] } fp-rpc = { workspace = true, features = ["default"] } +sc-utils = { workspace = true } + +[dev-dependencies] +ethereum = { workspace = true, features = ["with-codec"] } +ethereum-types = { workspace = true } +tempfile = "3.3.0" +tokio = { version = "1.24", features = ["sync"] } +#Frontier +fp-storage = { workspace = true, features = ["default"] } +frontier-template-runtime = { workspace = true, features = ["default"] } +# Substrate +sc-block-builder = { workspace = true } +sc-client-db = { workspace = true } +sp-core = { workspace = true, features = ["default"] } +substrate-test-runtime-client = { workspace = true } diff --git a/client/mapping-sync/src/lib.rs b/client/mapping-sync/src/lib.rs index 1853a9688b..c83398ab9e 100644 --- a/client/mapping-sync/src/lib.rs +++ b/client/mapping-sync/src/lib.rs @@ -29,12 +29,22 @@ use std::sync::Arc; use sc_client_api::backend::{Backend, StorageProvider}; use sp_api::{ApiExt, ProvideRuntimeApi}; use sp_blockchain::{Backend as _, HeaderBackend}; +use sp_consensus::SyncOracle; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Zero}; // Frontier use fc_storage::OverrideHandle; use fp_consensus::{FindLogError, Hashes, Log, PostLog, PreLog}; use fp_rpc::EthereumRuntimeRPCApi; +pub type EthereumBlockNotificationSinks = + parking_lot::Mutex>>; + +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub struct EthereumBlockNotification { + pub is_new_best: bool, + pub hash: Block::Hash, +} + pub fn sync_block( client: &C, overrides: Arc>, @@ -160,6 +170,10 @@ pub fn sync_one_block( frontier_backend: &fc_db::Backend, sync_from: ::Number, strategy: SyncStrategy, + sync_oracle: Arc, + pubsub_notification_sinks: Arc< + EthereumBlockNotificationSinks>, + >, ) -> Result where C: ProvideRuntimeApi, @@ -208,7 +222,6 @@ where frontier_backend .meta() .write_current_syncing_tips(current_syncing_tips)?; - Ok(true) } else { if SyncStrategy::Parachain == strategy && operating_header.number() > &client.info().best_number @@ -221,8 +234,22 @@ where frontier_backend .meta() .write_current_syncing_tips(current_syncing_tips)?; - Ok(true) } + // Notify on import and remove closed channels. + // Only notify when the node is node in major syncing. + let sinks = &mut pubsub_notification_sinks.lock(); + sinks.retain(|sink| { + if !sync_oracle.is_major_syncing() { + let hash = operating_header.hash(); + let is_new_best = client.info().best_hash == hash; + sink.unbounded_send(EthereumBlockNotification { is_new_best, hash }) + .is_ok() + } else { + // Remove from the pool if in major syncing. + false + } + }); + Ok(true) } pub fn sync_blocks( @@ -233,6 +260,10 @@ pub fn sync_blocks( limit: usize, sync_from: ::Number, strategy: SyncStrategy, + sync_oracle: Arc, + pubsub_notification_sinks: Arc< + EthereumBlockNotificationSinks>, + >, ) -> Result where C: ProvideRuntimeApi, @@ -251,6 +282,8 @@ where frontier_backend, sync_from, strategy, + sync_oracle.clone(), + pubsub_notification_sinks.clone(), )?; } diff --git a/client/mapping-sync/src/worker.rs b/client/mapping-sync/src/worker.rs index 06702cbe1a..1299bd7f25 100644 --- a/client/mapping-sync/src/worker.rs +++ b/client/mapping-sync/src/worker.rs @@ -31,6 +31,7 @@ use sc_client_api::{ }; use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; +use sp_consensus::SyncOracle; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; // Frontier use fc_storage::OverrideHandle; @@ -56,6 +57,10 @@ pub struct MappingSyncWorker { retry_times: usize, sync_from: ::Number, strategy: SyncStrategy, + + sync_oracle: Arc, + pubsub_notification_sinks: + Arc>>, } impl Unpin for MappingSyncWorker {} @@ -71,6 +76,10 @@ impl MappingSyncWorker { retry_times: usize, sync_from: ::Number, strategy: SyncStrategy, + sync_oracle: Arc, + pubsub_notification_sinks: Arc< + crate::EthereumBlockNotificationSinks>, + >, ) -> Self { Self { import_notifications, @@ -86,6 +95,9 @@ impl MappingSyncWorker { retry_times, sync_from, strategy, + + sync_oracle, + pubsub_notification_sinks, } } } @@ -137,6 +149,8 @@ where self.retry_times, self.sync_from, self.strategy, + self.sync_oracle.clone(), + self.pubsub_notification_sinks.clone(), ) { Ok(have_next) => { self.have_next = have_next; @@ -153,3 +167,308 @@ where } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks}; + use fc_storage::{OverrideHandle, SchemaV3Override, StorageOverride}; + use fp_storage::{EthereumStorageSchema, PALLET_ETHEREUM_SCHEMA}; + use futures::executor; + use sc_block_builder::BlockBuilderProvider; + use sc_client_api::BlockchainEvents; + use sp_api::Encode; + use sp_consensus::BlockOrigin; + use sp_core::{H160, H256, U256}; + use sp_runtime::{generic::Header, traits::BlakeTwo256, Digest}; + use std::collections::BTreeMap; + use substrate_test_runtime_client::{ + ClientBlockImportExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt, + }; + use tempfile::tempdir; + + type OpaqueBlock = sp_runtime::generic::Block< + Header, + substrate_test_runtime_client::runtime::Extrinsic, + >; + + fn ethereum_digest() -> Digest { + let partial_header = ethereum::PartialHeader { + parent_hash: H256::random(), + beneficiary: H160::default(), + state_root: H256::default(), + receipts_root: H256::default(), + logs_bloom: ethereum_types::Bloom::default(), + difficulty: U256::zero(), + number: U256::zero(), + gas_limit: U256::zero(), + gas_used: U256::zero(), + timestamp: 0u64, + extra_data: Vec::new(), + mix_hash: H256::default(), + nonce: ethereum_types::H64::default(), + }; + let ethereum_block = ethereum::Block::new(partial_header, vec![], vec![]); + Digest { + logs: vec![sp_runtime::generic::DigestItem::Consensus( + fp_consensus::FRONTIER_ENGINE_ID, + fp_consensus::PostLog::Hashes(fp_consensus::Hashes::from_block(ethereum_block)) + .encode(), + )], + } + } + + struct TestSyncOracleNotSyncing; + impl sp_consensus::SyncOracle for TestSyncOracleNotSyncing { + fn is_major_syncing(&self) -> bool { + false + } + fn is_offline(&self) -> bool { + false + } + } + + struct TestSyncOracleSyncing; + impl sp_consensus::SyncOracle for TestSyncOracleSyncing { + fn is_major_syncing(&self) -> bool { + true + } + fn is_offline(&self) -> bool { + false + } + } + + #[tokio::test] + async fn block_import_notification_works() { + let tmp = tempdir().expect("create a temporary directory"); + let builder = TestClientBuilder::new().add_extra_storage( + PALLET_ETHEREUM_SCHEMA.to_vec(), + Encode::encode(&EthereumStorageSchema::V3), + ); + let test_sync_oracle = TestSyncOracleNotSyncing {}; + // Backend + let backend = builder.backend(); + // Client + let (client, _) = + builder.build_with_native_executor::(None); + let mut client = Arc::new(client); + // Overrides + let mut overrides_map = BTreeMap::new(); + overrides_map.insert( + EthereumStorageSchema::V3, + Box::new(SchemaV3Override::new(client.clone())) as Box>, + ); + let overrides = Arc::new(OverrideHandle { + schemas: overrides_map, + fallback: Box::new(SchemaV3Override::new(client.clone())), + }); + + let frontier_backend = Arc::new( + fc_db::Backend::::new( + client.clone(), + &fc_db::DatabaseSettings { + source: sc_client_db::DatabaseSource::RocksDb { + path: tmp.path().to_path_buf(), + cache_size: 0, + }, + }, + ) + .expect("frontier backend"), + ); + + let notification_stream = client.clone().import_notification_stream(); + let client_inner = client.clone(); + + let pubsub_notification_sinks: EthereumBlockNotificationSinks< + EthereumBlockNotification, + > = Default::default(); + let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks); + + let pubsub_notification_sinks_inner = pubsub_notification_sinks.clone(); + + tokio::task::spawn(async move { + MappingSyncWorker::new( + notification_stream, + Duration::new(6, 0), + client_inner, + backend, + overrides.clone(), + frontier_backend, + 3, + 0, + SyncStrategy::Normal, + Arc::new(test_sync_oracle), + pubsub_notification_sinks_inner, + ) + .for_each(|()| future::ready(())) + .await + }); + + { + // A new mpsc channel + let (inner_sink, mut block_notification_stream) = + sc_utils::mpsc::tracing_unbounded("pubsub_notification_stream", 100_000); + + { + // This scope represents a call to eth_subscribe, where it briefly locks the pool + // to push the new sink. + let sinks = &mut pubsub_notification_sinks.lock(); + // Push to sink pool + sinks.push(inner_sink); + } + + // Let's produce a block, which we expect to trigger a channel message + let mut builder = client.new_block(ethereum_digest()).unwrap(); + let block = builder.build().unwrap().block; + let block_hash = block.header.hash(); + client.import(BlockOrigin::Own, block).await; + + // Receive + assert_eq!( + block_notification_stream + .next() + .await + .expect("a message") + .hash, + block_hash + ); + } + + { + // Assert we still hold a sink in the pool after switching scopes + let sinks = pubsub_notification_sinks.lock(); + assert_eq!(sinks.len(), 1); + } + + { + // Create yet another mpsc channel + let (inner_sink, mut block_notification_stream) = + sc_utils::mpsc::tracing_unbounded("pubsub_notification_stream", 100_000); + + { + let sinks = &mut pubsub_notification_sinks.lock(); + // Push it + sinks.push(inner_sink); + // Now we expect two sinks in the pool + assert_eq!(sinks.len(), 2); + } + + // Let's produce another block, this not only triggers a message in the new channel + // but also removes the closed channels from the pool. + let mut builder = client.new_block(ethereum_digest()).unwrap(); + let block = builder.build().unwrap().block; + let block_hash = block.header.hash(); + client.import(BlockOrigin::Own, block).await; + + // Receive + assert_eq!( + block_notification_stream + .next() + .await + .expect("a message") + .hash, + block_hash + ); + + // So we expect the pool to hold one sink only after cleanup + let sinks = &mut pubsub_notification_sinks.lock(); + assert_eq!(sinks.len(), 1); + } + } + + #[tokio::test] + async fn sink_removal_when_syncing_works() { + let tmp = tempdir().expect("create a temporary directory"); + let builder = TestClientBuilder::new().add_extra_storage( + PALLET_ETHEREUM_SCHEMA.to_vec(), + Encode::encode(&EthereumStorageSchema::V3), + ); + let test_sync_oracle = TestSyncOracleSyncing {}; + // Backend + let backend = builder.backend(); + // Client + let (client, _) = + builder.build_with_native_executor::(None); + let mut client = Arc::new(client); + // Overrides + let mut overrides_map = BTreeMap::new(); + overrides_map.insert( + EthereumStorageSchema::V3, + Box::new(SchemaV3Override::new(client.clone())) as Box>, + ); + let overrides = Arc::new(OverrideHandle { + schemas: overrides_map, + fallback: Box::new(SchemaV3Override::new(client.clone())), + }); + + let frontier_backend = Arc::new( + fc_db::Backend::::new( + client.clone(), + &fc_db::DatabaseSettings { + source: sc_client_db::DatabaseSource::RocksDb { + path: tmp.path().to_path_buf(), + cache_size: 0, + }, + }, + ) + .expect("frontier backend"), + ); + + let notification_stream = client.clone().import_notification_stream(); + let client_inner = client.clone(); + + let pubsub_notification_sinks: EthereumBlockNotificationSinks< + EthereumBlockNotification, + > = Default::default(); + let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks); + + let pubsub_notification_sinks_inner = pubsub_notification_sinks.clone(); + + tokio::task::spawn(async move { + MappingSyncWorker::new( + notification_stream, + Duration::new(6, 0), + client_inner, + backend, + overrides.clone(), + frontier_backend, + 3, + 0, + SyncStrategy::Normal, + Arc::new(test_sync_oracle), + pubsub_notification_sinks_inner, + ) + .for_each(|()| future::ready(())) + .await + }); + + { + // A new mpsc channel + let (inner_sink, mut block_notification_stream) = + sc_utils::mpsc::tracing_unbounded("pubsub_notification_stream", 100_000); + + { + // This scope represents a call to eth_subscribe, where it briefly locks the pool + // to push the new sink. + let sinks = &mut pubsub_notification_sinks.lock(); + // Push to sink pool + sinks.push(inner_sink); + } + + // Let's produce a block, which we expect to trigger a channel message + let mut builder = client.new_block(ethereum_digest()).unwrap(); + let block = builder.build().unwrap().block; + let block_hash = block.header.hash(); + client.import(BlockOrigin::Own, block).await; + + // Not received, channel closed because major syncing + assert!(block_notification_stream.next().await.is_none()); + } + + { + // Assert sink was removed from pool on major syncing + let sinks = pubsub_notification_sinks.lock(); + assert_eq!(sinks.len(), 0); + } + } +} diff --git a/client/rpc/Cargo.toml b/client/rpc/Cargo.toml index 579537fa74..97f2a788c7 100644 --- a/client/rpc/Cargo.toml +++ b/client/rpc/Cargo.toml @@ -36,6 +36,7 @@ sc-rpc = { workspace = true } sc-service = { workspace = true } sc-transaction-pool = { workspace = true } sc-transaction-pool-api = { workspace = true } +sc-utils = { workspace = true } sp-api = { workspace = true } sp-block-builder = { workspace = true } sp-blockchain = { workspace = true } @@ -47,6 +48,7 @@ sp-state-machine = { workspace = true } sp-storage = { workspace = true } # Frontier fc-db = { workspace = true } +fc-mapping-sync = { workspace = true } fc-rpc-core = { workspace = true } fc-storage = { workspace = true } fp-ethereum = { workspace = true, features = ["default"] } diff --git a/client/rpc/src/eth_pubsub.rs b/client/rpc/src/eth_pubsub.rs index 670f653358..9a7bb9821d 100644 --- a/client/rpc/src/eth_pubsub.rs +++ b/client/rpc/src/eth_pubsub.rs @@ -37,6 +37,7 @@ use sp_consensus::SyncOracle; use sp_core::hashing::keccak_256; use sp_runtime::traits::{Block as BlockT, UniqueSaturatedInto}; // Frontier +use fc_mapping_sync::{EthereumBlockNotification, EthereumBlockNotificationSinks}; use fc_rpc_core::{ types::{ pubsub::{Kind, Params, PubSubSyncStatus, Result as PubSubResult, SyncStatusMetadata}, @@ -64,6 +65,7 @@ pub struct EthPubSub { subscriptions: SubscriptionTaskExecutor, overrides: Arc>, starting_block: u64, + pubsub_notification_sinks: Arc>>, _marker: PhantomData, } @@ -77,6 +79,9 @@ where network: Arc>, subscriptions: SubscriptionTaskExecutor, overrides: Arc>, + pubsub_notification_sinks: Arc< + EthereumBlockNotificationSinks>, + >, ) -> Self { // Capture the best block as seen on initialization. Used for syncing subscriptions. let starting_block = @@ -88,6 +93,7 @@ where subscriptions, overrides, starting_block, + pubsub_notification_sinks, _marker: PhantomData, } } @@ -217,6 +223,10 @@ where }; let client = self.client.clone(); + // Everytime a new subscription is created, a new mpsc channel is added to the sink pool. + let (inner_sink, block_notification_stream) = + sc_utils::mpsc::tracing_unbounded("pubsub_notification_stream", 100_000); + self.pubsub_notification_sinks.lock().push(inner_sink); let pool = self.pool.clone(); let network = self.network.clone(); let overrides = self.overrides.clone(); @@ -224,8 +234,7 @@ where let fut = async move { match kind { Kind::Logs => { - let stream = client - .import_notification_stream() + let stream = block_notification_stream .filter_map(move |notification| { if notification.is_new_best { let substrate_hash = notification.hash; @@ -263,8 +272,7 @@ where sink.pipe_from_stream(stream).await; } Kind::NewHeads => { - let stream = client - .import_notification_stream() + let stream = block_notification_stream .filter_map(move |notification| { if notification.is_new_best { let schema = fc_storage::onchain_storage_schema( diff --git a/template/node/src/eth.rs b/template/node/src/eth.rs index cf0b6c56b6..1deb447f0c 100644 --- a/template/node/src/eth.rs +++ b/template/node/src/eth.rs @@ -9,6 +9,8 @@ use futures::{future, prelude::*}; // Substrate use sc_client_api::{BlockchainEvents, StateBackendFor}; use sc_executor::NativeExecutionDispatch; +use sc_network::NetworkService; +use sc_network_common::ExHashT; use sc_service::{error::Error as ServiceError, BasePath, Configuration, TaskManager}; use sp_api::ConstructRuntimeApi; use sp_runtime::traits::BlakeTwo256; @@ -104,7 +106,7 @@ where { } -pub fn spawn_frontier_tasks( +pub fn spawn_frontier_tasks( task_manager: &TaskManager, client: Arc>, backend: Arc, @@ -113,6 +115,12 @@ pub fn spawn_frontier_tasks( overrides: Arc>, fee_history_cache: FeeHistoryCache, fee_history_cache_limit: FeeHistoryCacheLimit, + network: Arc>, + pubsub_notification_sinks: Arc< + fc_mapping_sync::EthereumBlockNotificationSinks< + fc_mapping_sync::EthereumBlockNotification, + >, + >, ) where RuntimeApi: ConstructRuntimeApi>, RuntimeApi: Send + Sync + 'static, @@ -133,6 +141,8 @@ pub fn spawn_frontier_tasks( 3, 0, SyncStrategy::Normal, + network, + pubsub_notification_sinks, ) .for_each(|()| future::ready(())), ); diff --git a/template/node/src/rpc/eth.rs b/template/node/src/rpc/eth.rs index 32bc4ecd25..5d6ab4bab4 100644 --- a/template/node/src/rpc/eth.rs +++ b/template/node/src/rpc/eth.rs @@ -83,6 +83,11 @@ pub fn create_eth>( mut io: RpcModule<()>, deps: EthDeps, subscription_task_executor: SubscriptionTaskExecutor, + pubsub_notification_sinks: Arc< + fc_mapping_sync::EthereumBlockNotificationSinks< + fc_mapping_sync::EthereumBlockNotification, + >, + >, ) -> Result, Box> where B: BlockT, @@ -167,6 +172,7 @@ where network.clone(), subscription_task_executor, overrides, + pubsub_notification_sinks, ) .into_rpc(), )?; diff --git a/template/node/src/rpc/mod.rs b/template/node/src/rpc/mod.rs index c14c9370d3..c5f340a2c1 100644 --- a/template/node/src/rpc/mod.rs +++ b/template/node/src/rpc/mod.rs @@ -53,6 +53,11 @@ where pub fn create_full( deps: FullDeps, subscription_task_executor: SubscriptionTaskExecutor, + pubsub_notification_sinks: Arc< + fc_mapping_sync::EthereumBlockNotificationSinks< + fc_mapping_sync::EthereumBlockNotification, + >, + >, ) -> Result, Box> where C: ProvideRuntimeApi, @@ -100,6 +105,7 @@ where io, eth, subscription_task_executor, + pubsub_notification_sinks, )?; Ok(io) diff --git a/template/node/src/service.rs b/template/node/src/service.rs index b78f3713b1..d24610371c 100644 --- a/template/node/src/service.rs +++ b/template/node/src/service.rs @@ -341,6 +341,15 @@ where // Channel for the rpc handler to communicate with the authorship task. let (command_sink, commands_stream) = mpsc::channel(1000); + // Sinks for pubsub notifications. + // Everytime a new subscription is created, a new mpsc channel is added to the sink pool. + // The MappingSyncWorker sends through the channel on block import and the subscription emits a notification to the subscriber on receiving a message through this channel. + // This way we avoid race conditions when using native substrate block import notification stream. + let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks< + fc_mapping_sync::EthereumBlockNotification, + > = Default::default(); + let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks); + // for ethereum-compatibility rpc. config.rpc_id_provider = Some(Box::new(fc_rpc::EthereumSubIdProvider)); let overrides = crate::rpc::overrides_handle(client.clone()); @@ -371,6 +380,7 @@ where let rpc_builder = { let client = client.clone(); let pool = transaction_pool.clone(); + let pubsub_notification_sinks = pubsub_notification_sinks.clone(); Box::new(move |deny_unsafe, subscription_task_executor| { let deps = crate::rpc::FullDeps { @@ -385,7 +395,12 @@ where eth: eth_rpc_params.clone(), }; - crate::rpc::create_full(deps, subscription_task_executor).map_err(Into::into) + crate::rpc::create_full( + deps, + subscription_task_executor, + pubsub_notification_sinks.clone(), + ) + .map_err(Into::into) }) }; @@ -412,6 +427,8 @@ where overrides, fee_history_cache, fee_history_cache_limit, + network.clone(), + pubsub_notification_sinks, ); if role.is_authority() {