Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
BEEFY: optimize voter event loop for fewer 'active' wakeups (#12760)
Browse files Browse the repository at this point in the history
* client/beefy: remove high-freq network events from main loop

Network events are many and very frequent, remove the net-event-stream
from the main voter loop and drastically reduce BEEFY voter task
'wakeups'.

Instead have the `GossipValidator` track known peers as it already
has callbacks for that coming from `GossipEngine`.

Signed-off-by: acatangiu <adrian@parity.io>
  • Loading branch information
acatangiu authored Nov 23, 2022
1 parent 269c799 commit 08d1b2c
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 122 deletions.
4 changes: 4 additions & 0 deletions client/beefy/src/communication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ impl<B> Validator<B> for GossipValidator<B>
where
B: Block,
{
fn peer_disconnected(&self, _context: &mut dyn ValidatorContext<B>, who: &PeerId) {
self.known_peers.lock().remove(who);
}

fn validate(
&self,
_context: &mut dyn ValidatorContext<B>,
Expand Down
12 changes: 2 additions & 10 deletions client/beefy/src/communication/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ impl<B: Block> KnownPeers<B> {
Self { live: HashMap::new() }
}

/// Add new connected `peer`.
pub fn add_new(&mut self, peer: PeerId) {
self.live.entry(peer).or_default();
}

/// Note vote round number for `peer`.
pub fn note_vote_for(&mut self, peer: PeerId, round: NumberFor<B>) {
let data = self.live.entry(peer).or_default();
Expand Down Expand Up @@ -87,16 +82,13 @@ mod tests {
let mut peers = KnownPeers::<sc_network_test::Block>::new();
assert!(peers.live.is_empty());

// Alice and Bob new connected peers.
peers.add_new(alice);
peers.add_new(bob);
// 'Tracked' Bob seen voting for 5.
peers.note_vote_for(bob, 5);
// Previously unseen Charlie now seen voting for 10.
peers.note_vote_for(charlie, 10);

assert_eq!(peers.live.len(), 3);
assert!(peers.contains(&alice));
assert_eq!(peers.live.len(), 2);
assert!(!peers.contains(&alice));
assert!(peers.contains(&bob));
assert!(peers.contains(&charlie));

Expand Down
4 changes: 2 additions & 2 deletions client/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,12 @@ where
None,
);

// The `GossipValidator` adds and removes known peers based on valid votes and network events.
let on_demand_justifications = OnDemandJustificationsEngine::new(
network.clone(),
runtime.clone(),
justifications_protocol_name,
known_peers.clone(),
known_peers,
);

let metrics =
Expand Down Expand Up @@ -286,7 +287,6 @@ where
payload_provider,
network,
key_store: key_store.into(),
known_peers,
gossip_engine,
gossip_validator,
on_demand_justifications,
Expand Down
48 changes: 24 additions & 24 deletions client/beefy/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,27 @@ pub(crate) fn create_beefy_keystore(authority: BeefyKeyring) -> SyncCryptoStoreP
keystore
}

fn voter_init_setup(
net: &mut BeefyTestNet,
finality: &mut futures::stream::Fuse<FinalityNotifications<Block>>,
) -> sp_blockchain::Result<PersistedState<Block>> {
let backend = net.peer(0).client().as_backend();
let api = Arc::new(crate::tests::two_validators::TestApi {});
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
let gossip_validator =
Arc::new(crate::communication::gossip::GossipValidator::new(known_peers));
let mut gossip_engine = sc_network_gossip::GossipEngine::new(
net.peer(0).network_service().clone(),
"/beefy/whatever",
gossip_validator,
None,
);
let best_grandpa =
futures::executor::block_on(wait_for_runtime_pallet(&*api, &mut gossip_engine, finality))
.unwrap();
load_or_init_voter_state(&*backend, &*api, best_grandpa, 1)
}

// Spawns beefy voters. Returns a future to spawn on the runtime.
fn initialize_beefy<API>(
net: &mut BeefyTestNet,
Expand Down Expand Up @@ -943,27 +964,6 @@ fn on_demand_beefy_justification_sync() {
finalize_block_and_wait_for_beefy(&net, all_peers, &mut runtime, &[30], &[30]);
}

fn test_voter_init_setup(
net: &mut BeefyTestNet,
finality: &mut futures::stream::Fuse<FinalityNotifications<Block>>,
) -> sp_blockchain::Result<PersistedState<Block>> {
let backend = net.peer(0).client().as_backend();
let api = Arc::new(crate::tests::two_validators::TestApi {});
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
let gossip_validator =
Arc::new(crate::communication::gossip::GossipValidator::new(known_peers));
let mut gossip_engine = sc_network_gossip::GossipEngine::new(
net.peer(0).network_service().clone(),
"/beefy/whatever",
gossip_validator,
None,
);
let best_grandpa =
futures::executor::block_on(wait_for_runtime_pallet(&*api, &mut gossip_engine, finality))
.unwrap();
load_or_init_voter_state(&*backend, &*api, best_grandpa, 1)
}

#[test]
fn should_initialize_voter_at_genesis() {
let keys = &[BeefyKeyring::Alice];
Expand All @@ -981,7 +981,7 @@ fn should_initialize_voter_at_genesis() {
net.peer(0).client().as_client().finalize_block(hashof13, None).unwrap();

// load persistent state - nothing in DB, should init at session boundary
let persisted_state = test_voter_init_setup(&mut net, &mut finality).unwrap();
let persisted_state = voter_init_setup(&mut net, &mut finality).unwrap();

// Test initialization at session boundary.
// verify voter initialized with two sessions starting at blocks 1 and 10
Expand Down Expand Up @@ -1044,7 +1044,7 @@ fn should_initialize_voter_when_last_final_is_session_boundary() {
// expect rounds initialized at last beefy finalized 10.

// load persistent state - nothing in DB, should init at session boundary
let persisted_state = test_voter_init_setup(&mut net, &mut finality).unwrap();
let persisted_state = voter_init_setup(&mut net, &mut finality).unwrap();

// verify voter initialized with single session starting at block 10
assert_eq!(persisted_state.voting_oracle().sessions().len(), 1);
Expand Down Expand Up @@ -1103,7 +1103,7 @@ fn should_initialize_voter_at_latest_finalized() {
// Test initialization at last BEEFY finalized.

// load persistent state - nothing in DB, should init at last BEEFY finalized
let persisted_state = test_voter_init_setup(&mut net, &mut finality).unwrap();
let persisted_state = voter_init_setup(&mut net, &mut finality).unwrap();

// verify voter initialized with single session starting at block 12
assert_eq!(persisted_state.voting_oracle().sessions().len(), 1);
Expand Down
139 changes: 53 additions & 86 deletions client/beefy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,31 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use std::{
collections::{BTreeMap, BTreeSet, VecDeque},
fmt::Debug,
marker::PhantomData,
sync::Arc,
use crate::{
communication::{
gossip::{topic, GossipValidator},
request_response::outgoing_requests_engine::OnDemandJustificationsEngine,
},
error::Error,
justification::BeefyVersionedFinalityProof,
keystore::BeefyKeystore,
metric_inc, metric_set,
metrics::Metrics,
round::Rounds,
BeefyVoterLinks,
};
use beefy_primitives::{
crypto::{AuthorityId, Signature},
BeefyApi, Commitment, ConsensusLog, MmrRootHash, Payload, PayloadProvider, SignedCommitment,
ValidatorSet, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID,
};

use codec::{Codec, Decode, Encode};
use futures::{stream::Fuse, FutureExt, StreamExt};
use log::{debug, error, info, log_enabled, trace, warn};
use parking_lot::Mutex;

use sc_client_api::{Backend, FinalityNotification, FinalityNotifications, HeaderBackend};
use sc_network_common::{
protocol::event::Event as NetEvent,
service::{NetworkEventStream, NetworkRequest},
};
use sc_network_common::service::{NetworkEventStream, NetworkRequest};
use sc_network_gossip::GossipEngine;

use sc_utils::notification::NotificationReceiver;
use sp_api::{BlockId, ProvideRuntimeApi};
use sp_arithmetic::traits::{AtLeast32Bit, Saturating};
use sp_consensus::SyncOracle;
Expand All @@ -44,26 +50,11 @@ use sp_runtime::{
traits::{Block, Header, NumberFor, Zero},
SaturatedConversion,
};

use beefy_primitives::{
crypto::{AuthorityId, Signature},
BeefyApi, Commitment, ConsensusLog, MmrRootHash, Payload, PayloadProvider, SignedCommitment,
ValidatorSet, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID,
};
use sc_utils::notification::NotificationReceiver;

use crate::{
communication::{
gossip::{topic, GossipValidator},
request_response::outgoing_requests_engine::OnDemandJustificationsEngine,
},
error::Error,
justification::BeefyVersionedFinalityProof,
keystore::BeefyKeystore,
metric_inc, metric_set,
metrics::Metrics,
round::Rounds,
BeefyVoterLinks, KnownPeers,
use std::{
collections::{BTreeMap, BTreeSet, VecDeque},
fmt::Debug,
marker::PhantomData,
sync::Arc,
};

pub(crate) enum RoundAction {
Expand Down Expand Up @@ -253,7 +244,6 @@ pub(crate) struct WorkerParams<B: Block, BE, P, R, N> {
pub payload_provider: P,
pub network: N,
pub key_store: BeefyKeystore,
pub known_peers: Arc<Mutex<KnownPeers<B>>>,
pub gossip_engine: GossipEngine<B>,
pub gossip_validator: Arc<GossipValidator<B>>,
pub on_demand_justifications: OnDemandJustificationsEngine<B, R>,
Expand Down Expand Up @@ -305,7 +295,6 @@ pub(crate) struct BeefyWorker<B: Block, BE, P, R, N> {
key_store: BeefyKeystore,

// communication
known_peers: Arc<Mutex<KnownPeers<B>>>,
gossip_engine: GossipEngine<B>,
gossip_validator: Arc<GossipValidator<B>>,
on_demand_justifications: OnDemandJustificationsEngine<B, R>,
Expand Down Expand Up @@ -349,7 +338,6 @@ where
gossip_engine,
gossip_validator,
on_demand_justifications,
known_peers,
links,
metrics,
persisted_state,
Expand All @@ -359,7 +347,6 @@ where
backend,
payload_provider,
network,
known_peers,
key_store,
gossip_engine,
gossip_validator,
Expand Down Expand Up @@ -783,6 +770,29 @@ where
Ok(())
}

fn process_new_state(&mut self) {
// Handle pending justifications and/or votes for now GRANDPA finalized blocks.
if let Err(err) = self.try_pending_justif_and_votes() {
debug!(target: "beefy", "🥩 {}", err);
}

// Don't bother voting or requesting justifications during major sync.
if !self.network.is_major_syncing() {
// There were external events, 'state' is changed, author a vote if needed/possible.
if let Err(err) = self.try_to_vote() {
debug!(target: "beefy", "🥩 {}", err);
}
// If the current target is a mandatory block,
// make sure there's also an on-demand justification request out for it.
if let Some(block) = self.voting_oracle().mandatory_pending() {
// This only starts new request if there isn't already an active one.
self.on_demand_justifications.request(block);
}
} else {
debug!(target: "beefy", "🥩 Skipping voting while major syncing.");
}
}

/// Main loop for BEEFY worker.
///
/// Wait for BEEFY runtime pallet to be available, then start the main async loop
Expand All @@ -794,7 +804,6 @@ where
) {
info!(target: "beefy", "🥩 run BEEFY worker, best grandpa: #{:?}.", self.best_grandpa_block());

let mut network_events = self.network.event_stream("network-gossip").fuse();
let mut votes = Box::pin(
self.gossip_engine
.messages_for(topic::<B>())
Expand All @@ -810,25 +819,12 @@ where
);

loop {
// Don't bother voting or requesting justifications during major sync.
if !self.network.is_major_syncing() {
// If the current target is a mandatory block,
// make sure there's also an on-demand justification request out for it.
if let Some(block) = self.voting_oracle().mandatory_pending() {
// This only starts new request if there isn't already an active one.
self.on_demand_justifications.request(block);
}
// There were external events, 'state' is changed, author a vote if needed/possible.
if let Err(err) = self.try_to_vote() {
debug!(target: "beefy", "🥩 {}", err);
}
} else {
debug!(target: "beefy", "🥩 Skipping voting while major syncing.");
}
// Act on changed 'state'.
self.process_new_state();

let mut gossip_engine = &mut self.gossip_engine;
// Wait for, and handle external events.
// The branches below only change 'state', actual voting happen afterwards,
// The branches below only change 'state', actual voting happens afterwards,
// based on the new resulting 'state'.
futures::select_biased! {
// Use `select_biased!` to prioritize order below.
Expand All @@ -837,15 +833,6 @@ where
error!(target: "beefy", "🥩 Gossip engine has terminated, closing worker.");
return;
},
// Keep track of connected peers.
net_event = network_events.next() => {
if let Some(net_event) = net_event {
self.handle_network_event(net_event);
} else {
error!(target: "beefy", "🥩 Network events stream terminated, closing worker.");
return;
}
},
// Process finality notifications first since these drive the voter.
notification = finality_notifications.next() => {
if let Some(notification) = notification {
Expand Down Expand Up @@ -888,25 +875,6 @@ where
}
},
}

// Handle pending justifications and/or votes for now GRANDPA finalized blocks.
if let Err(err) = self.try_pending_justif_and_votes() {
debug!(target: "beefy", "🥩 {}", err);
}
}
}

/// Update known peers based on network events.
fn handle_network_event(&mut self, event: NetEvent) {
match event {
NetEvent::SyncConnected { remote } => {
self.known_peers.lock().add_new(remote);
},
NetEvent::SyncDisconnected { remote } => {
self.known_peers.lock().remove(&remote);
},
// We don't care about other events.
_ => (),
}
}
}
Expand Down Expand Up @@ -976,11 +944,11 @@ pub(crate) mod tests {
create_beefy_keystore, get_beefy_streams, make_beefy_ids, two_validators::TestApi,
BeefyPeer, BeefyTestNet,
},
BeefyRPCLinks,
BeefyRPCLinks, KnownPeers,
};

use beefy_primitives::{known_payloads, mmr::MmrRootProvider};
use futures::{executor::block_on, future::poll_fn, task::Poll};
use parking_lot::Mutex;
use sc_client_api::{Backend as BackendT, HeaderBackend};
use sc_network::NetworkService;
use sc_network_test::TestNetFactory;
Expand Down Expand Up @@ -1058,7 +1026,7 @@ pub(crate) mod tests {
network.clone(),
api.clone(),
"/beefy/justifs/1".into(),
known_peers.clone(),
known_peers,
);
let at = BlockId::number(Zero::zero());
let genesis_header = backend.blockchain().expect_header(at).unwrap();
Expand All @@ -1074,7 +1042,6 @@ pub(crate) mod tests {
backend,
payload_provider,
key_store: Some(keystore).into(),
known_peers,
links,
gossip_engine,
gossip_validator,
Expand Down

0 comments on commit 08d1b2c

Please sign in to comment.