diff --git a/client/beefy/rpc/src/lib.rs b/client/beefy/rpc/src/lib.rs index 3be182ceb8f39..0af474116e6d0 100644 --- a/client/beefy/rpc/src/lib.rs +++ b/client/beefy/rpc/src/lib.rs @@ -35,7 +35,9 @@ use jsonrpsee::{ }; use log::warn; -use beefy_gadget::notification::{BeefyBestBlockStream, BeefyVersionedFinalityProofStream}; +use beefy_gadget::communication::notification::{ + BeefyBestBlockStream, BeefyVersionedFinalityProofStream, +}; mod notification; @@ -165,8 +167,8 @@ mod tests { use super::*; use beefy_gadget::{ + communication::notification::BeefyVersionedFinalityProofSender, justification::BeefyVersionedFinalityProof, - notification::{BeefyBestBlockStream, BeefyVersionedFinalityProofSender}, }; use beefy_primitives::{known_payload_ids, Payload, SignedCommitment}; use codec::{Decode, Encode}; diff --git a/client/beefy/src/gossip.rs b/client/beefy/src/communication/gossip.rs similarity index 94% rename from client/beefy/src/gossip.rs rename to client/beefy/src/communication/gossip.rs index 02d5efe9e0e58..6c41a2e48932a 100644 --- a/client/beefy/src/gossip.rs +++ b/client/beefy/src/communication/gossip.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use std::{collections::BTreeMap, time::Duration}; +use std::{collections::BTreeMap, sync::Arc, time::Duration}; use sc_network::PeerId; use sc_network_gossip::{MessageIntent, ValidationResult, Validator, ValidatorContext}; @@ -28,13 +28,12 @@ use log::{debug, trace}; use parking_lot::{Mutex, RwLock}; use wasm_timer::Instant; +use crate::{communication::peers::KnownPeers, keystore::BeefyKeystore}; use beefy_primitives::{ crypto::{Public, Signature}, VoteMessage, }; -use crate::keystore::BeefyKeystore; - // Timeout for rebroadcasting messages. const REBROADCAST_AFTER: Duration = Duration::from_secs(60 * 5); @@ -103,17 +102,19 @@ where topic: B::Hash, known_votes: RwLock>, next_rebroadcast: Mutex, + known_peers: Arc>>, } impl GossipValidator where B: Block, { - pub fn new() -> GossipValidator { + pub fn new(known_peers: Arc>>) -> GossipValidator { GossipValidator { topic: topic::(), known_votes: RwLock::new(KnownVotes::new()), next_rebroadcast: Mutex::new(Instant::now() + REBROADCAST_AFTER), + known_peers, } } @@ -165,6 +166,7 @@ where if BeefyKeystore::verify(&msg.id, &msg.signature, &msg.commitment.encode()) { self.known_votes.write().add_known(&round, msg_hash); + self.known_peers.lock().note_vote_for(*sender, round); return ValidationResult::ProcessAndKeep(self.topic) } else { // TODO: report peer @@ -271,7 +273,7 @@ mod tests { #[test] fn note_and_drop_round_works() { - let gv = GossipValidator::::new(); + let gv = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); gv.note_round(1u64); @@ -298,7 +300,7 @@ mod tests { #[test] fn note_same_round_twice() { - let gv = GossipValidator::::new(); + let gv = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); gv.note_round(3u64); gv.note_round(7u64); @@ -355,7 +357,7 @@ mod tests { #[test] fn should_avoid_verifying_signatures_twice() { - let gv = GossipValidator::::new(); + let gv = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); let sender = sc_network::PeerId::random(); let mut context = TestContext; @@ -391,7 +393,7 @@ mod tests { #[test] fn messages_allowed_and_expired() { - let gv = GossipValidator::::new(); + let gv = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); let sender = sc_network::PeerId::random(); let topic = Default::default(); let intent = MessageIntent::Broadcast; @@ -434,7 +436,7 @@ mod tests { #[test] fn messages_rebroadcast() { - let gv = GossipValidator::::new(); + let gv = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); let sender = sc_network::PeerId::random(); let topic = Default::default(); diff --git a/client/beefy/src/communication/mod.rs b/client/beefy/src/communication/mod.rs new file mode 100644 index 0000000000000..93646677c0ecd --- /dev/null +++ b/client/beefy/src/communication/mod.rs @@ -0,0 +1,118 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Communication streams for the BEEFY networking protocols. + +pub mod notification; +pub mod request_response; + +pub(crate) mod gossip; +pub(crate) mod peers; + +pub(crate) mod beefy_protocol_name { + use array_bytes::bytes2hex; + use sc_network::ProtocolName; + + /// BEEFY votes gossip protocol name suffix. + const GOSSIP_NAME: &str = "/beefy/1"; + /// BEEFY justifications protocol name suffix. + const JUSTIFICATIONS_NAME: &str = "/beefy/justifications/1"; + + /// Old names for the gossip protocol, used for backward compatibility. + pub(super) const LEGACY_NAMES: [&str; 1] = ["/paritytech/beefy/1"]; + + /// Name of the votes gossip protocol used by BEEFY. + /// + /// Must be registered towards the networking in order for BEEFY voter to properly function. + pub fn gossip_protocol_name>( + genesis_hash: Hash, + fork_id: Option<&str>, + ) -> ProtocolName { + let genesis_hash = genesis_hash.as_ref(); + if let Some(fork_id) = fork_id { + format!("/{}/{}{}", bytes2hex("", genesis_hash), fork_id, GOSSIP_NAME).into() + } else { + format!("/{}{}", bytes2hex("", genesis_hash), GOSSIP_NAME).into() + } + } + + /// Name of the BEEFY justifications request-response protocol. + pub fn justifications_protocol_name>( + genesis_hash: Hash, + fork_id: Option<&str>, + ) -> ProtocolName { + let genesis_hash = genesis_hash.as_ref(); + if let Some(fork_id) = fork_id { + format!("/{}/{}{}", bytes2hex("", genesis_hash), fork_id, JUSTIFICATIONS_NAME).into() + } else { + format!("/{}{}", bytes2hex("", genesis_hash), JUSTIFICATIONS_NAME).into() + } + } +} + +/// Returns the configuration value to put in +/// [`sc_network::config::NetworkConfiguration::extra_sets`]. +/// For standard protocol name see [`beefy_protocol_name::gossip_protocol_name`]. +pub fn beefy_peers_set_config( + gossip_protocol_name: sc_network::ProtocolName, +) -> sc_network_common::config::NonDefaultSetConfig { + let mut cfg = + sc_network_common::config::NonDefaultSetConfig::new(gossip_protocol_name, 1024 * 1024); + + cfg.allow_non_reserved(25, 25); + cfg.add_fallback_names(beefy_protocol_name::LEGACY_NAMES.iter().map(|&n| n.into()).collect()); + cfg +} + +#[cfg(test)] +mod tests { + use super::*; + + use sp_core::H256; + + #[test] + fn beefy_protocols_names() { + use beefy_protocol_name::{gossip_protocol_name, justifications_protocol_name}; + // Create protocol name using random genesis hash. + let genesis_hash = H256::random(); + let genesis_hex = array_bytes::bytes2hex("", genesis_hash.as_ref()); + + let expected_gossip_name = format!("/{}/beefy/1", genesis_hex); + let gossip_proto_name = gossip_protocol_name(&genesis_hash, None); + assert_eq!(gossip_proto_name.to_string(), expected_gossip_name); + + let expected_justif_name = format!("/{}/beefy/justifications/1", genesis_hex); + let justif_proto_name = justifications_protocol_name(&genesis_hash, None); + assert_eq!(justif_proto_name.to_string(), expected_justif_name); + + // Create protocol name using hardcoded genesis hash. Verify exact representation. + let genesis_hash = [ + 50, 4, 60, 123, 58, 106, 216, 246, 194, 188, 139, 193, 33, 212, 202, 171, 9, 55, 123, + 94, 8, 43, 12, 251, 187, 57, 173, 19, 188, 74, 205, 147, + ]; + let genesis_hex = "32043c7b3a6ad8f6c2bc8bc121d4caab09377b5e082b0cfbbb39ad13bc4acd93"; + + let expected_gossip_name = format!("/{}/beefy/1", genesis_hex); + let gossip_proto_name = gossip_protocol_name(&genesis_hash, None); + assert_eq!(gossip_proto_name.to_string(), expected_gossip_name); + + let expected_justif_name = format!("/{}/beefy/justifications/1", genesis_hex); + let justif_proto_name = justifications_protocol_name(&genesis_hash, None); + assert_eq!(justif_proto_name.to_string(), expected_justif_name); + } +} diff --git a/client/beefy/src/notification.rs b/client/beefy/src/communication/notification.rs similarity index 100% rename from client/beefy/src/notification.rs rename to client/beefy/src/communication/notification.rs diff --git a/client/beefy/src/communication/peers.rs b/client/beefy/src/communication/peers.rs new file mode 100644 index 0000000000000..0e20a0f4e0ff6 --- /dev/null +++ b/client/beefy/src/communication/peers.rs @@ -0,0 +1,131 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Logic for keeping track of BEEFY peers. + +// TODO (issue #12296): replace this naive peer tracking with generic one that infers data +// from multiple network protocols. + +use sc_network::PeerId; +use sp_runtime::traits::{Block, NumberFor, Zero}; +use std::collections::{HashMap, VecDeque}; + +struct PeerData { + last_voted_on: NumberFor, +} + +impl Default for PeerData { + fn default() -> Self { + PeerData { last_voted_on: Zero::zero() } + } +} + +/// Keep a simple map of connected peers +/// and the most recent voting round they participated in. +pub struct KnownPeers { + live: HashMap>, +} + +impl KnownPeers { + pub fn new() -> Self { + Self { live: HashMap::new() } + } + + /// Add new connected `peer`. + pub fn add_new(&mut self, peer: PeerId) { + self.live.entry(peer).or_default(); + } + + /// Note vote round number for `peer`. + pub fn note_vote_for(&mut self, peer: PeerId, round: NumberFor) { + let data = self.live.entry(peer).or_default(); + data.last_voted_on = round.max(data.last_voted_on); + } + + /// Remove connected `peer`. + pub fn remove(&mut self, peer: &PeerId) { + self.live.remove(peer); + } + + /// Return _filtered and cloned_ list of peers that have voted on `block` or higher. + pub fn at_least_at_block(&self, block: NumberFor) -> VecDeque { + self.live + .iter() + .filter_map(|(k, v)| (v.last_voted_on >= block).then_some(k)) + .cloned() + .collect() + } + + /// Answer whether `peer` is part of `KnownPeers` set. + pub fn contains(&self, peer: &PeerId) -> bool { + self.live.contains_key(peer) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn should_track_known_peers_progress() { + let (alice, bob, charlie) = (PeerId::random(), PeerId::random(), PeerId::random()); + let mut peers = KnownPeers::::new(); + assert!(peers.live.is_empty()); + + // Alice and Bob new connected peers. + peers.add_new(alice); + peers.add_new(bob); + // 'Tracked' Bob seen voting for 5. + peers.note_vote_for(bob, 5); + // Previously unseen Charlie now seen voting for 10. + peers.note_vote_for(charlie, 10); + + assert_eq!(peers.live.len(), 3); + assert!(peers.contains(&alice)); + assert!(peers.contains(&bob)); + assert!(peers.contains(&charlie)); + + // Get peers at block >= 5 + let at_5 = peers.at_least_at_block(5); + // Should be Bob and Charlie + assert_eq!(at_5.len(), 2); + assert!(at_5.contains(&bob)); + assert!(at_5.contains(&charlie)); + + // 'Tracked' Alice seen voting for 10. + peers.note_vote_for(alice, 10); + + // Get peers at block >= 9 + let at_9 = peers.at_least_at_block(9); + // Should be Charlie and Alice + assert_eq!(at_9.len(), 2); + assert!(at_9.contains(&charlie)); + assert!(at_9.contains(&alice)); + + // Remove Alice + peers.remove(&alice); + assert_eq!(peers.live.len(), 2); + assert!(!peers.contains(&alice)); + + // Get peers at block >= 9 + let at_9 = peers.at_least_at_block(9); + // Now should be just Charlie + assert_eq!(at_9.len(), 1); + assert!(at_9.contains(&charlie)); + } +} diff --git a/client/beefy/src/communication/request_response/incoming_requests_handler.rs b/client/beefy/src/communication/request_response/incoming_requests_handler.rs new file mode 100644 index 0000000000000..c0910a60fba3b --- /dev/null +++ b/client/beefy/src/communication/request_response/incoming_requests_handler.rs @@ -0,0 +1,193 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Helper for handling (i.e. answering) BEEFY justifications requests from a remote peer. + +use beefy_primitives::BEEFY_ENGINE_ID; +use codec::Decode; +use futures::{ + channel::{mpsc, oneshot}, + StreamExt, +}; +use log::{debug, trace}; +use sc_client_api::BlockBackend; +use sc_network::{config as netconfig, config::RequestResponseConfig, PeerId, ReputationChange}; +use sc_network_common::protocol::ProtocolName; +use sp_runtime::{generic::BlockId, traits::Block}; +use std::{marker::PhantomData, sync::Arc}; + +use crate::communication::request_response::{ + on_demand_justifications_protocol_config, Error, JustificationRequest, +}; + +/// A request coming in, including a sender for sending responses. +#[derive(Debug)] +pub(crate) struct IncomingRequest { + /// `PeerId` of sending peer. + pub peer: PeerId, + /// The sent request. + pub payload: JustificationRequest, + /// Sender for sending response back. + pub pending_response: oneshot::Sender, +} + +impl IncomingRequest { + /// Create new `IncomingRequest`. + pub fn new( + peer: PeerId, + payload: JustificationRequest, + pending_response: oneshot::Sender, + ) -> Self { + Self { peer, payload, pending_response } + } + + /// Try building from raw network request. + /// + /// This function will fail if the request cannot be decoded and will apply passed in + /// reputation changes in that case. + /// + /// Params: + /// - The raw request to decode + /// - Reputation changes to apply for the peer in case decoding fails. + pub fn try_from_raw( + raw: netconfig::IncomingRequest, + reputation_changes: Vec, + ) -> Result { + let netconfig::IncomingRequest { payload, peer, pending_response } = raw; + let payload = match JustificationRequest::decode(&mut payload.as_ref()) { + Ok(payload) => payload, + Err(err) => { + let response = netconfig::OutgoingResponse { + result: Err(()), + reputation_changes, + sent_feedback: None, + }; + if let Err(_) = pending_response.send(response) { + return Err(Error::DecodingErrorNoReputationChange(peer, err)) + } + return Err(Error::DecodingError(peer, err)) + }, + }; + Ok(Self::new(peer, payload, pending_response)) + } +} + +/// Receiver for incoming BEEFY justifications requests. +/// +/// Takes care of decoding and handling of invalid encoded requests. +pub(crate) struct IncomingRequestReceiver { + raw: mpsc::Receiver, +} + +impl IncomingRequestReceiver { + pub fn new(inner: mpsc::Receiver) -> Self { + Self { raw: inner } + } + + /// Try to receive the next incoming request. + /// + /// Any received request will be decoded, on decoding errors the provided reputation changes + /// will be applied and an error will be reported. + pub async fn recv(&mut self, reputation_changes: F) -> Result, Error> + where + B: Block, + F: FnOnce() -> Vec, + { + let req = match self.raw.next().await { + None => return Err(Error::RequestChannelExhausted), + Some(raw) => IncomingRequest::::try_from_raw(raw, reputation_changes())?, + }; + Ok(req) + } +} + +/// Handler for incoming BEEFY justifications requests from a remote peer. +pub struct BeefyJustifsRequestHandler { + pub(crate) request_receiver: IncomingRequestReceiver, + pub(crate) justif_protocol_name: ProtocolName, + pub(crate) client: Arc, + pub(crate) _block: PhantomData, +} + +impl BeefyJustifsRequestHandler +where + B: Block, + Client: BlockBackend + Send + Sync, +{ + /// Create a new [`BeefyJustifsRequestHandler`]. + pub fn new>( + genesis_hash: Hash, + fork_id: Option<&str>, + client: Arc, + ) -> (Self, RequestResponseConfig) { + let (request_receiver, config) = + on_demand_justifications_protocol_config(genesis_hash, fork_id); + let justif_protocol_name = config.name.clone(); + + (Self { request_receiver, justif_protocol_name, client, _block: PhantomData }, config) + } + + /// Network request-response protocol name used by this handler. + pub fn protocol_name(&self) -> ProtocolName { + self.justif_protocol_name.clone() + } + + // Sends back justification response if justification found in client backend. + fn handle_request(&self, request: IncomingRequest) -> Result<(), Error> { + // TODO (issue #12293): validate `request` and change peer reputation for invalid requests. + + let maybe_encoded_proof = self + .client + .justifications(&BlockId::Number(request.payload.begin)) + .map_err(Error::Client)? + .and_then(|justifs| justifs.get(BEEFY_ENGINE_ID).cloned()) + // No BEEFY justification present. + .ok_or(()); + + request + .pending_response + .send(netconfig::OutgoingResponse { + result: maybe_encoded_proof, + reputation_changes: Vec::new(), + sent_feedback: None, + }) + .map_err(|_| Error::SendResponse) + } + + /// Run [`BeefyJustifsRequestHandler`]. + pub async fn run(mut self) { + trace!(target: "beefy::sync", "🥩 Running BeefyJustifsRequestHandler"); + + while let Ok(request) = self.request_receiver.recv(|| vec![]).await { + let peer = request.peer; + match self.handle_request(request) { + Ok(()) => { + debug!( + target: "beefy::sync", + "🥩 Handled BEEFY justification request from {:?}.", peer + ) + }, + Err(e) => { + // TODO (issue #12293): apply reputation changes here based on error type. + debug!( + target: "beefy::sync", + "🥩 Failed to handle BEEFY justification request from {:?}: {}", peer, e, + ) + }, + } + } + } +} diff --git a/client/beefy/src/communication/request_response/mod.rs b/client/beefy/src/communication/request_response/mod.rs new file mode 100644 index 0000000000000..c83bb9d57e91b --- /dev/null +++ b/client/beefy/src/communication/request_response/mod.rs @@ -0,0 +1,101 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Request/response protocol for syncing BEEFY justifications. + +mod incoming_requests_handler; +pub(crate) mod outgoing_requests_engine; + +pub use incoming_requests_handler::BeefyJustifsRequestHandler; + +use futures::channel::mpsc; +use std::time::Duration; + +use codec::{Decode, Encode, Error as CodecError}; +use sc_network::{config::RequestResponseConfig, PeerId}; +use sp_runtime::traits::{Block, NumberFor}; + +use crate::communication::beefy_protocol_name::justifications_protocol_name; +use incoming_requests_handler::IncomingRequestReceiver; + +// 10 seems reasonable, considering justifs are explicitly requested only +// for mandatory blocks, by nodes that are syncing/catching-up. +const JUSTIF_CHANNEL_SIZE: usize = 10; + +const MAX_RESPONSE_SIZE: u64 = 1024 * 1024; +const JUSTIF_REQUEST_TIMEOUT: Duration = Duration::from_secs(3); + +/// Get the configuration for the BEEFY justifications Request/response protocol. +/// +/// Returns a receiver for messages received on this protocol and the requested +/// `ProtocolConfig`. +/// +/// Consider using [`BeefyJustifsRequestHandler`] instead of this low-level function. +pub(crate) fn on_demand_justifications_protocol_config>( + genesis_hash: Hash, + fork_id: Option<&str>, +) -> (IncomingRequestReceiver, RequestResponseConfig) { + let name = justifications_protocol_name(genesis_hash, fork_id); + let fallback_names = vec![]; + let (tx, rx) = mpsc::channel(JUSTIF_CHANNEL_SIZE); + let rx = IncomingRequestReceiver::new(rx); + let cfg = RequestResponseConfig { + name, + fallback_names, + max_request_size: 32, + max_response_size: MAX_RESPONSE_SIZE, + // We are connected to all validators: + request_timeout: JUSTIF_REQUEST_TIMEOUT, + inbound_queue: Some(tx), + }; + (rx, cfg) +} + +/// BEEFY justification request. +#[derive(Debug, Clone, Encode, Decode)] +pub struct JustificationRequest { + /// Start collecting proofs from this block. + pub begin: NumberFor, +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + Client(#[from] sp_blockchain::Error), + + #[error(transparent)] + RuntimeApi(#[from] sp_api::ApiError), + + /// Decoding failed, we were able to change the peer's reputation accordingly. + #[error("Decoding request failed for peer {0}.")] + DecodingError(PeerId, #[source] CodecError), + + /// Decoding failed, but sending reputation change failed. + #[error("Decoding request failed for peer {0}, and changing reputation failed.")] + DecodingErrorNoReputationChange(PeerId, #[source] CodecError), + + /// Incoming request stream exhausted. Should only happen on shutdown. + #[error("Incoming request channel got closed.")] + RequestChannelExhausted, + + #[error("Failed to send response.")] + SendResponse, + + #[error("Received invalid response.")] + InvalidResponse, +} diff --git a/client/beefy/src/communication/request_response/outgoing_requests_engine.rs b/client/beefy/src/communication/request_response/outgoing_requests_engine.rs new file mode 100644 index 0000000000000..e22958e19cd2e --- /dev/null +++ b/client/beefy/src/communication/request_response/outgoing_requests_engine.rs @@ -0,0 +1,245 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Generating request logic for request/response protocol for syncing BEEFY justifications. + +use beefy_primitives::{crypto::AuthorityId, BeefyApi, ValidatorSet}; +use codec::Encode; +use futures::{ + channel::{oneshot, oneshot::Canceled}, + stream::{self, StreamExt}, +}; +use log::{debug, error, warn}; +use parking_lot::Mutex; +use sc_network::{PeerId, ProtocolName}; +use sc_network_common::{ + request_responses::{IfDisconnected, RequestFailure}, + service::NetworkRequest, +}; +use sp_api::ProvideRuntimeApi; +use sp_runtime::{ + generic::BlockId, + traits::{Block, NumberFor}, +}; +use std::{collections::VecDeque, result::Result, sync::Arc}; + +use crate::{ + communication::request_response::{Error, JustificationRequest}, + justification::{decode_and_verify_finality_proof, BeefyVersionedFinalityProof}, + KnownPeers, +}; + +/// Response type received from network. +type Response = Result, RequestFailure>; +/// Used to receive a response from the network. +type ResponseReceiver = oneshot::Receiver; + +enum State { + Idle(stream::Pending>), + AwaitingResponse(PeerId, NumberFor, stream::Once), +} + +pub struct OnDemandJustificationsEngine { + network: Arc, + runtime: Arc, + protocol_name: ProtocolName, + + live_peers: Arc>>, + peers_cache: VecDeque, + + state: State, +} + +impl OnDemandJustificationsEngine +where + B: Block, + R: ProvideRuntimeApi, + R::Api: BeefyApi, +{ + pub fn new( + network: Arc, + runtime: Arc, + protocol_name: ProtocolName, + live_peers: Arc>>, + ) -> Self { + Self { + network, + runtime, + protocol_name, + live_peers, + peers_cache: VecDeque::new(), + state: State::Idle(stream::pending()), + } + } + + fn reset_peers_cache_for_block(&mut self, block: NumberFor) { + // TODO (issue #12296): replace peer selection with generic one that involves all protocols. + self.peers_cache = self.live_peers.lock().at_least_at_block(block); + } + + fn try_next_peer(&mut self) -> Option { + // TODO (issue #12296): replace peer selection with generic one that involves all protocols. + let live = self.live_peers.lock(); + while let Some(peer) = self.peers_cache.pop_front() { + if live.contains(&peer) { + return Some(peer) + } + } + None + } + + fn request_from_peer(&mut self, peer: PeerId, block: NumberFor) { + debug!(target: "beefy::sync", "🥩 requesting justif #{:?} from peer {:?}", block, peer); + + let payload = JustificationRequest:: { begin: block }.encode(); + + let (tx, rx) = oneshot::channel(); + + self.network.start_request( + peer, + self.protocol_name.clone(), + payload, + tx, + IfDisconnected::ImmediateError, + ); + + self.state = State::AwaitingResponse(peer, block, stream::once(rx)); + } + + /// If no other request is in progress, start new justification request for `block`. + pub fn request(&mut self, block: NumberFor) { + // ignore new requests while there's already one pending + match &self.state { + State::AwaitingResponse(_, _, _) => return, + State::Idle(_) => (), + } + self.reset_peers_cache_for_block(block); + + // Start the requests engine - each unsuccessful received response will automatically + // trigger a new request to the next peer in the `peers_cache` until there are none left. + if let Some(peer) = self.try_next_peer() { + self.request_from_peer(peer, block); + } else { + debug!(target: "beefy::sync", "🥩 no good peers to request justif #{:?} from", block); + } + } + + /// Cancel any pending request for block numbers smaller or equal to `block`. + pub fn cancel_requests_older_than(&mut self, block: NumberFor) { + match &self.state { + State::AwaitingResponse(_, number, _) if *number <= block => { + debug!( + target: "beefy::sync", + "🥩 cancel pending request for justification #{:?}", + number + ); + self.state = State::Idle(stream::pending()); + }, + _ => (), + } + } + + fn process_response( + &mut self, + peer: PeerId, + block: NumberFor, + validator_set: &ValidatorSet, + response: Result, + ) -> Result, Error> { + response + .map_err(|e| { + debug!( + target: "beefy::sync", + "🥩 for on demand justification #{:?}, peer {:?} hung up: {:?}", + block, peer, e + ); + Error::InvalidResponse + })? + .map_err(|e| { + debug!( + target: "beefy::sync", + "🥩 for on demand justification #{:?}, peer {:?} error: {:?}", + block, peer, e + ); + Error::InvalidResponse + }) + .and_then(|encoded| { + decode_and_verify_finality_proof::(&encoded[..], block, &validator_set).map_err( + |e| { + debug!( + target: "beefy::sync", + "🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}", + block, peer, e + ); + Error::InvalidResponse + }, + ) + }) + } + + pub async fn next(&mut self) -> Option> { + let (peer, block, resp) = match &mut self.state { + State::Idle(pending) => { + let _ = pending.next().await; + // This never happens since 'stream::pending' never generates any items. + return None + }, + State::AwaitingResponse(peer, block, receiver) => { + let resp = receiver.next().await?; + (*peer, *block, resp) + }, + }; + // We received the awaited response. Our 'stream::once()' receiver will never generate any + // other response, meaning we're done with current state. Move the engine to `State::Idle`. + self.state = State::Idle(stream::pending()); + + let block_id = BlockId::number(block); + let validator_set = self + .runtime + .runtime_api() + .validator_set(&block_id) + .map_err(|e| { + error!(target: "beefy::sync", "🥩 Runtime API error {:?} in on-demand justif engine.", e); + e + }) + .ok()? + .or_else(|| { + error!(target: "beefy::sync", "🥩 BEEFY pallet not available for block {:?}.", block); + None + })?; + + self.process_response(peer, block, &validator_set, resp) + .map_err(|_| { + // No valid justification received, try next peer in our set. + if let Some(peer) = self.try_next_peer() { + self.request_from_peer(peer, block); + } else { + warn!(target: "beefy::sync", "🥩 ran out of peers to request justif #{:?} from", block); + } + }) + .map(|proof| { + debug!( + target: "beefy::sync", + "🥩 received valid on-demand justif #{:?} from {:?}", + block, peer + ); + proof + }) + .ok() + } +} diff --git a/client/beefy/src/import.rs b/client/beefy/src/import.rs index db4d8bfba7450..89a4517334189 100644 --- a/client/beefy/src/import.rs +++ b/client/beefy/src/import.rs @@ -33,8 +33,8 @@ use sc_client_api::backend::Backend; use sc_consensus::{BlockCheckParams, BlockImport, BlockImportParams, ImportResult}; use crate::{ + communication::notification::BeefyVersionedFinalityProofSender, justification::{decode_and_verify_finality_proof, BeefyVersionedFinalityProof}, - notification::BeefyVersionedFinalityProofSender, }; /// A block-import handler for BEEFY. diff --git a/client/beefy/src/lib.rs b/client/beefy/src/lib.rs index ad527b2929585..7407f101e99a5 100644 --- a/client/beefy/src/lib.rs +++ b/client/beefy/src/lib.rs @@ -17,10 +17,12 @@ // along with this program. If not, see . use beefy_primitives::{BeefyApi, MmrRootHash}; +use parking_lot::Mutex; use prometheus::Registry; -use sc_client_api::{Backend, BlockchainEvents, Finalizer}; +use sc_client_api::{Backend, BlockBackend, BlockchainEvents, Finalizer}; use sc_consensus::BlockImport; use sc_network::ProtocolName; +use sc_network_common::service::NetworkRequest; use sc_network_gossip::Network as GossipNetwork; use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; @@ -28,68 +30,38 @@ use sp_consensus::{Error as ConsensusError, SyncOracle}; use sp_keystore::SyncCryptoStorePtr; use sp_mmr_primitives::MmrApi; use sp_runtime::traits::Block; -use std::sync::Arc; +use std::{marker::PhantomData, sync::Arc}; mod error; -mod gossip; mod keystore; mod metrics; mod round; mod worker; +pub mod communication; pub mod import; pub mod justification; -pub mod notification; #[cfg(test)] mod tests; use crate::{ - import::BeefyBlockImport, - notification::{ - BeefyBestBlockSender, BeefyBestBlockStream, BeefyVersionedFinalityProofSender, - BeefyVersionedFinalityProofStream, + communication::{ + notification::{ + BeefyBestBlockSender, BeefyBestBlockStream, BeefyVersionedFinalityProofSender, + BeefyVersionedFinalityProofStream, + }, + peers::KnownPeers, + request_response::{ + outgoing_requests_engine::OnDemandJustificationsEngine, BeefyJustifsRequestHandler, + }, }, + import::BeefyBlockImport, }; -pub use beefy_protocol_name::standard_name as protocol_standard_name; - -pub(crate) mod beefy_protocol_name { - use sc_chain_spec::ChainSpec; - use sc_network::ProtocolName; - - const NAME: &str = "/beefy/1"; - /// Old names for the notifications protocol, used for backward compatibility. - pub(crate) const LEGACY_NAMES: [&str; 1] = ["/paritytech/beefy/1"]; - - /// Name of the notifications protocol used by BEEFY. - /// - /// Must be registered towards the networking in order for BEEFY to properly function. - pub fn standard_name>( - genesis_hash: &Hash, - chain_spec: &Box, - ) -> ProtocolName { - let genesis_hash = genesis_hash.as_ref(); - let chain_prefix = match chain_spec.fork_id() { - Some(fork_id) => format!("/{}/{}", array_bytes::bytes2hex("", genesis_hash), fork_id), - None => format!("/{}", array_bytes::bytes2hex("", genesis_hash)), - }; - format!("{}{}", chain_prefix, NAME).into() - } -} - -/// Returns the configuration value to put in -/// [`sc_network::config::NetworkConfiguration::extra_sets`]. -/// For standard protocol name see [`beefy_protocol_name::standard_name`]. -pub fn beefy_peers_set_config( - protocol_name: ProtocolName, -) -> sc_network_common::config::NonDefaultSetConfig { - let mut cfg = sc_network_common::config::NonDefaultSetConfig::new(protocol_name, 1024 * 1024); - - cfg.allow_non_reserved(25, 25); - cfg.add_fallback_names(beefy_protocol_name::LEGACY_NAMES.iter().map(|&n| n.into()).collect()); - cfg -} +pub use communication::beefy_protocol_name::{ + gossip_protocol_name, justifications_protocol_name as justifs_protocol_name, +}; /// A convenience BEEFY client trait that defines all the type bounds a BEEFY client /// has to satisfy. Ideally that should actually be a trait alias. Unfortunately as @@ -159,13 +131,13 @@ where { // Voter -> RPC links let (to_rpc_justif_sender, from_voter_justif_stream) = - notification::BeefyVersionedFinalityProofStream::::channel(); + BeefyVersionedFinalityProofStream::::channel(); let (to_rpc_best_block_sender, from_voter_best_beefy_stream) = - notification::BeefyBestBlockStream::::channel(); + BeefyBestBlockStream::::channel(); // BlockImport -> Voter links let (to_voter_justif_sender, from_block_import_justif_stream) = - notification::BeefyVersionedFinalityProofStream::::channel(); + BeefyVersionedFinalityProofStream::::channel(); // BlockImport let import = @@ -180,6 +152,24 @@ where (import, voter_links, rpc_links) } +/// BEEFY gadget network parameters. +pub struct BeefyNetworkParams +where + B: Block, + N: GossipNetwork + NetworkRequest + SyncOracle + Send + Sync + 'static, +{ + /// Network implementing gossip, requests and sync-oracle. + pub network: Arc, + /// Chain specific BEEFY gossip protocol name. See + /// [`communication::beefy_protocol_name::gossip_protocol_name`]. + pub gossip_protocol_name: ProtocolName, + /// Chain specific BEEFY on-demand justifications protocol name. See + /// [`communication::beefy_protocol_name::justifications_protocol_name`]. + pub justifications_protocol_name: ProtocolName, + + pub _phantom: PhantomData, +} + /// BEEFY gadget initialization parameters. pub struct BeefyParams where @@ -188,7 +178,7 @@ where C: Client, R: ProvideRuntimeApi, R::Api: BeefyApi + MmrApi, - N: GossipNetwork + Clone + SyncOracle + Send + Sync + 'static, + N: GossipNetwork + NetworkRequest + SyncOracle + Send + Sync + 'static, { /// BEEFY client pub client: Arc, @@ -198,16 +188,16 @@ where pub runtime: Arc, /// Local key store pub key_store: Option, - /// Gossip network - pub network: N, + /// BEEFY voter network params + pub network_params: BeefyNetworkParams, /// Minimal delta between blocks, BEEFY should vote for pub min_block_delta: u32, /// Prometheus metric registry pub prometheus_registry: Option, - /// Chain specific GRANDPA protocol name. See [`beefy_protocol_name::standard_name`]. - pub protocol_name: ProtocolName, /// Links between the block importer, the background voter and the RPC layer. pub links: BeefyVoterLinks, + /// Handler for incoming BEEFY justifications requests from a remote peer. + pub on_demand_justifications_handler: BeefyJustifsRequestHandler, } /// Start the BEEFY gadget. @@ -217,32 +207,43 @@ pub async fn start_beefy_gadget(beefy_params: BeefyParams, - C: Client, + C: Client + BlockBackend, R: ProvideRuntimeApi, R::Api: BeefyApi + MmrApi, - N: GossipNetwork + Clone + SyncOracle + Send + Sync + 'static, + N: GossipNetwork + NetworkRequest + SyncOracle + Send + Sync + 'static, { let BeefyParams { client, backend, runtime, key_store, - network, + network_params, min_block_delta, prometheus_registry, - protocol_name, links, + on_demand_justifications_handler, } = beefy_params; - let sync_oracle = network.clone(); - let gossip_validator = Arc::new(gossip::GossipValidator::new()); + let BeefyNetworkParams { network, gossip_protocol_name, justifications_protocol_name, .. } = + network_params; + + let known_peers = Arc::new(Mutex::new(KnownPeers::new())); + let gossip_validator = + Arc::new(communication::gossip::GossipValidator::new(known_peers.clone())); let gossip_engine = sc_network_gossip::GossipEngine::new( - network, - protocol_name, + network.clone(), + gossip_protocol_name, gossip_validator.clone(), None, ); + let on_demand_justifications = OnDemandJustificationsEngine::new( + network.clone(), + runtime.clone(), + justifications_protocol_name, + known_peers.clone(), + ); + let metrics = prometheus_registry.as_ref().map(metrics::Metrics::register).and_then( |result| match result { @@ -261,10 +262,12 @@ where client, backend, runtime, - sync_oracle, + network, key_store: key_store.into(), + known_peers, gossip_engine, gossip_validator, + on_demand_justifications, links, metrics, min_block_delta, @@ -272,5 +275,5 @@ where let worker = worker::BeefyWorker::<_, _, _, _, _>::new(worker_params); - worker.run().await + futures::future::join(worker.run(), on_demand_justifications_handler.run()).await; } diff --git a/client/beefy/src/round.rs b/client/beefy/src/round.rs index c96613eb38a95..45d346ccd85eb 100644 --- a/client/beefy/src/round.rs +++ b/client/beefy/src/round.rs @@ -33,7 +33,7 @@ use sp_runtime::traits::{Block, NumberFor}; /// whether the local `self` validator has voted/signed. /// /// Does not do any validation on votes or signatures, layers above need to handle that (gossip). -#[derive(Default)] +#[derive(Debug, Default)] struct RoundTracker { self_vote: bool, votes: HashMap, @@ -69,6 +69,7 @@ pub fn threshold(authorities: usize) -> usize { /// Only round numbers > `best_done` are of interest, all others are considered stale. /// /// Does not do any validation on votes or signatures, layers above need to handle that (gossip). +#[derive(Debug)] pub(crate) struct Rounds { rounds: BTreeMap<(Payload, NumberFor), RoundTracker>, session_start: NumberFor, @@ -135,7 +136,7 @@ where } } - pub(crate) fn try_conclude( + pub(crate) fn should_conclude( &mut self, round: &(P, NumberFor), ) -> Option>> { @@ -148,7 +149,6 @@ where if done { let signatures = self.rounds.remove(round)?.votes; - self.conclude(round.1); Some( self.validators() .iter() @@ -279,7 +279,7 @@ mod tests { true )); // round not concluded - assert!(rounds.try_conclude(&round).is_none()); + assert!(rounds.should_conclude(&round).is_none()); // self vote already present, should not self vote assert!(!rounds.should_self_vote(&round)); @@ -296,7 +296,7 @@ mod tests { (Keyring::Dave.public(), Keyring::Dave.sign(b"I am committed")), false )); - assert!(rounds.try_conclude(&round).is_none()); + assert!(rounds.should_conclude(&round).is_none()); // add 2nd good vote assert!(rounds.add_vote( @@ -305,7 +305,7 @@ mod tests { false )); // round not concluded - assert!(rounds.try_conclude(&round).is_none()); + assert!(rounds.should_conclude(&round).is_none()); // add 3rd good vote assert!(rounds.add_vote( @@ -314,7 +314,8 @@ mod tests { false )); // round concluded - assert!(rounds.try_conclude(&round).is_some()); + assert!(rounds.should_conclude(&round).is_some()); + rounds.conclude(round.1); // Eve is a validator, but round was concluded, adding vote disallowed assert!(!rounds.add_vote( @@ -432,11 +433,12 @@ mod tests { assert_eq!(3, rounds.rounds.len()); // conclude unknown round - assert!(rounds.try_conclude(&(H256::from_low_u64_le(5), 5)).is_none()); + assert!(rounds.should_conclude(&(H256::from_low_u64_le(5), 5)).is_none()); assert_eq!(3, rounds.rounds.len()); // conclude round 2 - let signatures = rounds.try_conclude(&(H256::from_low_u64_le(2), 2)).unwrap(); + let signatures = rounds.should_conclude(&(H256::from_low_u64_le(2), 2)).unwrap(); + rounds.conclude(2); assert_eq!(1, rounds.rounds.len()); assert_eq!( diff --git a/client/beefy/src/tests.rs b/client/beefy/src/tests.rs index 3e49f4e05cc91..8057bd7cab7a5 100644 --- a/client/beefy/src/tests.rs +++ b/client/beefy/src/tests.rs @@ -21,10 +21,9 @@ use futures::{future, stream::FuturesUnordered, Future, StreamExt}; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, sync::Arc, task::Poll}; +use std::{collections::HashMap, marker::PhantomData, sync::Arc, task::Poll}; use tokio::{runtime::Runtime, time::Duration}; -use sc_chain_spec::{ChainSpec, GenericChainSpec}; use sc_client_api::HeaderBackend; use sc_consensus::{ BlockImport, BlockImportParams, BoxJustificationImport, ForkChoiceStrategy, ImportResult, @@ -33,7 +32,7 @@ use sc_consensus::{ use sc_keystore::LocalKeystore; use sc_network_test::{ Block, BlockImportAdapter, FullPeerConfig, PassThroughVerifier, Peer, PeersClient, - TestNetFactory, + PeersFullClient, TestNetFactory, }; use sc_utils::notification::NotificationReceiver; @@ -42,6 +41,7 @@ use beefy_primitives::{ BeefyApi, ConsensusLog, MmrRootHash, ValidatorSet, VersionedFinalityProof, BEEFY_ENGINE_ID, KEY_TYPE as BeefyKeyType, }; +use sc_network::{config::RequestResponseConfig, ProtocolName}; use sp_mmr_primitives::{ BatchProof, EncodableOpaqueLeaf, Error as MmrError, LeafIndex, MmrApi, Proof, }; @@ -60,11 +60,21 @@ use sp_runtime::{ use substrate_test_runtime_client::{runtime::Header, ClientExt}; use crate::{ - beefy_block_import_and_links, beefy_protocol_name, justification::*, - keystore::tests::Keyring as BeefyKeyring, BeefyRPCLinks, BeefyVoterLinks, + beefy_block_import_and_links, + communication::request_response::{ + on_demand_justifications_protocol_config, BeefyJustifsRequestHandler, + }, + gossip_protocol_name, + justification::*, + keystore::tests::Keyring as BeefyKeyring, + BeefyRPCLinks, BeefyVoterLinks, }; -pub(crate) const BEEFY_PROTOCOL_NAME: &'static str = "/beefy/1"; +const GENESIS_HASH: H256 = H256::zero(); +fn beefy_gossip_proto_name() -> ProtocolName { + gossip_protocol_name(GENESIS_HASH, None) +} + const GOOD_MMR_ROOT: MmrRootHash = MmrRootHash::repeat_byte(0xbf); const BAD_MMR_ROOT: MmrRootHash = MmrRootHash::repeat_byte(0x42); @@ -89,35 +99,12 @@ impl BuildStorage for Genesis { } } -#[test] -fn beefy_protocol_name() { - let chain_spec = GenericChainSpec::::from_json_bytes( - &include_bytes!("../../chain-spec/res/chain_spec.json")[..], - ) - .unwrap() - .cloned_box(); - - // Create protocol name using random genesis hash. - let genesis_hash = H256::random(); - let expected = format!("/{}/beefy/1", array_bytes::bytes2hex("", genesis_hash.as_ref())); - let proto_name = beefy_protocol_name::standard_name(&genesis_hash, &chain_spec); - assert_eq!(proto_name.to_string(), expected); - - // Create protocol name using hardcoded genesis hash. Verify exact representation. - let genesis_hash = [ - 50, 4, 60, 123, 58, 106, 216, 246, 194, 188, 139, 193, 33, 212, 202, 171, 9, 55, 123, 94, - 8, 43, 12, 251, 187, 57, 173, 19, 188, 74, 205, 147, - ]; - let expected = - "/32043c7b3a6ad8f6c2bc8bc121d4caab09377b5e082b0cfbbb39ad13bc4acd93/beefy/1".to_string(); - let proto_name = beefy_protocol_name::standard_name(&genesis_hash, &chain_spec); - assert_eq!(proto_name.to_string(), expected); -} - #[derive(Default)] pub(crate) struct PeerData { pub(crate) beefy_rpc_links: Mutex>>, pub(crate) beefy_voter_links: Mutex>>, + pub(crate) beefy_justif_req_handler: + Mutex>>, } #[derive(Default)] @@ -126,23 +113,34 @@ pub(crate) struct BeefyTestNet { } impl BeefyTestNet { - pub(crate) fn new(n_authority: usize, n_full: usize) -> Self { - let mut net = BeefyTestNet { peers: Vec::with_capacity(n_authority + n_full) }; - for _ in 0..n_authority { - net.add_authority_peer(); - } - for _ in 0..n_full { - net.add_full_peer(); + pub(crate) fn new(n_authority: usize) -> Self { + let mut net = BeefyTestNet { peers: Vec::with_capacity(n_authority) }; + + for i in 0..n_authority { + let (rx, cfg) = on_demand_justifications_protocol_config(GENESIS_HASH, None); + let justif_protocol_name = cfg.name.clone(); + + net.add_authority_peer(vec![cfg]); + + let client = net.peers[i].client().as_client(); + let justif_handler = BeefyJustifsRequestHandler { + request_receiver: rx, + justif_protocol_name, + client, + _block: PhantomData, + }; + *net.peers[i].data.beefy_justif_req_handler.lock() = Some(justif_handler); } net } - pub(crate) fn add_authority_peer(&mut self) { + pub(crate) fn add_authority_peer(&mut self, req_resp_cfgs: Vec) { self.add_full_peer_with_config(FullPeerConfig { - notifications_protocols: vec![BEEFY_PROTOCOL_NAME.into()], + notifications_protocols: vec![beefy_gossip_proto_name()], + request_response_protocols: req_resp_cfgs, is_authority: true, ..Default::default() - }) + }); } pub(crate) fn generate_blocks_and_sync( @@ -198,6 +196,7 @@ impl TestNetFactory for BeefyTestNet { let peer_data = PeerData { beefy_rpc_links: Mutex::new(Some(rpc_links)), beefy_voter_links: Mutex::new(Some(voter_links)), + ..Default::default() }; (BlockImportAdapter::new(block_import), None, peer_data) } @@ -215,11 +214,8 @@ impl TestNetFactory for BeefyTestNet { } fn add_full_peer(&mut self) { - self.add_full_peer_with_config(FullPeerConfig { - notifications_protocols: vec![BEEFY_PROTOCOL_NAME.into()], - is_authority: false, - ..Default::default() - }) + // `add_authority_peer()` used instead. + unimplemented!() } } @@ -354,7 +350,7 @@ where API: ProvideRuntimeApi + Default + Sync + Send, API::Api: BeefyApi + MmrApi, { - let voters = FuturesUnordered::new(); + let tasks = FuturesUnordered::new(); for (peer_id, key, api) in peers.into_iter() { let peer = &net.peers[peer_id]; @@ -362,31 +358,40 @@ where let keystore = create_beefy_keystore(*key); let (_, _, peer_data) = net.make_block_import(peer.client().clone()); - let PeerData { beefy_rpc_links, beefy_voter_links } = peer_data; + let PeerData { beefy_rpc_links, beefy_voter_links, .. } = peer_data; let beefy_voter_links = beefy_voter_links.lock().take(); *peer.data.beefy_rpc_links.lock() = beefy_rpc_links.lock().take(); *peer.data.beefy_voter_links.lock() = beefy_voter_links.clone(); + let on_demand_justif_handler = peer.data.beefy_justif_req_handler.lock().take().unwrap(); + + let network_params = crate::BeefyNetworkParams { + network: peer.network_service().clone(), + gossip_protocol_name: beefy_gossip_proto_name(), + justifications_protocol_name: on_demand_justif_handler.protocol_name(), + _phantom: PhantomData, + }; + let beefy_params = crate::BeefyParams { client: peer.client().as_client(), backend: peer.client().as_backend(), runtime: api.clone(), key_store: Some(keystore), - network: peer.network_service().clone(), + network_params, links: beefy_voter_links.unwrap(), min_block_delta, prometheus_registry: None, - protocol_name: BEEFY_PROTOCOL_NAME.into(), + on_demand_justifications_handler: on_demand_justif_handler, }; - let gadget = crate::start_beefy_gadget::<_, _, _, _, _>(beefy_params); + let task = crate::start_beefy_gadget::<_, _, _, _, _>(beefy_params); fn assert_send(_: &T) {} - assert_send(&gadget); - voters.push(gadget); + assert_send(&task); + tasks.push(task); } - voters.for_each(|_| async move {}) + tasks.for_each(|_| async move {}) } fn block_until(future: impl Future + Unpin, net: &Arc>, runtime: &mut Runtime) { @@ -404,18 +409,19 @@ fn run_for(duration: Duration, net: &Arc>, runtime: &mut Run pub(crate) fn get_beefy_streams( net: &mut BeefyTestNet, - peers: &[BeefyKeyring], + // peer index and key + peers: impl Iterator, ) -> (Vec>, Vec>>) { let mut best_block_streams = Vec::new(); let mut versioned_finality_proof_streams = Vec::new(); - for peer_id in 0..peers.len() { - let beefy_rpc_links = net.peer(peer_id).data.beefy_rpc_links.lock().clone().unwrap(); + peers.for_each(|(index, _)| { + let beefy_rpc_links = net.peer(index).data.beefy_rpc_links.lock().clone().unwrap(); let BeefyRPCLinks { from_voter_justif_stream, from_voter_best_beefy_stream } = beefy_rpc_links; best_block_streams.push(from_voter_best_beefy_stream.subscribe()); versioned_finality_proof_streams.push(from_voter_justif_stream.subscribe()); - } + }); (best_block_streams, versioned_finality_proof_streams) } @@ -493,18 +499,24 @@ fn streams_empty_after_timeout( fn finalize_block_and_wait_for_beefy( net: &Arc>, - peers: &[BeefyKeyring], + // peer index and key + peers: impl Iterator + Clone, runtime: &mut Runtime, finalize_targets: &[u64], expected_beefy: &[u64], ) { - let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers); + let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers.clone()); for block in finalize_targets { let finalize = BlockId::number(*block); - for i in 0..peers.len() { - net.lock().peer(i).client().as_client().finalize_block(finalize, None).unwrap(); - } + peers.clone().for_each(|(index, _)| { + net.lock() + .peer(index) + .client() + .as_client() + .finalize_block(finalize, None) + .unwrap(); + }) } if expected_beefy.is_empty() { @@ -524,12 +536,12 @@ fn beefy_finalizing_blocks() { sp_tracing::try_init_simple(); let mut runtime = Runtime::new().unwrap(); - let peers = &[BeefyKeyring::Alice, BeefyKeyring::Bob]; - let validator_set = ValidatorSet::new(make_beefy_ids(peers), 0).unwrap(); + let peers = [BeefyKeyring::Alice, BeefyKeyring::Bob]; + let validator_set = ValidatorSet::new(make_beefy_ids(&peers), 0).unwrap(); let session_len = 10; let min_block_delta = 4; - let mut net = BeefyTestNet::new(2, 0); + let mut net = BeefyTestNet::new(2); let api = Arc::new(two_validators::TestApi {}); let beefy_peers = peers.iter().enumerate().map(|(id, key)| (id, key, api.clone())).collect(); @@ -542,17 +554,18 @@ fn beefy_finalizing_blocks() { // Minimum BEEFY block delta is 4. + let peers = peers.into_iter().enumerate(); // finalize block #5 -> BEEFY should finalize #1 (mandatory) and #5 from diff-power-of-two rule. - finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[5], &[1, 5]); + finalize_block_and_wait_for_beefy(&net, peers.clone(), &mut runtime, &[5], &[1, 5]); // GRANDPA finalize #10 -> BEEFY finalize #10 (mandatory) - finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[10], &[10]); + finalize_block_and_wait_for_beefy(&net, peers.clone(), &mut runtime, &[10], &[10]); // GRANDPA finalize #18 -> BEEFY finalize #14, then #18 (diff-power-of-two rule) - finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[18], &[14, 18]); + finalize_block_and_wait_for_beefy(&net, peers.clone(), &mut runtime, &[18], &[14, 18]); // GRANDPA finalize #20 -> BEEFY finalize #20 (mandatory) - finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[20], &[20]); + finalize_block_and_wait_for_beefy(&net, peers.clone(), &mut runtime, &[20], &[20]); // GRANDPA finalize #21 -> BEEFY finalize nothing (yet) because min delta is 4 finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[21], &[]); @@ -563,12 +576,12 @@ fn lagging_validators() { sp_tracing::try_init_simple(); let mut runtime = Runtime::new().unwrap(); - let peers = &[BeefyKeyring::Alice, BeefyKeyring::Bob]; - let validator_set = ValidatorSet::new(make_beefy_ids(peers), 0).unwrap(); + let peers = [BeefyKeyring::Alice, BeefyKeyring::Bob]; + let validator_set = ValidatorSet::new(make_beefy_ids(&peers), 0).unwrap(); let session_len = 30; let min_block_delta = 1; - let mut net = BeefyTestNet::new(2, 0); + let mut net = BeefyTestNet::new(2); let api = Arc::new(two_validators::TestApi {}); let beefy_peers = peers.iter().enumerate().map(|(id, key)| (id, key, api.clone())).collect(); runtime.spawn(initialize_beefy(&mut net, beefy_peers, min_block_delta)); @@ -578,13 +591,20 @@ fn lagging_validators() { let net = Arc::new(Mutex::new(net)); + let peers = peers.into_iter().enumerate(); // finalize block #15 -> BEEFY should finalize #1 (mandatory) and #9, #13, #14, #15 from // diff-power-of-two rule. - finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[15], &[1, 9, 13, 14, 15]); + finalize_block_and_wait_for_beefy( + &net, + peers.clone(), + &mut runtime, + &[15], + &[1, 9, 13, 14, 15], + ); // Alice finalizes #25, Bob lags behind let finalize = BlockId::number(25); - let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers); + let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers.clone()); net.lock().peer(0).client().as_client().finalize_block(finalize, None).unwrap(); // verify nothing gets finalized by BEEFY let timeout = Some(Duration::from_millis(250)); @@ -592,21 +612,21 @@ fn lagging_validators() { streams_empty_after_timeout(versioned_finality_proof, &net, &mut runtime, None); // Bob catches up and also finalizes #25 - let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers); + let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers.clone()); net.lock().peer(1).client().as_client().finalize_block(finalize, None).unwrap(); // expected beefy finalizes block #17 from diff-power-of-two wait_for_best_beefy_blocks(best_blocks, &net, &mut runtime, &[23, 24, 25]); wait_for_beefy_signed_commitments(versioned_finality_proof, &net, &mut runtime, &[23, 24, 25]); // Both finalize #30 (mandatory session) and #32 -> BEEFY finalize #30 (mandatory), #31, #32 - finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[30, 32], &[30, 31, 32]); + finalize_block_and_wait_for_beefy(&net, peers.clone(), &mut runtime, &[30, 32], &[30, 31, 32]); // Verify that session-boundary votes get buffered by client and only processed once // session-boundary block is GRANDPA-finalized (this guarantees authenticity for the new session // validator set). // Alice finalizes session-boundary mandatory block #60, Bob lags behind - let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers); + let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers.clone()); let finalize = BlockId::number(60); net.lock().peer(0).client().as_client().finalize_block(finalize, None).unwrap(); // verify nothing gets finalized by BEEFY @@ -617,7 +637,7 @@ fn lagging_validators() { // Bob catches up and also finalizes #60 (and should have buffered Alice's vote on #60) let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers); net.lock().peer(1).client().as_client().finalize_block(finalize, None).unwrap(); - // verify beefy skips intermediary votes, and successfully finalizes mandatory block #40 + // verify beefy skips intermediary votes, and successfully finalizes mandatory block #60 wait_for_best_beefy_blocks(best_blocks, &net, &mut runtime, &[60]); wait_for_beefy_signed_commitments(versioned_finality_proof, &net, &mut runtime, &[60]); } @@ -627,13 +647,12 @@ fn correct_beefy_payload() { sp_tracing::try_init_simple(); let mut runtime = Runtime::new().unwrap(); - let peers = - &[BeefyKeyring::Alice, BeefyKeyring::Bob, BeefyKeyring::Charlie, BeefyKeyring::Dave]; - let validator_set = ValidatorSet::new(make_beefy_ids(peers), 0).unwrap(); + let peers = [BeefyKeyring::Alice, BeefyKeyring::Bob, BeefyKeyring::Charlie, BeefyKeyring::Dave]; + let validator_set = ValidatorSet::new(make_beefy_ids(&peers), 0).unwrap(); let session_len = 20; let min_block_delta = 2; - let mut net = BeefyTestNet::new(4, 0); + let mut net = BeefyTestNet::new(4); // Alice, Bob, Charlie will vote on good payloads let good_api = Arc::new(four_validators::TestApi {}); @@ -649,15 +668,16 @@ fn correct_beefy_payload() { let bad_peers = vec![(3, &BeefyKeyring::Dave, bad_api)]; runtime.spawn(initialize_beefy(&mut net, bad_peers, min_block_delta)); - // push 10 blocks + // push 12 blocks net.generate_blocks_and_sync(12, session_len, &validator_set, false); let net = Arc::new(Mutex::new(net)); + let peers = peers.into_iter().enumerate(); // with 3 good voters and 1 bad one, consensus should happen and best blocks produced. finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[10], &[1, 9]); let (best_blocks, versioned_finality_proof) = - get_beefy_streams(&mut net.lock(), &[BeefyKeyring::Alice]); + get_beefy_streams(&mut net.lock(), [(0, BeefyKeyring::Alice)].into_iter()); // now 2 good validators and 1 bad one are voting net.lock() @@ -686,7 +706,7 @@ fn correct_beefy_payload() { // 3rd good validator catches up and votes as well let (best_blocks, versioned_finality_proof) = - get_beefy_streams(&mut net.lock(), &[BeefyKeyring::Alice]); + get_beefy_streams(&mut net.lock(), [(0, BeefyKeyring::Alice)].into_iter()); net.lock() .peer(2) .client() @@ -707,11 +727,11 @@ fn beefy_importing_blocks() { sp_tracing::try_init_simple(); - let mut net = BeefyTestNet::new(2, 0); + let mut net = BeefyTestNet::new(2); let client = net.peer(0).client().clone(); let (mut block_import, _, peer_data) = net.make_block_import(client.clone()); - let PeerData { beefy_rpc_links: _, beefy_voter_links } = peer_data; + let PeerData { beefy_voter_links, .. } = peer_data; let justif_stream = beefy_voter_links.lock().take().unwrap().from_block_import_justif_stream; let params = |block: Block, justifications: Option| { @@ -826,18 +846,18 @@ fn voter_initialization() { // after waiting for BEEFY pallet availability. let mut runtime = Runtime::new().unwrap(); - let peers = &[BeefyKeyring::Alice, BeefyKeyring::Bob]; - let validator_set = ValidatorSet::new(make_beefy_ids(peers), 0).unwrap(); + let peers = [BeefyKeyring::Alice, BeefyKeyring::Bob]; + let validator_set = ValidatorSet::new(make_beefy_ids(&peers), 0).unwrap(); let session_len = 5; // Should vote on all mandatory blocks no matter the `min_block_delta`. let min_block_delta = 10; - let mut net = BeefyTestNet::new(2, 0); + let mut net = BeefyTestNet::new(2); let api = Arc::new(two_validators::TestApi {}); let beefy_peers = peers.iter().enumerate().map(|(id, key)| (id, key, api.clone())).collect(); runtime.spawn(initialize_beefy(&mut net, beefy_peers, min_block_delta)); - // push 30 blocks + // push 26 blocks net.generate_blocks_and_sync(26, session_len, &validator_set, false); let net = Arc::new(Mutex::new(net)); @@ -846,9 +866,90 @@ fn voter_initialization() { // Expect voters to pick up all of them and BEEFY-finalize the mandatory blocks of each session. finalize_block_and_wait_for_beefy( &net, - peers, + peers.into_iter().enumerate(), &mut runtime, &[1, 6, 10, 17, 24, 26], &[1, 5, 10, 15, 20, 25], ); } + +#[test] +fn on_demand_beefy_justification_sync() { + sp_tracing::try_init_simple(); + + let mut runtime = Runtime::new().unwrap(); + let all_peers = + [BeefyKeyring::Alice, BeefyKeyring::Bob, BeefyKeyring::Charlie, BeefyKeyring::Dave]; + let validator_set = ValidatorSet::new(make_beefy_ids(&all_peers), 0).unwrap(); + let session_len = 5; + let min_block_delta = 5; + + let mut net = BeefyTestNet::new(4); + + // Alice, Bob, Charlie start first and make progress through voting. + let api = Arc::new(four_validators::TestApi {}); + let fast_peers = [BeefyKeyring::Alice, BeefyKeyring::Bob, BeefyKeyring::Charlie]; + let voting_peers = + fast_peers.iter().enumerate().map(|(id, key)| (id, key, api.clone())).collect(); + runtime.spawn(initialize_beefy(&mut net, voting_peers, min_block_delta)); + + // Dave will start late and have to catch up using on-demand justification requests (since + // in this test there is no block import queue to automatically import justifications). + let dave = vec![(3, &BeefyKeyring::Dave, api)]; + // Instantiate but don't run Dave, yet. + let dave_task = initialize_beefy(&mut net, dave, min_block_delta); + let dave_index = 3; + + // push 30 blocks + net.generate_blocks_and_sync(30, session_len, &validator_set, false); + + let fast_peers = fast_peers.into_iter().enumerate(); + let net = Arc::new(Mutex::new(net)); + // With 3 active voters and one inactive, consensus should happen and blocks BEEFY-finalized. + // Need to finalize at least one block in each session, choose randomly. + finalize_block_and_wait_for_beefy( + &net, + fast_peers.clone(), + &mut runtime, + &[1, 6, 10, 17, 24], + &[1, 5, 10, 15, 20], + ); + + // Spawn Dave, he's now way behind voting and can only catch up through on-demand justif sync. + runtime.spawn(dave_task); + // give Dave a chance to spawn and init. + run_for(Duration::from_millis(400), &net, &mut runtime); + + let (dave_best_blocks, _) = + get_beefy_streams(&mut net.lock(), [(dave_index, BeefyKeyring::Dave)].into_iter()); + net.lock() + .peer(dave_index) + .client() + .as_client() + .finalize_block(BlockId::number(1), None) + .unwrap(); + // Give Dave task some cpu cycles to process the finality notification, + run_for(Duration::from_millis(100), &net, &mut runtime); + // freshly spun up Dave now needs to listen for gossip to figure out the state of his peers. + + // Have the other peers do some gossip so Dave finds out about their progress. + finalize_block_and_wait_for_beefy(&net, fast_peers, &mut runtime, &[25], &[25]); + + // Now verify Dave successfully finalized #1 (through on-demand justification request). + wait_for_best_beefy_blocks(dave_best_blocks, &net, &mut runtime, &[1]); + + // Give Dave all tasks some cpu cycles to burn through their events queues, + run_for(Duration::from_millis(100), &net, &mut runtime); + // then verify Dave catches up through on-demand justification requests. + finalize_block_and_wait_for_beefy( + &net, + [(dave_index, BeefyKeyring::Dave)].into_iter(), + &mut runtime, + &[6, 10, 17, 24, 26], + &[5, 10, 15, 20, 25], + ); + + let all_peers = all_peers.into_iter().enumerate(); + // Now that Dave has caught up, sanity check voting works for all of them. + finalize_block_and_wait_for_beefy(&net, all_peers, &mut runtime, &[30], &[30]); +} diff --git a/client/beefy/src/worker.rs b/client/beefy/src/worker.rs index 6e8c89d804984..832b43315515f 100644 --- a/client/beefy/src/worker.rs +++ b/client/beefy/src/worker.rs @@ -24,10 +24,15 @@ use std::{ }; use codec::{Codec, Decode, Encode}; -use futures::{stream::Fuse, StreamExt}; +use futures::{stream::Fuse, FutureExt, StreamExt}; use log::{debug, error, info, log_enabled, trace, warn}; +use parking_lot::Mutex; use sc_client_api::{Backend, FinalityNotification, FinalityNotifications, HeaderBackend}; +use sc_network_common::{ + protocol::event::Event as NetEvent, + service::{NetworkEventStream, NetworkRequest}, +}; use sc_network_gossip::GossipEngine; use sp_api::{BlockId, ProvideRuntimeApi}; @@ -48,14 +53,17 @@ use beefy_primitives::{ }; use crate::{ + communication::{ + gossip::{topic, GossipValidator}, + request_response::outgoing_requests_engine::OnDemandJustificationsEngine, + }, error::Error, - gossip::{topic, GossipValidator}, justification::BeefyVersionedFinalityProof, keystore::BeefyKeystore, metric_inc, metric_set, metrics::Metrics, round::Rounds, - BeefyVoterLinks, Client, + BeefyVoterLinks, Client, KnownPeers, }; enum RoundAction { @@ -113,6 +121,17 @@ impl VoterOracle { } } + /// Return current pending mandatory block, if any. + pub fn mandatory_pending(&self) -> Option> { + self.sessions.front().and_then(|round| { + if round.mandatory_done() { + None + } else { + Some(round.session_start()) + } + }) + } + /// Return `(A, B)` tuple representing inclusive [A, B] interval of votes to accept. pub fn accepted_interval( &self, @@ -175,29 +194,35 @@ impl VoterOracle { } } -pub(crate) struct WorkerParams { +pub(crate) struct WorkerParams { pub client: Arc, pub backend: Arc, pub runtime: Arc, - pub sync_oracle: SO, + pub network: N, pub key_store: BeefyKeystore, + pub known_peers: Arc>>, pub gossip_engine: GossipEngine, pub gossip_validator: Arc>, + pub on_demand_justifications: OnDemandJustificationsEngine, pub links: BeefyVoterLinks, pub metrics: Option, pub min_block_delta: u32, } /// A BEEFY worker plays the BEEFY protocol -pub(crate) struct BeefyWorker { +pub(crate) struct BeefyWorker { // utilities client: Arc, backend: Arc, runtime: Arc, - sync_oracle: SO, + network: N, key_store: BeefyKeystore, + + // communication + known_peers: Arc>>, gossip_engine: GossipEngine, gossip_validator: Arc>, + on_demand_justifications: OnDemandJustificationsEngine, // channels /// Links between the block importer, the background voter and the RPC layer. @@ -218,14 +243,14 @@ pub(crate) struct BeefyWorker { voting_oracle: VoterOracle, } -impl BeefyWorker +impl BeefyWorker where B: Block + Codec, BE: Backend, C: Client, R: ProvideRuntimeApi, R::Api: BeefyApi + MmrApi, - SO: SyncOracle + Send + Sync + Clone + 'static, + N: NetworkEventStream + NetworkRequest + SyncOracle + Send + Sync + Clone + 'static, { /// Return a new BEEFY worker instance. /// @@ -233,15 +258,17 @@ where /// BEEFY pallet has been deployed on-chain. /// /// The BEEFY pallet is needed in order to keep track of the BEEFY authority set. - pub(crate) fn new(worker_params: WorkerParams) -> Self { + pub(crate) fn new(worker_params: WorkerParams) -> Self { let WorkerParams { client, backend, runtime, key_store, - sync_oracle, + network, gossip_engine, gossip_validator, + on_demand_justifications, + known_peers, links, metrics, min_block_delta, @@ -256,10 +283,12 @@ where client: client.clone(), backend, runtime, - sync_oracle, + network, + known_peers, key_store, gossip_engine, gossip_validator, + on_demand_justifications, links, metrics, best_grandpa_block_header: last_finalized_header, @@ -366,8 +395,6 @@ where { if let Some(new_validator_set) = find_authorities_change::(&header) { self.init_session_at(new_validator_set, *header.number()); - // TODO (grandpa-bridge-gadget/issues/20): when adding SYNC protocol, - // fire up a request for justification for this mandatory block here. } } } @@ -408,7 +435,10 @@ where let block_num = signed_commitment.commitment.block_number; let best_grandpa = *self.best_grandpa_block_header.number(); match self.voting_oracle.triage_round(block_num, best_grandpa)? { - RoundAction::Process => self.finalize(justification)?, + RoundAction::Process => { + debug!(target: "beefy", "🥩 Process justification for round: {:?}.", block_num); + self.finalize(justification)? + }, RoundAction::Enqueue => { debug!(target: "beefy", "🥩 Buffer justification for round: {:?}.", block_num); self.pending_justifications.entry(block_num).or_insert(justification); @@ -429,7 +459,7 @@ where let rounds = self.voting_oracle.rounds_mut().ok_or(Error::UninitSession)?; if rounds.add_vote(&round, vote, self_vote) { - if let Some(signatures) = rounds.try_conclude(&round) { + if let Some(signatures) = rounds.should_conclude(&round) { self.gossip_validator.conclude_round(round.1); let block_num = round.1; @@ -474,6 +504,8 @@ where self.best_beefy_block = Some(block_num); metric_set!(self, beefy_best_block, block_num); + self.on_demand_justifications.cancel_requests_older_than(block_num); + if let Err(e) = self.backend.append_justification( BlockId::Number(block_num), (BEEFY_ENGINE_ID, finality_proof.clone().encode()), @@ -735,7 +767,7 @@ where let at = BlockId::hash(notif.header.hash()); if let Some(active) = self.runtime.runtime_api().validator_set(&at).ok().flatten() { self.initialize_voter(¬if.header, active); - if !self.sync_oracle.is_major_syncing() { + if !self.network.is_major_syncing() { if let Err(err) = self.try_to_vote() { debug!(target: "beefy", "🥩 {}", err); } @@ -768,6 +800,7 @@ where self.wait_for_runtime_pallet(&mut finality_notifications).await; trace!(target: "beefy", "🥩 BEEFY pallet available, starting voter."); + let mut network_events = self.network.event_stream("network-gossip").fuse(); let mut votes = Box::pin( self.gossip_engine .messages_for(topic::()) @@ -788,15 +821,38 @@ where // The branches below only change 'state', actual voting happen afterwards, // based on the new resulting 'state'. futures::select_biased! { + // Use `select_biased!` to prioritize order below. + // Make sure to pump gossip engine. + _ = gossip_engine => { + error!(target: "beefy", "🥩 Gossip engine has terminated, closing worker."); + return; + }, + // Keep track of connected peers. + net_event = network_events.next() => { + if let Some(net_event) = net_event { + self.handle_network_event(net_event); + } else { + error!(target: "beefy", "🥩 Network events stream terminated, closing worker."); + return; + } + }, + // Process finality notifications first since these drive the voter. notification = finality_notifications.next() => { if let Some(notification) = notification { self.handle_finality_notification(¬ification); } else { + error!(target: "beefy", "🥩 Finality stream terminated, closing worker."); return; } }, - // TODO: when adding SYNC protocol, join the on-demand justifications stream to - // this one, and handle them both here. + // Process incoming justifications as these can make some in-flight votes obsolete. + justif = self.on_demand_justifications.next().fuse() => { + if let Some(justif) = justif { + if let Err(err) = self.triage_incoming_justif(justif) { + debug!(target: "beefy", "🥩 {}", err); + } + } + }, justif = block_import_justif.next() => { if let Some(justif) = justif { // Block import justifications have already been verified to be valid @@ -805,9 +861,11 @@ where debug!(target: "beefy", "🥩 {}", err); } } else { + error!(target: "beefy", "🥩 Block import stream terminated, closing worker."); return; } }, + // Finally process incoming votes. vote = votes.next() => { if let Some(vote) = vote { // Votes have already been verified to be valid by the gossip validator. @@ -815,13 +873,10 @@ where debug!(target: "beefy", "🥩 {}", err); } } else { + error!(target: "beefy", "🥩 Votes gossiping stream terminated, closing worker."); return; } }, - _ = gossip_engine => { - error!(target: "beefy", "🥩 Gossip engine has terminated."); - return; - } } // Handle pending justifications and/or votes for now GRANDPA finalized blocks. @@ -829,8 +884,14 @@ where debug!(target: "beefy", "🥩 {}", err); } - // Don't bother voting during major sync. - if !self.sync_oracle.is_major_syncing() { + // Don't bother voting or requesting justifications during major sync. + if !self.network.is_major_syncing() { + // If the current target is a mandatory block, + // make sure there's also an on-demand justification request out for it. + if let Some(block) = self.voting_oracle.mandatory_pending() { + // This only starts new request if there isn't already an active one. + self.on_demand_justifications.request(block); + } // There were external events, 'state' is changed, author a vote if needed/possible. if let Err(err) = self.try_to_vote() { debug!(target: "beefy", "🥩 {}", err); @@ -840,6 +901,20 @@ where } } } + + /// Update known peers based on network events. + fn handle_network_event(&mut self, event: NetEvent) { + match event { + NetEvent::SyncConnected { remote } => { + self.known_peers.lock().add_new(remote); + }, + NetEvent::SyncDisconnected { remote } => { + self.known_peers.lock().remove(&remote); + }, + // We don't care about other events. + _ => (), + } + } } /// Extract the MMR root hash from a digest in the given header, if it exists. @@ -932,11 +1007,11 @@ where pub(crate) mod tests { use super::*; use crate::{ + communication::notification::{BeefyBestBlockStream, BeefyVersionedFinalityProofStream}, keystore::tests::Keyring, - notification::{BeefyBestBlockStream, BeefyVersionedFinalityProofStream}, tests::{ create_beefy_keystore, get_beefy_streams, make_beefy_ids, two_validators::TestApi, - BeefyPeer, BeefyTestNet, BEEFY_PROTOCOL_NAME, + BeefyPeer, BeefyTestNet, }, BeefyRPCLinks, }; @@ -979,21 +1054,29 @@ pub(crate) mod tests { let api = Arc::new(TestApi {}); let network = peer.network_service().clone(); - let sync_oracle = network.clone(); - let gossip_validator = Arc::new(crate::gossip::GossipValidator::new()); + let known_peers = Arc::new(Mutex::new(KnownPeers::new())); + let gossip_validator = Arc::new(GossipValidator::new(known_peers.clone())); let gossip_engine = - GossipEngine::new(network, BEEFY_PROTOCOL_NAME, gossip_validator.clone(), None); + GossipEngine::new(network.clone(), "/beefy/1", gossip_validator.clone(), None); + let on_demand_justifications = OnDemandJustificationsEngine::new( + network.clone(), + api.clone(), + "/beefy/justifs/1".into(), + known_peers.clone(), + ); let worker_params = crate::worker::WorkerParams { client: peer.client().as_client(), backend: peer.client().as_backend(), runtime: api, key_store: Some(keystore).into(), + known_peers, links, gossip_engine, gossip_validator, min_block_delta, metrics: None, - sync_oracle, + network, + on_demand_justifications, }; BeefyWorker::<_, _, _, _, _>::new(worker_params) } @@ -1245,7 +1328,7 @@ pub(crate) mod tests { fn keystore_vs_validator_set() { let keys = &[Keyring::Alice]; let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); - let mut net = BeefyTestNet::new(1, 0); + let mut net = BeefyTestNet::new(1); let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1); // keystore doesn't contain other keys than validators' @@ -1266,13 +1349,15 @@ pub(crate) mod tests { #[test] fn should_finalize_correctly() { - let keys = &[Keyring::Alice]; - let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); - let mut net = BeefyTestNet::new(1, 0); + let keys = [Keyring::Alice]; + let validator_set = ValidatorSet::new(make_beefy_ids(&keys), 0).unwrap(); + let mut net = BeefyTestNet::new(1); let backend = net.peer(0).client().as_backend(); let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1); - let (mut best_block_streams, mut finality_proofs) = get_beefy_streams(&mut net, keys); + let keys = keys.iter().cloned().enumerate(); + let (mut best_block_streams, mut finality_proofs) = + get_beefy_streams(&mut net, keys.clone()); let mut best_block_stream = best_block_streams.drain(..).next().unwrap(); let mut finality_proof = finality_proofs.drain(..).next().unwrap(); @@ -1294,7 +1379,8 @@ pub(crate) mod tests { })); // unknown hash for block #1 - let (mut best_block_streams, mut finality_proofs) = get_beefy_streams(&mut net, keys); + let (mut best_block_streams, mut finality_proofs) = + get_beefy_streams(&mut net, keys.clone()); let mut best_block_stream = best_block_streams.drain(..).next().unwrap(); let mut finality_proof = finality_proofs.drain(..).next().unwrap(); let justif = create_finality_proof(1); @@ -1355,7 +1441,7 @@ pub(crate) mod tests { fn should_init_session() { let keys = &[Keyring::Alice]; let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); - let mut net = BeefyTestNet::new(1, 0); + let mut net = BeefyTestNet::new(1); let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1); assert!(worker.voting_oracle.sessions.is_empty()); @@ -1389,7 +1475,7 @@ pub(crate) mod tests { fn should_triage_votes_and_process_later() { let keys = &[Keyring::Alice, Keyring::Bob]; let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); - let mut net = BeefyTestNet::new(1, 0); + let mut net = BeefyTestNet::new(1); let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1); fn new_vote( @@ -1450,7 +1536,7 @@ pub(crate) mod tests { fn should_initialize_correct_voter() { let keys = &[Keyring::Alice]; let validator_set = ValidatorSet::new(make_beefy_ids(keys), 1).unwrap(); - let mut net = BeefyTestNet::new(1, 0); + let mut net = BeefyTestNet::new(1); let backend = net.peer(0).client().as_backend(); // push 15 blocks with `AuthorityChange` digests every 10 blocks diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 2f6b788e368b3..9d5abf98ceff0 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -48,7 +48,7 @@ use sc_consensus::{ Verifier, }; use sc_network::{ - config::{NetworkConfiguration, Role, SyncMode}, + config::{NetworkConfiguration, RequestResponseConfig, Role, SyncMode}, Multiaddr, NetworkService, NetworkWorker, }; use sc_network_common::{ @@ -688,6 +688,8 @@ pub struct FullPeerConfig { pub block_announce_validator: Option + Send + Sync>>, /// List of notification protocols that the network must support. pub notifications_protocols: Vec, + /// List of request-response protocols that the network must support. + pub request_response_protocols: Vec, /// The indices of the peers the peer should be connected to. /// /// If `None`, it will be connected to all other peers. @@ -790,6 +792,9 @@ where network_config.transport = TransportConfig::MemoryOnly; network_config.listen_addresses = vec![listen_addr.clone()]; network_config.allow_non_globals_in_dht = true; + network_config + .request_response_protocols + .extend(config.request_response_protocols); network_config.extra_sets = config .notifications_protocols .into_iter()