Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Reduce consensus gossip spam (#1538)
Browse files Browse the repository at this point in the history
* core: keep known gossip messages for twice their expiration

* core: test expiration of known gossip messages

* core: only broadcast grandpa votes if authority in current set

* core: only broadcast grandpa commits if authority in current set
  • Loading branch information
andresilva authored and gavofyork committed Jan 23, 2019
1 parent 9040ca9 commit f608158
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 17 deletions.
8 changes: 7 additions & 1 deletion core/finality-grandpa/src/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,14 +395,16 @@ pub(crate) struct CommitsOut<Block, N> {
network: N,
set_id: u64,
_marker: ::std::marker::PhantomData<Block>,
is_voter: bool,
}

impl<Block, N> CommitsOut<Block, N> {
/// Create a new commit output stream.
pub(crate) fn new(network: N, set_id: u64) -> Self {
pub(crate) fn new(network: N, set_id: u64, is_voter: bool) -> Self {
CommitsOut {
network,
set_id,
is_voter,
_marker: Default::default(),
}
}
Expand All @@ -413,6 +415,10 @@ impl<Block: BlockT, N: Network> Sink for CommitsOut<Block, N> {
type SinkError = Error;

fn start_send(&mut self, input: (u64, Commit<Block>)) -> StartSend<Self::SinkItem, Error> {
if !self.is_voter {
return Ok(AsyncSink::Ready);
}

let (round, commit) = input;
let (precommits, auth_data) = commit.precommits.into_iter()
.map(|signed| (signed.precommit, (signed.signature, signed.id)))
Expand Down
12 changes: 11 additions & 1 deletion core/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,10 +528,13 @@ impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, Numb
self.voters.clone(),
);

let local_key = self.config.local_key.as_ref()
.filter(|pair| self.voters.contains_key(&pair.public().into()));

let (out_rx, outgoing) = ::communication::outgoing_messages::<Block, _>(
round,
self.set_id,
self.config.local_key.clone(),
local_key.cloned(),
self.voters.clone(),
self.network.clone(),
);
Expand Down Expand Up @@ -1407,6 +1410,7 @@ pub fn block_import<B, E, Block: BlockT<Hash=H256>, RA, PRA>(
}

fn committer_communication<Block: BlockT<Hash=H256>, B, E, N, RA>(
local_key: Option<Arc<ed25519::Pair>>,
set_id: u64,
voters: &Arc<HashMap<Ed25519AuthorityId, u64>>,
client: &Arc<Client<B, E, Block, RA>>,
Expand Down Expand Up @@ -1442,9 +1446,14 @@ fn committer_communication<Block: BlockT<Hash=H256>, B, E, N, RA>(
commit_in,
);

let is_voter = local_key
.map(|pair| voters.contains_key(&pair.public().into()))
.unwrap_or(false);

let commit_out = ::communication::CommitsOut::<Block, _>::new(
network.clone(),
set_id,
is_voter,
);

let commit_in = commit_in.map_err(Into::into);
Expand Down Expand Up @@ -1524,6 +1533,7 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
);

let committer_data = committer_communication(
config.local_key.clone(),
env.set_id,
&env.voters,
&client,
Expand Down
51 changes: 36 additions & 15 deletions core/network/src/consensus_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ struct MessageEntry<B: BlockT> {
message_hash: B::Hash,
message: ConsensusMessage,
broadcast: bool,
instant: Instant,
}

/// Consensus network protocol handler. Manages statements and candidate requests.
Expand All @@ -50,6 +49,7 @@ pub struct ConsensusGossip<B: BlockT> {
live_message_sinks: HashMap<B::Hash, Vec<mpsc::UnboundedSender<ConsensusMessage>>>,
messages: Vec<MessageEntry<B>>,
known_messages: HashSet<(B::Hash, B::Hash)>,
message_times: HashMap<(B::Hash, B::Hash), Instant>,
session_start: Option<B::Hash>,
}

Expand All @@ -61,6 +61,7 @@ impl<B: BlockT> ConsensusGossip<B> {
live_message_sinks: HashMap::new(),
messages: Default::default(),
known_messages: Default::default(),
message_times: Default::default(),
session_start: None
}
}
Expand Down Expand Up @@ -155,9 +156,10 @@ impl<B: BlockT> ConsensusGossip<B> {
topic,
message_hash,
broadcast,
instant: Instant::now(),
message: get_message(),
});

self.message_times.insert((topic, message_hash), Instant::now());
}
}

Expand All @@ -174,20 +176,33 @@ impl<B: BlockT> ConsensusGossip<B> {
!sinks.is_empty()
});

let hashes = &mut self.known_messages;
let message_times = &mut self.message_times;
let known_messages = &mut self.known_messages;
let before = self.messages.len();
let now = Instant::now();

self.messages.retain(|entry| {
if entry.instant + MESSAGE_LIFETIME >= now && predicate(&entry.topic) {
true
} else {
hashes.remove(&(entry.topic, entry.message_hash));
false
}
message_times.get(&(entry.topic, entry.message_hash))
.map(|instant| *instant + MESSAGE_LIFETIME >= now && predicate(&entry.topic))
.unwrap_or(false)
});
trace!(target:"gossip", "Cleaned up {} stale messages, {} left", before - self.messages.len(), self.messages.len());

known_messages.retain(|(topic, message_hash)| {
message_times.get(&(*topic, *message_hash))
.map(|instant| *instant + (2 * MESSAGE_LIFETIME) >= now && predicate(topic))
.unwrap_or(false)
});

trace!(target:"gossip", "Cleaned up {} stale messages, {} left ({} known)",
before - self.messages.len(),
self.messages.len(),
known_messages.len(),
);

message_times.retain(|h, _| known_messages.contains(h));

for (_, ref mut peer) in self.peers.iter_mut() {
peer.known_messages.retain(|h| hashes.contains(h));
peer.known_messages.retain(|h| known_messages.contains(h));
}
}

Expand Down Expand Up @@ -318,10 +333,10 @@ mod tests {
consensus.messages.push(MessageEntry {
topic: $topic,
message_hash: $hash,
instant: $now,
message: $m,
broadcast: false,
})
});
consensus.message_times.insert(($topic, $hash), $now);
}
}

Expand All @@ -346,9 +361,15 @@ mod tests {
assert_eq!(consensus.known_messages.len(), 1);
assert!(consensus.known_messages.contains(&(best_hash, m2_hash)));

// make timestamp expired
// make timestamp expired, but the message is still kept as known
consensus.messages.clear();
push_msg!(best_hash, m2_hash, now - MESSAGE_LIFETIME, m2);
push_msg!(best_hash, m2_hash, now - MESSAGE_LIFETIME, m2.clone());
consensus.collect_garbage(|_topic| true);
assert!(consensus.messages.is_empty());
assert_eq!(consensus.known_messages.len(), 1);

// make timestamp expired past the known message lifetime
push_msg!(best_hash, m2_hash, now - (2 * MESSAGE_LIFETIME), m2);
consensus.collect_garbage(|_topic| true);
assert!(consensus.messages.is_empty());
assert!(consensus.known_messages.is_empty());
Expand Down

0 comments on commit f608158

Please sign in to comment.