Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

BEEFY: optimize voter event loop for fewer 'active' wakeups #12760

Merged
merged 3 commits into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions client/beefy/src/communication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ impl<B> Validator<B> for GossipValidator<B>
where
B: Block,
{
fn peer_disconnected(&self, _context: &mut dyn ValidatorContext<B>, who: &PeerId) {
self.known_peers.lock().remove(who);
}

fn validate(
&self,
_context: &mut dyn ValidatorContext<B>,
Expand Down
12 changes: 2 additions & 10 deletions client/beefy/src/communication/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ impl<B: Block> KnownPeers<B> {
Self { live: HashMap::new() }
}

/// Add new connected `peer`.
pub fn add_new(&mut self, peer: PeerId) {
self.live.entry(peer).or_default();
}

/// Note vote round number for `peer`.
pub fn note_vote_for(&mut self, peer: PeerId, round: NumberFor<B>) {
let data = self.live.entry(peer).or_default();
Expand Down Expand Up @@ -87,16 +82,13 @@ mod tests {
let mut peers = KnownPeers::<sc_network_test::Block>::new();
assert!(peers.live.is_empty());

// Alice and Bob new connected peers.
peers.add_new(alice);
peers.add_new(bob);
// 'Tracked' Bob seen voting for 5.
peers.note_vote_for(bob, 5);
// Previously unseen Charlie now seen voting for 10.
peers.note_vote_for(charlie, 10);

assert_eq!(peers.live.len(), 3);
assert!(peers.contains(&alice));
assert_eq!(peers.live.len(), 2);
assert!(!peers.contains(&alice));
assert!(peers.contains(&bob));
assert!(peers.contains(&charlie));

Expand Down
4 changes: 2 additions & 2 deletions client/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,12 @@ where
None,
);

// The `GossipValidator` adds and removes known peers based on valid votes and network events.
let on_demand_justifications = OnDemandJustificationsEngine::new(
network.clone(),
runtime.clone(),
justifications_protocol_name,
known_peers.clone(),
known_peers,
);

let metrics =
Expand Down Expand Up @@ -286,7 +287,6 @@ where
payload_provider,
network,
key_store: key_store.into(),
known_peers,
gossip_engine,
gossip_validator,
on_demand_justifications,
Expand Down
48 changes: 24 additions & 24 deletions client/beefy/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,27 @@ pub(crate) fn create_beefy_keystore(authority: BeefyKeyring) -> SyncCryptoStoreP
keystore
}

fn voter_init_setup(
net: &mut BeefyTestNet,
finality: &mut futures::stream::Fuse<FinalityNotifications<Block>>,
) -> sp_blockchain::Result<PersistedState<Block>> {
let backend = net.peer(0).client().as_backend();
let api = Arc::new(crate::tests::two_validators::TestApi {});
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
let gossip_validator =
Arc::new(crate::communication::gossip::GossipValidator::new(known_peers));
let mut gossip_engine = sc_network_gossip::GossipEngine::new(
net.peer(0).network_service().clone(),
"/beefy/whatever",
gossip_validator,
None,
);
let best_grandpa =
futures::executor::block_on(wait_for_runtime_pallet(&*api, &mut gossip_engine, finality))
.unwrap();
load_or_init_voter_state(&*backend, &*api, best_grandpa, 1)
}

// Spawns beefy voters. Returns a future to spawn on the runtime.
fn initialize_beefy<API>(
net: &mut BeefyTestNet,
Expand Down Expand Up @@ -943,27 +964,6 @@ fn on_demand_beefy_justification_sync() {
finalize_block_and_wait_for_beefy(&net, all_peers, &mut runtime, &[30], &[30]);
}

fn test_voter_init_setup(
net: &mut BeefyTestNet,
finality: &mut futures::stream::Fuse<FinalityNotifications<Block>>,
) -> sp_blockchain::Result<PersistedState<Block>> {
let backend = net.peer(0).client().as_backend();
let api = Arc::new(crate::tests::two_validators::TestApi {});
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
let gossip_validator =
Arc::new(crate::communication::gossip::GossipValidator::new(known_peers));
let mut gossip_engine = sc_network_gossip::GossipEngine::new(
net.peer(0).network_service().clone(),
"/beefy/whatever",
gossip_validator,
None,
);
let best_grandpa =
futures::executor::block_on(wait_for_runtime_pallet(&*api, &mut gossip_engine, finality))
.unwrap();
load_or_init_voter_state(&*backend, &*api, best_grandpa, 1)
}

#[test]
fn should_initialize_voter_at_genesis() {
let keys = &[BeefyKeyring::Alice];
Expand All @@ -981,7 +981,7 @@ fn should_initialize_voter_at_genesis() {
net.peer(0).client().as_client().finalize_block(hashof13, None).unwrap();

// load persistent state - nothing in DB, should init at session boundary
let persisted_state = test_voter_init_setup(&mut net, &mut finality).unwrap();
let persisted_state = voter_init_setup(&mut net, &mut finality).unwrap();

// Test initialization at session boundary.
// verify voter initialized with two sessions starting at blocks 1 and 10
Expand Down Expand Up @@ -1044,7 +1044,7 @@ fn should_initialize_voter_when_last_final_is_session_boundary() {
// expect rounds initialized at last beefy finalized 10.

// load persistent state - nothing in DB, should init at session boundary
let persisted_state = test_voter_init_setup(&mut net, &mut finality).unwrap();
let persisted_state = voter_init_setup(&mut net, &mut finality).unwrap();

// verify voter initialized with single session starting at block 10
assert_eq!(persisted_state.voting_oracle().sessions().len(), 1);
Expand Down Expand Up @@ -1103,7 +1103,7 @@ fn should_initialize_voter_at_latest_finalized() {
// Test initialization at last BEEFY finalized.

// load persistent state - nothing in DB, should init at last BEEFY finalized
let persisted_state = test_voter_init_setup(&mut net, &mut finality).unwrap();
let persisted_state = voter_init_setup(&mut net, &mut finality).unwrap();

// verify voter initialized with single session starting at block 12
assert_eq!(persisted_state.voting_oracle().sessions().len(), 1);
Expand Down
139 changes: 53 additions & 86 deletions client/beefy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,31 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use std::{
collections::{BTreeMap, BTreeSet, VecDeque},
fmt::Debug,
marker::PhantomData,
sync::Arc,
use crate::{
communication::{
gossip::{topic, GossipValidator},
request_response::outgoing_requests_engine::OnDemandJustificationsEngine,
},
error::Error,
justification::BeefyVersionedFinalityProof,
keystore::BeefyKeystore,
metric_inc, metric_set,
metrics::Metrics,
round::Rounds,
BeefyVoterLinks,
};
use beefy_primitives::{
crypto::{AuthorityId, Signature},
BeefyApi, Commitment, ConsensusLog, MmrRootHash, Payload, PayloadProvider, SignedCommitment,
ValidatorSet, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID,
};

use codec::{Codec, Decode, Encode};
use futures::{stream::Fuse, FutureExt, StreamExt};
use log::{debug, error, info, log_enabled, trace, warn};
use parking_lot::Mutex;

use sc_client_api::{Backend, FinalityNotification, FinalityNotifications, HeaderBackend};
use sc_network_common::{
protocol::event::Event as NetEvent,
service::{NetworkEventStream, NetworkRequest},
};
use sc_network_common::service::{NetworkEventStream, NetworkRequest};
use sc_network_gossip::GossipEngine;

use sc_utils::notification::NotificationReceiver;
use sp_api::{BlockId, ProvideRuntimeApi};
use sp_arithmetic::traits::{AtLeast32Bit, Saturating};
use sp_consensus::SyncOracle;
Expand All @@ -44,26 +50,11 @@ use sp_runtime::{
traits::{Block, Header, NumberFor, Zero},
SaturatedConversion,
};

use beefy_primitives::{
crypto::{AuthorityId, Signature},
BeefyApi, Commitment, ConsensusLog, MmrRootHash, Payload, PayloadProvider, SignedCommitment,
ValidatorSet, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID,
};
use sc_utils::notification::NotificationReceiver;

use crate::{
communication::{
gossip::{topic, GossipValidator},
request_response::outgoing_requests_engine::OnDemandJustificationsEngine,
},
error::Error,
justification::BeefyVersionedFinalityProof,
keystore::BeefyKeystore,
metric_inc, metric_set,
metrics::Metrics,
round::Rounds,
BeefyVoterLinks, KnownPeers,
use std::{
collections::{BTreeMap, BTreeSet, VecDeque},
fmt::Debug,
marker::PhantomData,
sync::Arc,
};

pub(crate) enum RoundAction {
Expand Down Expand Up @@ -253,7 +244,6 @@ pub(crate) struct WorkerParams<B: Block, BE, P, R, N> {
pub payload_provider: P,
pub network: N,
pub key_store: BeefyKeystore,
pub known_peers: Arc<Mutex<KnownPeers<B>>>,
pub gossip_engine: GossipEngine<B>,
pub gossip_validator: Arc<GossipValidator<B>>,
pub on_demand_justifications: OnDemandJustificationsEngine<B, R>,
Expand Down Expand Up @@ -305,7 +295,6 @@ pub(crate) struct BeefyWorker<B: Block, BE, P, R, N> {
key_store: BeefyKeystore,

// communication
known_peers: Arc<Mutex<KnownPeers<B>>>,
gossip_engine: GossipEngine<B>,
gossip_validator: Arc<GossipValidator<B>>,
on_demand_justifications: OnDemandJustificationsEngine<B, R>,
Expand Down Expand Up @@ -349,7 +338,6 @@ where
gossip_engine,
gossip_validator,
on_demand_justifications,
known_peers,
links,
metrics,
persisted_state,
Expand All @@ -359,7 +347,6 @@ where
backend,
payload_provider,
network,
known_peers,
key_store,
gossip_engine,
gossip_validator,
Expand Down Expand Up @@ -783,6 +770,29 @@ where
Ok(())
}

fn process_new_state(&mut self) {
// Handle pending justifications and/or votes for now GRANDPA finalized blocks.
if let Err(err) = self.try_pending_justif_and_votes() {
debug!(target: "beefy", "🥩 {}", err);
}

// Don't bother voting or requesting justifications during major sync.
if !self.network.is_major_syncing() {
// There were external events, 'state' is changed, author a vote if needed/possible.
if let Err(err) = self.try_to_vote() {
debug!(target: "beefy", "🥩 {}", err);
}
// If the current target is a mandatory block,
// make sure there's also an on-demand justification request out for it.
if let Some(block) = self.voting_oracle().mandatory_pending() {
// This only starts new request if there isn't already an active one.
self.on_demand_justifications.request(block);
}
} else {
debug!(target: "beefy", "🥩 Skipping voting while major syncing.");
}
}

/// Main loop for BEEFY worker.
///
/// Wait for BEEFY runtime pallet to be available, then start the main async loop
Expand All @@ -794,7 +804,6 @@ where
) {
info!(target: "beefy", "🥩 run BEEFY worker, best grandpa: #{:?}.", self.best_grandpa_block());

let mut network_events = self.network.event_stream("network-gossip").fuse();
let mut votes = Box::pin(
self.gossip_engine
.messages_for(topic::<B>())
Expand All @@ -810,25 +819,12 @@ where
);

loop {
// Don't bother voting or requesting justifications during major sync.
if !self.network.is_major_syncing() {
// If the current target is a mandatory block,
// make sure there's also an on-demand justification request out for it.
if let Some(block) = self.voting_oracle().mandatory_pending() {
// This only starts new request if there isn't already an active one.
self.on_demand_justifications.request(block);
}
// There were external events, 'state' is changed, author a vote if needed/possible.
if let Err(err) = self.try_to_vote() {
debug!(target: "beefy", "🥩 {}", err);
}
} else {
debug!(target: "beefy", "🥩 Skipping voting while major syncing.");
}
// Act on changed 'state'.
self.process_new_state();

let mut gossip_engine = &mut self.gossip_engine;
// Wait for, and handle external events.
// The branches below only change 'state', actual voting happen afterwards,
// The branches below only change 'state', actual voting happens afterwards,
// based on the new resulting 'state'.
futures::select_biased! {
// Use `select_biased!` to prioritize order below.
Expand All @@ -837,15 +833,6 @@ where
error!(target: "beefy", "🥩 Gossip engine has terminated, closing worker.");
return;
},
// Keep track of connected peers.
net_event = network_events.next() => {
if let Some(net_event) = net_event {
self.handle_network_event(net_event);
} else {
error!(target: "beefy", "🥩 Network events stream terminated, closing worker.");
return;
}
},
// Process finality notifications first since these drive the voter.
notification = finality_notifications.next() => {
if let Some(notification) = notification {
Expand Down Expand Up @@ -888,25 +875,6 @@ where
}
},
}

// Handle pending justifications and/or votes for now GRANDPA finalized blocks.
if let Err(err) = self.try_pending_justif_and_votes() {
debug!(target: "beefy", "🥩 {}", err);
}
}
}

/// Update known peers based on network events.
fn handle_network_event(&mut self, event: NetEvent) {
match event {
NetEvent::SyncConnected { remote } => {
self.known_peers.lock().add_new(remote);
},
NetEvent::SyncDisconnected { remote } => {
self.known_peers.lock().remove(&remote);
},
// We don't care about other events.
_ => (),
}
}
}
Expand Down Expand Up @@ -976,11 +944,11 @@ pub(crate) mod tests {
create_beefy_keystore, get_beefy_streams, make_beefy_ids, two_validators::TestApi,
BeefyPeer, BeefyTestNet,
},
BeefyRPCLinks,
BeefyRPCLinks, KnownPeers,
};

use beefy_primitives::{known_payloads, mmr::MmrRootProvider};
use futures::{executor::block_on, future::poll_fn, task::Poll};
use parking_lot::Mutex;
use sc_client_api::{Backend as BackendT, HeaderBackend};
use sc_network::NetworkService;
use sc_network_test::TestNetFactory;
Expand Down Expand Up @@ -1058,7 +1026,7 @@ pub(crate) mod tests {
network.clone(),
api.clone(),
"/beefy/justifs/1".into(),
known_peers.clone(),
known_peers,
);
let at = BlockId::number(Zero::zero());
let genesis_header = backend.blockchain().expect_header(at).unwrap();
Expand All @@ -1074,7 +1042,6 @@ pub(crate) mod tests {
backend,
payload_provider,
key_store: Some(keystore).into(),
known_peers,
links,
gossip_engine,
gossip_validator,
Expand Down