Skip to content

Commit

Permalink
sc-beefy-consensus: Remove unneeded stream. (#4015)
Browse files Browse the repository at this point in the history
The stream was just used to communicate from the validator the peer
reports back to the gossip engine. Internally the gossip engine just
forwards these reports to the networking engine. So, we can just do this
directly.

The reporting stream was also pumped [in the worker behind the
engine](https://github.com/paritytech/polkadot-sdk/blob/9d6261892814fa27c97881c0321c008d7340b54b/substrate/client/consensus/beefy/src/worker.rs#L939).
This means if there was a lot of data incoming over the engine, the
reporting stream was almost never processed and thus, it could have
started to grow and we have seen issues around this.

Partly Closes: #3945
  • Loading branch information
bkchr authored and Ank4n committed Apr 9, 2024
1 parent 7daeaca commit 70fc335
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 111 deletions.
14 changes: 14 additions & 0 deletions prdoc/pr_4015.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
title: Improve beefy networking code by forwarding data more directly

doc:
- audience: Node Operator
description: |
Improve internal implementation of beefy to forward data directly to the
networking layer instead of first storing them internally. So, the
following error message should not appear again:
```
The number of unprocessed messages in channel `mpsc_beefy_gossip_validator` exceeded 100000.
```

crates:
- name: sc-consensus-beefy
175 changes: 137 additions & 38 deletions substrate/client/consensus/beefy/src/communication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,17 @@

use std::{collections::BTreeSet, sync::Arc, time::Duration};

use sc_network::{PeerId, ReputationChange};
use sc_network::{NetworkPeers, PeerId, ReputationChange};
use sc_network_gossip::{MessageIntent, ValidationResult, Validator, ValidatorContext};
use sp_runtime::traits::{Block, Hash, Header, NumberFor};

use codec::{Decode, DecodeAll, Encode};
use log::{debug, trace};
use parking_lot::{Mutex, RwLock};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use wasm_timer::Instant;

use crate::{
communication::{
benefit, cost,
peers::{KnownPeers, PeerReport},
},
communication::{benefit, cost, peers::KnownPeers},
justification::{
proof_block_num_and_set_id, verify_with_validator_set, BeefyVersionedFinalityProof,
},
Expand Down Expand Up @@ -223,7 +219,7 @@ impl<B: Block> Filter<B> {
/// rejected/expired.
///
///All messaging is handled in a single BEEFY global topic.
pub(crate) struct GossipValidator<B>
pub(crate) struct GossipValidator<B, N>
where
B: Block,
{
Expand All @@ -232,26 +228,22 @@ where
gossip_filter: RwLock<Filter<B>>,
next_rebroadcast: Mutex<Instant>,
known_peers: Arc<Mutex<KnownPeers<B>>>,
report_sender: TracingUnboundedSender<PeerReport>,
network: Arc<N>,
}

impl<B> GossipValidator<B>
impl<B, N> GossipValidator<B, N>
where
B: Block,
{
pub(crate) fn new(
known_peers: Arc<Mutex<KnownPeers<B>>>,
) -> (GossipValidator<B>, TracingUnboundedReceiver<PeerReport>) {
let (tx, rx) = tracing_unbounded("mpsc_beefy_gossip_validator", 100_000);
let val = GossipValidator {
pub(crate) fn new(known_peers: Arc<Mutex<KnownPeers<B>>>, network: Arc<N>) -> Self {
Self {
votes_topic: votes_topic::<B>(),
justifs_topic: proofs_topic::<B>(),
gossip_filter: RwLock::new(Filter::new()),
next_rebroadcast: Mutex::new(Instant::now() + REBROADCAST_AFTER),
known_peers,
report_sender: tx,
};
(val, rx)
network,
}
}

/// Update gossip validator filter.
Expand All @@ -265,9 +257,15 @@ where
);
self.gossip_filter.write().update(filter);
}
}

impl<B, N> GossipValidator<B, N>
where
B: Block,
N: NetworkPeers,
{
fn report(&self, who: PeerId, cost_benefit: ReputationChange) {
let _ = self.report_sender.unbounded_send(PeerReport { who, cost_benefit });
self.network.report_peer(who, cost_benefit);
}

fn validate_vote(
Expand Down Expand Up @@ -370,9 +368,10 @@ where
}
}

impl<B> Validator<B> for GossipValidator<B>
impl<B, N> Validator<B> for GossipValidator<B, N>
where
B: Block,
N: NetworkPeers + Send + Sync,
{
fn peer_disconnected(&self, _context: &mut dyn ValidatorContext<B>, who: &PeerId) {
self.known_peers.lock().remove(who);
Expand Down Expand Up @@ -486,7 +485,7 @@ where
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::keystore::BeefyKeystore;
use crate::{communication::peers::PeerReport, keystore::BeefyKeystore};
use sc_network_test::Block;
use sp_application_crypto::key_types::BEEFY as BEEFY_KEY_TYPE;
use sp_consensus_beefy::{
Expand All @@ -495,20 +494,109 @@ pub(crate) mod tests {
};
use sp_keystore::{testing::MemoryKeystore, Keystore};

pub(crate) struct TestNetwork {
report_sender: futures::channel::mpsc::UnboundedSender<PeerReport>,
}

impl TestNetwork {
pub fn new() -> (Self, futures::channel::mpsc::UnboundedReceiver<PeerReport>) {
let (tx, rx) = futures::channel::mpsc::unbounded();

(Self { report_sender: tx }, rx)
}
}

impl NetworkPeers for TestNetwork {
fn set_authorized_peers(&self, _: std::collections::HashSet<PeerId>) {
unimplemented!()
}

fn set_authorized_only(&self, _: bool) {
unimplemented!()
}

fn add_known_address(&self, _: PeerId, _: sc_network::Multiaddr) {
unimplemented!()
}

fn report_peer(&self, peer_id: PeerId, cost_benefit: ReputationChange) {
let _ = self.report_sender.unbounded_send(PeerReport { who: peer_id, cost_benefit });
}

fn peer_reputation(&self, _: &PeerId) -> i32 {
unimplemented!()
}

fn disconnect_peer(&self, _: PeerId, _: sc_network::ProtocolName) {
unimplemented!()
}

fn accept_unreserved_peers(&self) {
unimplemented!()
}

fn deny_unreserved_peers(&self) {
unimplemented!()
}

fn add_reserved_peer(
&self,
_: sc_network::config::MultiaddrWithPeerId,
) -> Result<(), String> {
unimplemented!()
}

fn remove_reserved_peer(&self, _: PeerId) {
unimplemented!()
}

fn set_reserved_peers(
&self,
_: sc_network::ProtocolName,
_: std::collections::HashSet<sc_network::Multiaddr>,
) -> Result<(), String> {
unimplemented!()
}

fn add_peers_to_reserved_set(
&self,
_: sc_network::ProtocolName,
_: std::collections::HashSet<sc_network::Multiaddr>,
) -> Result<(), String> {
unimplemented!()
}

fn remove_peers_from_reserved_set(
&self,
_: sc_network::ProtocolName,
_: Vec<PeerId>,
) -> Result<(), String> {
unimplemented!()
}

fn sync_num_connected(&self) -> usize {
unimplemented!()
}

fn peer_role(&self, _: PeerId, _: Vec<u8>) -> Option<sc_network::ObservedRole> {
unimplemented!()
}
}

struct TestContext;
impl<B: sp_runtime::traits::Block> ValidatorContext<B> for TestContext {
fn broadcast_topic(&mut self, _topic: B::Hash, _force: bool) {
todo!()
unimplemented!()
}

fn broadcast_message(&mut self, _topic: B::Hash, _message: Vec<u8>, _force: bool) {}

fn send_message(&mut self, _who: &sc_network::PeerId, _message: Vec<u8>) {
todo!()
unimplemented!()
}

fn send_topic(&mut self, _who: &sc_network::PeerId, _topic: B::Hash, _force: bool) {
todo!()
unimplemented!()
}
}

Expand Down Expand Up @@ -560,8 +648,13 @@ pub(crate) mod tests {
fn should_validate_messages() {
let keys = vec![Keyring::<AuthorityId>::Alice.public()];
let validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 0).unwrap();
let (gv, mut report_stream) =
GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));

let (network, mut report_stream) = TestNetwork::new();

let gv = GossipValidator::<Block, _>::new(
Arc::new(Mutex::new(KnownPeers::new())),
Arc::new(network),
);
let sender = PeerId::random();
let mut context = TestContext;

Expand All @@ -574,7 +667,7 @@ pub(crate) mod tests {
let mut expected_report = PeerReport { who: sender, cost_benefit: expected_cost };
let res = gv.validate(&mut context, &sender, bad_encoding);
assert!(matches!(res, ValidationResult::Discard));
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);

// verify votes validation

Expand All @@ -585,14 +678,14 @@ pub(crate) mod tests {
let res = gv.validate(&mut context, &sender, &encoded);
assert!(matches!(res, ValidationResult::Discard));
// nothing reported
assert!(report_stream.try_recv().is_err());
assert!(report_stream.try_next().is_err());

gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set });
// nothing in cache first time
let res = gv.validate(&mut context, &sender, &encoded);
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
expected_report.cost_benefit = benefit::VOTE_MESSAGE;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);

// reject vote, voter not in validator set
let mut bad_vote = vote.clone();
Expand All @@ -601,7 +694,7 @@ pub(crate) mod tests {
let res = gv.validate(&mut context, &sender, &bad_vote);
assert!(matches!(res, ValidationResult::Discard));
expected_report.cost_benefit = cost::UNKNOWN_VOTER;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);

// reject if the round is not GRANDPA finalized
gv.update_filter(GossipFilterCfg { start: 1, end: 2, validator_set: &validator_set });
Expand All @@ -611,7 +704,7 @@ pub(crate) mod tests {
let res = gv.validate(&mut context, &sender, &encoded);
assert!(matches!(res, ValidationResult::Discard));
expected_report.cost_benefit = cost::FUTURE_MESSAGE;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);

// reject if the round is not live anymore
gv.update_filter(GossipFilterCfg { start: 7, end: 10, validator_set: &validator_set });
Expand All @@ -621,7 +714,7 @@ pub(crate) mod tests {
let res = gv.validate(&mut context, &sender, &encoded);
assert!(matches!(res, ValidationResult::Discard));
expected_report.cost_benefit = cost::OUTDATED_MESSAGE;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);

// now verify proofs validation

Expand All @@ -631,23 +724,23 @@ pub(crate) mod tests {
let res = gv.validate(&mut context, &sender, &encoded_proof);
assert!(matches!(res, ValidationResult::Discard));
expected_report.cost_benefit = cost::OUTDATED_MESSAGE;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);

// accept next proof with good set_id
let proof = dummy_proof(7, &validator_set);
let encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
let res = gv.validate(&mut context, &sender, &encoded_proof);
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
expected_report.cost_benefit = benefit::VALIDATED_PROOF;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);

// accept future proof with good set_id
let proof = dummy_proof(20, &validator_set);
let encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
let res = gv.validate(&mut context, &sender, &encoded_proof);
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
expected_report.cost_benefit = benefit::VALIDATED_PROOF;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);

// reject proof, future set_id
let bad_validator_set = ValidatorSet::<AuthorityId>::new(keys, 1).unwrap();
Expand All @@ -656,7 +749,7 @@ pub(crate) mod tests {
let res = gv.validate(&mut context, &sender, &encoded_proof);
assert!(matches!(res, ValidationResult::Discard));
expected_report.cost_benefit = cost::FUTURE_MESSAGE;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);

// reject proof, bad signatures (Bob instead of Alice)
let bad_validator_set =
Expand All @@ -667,14 +760,17 @@ pub(crate) mod tests {
assert!(matches!(res, ValidationResult::Discard));
expected_report.cost_benefit = cost::INVALID_PROOF;
expected_report.cost_benefit.value += cost::PER_SIGNATURE_CHECKED;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report);
}

#[test]
fn messages_allowed_and_expired() {
let keys = vec![Keyring::Alice.public()];
let validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 0).unwrap();
let (gv, _) = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
let gv = GossipValidator::<Block, _>::new(
Arc::new(Mutex::new(KnownPeers::new())),
Arc::new(TestNetwork::new().0),
);
gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set });
let sender = sc_network::PeerId::random();
let topic = Default::default();
Expand Down Expand Up @@ -751,7 +847,10 @@ pub(crate) mod tests {
fn messages_rebroadcast() {
let keys = vec![Keyring::Alice.public()];
let validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 0).unwrap();
let (gv, _) = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
let gv = GossipValidator::<Block, _>::new(
Arc::new(Mutex::new(KnownPeers::new())),
Arc::new(TestNetwork::new().0),
);
gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set });
let sender = sc_network::PeerId::random();
let topic = Default::default();
Expand Down
Loading

0 comments on commit 70fc335

Please sign in to comment.