From a58d0a9a00cb59ffab752fc17cb6a8030195406d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Sat, 6 Apr 2024 11:12:19 +0200 Subject: [PATCH 1/6] sc-beefy-consensus: Remove unneeded stream. The stream was just used to communicate from the validator the peer reports back to the gossip engine. Internally the gossip engine just forwards these reports to the networking engine. So, we can just do this directly. The reporting stream was also pumped [in the worker behind the engine](https://github.com/paritytech/polkadot-sdk/blob/9d6261892814fa27c97881c0321c008d7340b54b/substrate/client/consensus/beefy/src/worker.rs#L939). This means if there was a lot of data incoming over the engine, the reporting stream was almost never processed and thus, it could have started to grow and we have seen issues around this. Closes: https://github.com/paritytech/polkadot-sdk/issues/3945 --- .../beefy/src/communication/gossip.rs | 162 ++++++++++++++---- substrate/client/consensus/beefy/src/lib.rs | 33 ++-- substrate/client/consensus/beefy/src/tests.rs | 7 +- .../client/consensus/beefy/src/worker.rs | 34 ++-- 4 files changed, 157 insertions(+), 79 deletions(-) diff --git a/substrate/client/consensus/beefy/src/communication/gossip.rs b/substrate/client/consensus/beefy/src/communication/gossip.rs index eb43c9173d75..d7cfc999fb28 100644 --- a/substrate/client/consensus/beefy/src/communication/gossip.rs +++ b/substrate/client/consensus/beefy/src/communication/gossip.rs @@ -18,14 +18,13 @@ use std::{collections::BTreeSet, sync::Arc, time::Duration}; -use sc_network::{PeerId, ReputationChange}; +use sc_network::{NetworkPeers, PeerId, ReputationChange}; use sc_network_gossip::{MessageIntent, ValidationResult, Validator, ValidatorContext}; use sp_runtime::traits::{Block, Hash, Header, NumberFor}; use codec::{Decode, DecodeAll, Encode}; use log::{debug, trace}; use parking_lot::{Mutex, RwLock}; -use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use wasm_timer::Instant; use crate::{ @@ -223,7 +222,7 @@ impl Filter { /// rejected/expired. /// ///All messaging is handled in a single BEEFY global topic. -pub(crate) struct GossipValidator +pub(crate) struct GossipValidator where B: Block, { @@ -232,26 +231,22 @@ where gossip_filter: RwLock>, next_rebroadcast: Mutex, known_peers: Arc>>, - report_sender: TracingUnboundedSender, + network: Arc, } -impl GossipValidator +impl GossipValidator where B: Block, { - pub(crate) fn new( - known_peers: Arc>>, - ) -> (GossipValidator, TracingUnboundedReceiver) { - let (tx, rx) = tracing_unbounded("mpsc_beefy_gossip_validator", 100_000); - let val = GossipValidator { + pub(crate) fn new(known_peers: Arc>>, network: Arc) -> Self { + Self { votes_topic: votes_topic::(), justifs_topic: proofs_topic::(), gossip_filter: RwLock::new(Filter::new()), next_rebroadcast: Mutex::new(Instant::now() + REBROADCAST_AFTER), known_peers, - report_sender: tx, - }; - (val, rx) + network, + } } /// Update gossip validator filter. @@ -265,9 +260,15 @@ where ); self.gossip_filter.write().update(filter); } +} +impl GossipValidator +where + B: Block, + N: NetworkPeers, +{ fn report(&self, who: PeerId, cost_benefit: ReputationChange) { - let _ = self.report_sender.unbounded_send(PeerReport { who, cost_benefit }); + self.network.report_peer(who, cost_benefit); } fn validate_vote( @@ -370,9 +371,10 @@ where } } -impl Validator for GossipValidator +impl Validator for GossipValidator where B: Block, + N: NetworkPeers + Send + Sync, { fn peer_disconnected(&self, _context: &mut dyn ValidatorContext, who: &PeerId) { self.known_peers.lock().remove(who); @@ -495,6 +497,95 @@ pub(crate) mod tests { }; use sp_keystore::{testing::MemoryKeystore, Keystore}; + pub(crate) struct TestNetwork { + report_sender: futures::channel::mpsc::UnboundedSender, + } + + impl TestNetwork { + pub fn new() -> (Self, futures::channel::mpsc::UnboundedReceiver) { + let (tx, rx) = futures::channel::mpsc::unbounded(); + + (Self { report_sender: tx }, rx) + } + } + + impl NetworkPeers for TestNetwork { + fn set_authorized_peers(&self, _: std::collections::HashSet) { + todo!() + } + + fn set_authorized_only(&self, _: bool) { + todo!() + } + + fn add_known_address(&self, _: PeerId, _: sc_network::Multiaddr) { + todo!() + } + + fn report_peer(&self, peer_id: PeerId, cost_benefit: ReputationChange) { + let _ = self.report_sender.unbounded_send(PeerReport { who: peer_id, cost_benefit }); + } + + fn peer_reputation(&self, _: &PeerId) -> i32 { + todo!() + } + + fn disconnect_peer(&self, _: PeerId, _: sc_network::ProtocolName) { + todo!() + } + + fn accept_unreserved_peers(&self) { + todo!() + } + + fn deny_unreserved_peers(&self) { + todo!() + } + + fn add_reserved_peer( + &self, + _: sc_network::config::MultiaddrWithPeerId, + ) -> Result<(), String> { + todo!() + } + + fn remove_reserved_peer(&self, _: PeerId) { + todo!() + } + + fn set_reserved_peers( + &self, + _: sc_network::ProtocolName, + _: std::collections::HashSet, + ) -> Result<(), String> { + todo!() + } + + fn add_peers_to_reserved_set( + &self, + _: sc_network::ProtocolName, + _: std::collections::HashSet, + ) -> Result<(), String> { + todo!() + } + + fn remove_peers_from_reserved_set( + &self, + _: sc_network::ProtocolName, + _: Vec, + ) -> Result<(), String> { + todo!() + } + + fn sync_num_connected(&self) -> usize { + todo!() + } + + fn peer_role(&self, _: PeerId, _: Vec) -> Option { + todo!() + } + } + struct TestContext; impl ValidatorContext for TestContext { fn broadcast_topic(&mut self, _topic: B::Hash, _force: bool) { @@ -560,8 +651,13 @@ pub(crate) mod tests { fn should_validate_messages() { let keys = vec![Keyring::::Alice.public()]; let validator_set = ValidatorSet::::new(keys.clone(), 0).unwrap(); - let (gv, mut report_stream) = - GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); + + let (network, mut report_stream) = TestNetwork::new(); + + let gv = GossipValidator::::new( + Arc::new(Mutex::new(KnownPeers::new())), + Arc::new(network), + ); let sender = PeerId::random(); let mut context = TestContext; @@ -574,7 +670,7 @@ pub(crate) mod tests { let mut expected_report = PeerReport { who: sender, cost_benefit: expected_cost }; let res = gv.validate(&mut context, &sender, bad_encoding); assert!(matches!(res, ValidationResult::Discard)); - assert_eq!(report_stream.try_recv().unwrap(), expected_report); + assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report); // verify votes validation @@ -585,14 +681,14 @@ pub(crate) mod tests { let res = gv.validate(&mut context, &sender, &encoded); assert!(matches!(res, ValidationResult::Discard)); // nothing reported - assert!(report_stream.try_recv().is_err()); + assert!(report_stream.try_next().is_err()); gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set }); // nothing in cache first time let res = gv.validate(&mut context, &sender, &encoded); assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); expected_report.cost_benefit = benefit::VOTE_MESSAGE; - assert_eq!(report_stream.try_recv().unwrap(), expected_report); + assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report); // reject vote, voter not in validator set let mut bad_vote = vote.clone(); @@ -601,7 +697,7 @@ pub(crate) mod tests { let res = gv.validate(&mut context, &sender, &bad_vote); assert!(matches!(res, ValidationResult::Discard)); expected_report.cost_benefit = cost::UNKNOWN_VOTER; - assert_eq!(report_stream.try_recv().unwrap(), expected_report); + assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report); // reject if the round is not GRANDPA finalized gv.update_filter(GossipFilterCfg { start: 1, end: 2, validator_set: &validator_set }); @@ -611,7 +707,7 @@ pub(crate) mod tests { let res = gv.validate(&mut context, &sender, &encoded); assert!(matches!(res, ValidationResult::Discard)); expected_report.cost_benefit = cost::FUTURE_MESSAGE; - assert_eq!(report_stream.try_recv().unwrap(), expected_report); + assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report); // reject if the round is not live anymore gv.update_filter(GossipFilterCfg { start: 7, end: 10, validator_set: &validator_set }); @@ -621,7 +717,7 @@ pub(crate) mod tests { let res = gv.validate(&mut context, &sender, &encoded); assert!(matches!(res, ValidationResult::Discard)); expected_report.cost_benefit = cost::OUTDATED_MESSAGE; - assert_eq!(report_stream.try_recv().unwrap(), expected_report); + assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report); // now verify proofs validation @@ -631,7 +727,7 @@ pub(crate) mod tests { let res = gv.validate(&mut context, &sender, &encoded_proof); assert!(matches!(res, ValidationResult::Discard)); expected_report.cost_benefit = cost::OUTDATED_MESSAGE; - assert_eq!(report_stream.try_recv().unwrap(), expected_report); + assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report); // accept next proof with good set_id let proof = dummy_proof(7, &validator_set); @@ -639,7 +735,7 @@ pub(crate) mod tests { let res = gv.validate(&mut context, &sender, &encoded_proof); assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); expected_report.cost_benefit = benefit::VALIDATED_PROOF; - assert_eq!(report_stream.try_recv().unwrap(), expected_report); + assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report); // accept future proof with good set_id let proof = dummy_proof(20, &validator_set); @@ -647,7 +743,7 @@ pub(crate) mod tests { let res = gv.validate(&mut context, &sender, &encoded_proof); assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); expected_report.cost_benefit = benefit::VALIDATED_PROOF; - assert_eq!(report_stream.try_recv().unwrap(), expected_report); + assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report); // reject proof, future set_id let bad_validator_set = ValidatorSet::::new(keys, 1).unwrap(); @@ -656,7 +752,7 @@ pub(crate) mod tests { let res = gv.validate(&mut context, &sender, &encoded_proof); assert!(matches!(res, ValidationResult::Discard)); expected_report.cost_benefit = cost::FUTURE_MESSAGE; - assert_eq!(report_stream.try_recv().unwrap(), expected_report); + assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report); // reject proof, bad signatures (Bob instead of Alice) let bad_validator_set = @@ -667,14 +763,17 @@ pub(crate) mod tests { assert!(matches!(res, ValidationResult::Discard)); expected_report.cost_benefit = cost::INVALID_PROOF; expected_report.cost_benefit.value += cost::PER_SIGNATURE_CHECKED; - assert_eq!(report_stream.try_recv().unwrap(), expected_report); + assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report); } #[test] fn messages_allowed_and_expired() { let keys = vec![Keyring::Alice.public()]; let validator_set = ValidatorSet::::new(keys.clone(), 0).unwrap(); - let (gv, _) = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); + let gv = GossipValidator::::new( + Arc::new(Mutex::new(KnownPeers::new())), + Arc::new(TestNetwork::new().0), + ); gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set }); let sender = sc_network::PeerId::random(); let topic = Default::default(); @@ -751,7 +850,10 @@ pub(crate) mod tests { fn messages_rebroadcast() { let keys = vec![Keyring::Alice.public()]; let validator_set = ValidatorSet::::new(keys.clone(), 0).unwrap(); - let (gv, _) = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); + let gv = GossipValidator::::new( + Arc::new(Mutex::new(KnownPeers::new())), + Arc::new(TestNetwork::new().0), + ); gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set }); let sender = sc_network::PeerId::random(); let topic = Default::default(); diff --git a/substrate/client/consensus/beefy/src/lib.rs b/substrate/client/consensus/beefy/src/lib.rs index 714a0fb7c885..0b1998a26e16 100644 --- a/substrate/client/consensus/beefy/src/lib.rs +++ b/substrate/client/consensus/beefy/src/lib.rs @@ -68,7 +68,7 @@ pub mod import; pub mod justification; use crate::{ - communication::{gossip::GossipValidator, peers::PeerReport}, + communication::gossip::GossipValidator, justification::BeefyVersionedFinalityProof, keystore::BeefyKeystore, metrics::VoterMetrics, @@ -78,7 +78,6 @@ use crate::{ pub use communication::beefy_protocol_name::{ gossip_protocol_name, justifications_protocol_name as justifs_protocol_name, }; -use sc_utils::mpsc::TracingUnboundedReceiver; use sp_runtime::generic::OpaqueDigestItemId; #[cfg(test)] @@ -228,10 +227,9 @@ pub struct BeefyParams { /// Helper object holding BEEFY worker communication/gossip components. /// /// These are created once, but will be reused if worker is restarted/reinitialized. -pub(crate) struct BeefyComms { +pub(crate) struct BeefyComms { pub gossip_engine: GossipEngine, - pub gossip_validator: Arc>, - pub gossip_report_stream: TracingUnboundedReceiver, + pub gossip_validator: Arc>, pub on_demand_justifications: OnDemandJustificationsEngine, } @@ -264,13 +262,13 @@ where /// persisted state in AUX DB and latest chain information/progress. /// /// Returns a sane `BeefyWorkerBuilder` that can build the `BeefyWorker`. - pub async fn async_initialize( + pub async fn async_initialize( backend: Arc, runtime: Arc, key_store: BeefyKeystore, metrics: Option, min_block_delta: u32, - gossip_validator: Arc>, + gossip_validator: Arc>, finality_notifications: &mut Fuse>, is_authority: bool, ) -> Result { @@ -298,15 +296,15 @@ where } /// Takes rest of missing pieces as params and builds the `BeefyWorker`. - pub fn build( + pub fn build( self, payload_provider: P, sync: Arc, - comms: BeefyComms, + comms: BeefyComms, links: BeefyVoterLinks, pending_justifications: BTreeMap, BeefyVersionedFinalityProof>, is_authority: bool, - ) -> BeefyWorker { + ) -> BeefyWorker { BeefyWorker { backend: self.backend, runtime: self.runtime, @@ -526,8 +524,8 @@ pub async fn start_beefy_gadget( let known_peers = Arc::new(Mutex::new(KnownPeers::new())); // Default votes filter is to discard everything. // Validator is updated later with correct starting round and set id. - let (gossip_validator, gossip_report_stream) = - communication::gossip::GossipValidator::new(known_peers.clone()); + let gossip_validator = + communication::gossip::GossipValidator::new(known_peers.clone(), network.clone()); let gossip_validator = Arc::new(gossip_validator); let gossip_engine = GossipEngine::new( network.clone(), @@ -546,12 +544,7 @@ pub async fn start_beefy_gadget( known_peers, prometheus_registry.clone(), ); - let mut beefy_comms = BeefyComms { - gossip_engine, - gossip_validator, - gossip_report_stream, - on_demand_justifications, - }; + let mut beefy_comms = BeefyComms { gossip_engine, gossip_validator, on_demand_justifications }; // We re-create and re-run the worker in this loop in order to quickly reinit and resume after // select recoverable errors. @@ -577,10 +570,6 @@ pub async fn start_beefy_gadget( }, } }, - // Pump peer reports - _ = &mut beefy_comms.gossip_report_stream.next() => { - continue - }, // Pump gossip engine. _ = &mut beefy_comms.gossip_engine => { error!(target: LOG_TARGET, "🥩 Gossip engine has unexpectedly terminated."); diff --git a/substrate/client/consensus/beefy/src/tests.rs b/substrate/client/consensus/beefy/src/tests.rs index aecfec7b9ed1..d4ec6ffd497b 100644 --- a/substrate/client/consensus/beefy/src/tests.rs +++ b/substrate/client/consensus/beefy/src/tests.rs @@ -23,8 +23,9 @@ use crate::{ beefy_block_import_and_links, communication::{ gossip::{ - proofs_topic, tests::sign_commitment, votes_topic, GossipFilterCfg, GossipMessage, - GossipValidator, + proofs_topic, + tests::{sign_commitment, TestNetwork}, + votes_topic, GossipFilterCfg, GossipMessage, GossipValidator, }, request_response::{on_demand_justifications_protocol_config, BeefyJustifsRequestHandler}, }, @@ -1450,7 +1451,7 @@ async fn gossipped_finality_proofs() { let charlie = &mut net.peers[2]; let known_peers = Arc::new(Mutex::new(KnownPeers::::new())); // Charlie will run just the gossip engine and not the full voter. - let (gossip_validator, _) = GossipValidator::new(known_peers); + let gossip_validator = GossipValidator::new(known_peers, Arc::new(TestNetwork::new().0)); let charlie_gossip_validator = Arc::new(gossip_validator); charlie_gossip_validator.update_filter(GossipFilterCfg:: { start: 1, diff --git a/substrate/client/consensus/beefy/src/worker.rs b/substrate/client/consensus/beefy/src/worker.rs index ac6b72d1ea40..05575ae01c30 100644 --- a/substrate/client/consensus/beefy/src/worker.rs +++ b/substrate/client/consensus/beefy/src/worker.rs @@ -19,7 +19,6 @@ use crate::{ communication::{ gossip::{proofs_topic, votes_topic, GossipFilterCfg, GossipMessage}, - peers::PeerReport, request_response::outgoing_requests_engine::ResponseInfo, }, error::Error, @@ -374,7 +373,7 @@ impl PersistedState { } /// A BEEFY worker/voter that follows the BEEFY protocol -pub(crate) struct BeefyWorker { +pub(crate) struct BeefyWorker { // utilities pub backend: Arc, pub runtime: Arc, @@ -383,7 +382,7 @@ pub(crate) struct BeefyWorker { pub sync: Arc, // communication (created once, but returned and reused if worker is restarted/reinitialized) - pub comms: BeefyComms, + pub comms: BeefyComms, // channels /// Links between the block importer, the background voter and the RPC layer. @@ -400,7 +399,7 @@ pub(crate) struct BeefyWorker { pub is_authority: bool, } -impl BeefyWorker +impl BeefyWorker where B: Block + Codec, BE: Backend, @@ -827,7 +826,7 @@ where mut self, block_import_justif: &mut Fuse>>, finality_notifications: &mut Fuse>, - ) -> (Error, BeefyComms) { + ) -> (Error, BeefyComms) { info!( target: LOG_TARGET, "🥩 run BEEFY worker, best grandpa: #{:?}.", @@ -896,11 +895,8 @@ where }, ResponseInfo::PeerReport(peer_report) => { self.comms.gossip_engine.report(peer_report.who, peer_report.cost_benefit); - continue; - }, - ResponseInfo::Pending => { - continue; }, + ResponseInfo::Pending => {}, } }, justif = block_import_justif.next() => { @@ -935,13 +931,6 @@ where break Error::VotesGossipStreamTerminated; } }, - // Process peer reports. - report = self.comms.gossip_report_stream.next() => { - if let Some(PeerReport { who, cost_benefit }) = report { - self.comms.gossip_engine.report(who, cost_benefit); - } - continue; - }, } // Act on changed 'state'. @@ -1054,7 +1043,7 @@ pub(crate) mod tests { use super::*; use crate::{ communication::{ - gossip::GossipValidator, + gossip::{tests::TestNetwork, GossipValidator}, notification::{BeefyBestBlockStream, BeefyVersionedFinalityProofStream}, request_response::outgoing_requests_engine::OnDemandJustificationsEngine, }, @@ -1111,6 +1100,7 @@ pub(crate) mod tests { MmrRootProvider, TestApi, Arc>, + TestNetwork, > { let keystore = create_beefy_keystore(key); @@ -1140,7 +1130,8 @@ pub(crate) mod tests { .take_notification_service(&crate::tests::beefy_gossip_proto_name()) .unwrap(); let known_peers = Arc::new(Mutex::new(KnownPeers::new())); - let (gossip_validator, gossip_report_stream) = GossipValidator::new(known_peers.clone()); + let gossip_validator = + GossipValidator::new(known_peers.clone(), Arc::new(TestNetwork::new().0)); let gossip_validator = Arc::new(gossip_validator); let gossip_engine = GossipEngine::new( network.clone(), @@ -1173,12 +1164,7 @@ pub(crate) mod tests { ) .unwrap(); let payload_provider = MmrRootProvider::new(api.clone()); - let comms = BeefyComms { - gossip_engine, - gossip_validator, - gossip_report_stream, - on_demand_justifications, - }; + let comms = BeefyComms { gossip_engine, gossip_validator, on_demand_justifications }; BeefyWorker { backend, runtime: api, From c5f616b97e65a20ab72c1f66f584de8080e10a62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Sat, 6 Apr 2024 15:45:19 +0200 Subject: [PATCH 2/6] Make Davide happy :) --- .../beefy/src/communication/gossip.rs | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/substrate/client/consensus/beefy/src/communication/gossip.rs b/substrate/client/consensus/beefy/src/communication/gossip.rs index d7cfc999fb28..c2bb8fdea475 100644 --- a/substrate/client/consensus/beefy/src/communication/gossip.rs +++ b/substrate/client/consensus/beefy/src/communication/gossip.rs @@ -511,15 +511,15 @@ pub(crate) mod tests { impl NetworkPeers for TestNetwork { fn set_authorized_peers(&self, _: std::collections::HashSet) { - todo!() + unimplemented!() } fn set_authorized_only(&self, _: bool) { - todo!() + unimplemented!() } fn add_known_address(&self, _: PeerId, _: sc_network::Multiaddr) { - todo!() + unimplemented!() } fn report_peer(&self, peer_id: PeerId, cost_benefit: ReputationChange) { @@ -527,30 +527,30 @@ pub(crate) mod tests { } fn peer_reputation(&self, _: &PeerId) -> i32 { - todo!() + unimplemented!() } fn disconnect_peer(&self, _: PeerId, _: sc_network::ProtocolName) { - todo!() + unimplemented!() } fn accept_unreserved_peers(&self) { - todo!() + unimplemented!() } fn deny_unreserved_peers(&self) { - todo!() + unimplemented!() } fn add_reserved_peer( &self, _: sc_network::config::MultiaddrWithPeerId, ) -> Result<(), String> { - todo!() + unimplemented!() } fn remove_reserved_peer(&self, _: PeerId) { - todo!() + unimplemented!() } fn set_reserved_peers( @@ -558,7 +558,7 @@ pub(crate) mod tests { _: sc_network::ProtocolName, _: std::collections::HashSet, ) -> Result<(), String> { - todo!() + unimplemented!() } fn add_peers_to_reserved_set( @@ -566,7 +566,7 @@ pub(crate) mod tests { _: sc_network::ProtocolName, _: std::collections::HashSet, ) -> Result<(), String> { - todo!() + unimplemented!() } fn remove_peers_from_reserved_set( @@ -574,32 +574,32 @@ pub(crate) mod tests { _: sc_network::ProtocolName, _: Vec, ) -> Result<(), String> { - todo!() + unimplemented!() } fn sync_num_connected(&self) -> usize { - todo!() + unimplemented!() } fn peer_role(&self, _: PeerId, _: Vec) -> Option { - todo!() + unimplemented!() } } struct TestContext; impl ValidatorContext for TestContext { fn broadcast_topic(&mut self, _topic: B::Hash, _force: bool) { - todo!() + unimplemented!() } fn broadcast_message(&mut self, _topic: B::Hash, _message: Vec, _force: bool) {} fn send_message(&mut self, _who: &sc_network::PeerId, _message: Vec) { - todo!() + unimplemented!() } fn send_topic(&mut self, _who: &sc_network::PeerId, _topic: B::Hash, _force: bool) { - todo!() + unimplemented!() } } From b313be2f63a24342f2a46f8710a5e1c857b5c2aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Sat, 6 Apr 2024 15:59:31 +0200 Subject: [PATCH 3/6] Fix warning --- .../client/consensus/beefy/src/communication/gossip.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/substrate/client/consensus/beefy/src/communication/gossip.rs b/substrate/client/consensus/beefy/src/communication/gossip.rs index c2bb8fdea475..c1cd11450589 100644 --- a/substrate/client/consensus/beefy/src/communication/gossip.rs +++ b/substrate/client/consensus/beefy/src/communication/gossip.rs @@ -28,10 +28,7 @@ use parking_lot::{Mutex, RwLock}; use wasm_timer::Instant; use crate::{ - communication::{ - benefit, cost, - peers::{KnownPeers, PeerReport}, - }, + communication::{benefit, cost, peers::KnownPeers}, justification::{ proof_block_num_and_set_id, verify_with_validator_set, BeefyVersionedFinalityProof, }, @@ -488,7 +485,7 @@ where #[cfg(test)] pub(crate) mod tests { use super::*; - use crate::keystore::BeefyKeystore; + use crate::{communication::peerrs::PeerReport, keystore::BeefyKeystore}; use sc_network_test::Block; use sp_application_crypto::key_types::BEEFY as BEEFY_KEY_TYPE; use sp_consensus_beefy::{ From 105bf8ea2ea2d9ea73f170fa45e5135ef63fe1ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Sat, 6 Apr 2024 16:01:33 +0200 Subject: [PATCH 4/6] ... --- substrate/client/consensus/beefy/src/communication/gossip.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/consensus/beefy/src/communication/gossip.rs b/substrate/client/consensus/beefy/src/communication/gossip.rs index c1cd11450589..d31559131cc1 100644 --- a/substrate/client/consensus/beefy/src/communication/gossip.rs +++ b/substrate/client/consensus/beefy/src/communication/gossip.rs @@ -485,7 +485,7 @@ where #[cfg(test)] pub(crate) mod tests { use super::*; - use crate::{communication::peerrs::PeerReport, keystore::BeefyKeystore}; + use crate::{communication::peers::PeerReport, keystore::BeefyKeystore}; use sc_network_test::Block; use sp_application_crypto::key_types::BEEFY as BEEFY_KEY_TYPE; use sp_consensus_beefy::{ From 04c38ee52cbac4776eb4a8e2ea12428fb122e46d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Mon, 8 Apr 2024 09:49:56 +0200 Subject: [PATCH 5/6] PRDOC --- prdoc/pr_4015.prdoc | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 prdoc/pr_4015.prdoc diff --git a/prdoc/pr_4015.prdoc b/prdoc/pr_4015.prdoc new file mode 100644 index 000000000000..ede1731c4ab8 --- /dev/null +++ b/prdoc/pr_4015.prdoc @@ -0,0 +1,14 @@ +title: Improve beefy networking code by forwarding data more directly + +doc: + - audience: Node Operator + description: | + Improve internal implementation of beefy to forward data directly to the + networking layer instead of first storing them internally. So, the + following error message should not appear again: + ``` + The number of unprocessed messages in channel `mpsc_beefy_gossip_validator` exceeded 100000. + ``` + +crates: + - name: sc-consensus-beefy From 90d75fc22e545d996a0dfde40dd7b6fcf93678ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Mon, 8 Apr 2024 10:05:48 +0200 Subject: [PATCH 6/6] Remove useless loop --- substrate/client/consensus/beefy/src/lib.rs | 46 ++++++++++----------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/substrate/client/consensus/beefy/src/lib.rs b/substrate/client/consensus/beefy/src/lib.rs index 0b1998a26e16..2637481fbf3e 100644 --- a/substrate/client/consensus/beefy/src/lib.rs +++ b/substrate/client/consensus/beefy/src/lib.rs @@ -550,31 +550,29 @@ pub async fn start_beefy_gadget( // select recoverable errors. loop { // Make sure to pump gossip engine while waiting for initialization conditions. - let worker_builder = loop { - futures::select! { - builder_init_result = BeefyWorkerBuilder::async_initialize( - backend.clone(), - runtime.clone(), - key_store.clone().into(), - metrics.clone(), - min_block_delta, - beefy_comms.gossip_validator.clone(), - &mut finality_notifications, - is_authority, - ).fuse() => { - match builder_init_result { - Ok(builder) => break builder, - Err(e) => { - error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", e); - return - }, - } - }, - // Pump gossip engine. - _ = &mut beefy_comms.gossip_engine => { - error!(target: LOG_TARGET, "🥩 Gossip engine has unexpectedly terminated."); - return + let worker_builder = futures::select! { + builder_init_result = BeefyWorkerBuilder::async_initialize( + backend.clone(), + runtime.clone(), + key_store.clone().into(), + metrics.clone(), + min_block_delta, + beefy_comms.gossip_validator.clone(), + &mut finality_notifications, + is_authority, + ).fuse() => { + match builder_init_result { + Ok(builder) => builder, + Err(e) => { + error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", e); + return + }, } + }, + // Pump gossip engine. + _ = &mut beefy_comms.gossip_engine => { + error!(target: LOG_TARGET, "🥩 Gossip engine has unexpectedly terminated."); + return } };