diff --git a/core/finality-grandpa/src/communication/gossip.rs b/core/finality-grandpa/src/communication/gossip.rs index 720c083db7925..6b45fd0745138 100644 --- a/core/finality-grandpa/src/communication/gossip.rs +++ b/core/finality-grandpa/src/communication/gossip.rs @@ -73,10 +73,12 @@ use network::{config::Roles, PeerId}; use parity_codec::{Encode, Decode}; use substrate_telemetry::{telemetry, CONSENSUS_DEBUG}; -use log::{trace, debug}; +use log::{trace, debug, warn}; +use futures::prelude::*; +use futures::sync::mpsc; use crate::{CompactCommit, SignedMessage}; -use super::{Round, SetId, Network}; +use super::{cost, benefit, Round, SetId}; use std::collections::{HashMap, VecDeque}; use std::time::{Duration, Instant}; @@ -227,6 +229,12 @@ pub(super) enum GossipMessage { Neighbor(VersionedNeighborPacket>), } +impl From>> for GossipMessage { + fn from(neighbor: NeighborPacket>) -> Self { + GossipMessage::Neighbor(VersionedNeighborPacket::V1(neighbor)) + } +} + /// Network level message with topic information. #[derive(Debug, Encode, Decode)] pub(super) struct VoteOrPrecommitMessage { @@ -273,32 +281,12 @@ impl VersionedNeighborPacket { } } -// cost scalars for reporting peers. -mod cost { - pub(super) const PAST_REJECTION: i32 = -50; - pub(super) const BAD_SIGNATURE: i32 = -100; - pub(super) const MALFORMED_COMMIT: i32 = -1000; - pub(super) const FUTURE_MESSAGE: i32 = -500; - - pub(super) const INVALID_VIEW_CHANGE: i32 = -500; - pub(super) const PER_UNDECODABLE_BYTE: i32 = -5; - pub(super) const PER_SIGNATURE_CHECKED: i32 = -25; - pub(super) const PER_BLOCK_LOADED: i32 = -10; -} - -// benefit scalars for reporting peers. -mod benefit { - pub(super) const ROUND_MESSAGE: i32 = 100; - pub(super) const BASIC_VALIDATED_COMMIT: i32 = 100; - pub(super) const PER_EQUIVOCATION: i32 = 10; -} - /// Misbehavior that peers can perform. /// /// `cost` gives a cost that can be used to perform cost/benefit analysis of a /// peer. #[derive(Clone, Copy, Debug, PartialEq)] -enum Misbehavior { +pub(super) enum Misbehavior { // invalid neighbor message, considering the last one. InvalidViewChange, // could not decode neighbor message. bytes-length of the packet. @@ -315,7 +303,7 @@ enum Misbehavior { } impl Misbehavior { - fn cost(&self) -> i32 { + pub(super) fn cost(&self) -> i32 { use Misbehavior::*; match *self { @@ -402,9 +390,13 @@ impl Peers { Some(p) => p, }; - if peer.view.last_commit.as_ref() >= Some(&new_height) { + // this doesn't allow a peer to send us unlimited commits with the + // same height, because there is still a misbehavior condition based on + // sending commits that are <= the best we are aware of. + if peer.view.last_commit.as_ref() > Some(&new_height) { return Err(Misbehavior::InvalidViewChange); } + peer.view.last_commit = Some(new_height); Ok(()) @@ -416,7 +408,7 @@ impl Peers { } #[derive(Debug)] -enum Action { +pub(super) enum Action { // repropagate under given topic, to the given peers, applying cost/benefit to originator. Keep(H, i32), // discard and process. @@ -445,7 +437,9 @@ impl Inner { } /// Note a round in a set has started. - fn note_round>(&mut self, round: Round, set_id: SetId, net: &N) { + fn note_round(&mut self, round: Round, set_id: SetId, send_neighbor: F) + where F: FnOnce(Vec, NeighborPacket>) + { if self.local_view.round == round && self.local_view.set_id == set_id { return } @@ -457,24 +451,28 @@ impl Inner { self.local_view.set_id = set_id; self.live_topics.push(round, set_id); - self.multicast_neighbor_packet(net); + self.multicast_neighbor_packet(send_neighbor); } /// Note that a voter set with given ID has started. Does nothing if the last /// call to the function was with the same `set_id`. - fn note_set>(&mut self, set_id: SetId, net: &N) { + fn note_set(&mut self, set_id: SetId, send_neighbor: F) + where F: FnOnce(Vec, NeighborPacket>) + { if self.local_view.set_id == set_id { return } self.local_view.update_set(set_id); self.live_topics.push(Round(0), set_id); - self.multicast_neighbor_packet(net); + self.multicast_neighbor_packet(send_neighbor); } /// Note that we've imported a commit finalizing a given block. - fn note_commit_finalized>(&mut self, finalized: NumberFor, net: &N) { + fn note_commit_finalized(&mut self, finalized: NumberFor, send_neighbor: F) + where F: FnOnce(Vec, NeighborPacket>) + { if self.local_view.last_commit.as_ref() < Some(&finalized) { self.local_view.last_commit = Some(finalized); - self.multicast_neighbor_packet(net) + self.multicast_neighbor_packet(send_neighbor) } } @@ -520,7 +518,6 @@ impl Inner { fn validate_commit_message(&mut self, who: &PeerId, full: &FullCommitMessage) -> Action { - use grandpa::Message as GrandpaMessage; if let Err(misbehavior) = self.peers.update_commit_height(who, full.message.target_number) { return Action::Discard(misbehavior.cost()); @@ -543,28 +540,6 @@ impl Inner { return Action::Discard(cost::MALFORMED_COMMIT); } - // check signatures on all contained precommits. - for (i, (precommit, &(ref sig, ref id))) in full.message.precommits.iter() - .zip(&full.message.auth_data) - .enumerate() - { - if let Err(()) = super::check_message_sig::( - &GrandpaMessage::Precommit(precommit.clone()), - id, - sig, - full.round.0, - full.set_id.0, - ) { - debug!(target: "afg", "Bad commit message signature {}", id); - telemetry!(CONSENSUS_DEBUG; "afg.bad_commit_msg_signature"; "id" => ?id); - return Action::Discard(Misbehavior::BadCommitMessage { - signatures_checked: i as i32, - blocks_loaded: 0, - equivocations_caught: 0, - }.cost()); - } - } - // always discard commits initially and rebroadcast after doing full // checking. let topic = super::global_topic::(full.set_id.0); @@ -585,54 +560,64 @@ impl Inner { (neighbor_topics, Action::Discard(cb)) } - fn construct_neighbor_packet(&self) -> GossipMessage { + fn multicast_neighbor_packet(&self, send_neighbor: F) + where F: FnOnce(Vec, NeighborPacket>) + { let packet = NeighborPacket { round: self.local_view.round, set_id: self.local_view.set_id, commit_finalized_height: self.local_view.last_commit.unwrap_or(Zero::zero()), }; - GossipMessage::Neighbor(VersionedNeighborPacket::V1(packet)) - } - - fn multicast_neighbor_packet>(&self, net: &N) { - let packet = self.construct_neighbor_packet(); let peers = self.peers.inner.keys().cloned().collect(); - net.send_message(peers, packet.encode()); + send_neighbor(peers, packet); } } /// A validator for GRANDPA gossip messages. pub(super) struct GossipValidator { inner: parking_lot::RwLock>, + report_sender: mpsc::UnboundedSender, } impl GossipValidator { /// Create a new gossip-validator. - pub(super) fn new(config: crate::Config) -> GossipValidator { - GossipValidator { inner: parking_lot::RwLock::new(Inner::new(config)) } + pub(super) fn new(config: crate::Config) -> (GossipValidator, ReportStream) { + let (tx, rx) = mpsc::unbounded(); + let val = GossipValidator { + inner: parking_lot::RwLock::new(Inner::new(config)), + report_sender: tx, + }; + + (val, ReportStream { reports: rx }) } /// Note a round in a set has started. - pub(super) fn note_round>(&self, round: Round, set_id: SetId, net: &N) { - self.inner.write().note_round(round, set_id, net); + pub(super) fn note_round(&self, round: Round, set_id: SetId, send_neighbor: F) + where F: FnOnce(Vec, NeighborPacket>) + { + self.inner.write().note_round(round, set_id, send_neighbor); } /// Note that a voter set with given ID has started. - pub(super) fn note_set>(&self, set_id: SetId, net: &N) { - self.inner.write().note_set(set_id, net); + pub(super) fn note_set(&self, set_id: SetId, send_neighbor: F) + where F: FnOnce(Vec, NeighborPacket>) + { + self.inner.write().note_set(set_id, send_neighbor); } /// Note that we've imported a commit finalizing a given block. - pub(super) fn note_commit_finalized>(&self, finalized: NumberFor, net: &N) { - self.inner.write().note_commit_finalized(finalized, net); + pub(super) fn note_commit_finalized(&self, finalized: NumberFor, send_neighbor: F) + where F: FnOnce(Vec, NeighborPacket>) + { + self.inner.write().note_commit_finalized(finalized, send_neighbor); } - fn report(&self, _who: &PeerId, _cost_benefit: i32) { - // report + fn report(&self, who: PeerId, cost_benefit: i32) { + let _ = self.report_sender.unbounded_send(PeerReport { who, cost_benefit }); } - fn do_validate(&self, who: &PeerId, mut data: &[u8]) + pub(super) fn do_validate(&self, who: &PeerId, mut data: &[u8]) -> (Action, Vec) { let mut broadcast_topics = Vec::new(); @@ -670,7 +655,14 @@ impl network_gossip::Validator for GossipValidator let packet_data = { let mut inner = self.inner.write(); inner.peers.new_peer(who.clone()); - inner.construct_neighbor_packet().encode() + + let packet = NeighborPacket { + round: inner.local_view.round, + set_id: inner.local_view.set_id, + commit_finalized_height: inner.local_view.last_commit.unwrap_or(Zero::zero()), + }; + + GossipMessage::::from(packet).encode() }; context.send_message(who, packet_data); } @@ -691,15 +683,15 @@ impl network_gossip::Validator for GossipValidator match action { Action::Keep(topic, cb) => { - self.report(who, cb); + self.report(who.clone(), cb); network_gossip::ValidationResult::ProcessAndKeep(topic) } Action::ProcessAndDiscard(topic, cb) => { - self.report(who, cb); + self.report(who.clone(), cb); network_gossip::ValidationResult::ProcessAndDiscard(topic) } Action::Discard(cb) => { - self.report(who, cb); + self.report(who.clone(), cb); network_gossip::ValidationResult::Discard } } @@ -788,6 +780,62 @@ impl network_gossip::Validator for GossipValidator } } +struct PeerReport { + who: PeerId, + cost_benefit: i32, +} + +// wrapper around a stream of reports. +#[must_use = "The report stream must be consumed"] +pub(super) struct ReportStream { + reports: mpsc::UnboundedReceiver, +} + +impl ReportStream { + /// Consume the report stream, converting it into a future that + /// handles all reports. + pub(super) fn consume(self, net: N) + -> impl Future + Send + 'static + where + B: BlockT, + N: super::Network + Send + 'static, + { + ReportingTask { + reports: self.reports, + net, + _marker: Default::default(), + } + } +} + +/// A future for reporting peers. +#[must_use = "Futures do nothing unless polled"] +struct ReportingTask { + reports: mpsc::UnboundedReceiver, + net: N, + _marker: std::marker::PhantomData, +} + +impl> Future for ReportingTask { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + loop { + match self.reports.poll() { + Err(_) => { + warn!(target: "afg", "Report stream terminated unexpectedly"); + return Ok(Async::Ready(())) + } + Ok(Async::Ready(None)) => return Ok(Async::Ready(())), + Ok(Async::Ready(Some(PeerReport { who, cost_benefit }))) => + self.net.report(who, cost_benefit), + Ok(Async::NotReady) => return Ok(Async::NotReady), + } + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -804,36 +852,6 @@ mod tests { } } - #[derive(Clone)] - struct StubNetwork; - - impl super::Network for StubNetwork { - type In = futures::stream::Empty; - - fn messages_for(&self, _topic: Block::Hash) -> Self::In { - futures::stream::empty() - } - - fn register_validator( - &self, - _validator: std::sync::Arc>, - ) { - - } - - fn gossip_message(&self, _topic: Block::Hash, _data: Vec, _force: bool) { - - } - - fn send_message(&self, _who: Vec, _data: Vec) { - - } - - fn announce(&self, _block: Block::Hash) { - - } - } - #[test] fn view_vote_rules() { let view = View { round: Round(100), set_id: SetId(1), last_commit: Some(1000u64) }; @@ -975,12 +993,12 @@ mod tests { #[test] fn messages_not_expired_immediately() { - let val = GossipValidator::::new(config()); + let (val, _) = GossipValidator::::new(config()); let set_id = 1; for round_num in 1u64..10 { - val.note_round(Round(round_num), SetId(set_id), &StubNetwork); + val.note_round(Round(round_num), SetId(set_id), |_, _| {}); } { @@ -989,14 +1007,12 @@ mod tests { // messages from old rounds are expired. for round_num in 1u64..last_kept_round { - println!("{} should be expired?", round_num); let topic = crate::communication::round_topic::(round_num, 1); assert!(is_expired(topic, &[1, 2, 3])); } // messages from not-too-old rounds are not expired. for round_num in last_kept_round..10 { - println!("{} should not be expired?", round_num); let topic = crate::communication::round_topic::(round_num, 1); assert!(!is_expired(topic, &[1, 2, 3])); } diff --git a/core/finality-grandpa/src/communication/mod.rs b/core/finality-grandpa/src/communication/mod.rs index 1770d41adfd9c..4b8958f2e9c58 100644 --- a/core/finality-grandpa/src/communication/mod.rs +++ b/core/finality-grandpa/src/communication/mod.rs @@ -38,8 +38,8 @@ use parity_codec::{Encode, Decode}; use substrate_primitives::{ed25519, Pair}; use substrate_telemetry::{telemetry, CONSENSUS_DEBUG, CONSENSUS_INFO}; use runtime_primitives::ConsensusEngineId; -use runtime_primitives::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor}; -use network::{consensus_gossip as network_gossip, Service as NetworkService,}; +use runtime_primitives::traits::{Block as BlockT, Hash as HashT, Header as HeaderT}; +use network::{consensus_gossip as network_gossip, Service as NetworkService}; use network_gossip::ConsensusMessage; use crate::{Error, Message, SignedMessage, Commit, CompactCommit}; @@ -50,15 +50,40 @@ use gossip::{ use substrate_primitives::ed25519::{Public as AuthorityId, Signature as AuthoritySignature}; pub mod gossip; +mod periodic; + +#[cfg(test)] +mod tests; /// The consensus engine ID of GRANDPA. pub const GRANDPA_ENGINE_ID: ConsensusEngineId = [b'a', b'f', b'g', b'1']; +// cost scalars for reporting peers. +mod cost { + pub(super) const PAST_REJECTION: i32 = -50; + pub(super) const BAD_SIGNATURE: i32 = -100; + pub(super) const MALFORMED_COMMIT: i32 = -1000; + pub(super) const FUTURE_MESSAGE: i32 = -500; + + pub(super) const INVALID_VIEW_CHANGE: i32 = -500; + pub(super) const PER_UNDECODABLE_BYTE: i32 = -5; + pub(super) const PER_SIGNATURE_CHECKED: i32 = -25; + pub(super) const PER_BLOCK_LOADED: i32 = -10; + pub(super) const INVALID_COMMIT: i32 = -5000; +} + +// benefit scalars for reporting peers. +mod benefit { + pub(super) const ROUND_MESSAGE: i32 = 100; + pub(super) const BASIC_VALIDATED_COMMIT: i32 = 100; + pub(super) const PER_EQUIVOCATION: i32 = 10; +} + /// A handle to the network. This is generally implemented by providing some /// handle to a gossip service or similar. /// /// Intended to be a lightweight handle such as an `Arc`. -pub trait Network: Clone { +pub trait Network: Clone + Send + 'static { /// A stream of input messages for a topic. type In: Stream; @@ -77,6 +102,9 @@ pub trait Network: Clone { /// Send a message to a bunch of specific peers, even if they've seen it already. fn send_message(&self, who: Vec, data: Vec); + /// Report a peer's cost or benefit after some action. + fn report(&self, who: network::PeerId, cost_benefit: i32); + /// Inform peers that a block with given hash should be downloaded. fn announce(&self, block: Block::Hash); } @@ -133,6 +161,10 @@ impl Network for Arc> where }) } + fn report(&self, who: network::PeerId, cost_benefit: i32) { + self.report_peer(who, cost_benefit) + } + fn announce(&self, block: B::Hash) { self.announce_block(block) } @@ -164,18 +196,49 @@ impl Stream for NetworkStream { } } +/// The result of processing a commit. +pub(crate) enum CommitProcessingOutcome { + Good, + Bad, +} + /// Bridge between the underlying network service, gossiping consensus messages and Grandpa pub(crate) struct NetworkBridge> { service: N, validator: Arc>, + neighbor_sender: periodic::NeighborPacketSender, } impl> NetworkBridge { - /// Create a new NetworkBridge to the given NetworkService - pub(crate) fn new(service: N, config: crate::Config) -> Self { - let validator = Arc::new(GossipValidator::new(config)); + /// Create a new NetworkBridge to the given NetworkService. Returns the service + /// handle and a future that must be polled to completion to finish startup. + pub(crate) fn new( + service: N, + config: crate::Config, + on_exit: impl Future + Clone + Send + 'static, + ) -> ( + Self, + impl futures::Future + Send + 'static, + ) { + + let (validator, report_stream) = GossipValidator::new(config); + let validator = Arc::new(validator); service.register_validator(validator.clone()); - NetworkBridge { service, validator: validator } + + let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(service.clone()); + let reporting_job = report_stream.consume(service.clone()); + + let bridge = NetworkBridge { service, validator, neighbor_sender }; + + let startup_work = futures::future::lazy(move || { + // lazily spawn these jobs onto their own tasks. the lazy future has access + // to tokio globals, which aren't available outside. + tokio::spawn(rebroadcast_job.select(on_exit.clone()).then(|_| Ok(()))); + tokio::spawn(reporting_job.select(on_exit.clone()).then(|_| Ok(()))); + Ok(()) + }); + + (bridge, startup_work) } /// Get the round messages for a round in a given set ID. These are signature-checked. @@ -190,7 +253,14 @@ impl> NetworkBridge { impl Stream,Error=Error>, impl Sink,SinkError=Error>, ) { - self.validator.note_round(round, set_id, &self.service); + self.validator.note_round( + round, + set_id, + |to, neighbor| self.service.send_message( + to, + GossipMessage::::from(neighbor).encode() + ), + ); let locals = local_key.and_then(|pair| { let public = pair.public(); @@ -275,61 +345,113 @@ impl> NetworkBridge { } /// Set up the global communication streams. - pub(crate) fn global_communication(&self, + pub(crate) fn global_communication( + &self, set_id: SetId, voters: Arc>, - is_voter: bool + is_voter: bool, ) -> ( - impl Stream), Error = Error>, + impl Stream, impl FnMut(CommitProcessingOutcome)), Error = Error>, impl Sink), SinkError = Error>, ) { - self.validator.note_set(set_id, &self.service); + self.validator.note_set( + set_id, + |to, neighbor| self.service.send_message(to, GossipMessage::::from(neighbor).encode()), + ); + let service = self.service.clone(); let topic = global_topic::(set_id.0); - let incoming = self.service.messages_for(topic) - .filter_map(|notification| { - // this could be optimized by decoding piecewise. - let decoded = GossipMessage::::decode(&mut ¬ification.message[..]); - if decoded.is_none() { - trace!(target: "afg", "Skipping malformed commit message {:?}", notification); - } - decoded - }) - .filter_map(move |msg| { - match msg { - GossipMessage::Commit(msg) => { - let round = msg.round; - let precommits_signed_by: Vec = - msg.message.auth_data.iter().map(move |(_, a)| { - format!("{}", a) - }).collect(); - telemetry!(CONSENSUS_INFO; "afg.received_commit"; - "contains_precommits_signed_by" => ?precommits_signed_by, - "target_number" => ?msg.message.target_number, - "target_hash" => ?msg.message.target_hash, - ); - check_compact_commit::(msg.message, &*voters).map(move |c| (round.0, c)) - }, - _ => { - debug!(target: "afg", "Skipping unknown message type"); - return None; - } - } - }) - .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream"))); + let incoming = incoming_global(service, topic, voters, self.validator.clone()); let outgoing = CommitsOut::::new( self.service.clone(), set_id.0, is_voter, + self.validator.clone(), ); (incoming, outgoing) } +} - pub(crate) fn note_commit_finalized(&self, number: NumberFor) { - self.validator.note_commit_finalized(number, &self.service); - } +fn incoming_global>( + service: N, + topic: B::Hash, + voters: Arc>, + gossip_validator: Arc>, +) -> impl Stream, impl FnMut(CommitProcessingOutcome)), Error = Error> { + service.messages_for(topic) + .filter_map(|notification| { + // this could be optimized by decoding piecewise. + let decoded = GossipMessage::::decode(&mut ¬ification.message[..]); + if decoded.is_none() { + trace!(target: "afg", "Skipping malformed commit message {:?}", notification); + } + decoded.map(move |d| (notification, d)) + }) + .filter_map(move |(notification, msg)| { + match msg { + GossipMessage::Commit(msg) => { + let precommits_signed_by: Vec = + msg.message.auth_data.iter().map(move |(_, a)| { + format!("{}", a) + }).collect(); + telemetry!(CONSENSUS_INFO; "afg.received_commit"; + "contains_precommits_signed_by" => ?precommits_signed_by, + "target_number" => ?msg.message.target_number.clone(), + "target_hash" => ?msg.message.target_hash.clone(), + ); + if let Err(cost) = check_compact_commit::( + &msg.message, + &*voters, + msg.round, + msg.set_id, + ) { + if let Some(who) = notification.sender { + service.report(who, cost); + } + None + } else { + Some((msg, notification, service.clone())) + } + }, + _ => { + debug!(target: "afg", "Skipping unknown message type"); + return None; + } + } + }) + .map(move |(msg, mut notification, service)| { + let round = msg.round.0; + let commit = msg.message; + let finalized_number = commit.target_number; + let gossip_validator = gossip_validator.clone(); + let cb = move |outcome| match outcome { + CommitProcessingOutcome::Good => { + // if it checks out, gossip it. not accounting for + // any discrepancy between the actual ghost and the claimed + // finalized number. + gossip_validator.note_commit_finalized( + finalized_number, + |to, neighbor_msg| service.send_message( + to, + GossipMessage::::from(neighbor_msg).encode(), + ), + ); + + service.gossip_message(topic, notification.message.clone(), false); + } + CommitProcessingOutcome::Bad => { + // report peer and do not gossip. + if let Some(who) = notification.sender.take() { + service.report(who, cost::INVALID_COMMIT); + } + } + }; + + (round, commit, cb) + }) + .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream"))) } impl> Clone for NetworkBridge { @@ -337,6 +459,7 @@ impl> Clone for NetworkBridge { NetworkBridge { service: self.service.clone(), validator: Arc::clone(&self.validator), + neighbor_sender: self.neighbor_sender.clone(), } } } @@ -462,41 +585,86 @@ impl> Sink for OutgoingMessages } } +// checks a compact commit. returns `None` if it was bad and fn check_compact_commit( - msg: CompactCommit, + msg: &CompactCommit, voters: &VoterSet, -) -> Option> { - if msg.precommits.len() != msg.auth_data.len() || msg.precommits.is_empty() { - debug!(target: "afg", "Skipping malformed compact commit"); - return None; - } + round: Round, + set_id: SetId, +) -> Result<(), i32> { + // 4f + 1 = equivocations from f voters. + let f = voters.total_weight() - voters.threshold(); + let full_threshold = voters.total_weight() + f; - // check signatures on all contained precommits. + // check total weight is not too high. + let mut total_weight = 0; for (_, ref id) in &msg.auth_data { - if !voters.contains_key(id) { + if let Some(weight) = voters.info(id).map(|info| info.weight()) { + total_weight += weight; + if total_weight > full_threshold { + return Err(cost::MALFORMED_COMMIT); + } + } else { debug!(target: "afg", "Skipping commit containing unknown voter {}", id); - return None; + return Err(cost::MALFORMED_COMMIT); + } + } + + if total_weight < voters.threshold() { + return Err(cost::MALFORMED_COMMIT); + } + + // check signatures on all contained precommits. + for (i, (precommit, &(ref sig, ref id))) in msg.precommits.iter() + .zip(&msg.auth_data) + .enumerate() + { + use crate::communication::gossip::Misbehavior; + use grandpa::Message as GrandpaMessage; + + if let Err(()) = check_message_sig::( + &GrandpaMessage::Precommit(precommit.clone()), + id, + sig, + round.0, + set_id.0, + ) { + debug!(target: "afg", "Bad commit message signature {}", id); + telemetry!(CONSENSUS_DEBUG; "afg.bad_commit_msg_signature"; "id" => ?id); + let cost = Misbehavior::BadCommitMessage { + signatures_checked: i as i32, + blocks_loaded: 0, + equivocations_caught: 0, + }.cost(); + + return Err(cost); } } - Some(msg) + Ok(()) } + /// An output sink for commit messages. struct CommitsOut> { network: N, set_id: SetId, is_voter: bool, - _marker: ::std::marker::PhantomData, + gossip_validator: Arc>, } impl> CommitsOut { /// Create a new commit output stream. - pub(crate) fn new(network: N, set_id: u64, is_voter: bool) -> Self { + pub(crate) fn new( + network: N, + set_id: u64, + is_voter: bool, + gossip_validator: Arc>, + ) -> Self { CommitsOut { network, set_id: SetId(set_id), is_voter, - _marker: Default::default(), + gossip_validator, } } } @@ -534,6 +702,16 @@ impl> Sink for CommitsOut { }); let topic = global_topic::(self.set_id.0); + + // the gossip validator needs to be made aware of the best commit-height we know of + // before gosipping + self.gossip_validator.note_commit_finalized( + commit.target_number, + |to, neighbor| self.network.send_message( + to, + GossipMessage::::from(neighbor).encode(), + ), + ); self.network.gossip_message(topic, message.encode(), false); Ok(AsyncSink::Ready) diff --git a/core/finality-grandpa/src/communication/periodic.rs b/core/finality-grandpa/src/communication/periodic.rs new file mode 100644 index 0000000000000..c6121370421bc --- /dev/null +++ b/core/finality-grandpa/src/communication/periodic.rs @@ -0,0 +1,93 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Periodic rebroadcast of neighbor packets. + +use super::{gossip::{NeighborPacket, GossipMessage}, Network}; +use futures::prelude::*; +use futures::sync::mpsc; +use runtime_primitives::traits::{NumberFor, Block as BlockT}; +use network::PeerId; +use tokio::timer::Delay; +use log::warn; +use parity_codec::Encode; + +use std::time::{Instant, Duration}; + +// how often to rebroadcast, if no other +const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60); + +fn rebroadcast_instant() -> Instant { + Instant::now() + REBROADCAST_AFTER +} + +/// A sender used to send neighbor packets to a background job. +pub(super) type NeighborPacketSender = mpsc::UnboundedSender<(Vec, NeighborPacket>)>; + +/// Does the work of sending neighbor packets, asynchronously. +/// +/// It may rebroadcast the last neighbor packet periodically when no +/// progress is made. +pub(super) fn neighbor_packet_worker(net: N) -> ( + impl Future + Send + 'static, + NeighborPacketSender, +) where + B: BlockT, + N: Network, +{ + let mut last = None; + let (tx, mut rx) = mpsc::unbounded::<(Vec, NeighborPacket>)>(); + let mut delay = Delay::new(rebroadcast_instant()); + + let work = futures::future::poll_fn(move || { + loop { + match rx.poll().expect("unbounded receivers do not error; qed") { + Async::Ready(None) => return Ok(Async::Ready(())), + Async::Ready(Some((to, packet))) => { + // send to peers. + net.send_message(to.clone(), GossipMessage::::from(packet.clone()).encode()); + + // rebroadcasting network. + delay.reset(rebroadcast_instant()); + last = Some((to, packet)); + } + Async::NotReady => break, + } + } + + // has to be done in a loop because it needs to be polled after + // re-scheduling. + loop { + match delay.poll() { + Err(e) => { + warn!(target: "afg", "Could not rebroadcast neighbor packets: {:?}", e); + delay.reset(rebroadcast_instant()); + } + Ok(Async::Ready(())) => { + delay.reset(rebroadcast_instant()); + + if let Some((ref to, ref packet)) = last { + // send to peers. + net.send_message(to.clone(), GossipMessage::::from(packet.clone()).encode()); + } + } + Ok(Async::NotReady) => return Ok(Async::NotReady), + } + } + }); + + (work, tx) +} diff --git a/core/finality-grandpa/src/communication/tests.rs b/core/finality-grandpa/src/communication/tests.rs new file mode 100644 index 0000000000000..2ef6d280646a4 --- /dev/null +++ b/core/finality-grandpa/src/communication/tests.rs @@ -0,0 +1,385 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Tests for the communication portion of the GRANDPA crate. + +use futures::sync::mpsc; +use futures::prelude::*; +use network::consensus_gossip as network_gossip; +use network::test::{Block, Hash}; +use network_gossip::Validator; +use tokio::runtime::current_thread; +use std::sync::Arc; +use keyring::AuthorityKeyring; +use parity_codec::Encode; + +use super::gossip::{self, GossipValidator}; +use super::{AuthorityId, VoterSet, Round, SetId}; + +enum Event { + MessagesFor(Hash, mpsc::UnboundedSender), + RegisterValidator(Arc>), + GossipMessage(Hash, Vec, bool), + SendMessage(Vec, Vec), + Report(network::PeerId, i32), + Announce(Hash), +} + +#[derive(Clone)] +struct TestNetwork { + sender: mpsc::UnboundedSender, +} + +impl super::Network for TestNetwork { + type In = mpsc::UnboundedReceiver; + + /// Get a stream of messages for a specific gossip topic. + fn messages_for(&self, topic: Hash) -> Self::In { + let (tx, rx) = mpsc::unbounded(); + let _ = self.sender.unbounded_send(Event::MessagesFor(topic, tx)); + + rx + } + + /// Register a gossip validator. + fn register_validator(&self, validator: Arc>) { + let _ = self.sender.unbounded_send(Event::RegisterValidator(validator)); + } + + /// Gossip a message out to all connected peers. + /// + /// Force causes it to be sent to all peers, even if they've seen it already. + /// Only should be used in case of consensus stall. + fn gossip_message(&self, topic: Hash, data: Vec, force: bool) { + let _ = self.sender.unbounded_send(Event::GossipMessage(topic, data, force)); + } + + /// Send a message to a bunch of specific peers, even if they've seen it already. + fn send_message(&self, who: Vec, data: Vec) { + let _ = self.sender.unbounded_send(Event::SendMessage(who, data)); + } + + /// Report a peer's cost or benefit after some action. + fn report(&self, who: network::PeerId, cost_benefit: i32) { + let _ = self.sender.unbounded_send(Event::Report(who, cost_benefit)); + } + + /// Inform peers that a block with given hash should be downloaded. + fn announce(&self, block: Hash) { + let _ = self.sender.unbounded_send(Event::Announce(block)); + } +} + +struct Tester { + net_handle: super::NetworkBridge, + gossip_validator: Arc>, + events: mpsc::UnboundedReceiver, +} + +impl Tester { + fn filter_network_events(self, mut pred: F) -> impl Future + where F: FnMut(Event) -> bool + { + let mut s = Some(self); + futures::future::poll_fn(move || loop { + match s.as_mut().unwrap().events.poll().expect("concluded early") { + Async::Ready(None) => panic!("concluded early"), + Async::Ready(Some(item)) => if pred(item) { + return Ok(Async::Ready(s.take().unwrap())) + }, + Async::NotReady => return Ok(Async::NotReady), + } + }) + } +} + +// some random config (not really needed) +fn config() -> crate::Config { + crate::Config { + gossip_duration: std::time::Duration::from_millis(10), + justification_period: 256, + local_key: None, + name: None, + } +} + +// needs to run in a tokio runtime. +fn make_test_network() -> impl Future { + let (tx, rx) = mpsc::unbounded(); + let net = TestNetwork { sender: tx }; + + #[derive(Clone)] + struct Exit; + + impl Future for Exit { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + Ok(Async::NotReady) + } + } + + let (bridge, startup_work) = super::NetworkBridge::new( + net.clone(), + config(), + Exit, + ); + + startup_work.map(move |()| Tester { + gossip_validator: bridge.validator.clone(), + net_handle: bridge, + events: rx, + }) +} + +fn make_ids(keys: &[AuthorityKeyring]) -> Vec<(AuthorityId, u64)> { + keys.iter() + .map(|key| AuthorityId(key.to_raw_public())) + .map(|id| (id, 1)) + .collect() +} + +struct NoopContext; + +impl network_gossip::ValidatorContext for NoopContext { + fn broadcast_topic(&mut self, _: Hash, _: bool) { } + fn broadcast_message(&mut self, _: Hash, _: Vec, _: bool) { } + fn send_message(&mut self, _: &network::PeerId, _: Vec) { } + fn send_topic(&mut self, _: &network::PeerId, _: Hash, _: bool) { } +} + +#[test] +fn good_commit_leads_to_relay() { + let private = [AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie]; + let public = make_ids(&private[..]); + let voter_set = Arc::new(public.iter().cloned().collect::>()); + + let round = 0; + let set_id = 1; + + let commit = { + let target_hash: Hash = [1; 32].into(); + let target_number = 500; + + let precommit = grandpa::Precommit { target_hash: target_hash.clone(), target_number }; + let payload = super::localized_payload( + round, set_id, &grandpa::Message::Precommit(precommit.clone()) + ); + + let mut precommits = Vec::new(); + let mut auth_data = Vec::new(); + + for (i, key) in private.iter().enumerate() { + precommits.push(precommit.clone()); + + let signature = key.sign(&payload[..]); + auth_data.push((signature, public[i].0.clone())) + } + + grandpa::CompactCommit { + target_hash, + target_number, + precommits, + auth_data, + } + }; + + let encoded_commit = gossip::GossipMessage::::Commit(gossip::FullCommitMessage { + round: Round(round), + set_id: SetId(set_id), + message: commit, + }).encode(); + + let id = network::PeerId::random(); + let global_topic = super::global_topic::(set_id); + + let test = make_test_network() + .and_then(move |tester| { + // register a peer. + tester.gossip_validator.new_peer(&mut NoopContext, &id, network::config::Roles::FULL); + Ok((tester, id)) + }) + .and_then(move |(tester, id)| { + // start round, dispatch commit, and wait for broadcast. + let (commits_in, _) = tester.net_handle.global_communication(SetId(1), voter_set, false); + + { + let (action, _) = tester.gossip_validator.do_validate(&id, &encoded_commit[..]); + match action { + gossip::Action::ProcessAndDiscard(t, _) => assert_eq!(t, global_topic), + _ => panic!("wrong expected outcome from initial commit validation"), + } + } + + let commit_to_send = encoded_commit.clone(); + + // asking for global communication will cause the test network + // to send us an event asking us for a stream. use it to + // send a message. + let sender_id = id.clone(); + let send_message = tester.filter_network_events(move |event| match event { + Event::MessagesFor(topic, sender) => { + if topic != global_topic { return false } + let _ = sender.unbounded_send(network_gossip::TopicNotification { + message: commit_to_send.clone(), + sender: Some(sender_id.clone()), + }); + + true + } + _ => false, + }); + + // when the commit comes in, we'll tell the callback it was good. + let handle_commit = commits_in.into_future() + .map(|(item, _)| { + let (_, _, mut callback) = item.unwrap(); + (callback)(super::CommitProcessingOutcome::Good); + }) + .map_err(|_| panic!("could not process commit")); + + // once the message is sent and commit is "handled" we should have + // a repropagation event coming from the network. + send_message.join(handle_commit).and_then(move |(tester, ())| { + tester.filter_network_events(move |event| match event { + Event::GossipMessage(topic, data, false) => { + if topic == global_topic && data == encoded_commit { + true + } else { + panic!("Trying to gossip something strange") + } + } + _ => false, + }) + }) + .map_err(|_| panic!("could not watch for gossip message")) + .map(|_| ()) + }); + + current_thread::block_on_all(test).unwrap(); +} + +#[test] +fn bad_commit_leads_to_report() { + let private = [AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie]; + let public = make_ids(&private[..]); + let voter_set = Arc::new(public.iter().cloned().collect::>()); + + let round = 0; + let set_id = 1; + + let commit = { + let target_hash: Hash = [1; 32].into(); + let target_number = 500; + + let precommit = grandpa::Precommit { target_hash: target_hash.clone(), target_number }; + let payload = super::localized_payload( + round, set_id, &grandpa::Message::Precommit(precommit.clone()) + ); + + let mut precommits = Vec::new(); + let mut auth_data = Vec::new(); + + for (i, key) in private.iter().enumerate() { + precommits.push(precommit.clone()); + + let signature = key.sign(&payload[..]); + auth_data.push((signature, public[i].0.clone())) + } + + grandpa::CompactCommit { + target_hash, + target_number, + precommits, + auth_data, + } + }; + + let encoded_commit = gossip::GossipMessage::::Commit(gossip::FullCommitMessage { + round: Round(round), + set_id: SetId(set_id), + message: commit, + }).encode(); + + let id = network::PeerId::random(); + let global_topic = super::global_topic::(set_id); + + let test = make_test_network() + .and_then(move |tester| { + // register a peer. + tester.gossip_validator.new_peer(&mut NoopContext, &id, network::config::Roles::FULL); + Ok((tester, id)) + }) + .and_then(move |(tester, id)| { + // start round, dispatch commit, and wait for broadcast. + let (commits_in, _) = tester.net_handle.global_communication(SetId(1), voter_set, false); + + { + let (action, _) = tester.gossip_validator.do_validate(&id, &encoded_commit[..]); + match action { + gossip::Action::ProcessAndDiscard(t, _) => assert_eq!(t, global_topic), + _ => panic!("wrong expected outcome from initial commit validation"), + } + } + + let commit_to_send = encoded_commit.clone(); + + // asking for global communication will cause the test network + // to send us an event asking us for a stream. use it to + // send a message. + let sender_id = id.clone(); + let send_message = tester.filter_network_events(move |event| match event { + Event::MessagesFor(topic, sender) => { + if topic != global_topic { return false } + let _ = sender.unbounded_send(network_gossip::TopicNotification { + message: commit_to_send.clone(), + sender: Some(sender_id.clone()), + }); + + true + } + _ => false, + }); + + // when the commit comes in, we'll tell the callback it was good. + let handle_commit = commits_in.into_future() + .map(|(item, _)| { + let (_, _, mut callback) = item.unwrap(); + (callback)(super::CommitProcessingOutcome::Bad); + }) + .map_err(|_| panic!("could not process commit")); + + // once the message is sent and commit is "handled" we should have + // a report event coming from the network. + send_message.join(handle_commit).and_then(move |(tester, ())| { + tester.filter_network_events(move |event| match event { + Event::Report(who, cost_benefit) => { + if who == id && cost_benefit == super::cost::INVALID_COMMIT { + true + } else { + panic!("reported unknown peer or unexpected cost"); + } + } + _ => false, + }) + }) + .map_err(|_| panic!("could not watch for peer report")) + .map(|_| ()) + }); + + current_thread::block_on_all(test).unwrap(); +} diff --git a/core/finality-grandpa/src/environment.rs b/core/finality-grandpa/src/environment.rs index 5e099a92c2373..387673dea33b6 100644 --- a/core/finality-grandpa/src/environment.rs +++ b/core/finality-grandpa/src/environment.rs @@ -642,7 +642,7 @@ impl, N, RA> voter::Environment, N, RA> voter::Environment Self::Timer { diff --git a/core/finality-grandpa/src/lib.rs b/core/finality-grandpa/src/lib.rs index 83db670ac73c2..56b66e7026d6b 100644 --- a/core/finality-grandpa/src/lib.rs +++ b/core/finality-grandpa/src/lib.rs @@ -337,42 +337,7 @@ pub fn block_import, RA, PRA>( )) } -fn global_communication, I, O>( - commits_in: I, - commits_out: O, -) -> ( - impl Stream< - Item = voter::CommunicationIn, AuthoritySignature, AuthorityId>, - Error = CommandOrError>, - >, - impl Sink< - SinkItem = voter::CommunicationOut, AuthoritySignature, AuthorityId>, - SinkError = CommandOrError>, - >, -) where - I: Stream< - Item = (u64, ::grandpa::CompactCommit, AuthoritySignature, AuthorityId>), - Error = CommandOrError>, - >, - O: Sink< - SinkItem = (u64, ::grandpa::Commit, AuthoritySignature, AuthorityId>), - SinkError = CommandOrError>, - >, -{ - let global_in = commits_in.map(|(round, commit)| { - voter::CommunicationIn::Commit(round, commit, voter::Callback::Blank) - }); - - // NOTE: eventually this will also handle catch-up requests - let global_out = commits_out.with(|global| match global { - voter::CommunicationOut::Commit(round, commit) => Ok((round, commit)), - _ => unimplemented!(), - }); - - (global_in, global_out) -} - -fn committer_communication, B, E, N, RA>( +fn global_communication, B, E, N, RA>( local_key: Option<&Arc>, set_id: u64, voters: &Arc>, @@ -380,11 +345,11 @@ fn committer_communication, B, E, N, RA>( network: &NetworkBridge, ) -> ( impl Stream< - Item = (u64, ::grandpa::CompactCommit, AuthoritySignature, AuthorityId>), + Item = voter::CommunicationIn, AuthoritySignature, AuthorityId>, Error = CommandOrError>, >, impl Sink< - SinkItem = (u64, ::grandpa::Commit, AuthoritySignature, AuthorityId>), + SinkItem = voter::CommunicationOut, AuthoritySignature, AuthorityId>, SinkError = CommandOrError>, >, ) where @@ -395,6 +360,7 @@ fn committer_communication, B, E, N, RA>( NumberFor: BlockNumberOps, DigestItemFor: DigestItem, { + let is_voter = local_key .map(|pair| voters.contains_key(&pair.public().into())) .unwrap_or(false); @@ -413,10 +379,26 @@ fn committer_communication, B, E, N, RA>( commit_in, ); - let commit_in = commit_in.map_err(Into::into); - let commit_out = commit_out.sink_map_err(Into::into); + let commits_in = commit_in.map_err(CommandOrError::from); + let commits_out = commit_out.sink_map_err(CommandOrError::from); + + let global_in = commits_in.map(|(round, commit, mut callback)| { + let callback = voter::Callback::Work(Box::new(move |outcome| match outcome { + voter::CommitProcessingOutcome::Good(_) => + callback(communication::CommitProcessingOutcome::Good), + voter::CommitProcessingOutcome::Bad(_) => + callback(communication::CommitProcessingOutcome::Bad), + })); + voter::CommunicationIn::Commit(round, commit, callback) + }); + + // NOTE: eventually this will also handle catch-up requests + let global_out = commits_out.with(|global| match global { + voter::CommunicationOut::Commit(round, commit) => Ok((round, commit)), + _ => unimplemented!(), + }); - (commit_in, commit_out) + (global_in, global_out) } /// Register the finality tracker inherent data provider (which is used by @@ -456,7 +438,7 @@ pub fn run_grandpa, N, RA>( link: LinkHalf, network: N, inherent_data_providers: InherentDataProviders, - on_exit: impl Future + Send + 'static, + on_exit: impl Future + Clone + Send + 'static, ) -> ::client::error::Result + Send + 'static> where Block::Hash: Ord, B: Backend + 'static, @@ -470,7 +452,7 @@ pub fn run_grandpa, N, RA>( { use futures::future::{self, Loop as FutureLoop}; - let network = NetworkBridge::new(network, config.clone()); + let (network, network_startup) = NetworkBridge::new(network, config.clone(), on_exit.clone()); let LinkHalf { client, @@ -538,7 +520,7 @@ pub fn run_grandpa, N, RA>( chain_info.chain.finalized_number, ); - let (commit_in, commit_out) = committer_communication( + let global_comms = global_communication( config.local_key.as_ref(), env.set_id, &env.voters, @@ -546,11 +528,6 @@ pub fn run_grandpa, N, RA>( &network, ); - let global_comms = global_communication::( - commit_in, - commit_out, - ); - let voters = (*env.voters).clone(); let last_completed_round = completed_rounds.last(); @@ -675,5 +652,7 @@ pub fn run_grandpa, N, RA>( telemetry!(CONSENSUS_WARN; "afg.voter_failed"; "e" => ?e); }); + let voter_work = network_startup.and_then(move |()| voter_work); + Ok(voter_work.select(on_exit).then(|_| Ok(()))) } diff --git a/core/finality-grandpa/src/tests.rs b/core/finality-grandpa/src/tests.rs index eea03a34a2dee..55c0d57f3a5c2 100644 --- a/core/finality-grandpa/src/tests.rs +++ b/core/finality-grandpa/src/tests.rs @@ -204,11 +204,27 @@ impl Network for MessageRouting { }) } + fn report(&self, _who: network::PeerId, _cost_benefit: i32) { + + } + fn announce(&self, _block: Hash) { } } +#[derive(Clone)] +struct Exit; + +impl Future for Exit { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + Ok(Async::NotReady) + } +} + #[derive(Default, Clone)] struct TestApi { genesis_authorities: Vec<(AuthorityId, u64)>, @@ -402,7 +418,7 @@ fn run_to_completion_with( link, MessageRouting::new(net.clone(), peer_id), InherentDataProviders::new(), - futures::empty(), + Exit, ).expect("all in order with client and network"); assert_send(&voter); @@ -503,7 +519,7 @@ fn finalize_3_voters_1_observer() { link, MessageRouting::new(net.clone(), peer_id), InherentDataProviders::new(), - futures::empty(), + Exit, ).expect("all in order with client and network"); runtime.spawn(voter); @@ -665,7 +681,7 @@ fn transition_3_voters_twice_1_observer() { link, MessageRouting::new(net.clone(), peer_id), InherentDataProviders::new(), - futures::empty(), + Exit, ).expect("all in order with client and network"); runtime.spawn(voter); @@ -1065,7 +1081,7 @@ fn voter_persists_its_votes() { link, MessageRouting::new(net.clone(), 0), InherentDataProviders::new(), - futures::empty(), + Exit, ).expect("all in order with client and network"); let voter = future::poll_fn(move || { @@ -1112,7 +1128,8 @@ fn voter_persists_its_votes() { name: Some(format!("peer#{}", 1)), }; let routing = MessageRouting::new(net.clone(), 1); - let network = communication::NetworkBridge::new(routing, config.clone()); + let (network, routing_work) = communication::NetworkBridge::new(routing, config.clone(), Exit); + runtime.block_on(routing_work).unwrap(); let (round_rx, round_tx) = network.round_communication( communication::Round(1), diff --git a/core/finality-grandpa/src/until_imported.rs b/core/finality-grandpa/src/until_imported.rs index 4b867c18c8e2b..2efb827d378d0 100644 --- a/core/finality-grandpa/src/until_imported.rs +++ b/core/finality-grandpa/src/until_imported.rs @@ -258,13 +258,13 @@ pub(crate) type UntilVoteTargetImported = UntilImported { - inner: Arc<(AtomicUsize, Mutex)>>)>, +pub(crate) struct BlockCommitMessage { + inner: Arc<(AtomicUsize, Mutex, U)>>)>, target_number: NumberFor, } -impl BlockUntilImported for BlockCommitMessage { - type Blocked = (u64, CompactCommit); +impl BlockUntilImported for BlockCommitMessage { + type Blocked = (u64, CompactCommit, U); fn schedule_wait( input: Self::Blocked, @@ -400,11 +400,11 @@ impl BlockUntilImported for BlockCommitMessage { /// A stream which gates off incoming commit messages until all referenced /// block hashes have been imported. -pub(crate) type UntilCommitBlocksImported = UntilImported< +pub(crate) type UntilCommitBlocksImported = UntilImported< Block, Status, I, - BlockCommitMessage, + BlockCommitMessage, >; #[cfg(test)] @@ -507,7 +507,7 @@ mod tests { commit_rx.map_err(|_| panic!("should never error")), ); - commit_tx.unbounded_send((0, unknown_commit.clone())).unwrap(); + commit_tx.unbounded_send((0, unknown_commit.clone(), ())).unwrap(); let inner_chain_state = chain_state.clone(); let work = until_imported @@ -527,7 +527,7 @@ mod tests { }); let mut runtime = Runtime::new().unwrap(); - assert_eq!(runtime.block_on(work).map_err(|(e, _)| e).unwrap().0, Some((0, unknown_commit))); + assert_eq!(runtime.block_on(work).map_err(|(e, _)| e).unwrap().0, Some((0, unknown_commit, ()))); } #[test] @@ -567,11 +567,11 @@ mod tests { commit_rx.map_err(|_| panic!("should never error")), ); - commit_tx.unbounded_send((0, known_commit.clone())).unwrap(); + commit_tx.unbounded_send((0, known_commit.clone(), ())).unwrap(); let work = until_imported.into_future(); let mut runtime = Runtime::new().unwrap(); - assert_eq!(runtime.block_on(work).map_err(|(e, _)| e).unwrap().0, Some((0, known_commit))); + assert_eq!(runtime.block_on(work).map_err(|(e, _)| e).unwrap().0, Some((0, known_commit, ()))); } } diff --git a/core/network/src/lib.rs b/core/network/src/lib.rs index 0200494517879..f07013004f6d6 100644 --- a/core/network/src/lib.rs +++ b/core/network/src/lib.rs @@ -38,7 +38,10 @@ pub mod specialization; pub mod test; pub use chain::Client as ClientHandle; -pub use service::{Service, FetchFuture, TransactionPool, ManageNetwork, NetworkMsg, SyncProvider, ExHashT}; +pub use service::{ + Service, FetchFuture, TransactionPool, ManageNetwork, NetworkMsg, + SyncProvider, ExHashT, ReportHandle, +}; pub use protocol::{ProtocolStatus, PeerInfo, Context}; pub use sync::{Status as SyncStatus, SyncState}; pub use network_libp2p::{ diff --git a/core/network/src/service.rs b/core/network/src/service.rs index 10367123dba67..d7d2da494c848 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -45,7 +45,6 @@ pub use network_libp2p::PeerId; /// Type that represents fetch completion future. pub type FetchFuture = oneshot::Receiver>; - /// Sync status pub trait SyncProvider: Send + Sync { /// Get a stream of sync statuses. @@ -129,6 +128,20 @@ impl> Link for NetworkLink { } } +/// A cloneable handle for reporting cost/benefits of peers. +#[derive(Clone)] +pub struct ReportHandle { + inner: PeersetHandle, // wraps it so we don't have to worry about breaking API. +} + +impl ReportHandle { + /// Report a given peer as either beneficial (+) or costly (-) according to the + /// given scalar. + pub fn report_peer(&self, who: PeerId, cost_benefit: i32) { + self.inner.report_peer(who, cost_benefit); + } +} + /// Substrate network service. Handles network IO and manages connectivity. pub struct Service> { /// Sinks to propagate status updates. @@ -268,6 +281,17 @@ impl> Service { )); } + /// Return a cloneable handle for reporting peers' benefits or misbehavior. + pub fn report_handle(&self) -> ReportHandle { + ReportHandle { inner: self.peerset.clone() } + } + + /// Report a given peer as either beneficial (+) or costly (-) according to the + /// given scalar. + pub fn report_peer(&self, who: PeerId, cost_benefit: i32) { + self.peerset.report_peer(who, cost_benefit); + } + /// Execute a closure with the chain-specific network specialization. pub fn with_spec(&self, f: F) where F: FnOnce(&mut S, &mut Context) + Send + 'static