Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Cache known votes in gossip (#227)
Browse files Browse the repository at this point in the history
* Implement known messages cache.

* Add tests.

* Appease clippy.

* More clippy

Co-authored-by: adoerr <0xad@gmx.net>
  • Loading branch information
tomusdrw and adoerr authored Jul 5, 2021
1 parent e08e656 commit b0e0cdb
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 35 deletions.
3 changes: 2 additions & 1 deletion client/beefy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2018"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"

[dependencies]
fnv = "1.0.6"
futures = "0.3"
hex = "0.4"
log = "0.4"
Expand Down Expand Up @@ -35,4 +36,4 @@ beefy-primitives = { path = "../beefy-primitives" }
[dev-dependencies]
sc-network-test = { git = "https://github.com/paritytech/substrate", branch = "master" }

beefy-test = { path = "../beefy-test" }
beefy-test = { path = "../beefy-test" }
199 changes: 165 additions & 34 deletions client/beefy/src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
use codec::{Decode, Encode};
use log::{debug, trace};
use parking_lot::RwLock;
use std::collections::BTreeMap;

use sc_network::PeerId;
use sc_network_gossip::{MessageIntent, ValidationResult, Validator, ValidatorContext};

use sp_core::hashing::twox_64;
use sp_runtime::traits::{Block, Hash, Header, NumberFor};

use beefy_primitives::{
Expand All @@ -41,6 +43,11 @@ where
<<B::Header as Header>::Hashing as Hash>::hash(b"beefy")
}

/// A type that represents hash of the message.
pub type MessageHash = [u8; 8];

type KnownVotes<B> = BTreeMap<NumberFor<B>, fnv::FnvHashSet<MessageHash>>;

/// BEEFY gossip validator
///
/// Validate BEEFY gossip messages and limit the number of live BEEFY voting rounds.
Expand All @@ -54,7 +61,7 @@ where
B: Block,
{
topic: B::Hash,
live_rounds: RwLock<Vec<NumberFor<B>>>,
known_votes: RwLock<KnownVotes<B>>,
}

impl<B> GossipValidator<B>
Expand All @@ -64,7 +71,7 @@ where
pub fn new() -> GossipValidator<B> {
GossipValidator {
topic: topic::<B>(),
live_rounds: RwLock::new(Vec::new()),
known_votes: RwLock::new(BTreeMap::new()),
}
}

Expand All @@ -77,19 +84,34 @@ where
pub(crate) fn note_round(&self, round: NumberFor<B>) {
trace!(target: "beefy", "🥩 About to note round #{}", round);

let mut live = self.live_rounds.write();
let mut live = self.known_votes.write();

if let Some(idx) = live.binary_search(&round).err() {
live.insert(idx, round);
#[allow(clippy::map_entry)]
if !live.contains_key(&round) {
live.insert(round, Default::default());
}

if live.len() > MAX_LIVE_GOSSIP_ROUNDS {
let _ = live.remove(0);
let to_remove = live.iter().next().map(|x| x.0).copied();
if let Some(first) = to_remove {
live.remove(&first);
}
}
}

fn is_live(live_rounds: &[NumberFor<B>], round: NumberFor<B>) -> bool {
live_rounds.binary_search(&round).is_ok()
fn add_known(known_votes: &mut KnownVotes<B>, round: &NumberFor<B>, hash: MessageHash) {
known_votes.get_mut(round).map(|known| known.insert(hash));
}

fn is_live(known_votes: &KnownVotes<B>, round: &NumberFor<B>) -> bool {
known_votes.contains_key(round)
}

fn is_known(known_votes: &KnownVotes<B>, round: &NumberFor<B>, hash: &MessageHash) -> bool {
known_votes
.get(round)
.map(|known| known.contains(hash))
.unwrap_or(false)
}
}

Expand All @@ -104,7 +126,26 @@ where
mut data: &[u8],
) -> ValidationResult<B::Hash> {
if let Ok(msg) = VoteMessage::<MmrRootHash, NumberFor<B>, Public, Signature>::decode(&mut data) {
let msg_hash = twox_64(data);
let round = msg.commitment.block_number;

// Verify general usefulness of the message.
// We are going to discard old votes right away (without verification)
// Also we keep track of already received votes to avoid verifying duplicates.
{
let known_votes = self.known_votes.read();

if !GossipValidator::<B>::is_live(&known_votes, &round) {
return ValidationResult::Discard;
}

if GossipValidator::<B>::is_known(&known_votes, &round, &msg_hash) {
return ValidationResult::ProcessAndKeep(self.topic);
}
}

if BeefyKeystore::verify(&msg.id, &msg.signature, &msg.commitment.encode()) {
GossipValidator::<B>::add_known(&mut *self.known_votes.write(), &round, msg_hash);
return ValidationResult::ProcessAndKeep(self.topic);
} else {
// TODO: report peer
Expand All @@ -116,33 +157,35 @@ where
}

fn message_expired<'a>(&'a self) -> Box<dyn FnMut(B::Hash, &[u8]) -> bool + 'a> {
let live_rounds = self.live_rounds.read();
let known_votes = self.known_votes.read();
Box::new(move |_topic, mut data| {
let msg = match VoteMessage::<MmrRootHash, NumberFor<B>, Public, Signature>::decode(&mut data) {
Ok(vote) => vote,
Err(_) => return true,
};

let expired = !GossipValidator::<B>::is_live(&live_rounds, msg.commitment.block_number);
let round = msg.commitment.block_number;
let expired = !GossipValidator::<B>::is_live(&known_votes, &round);

trace!(target: "beefy", "🥩 Message for round #{} expired: {}", msg.commitment.block_number, expired);
trace!(target: "beefy", "🥩 Message for round #{} expired: {}", round, expired);

expired
})
}

#[allow(clippy::type_complexity)]
fn message_allowed<'a>(&'a self) -> Box<dyn FnMut(&PeerId, MessageIntent, &B::Hash, &[u8]) -> bool + 'a> {
let live_rounds = self.live_rounds.read();
let known_votes = self.known_votes.read();
Box::new(move |_who, _intent, _topic, mut data| {
let msg = match VoteMessage::<MmrRootHash, NumberFor<B>, Public, Signature>::decode(&mut data) {
Ok(vote) => vote,
Err(_) => return true,
};

let allowed = GossipValidator::<B>::is_live(&live_rounds, msg.commitment.block_number);
let round = msg.commitment.block_number;
let allowed = GossipValidator::<B>::is_live(&known_votes, &round);

trace!(target: "beefy", "🥩 Message for round #{} allowed: {}", msg.commitment.block_number, allowed);
trace!(target: "beefy", "🥩 Message for round #{} allowed: {}", round, allowed);

allowed
})
Expand All @@ -151,32 +194,38 @@ where

#[cfg(test)]
mod tests {
use super::{GossipValidator, MAX_LIVE_GOSSIP_ROUNDS};
use crate::keystore::BeefyKeystore;

use super::*;
use beefy_primitives::{crypto::Signature, Commitment, MmrRootHash, VoteMessage, KEY_TYPE};
use beefy_test::Keyring;
use sc_keystore::LocalKeystore;
use sc_network_test::Block;
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};

#[test]
fn note_round_works() {
let gv = GossipValidator::<Block>::new();

gv.note_round(1u64);

let live = gv.live_rounds.read();
assert!(GossipValidator::<Block>::is_live(&live, 1u64));
let live = gv.known_votes.read();
assert!(GossipValidator::<Block>::is_live(&live, &1u64));

drop(live);

gv.note_round(3u64);
gv.note_round(7u64);
gv.note_round(10u64);

let live = gv.live_rounds.read();
let live = gv.known_votes.read();

assert_eq!(live.len(), MAX_LIVE_GOSSIP_ROUNDS);

assert!(!GossipValidator::<Block>::is_live(&live, 1u64));
assert!(GossipValidator::<Block>::is_live(&live, 3u64));
assert!(GossipValidator::<Block>::is_live(&live, 7u64));
assert!(GossipValidator::<Block>::is_live(&live, 10u64));
assert!(!GossipValidator::<Block>::is_live(&live, &1u64));
assert!(GossipValidator::<Block>::is_live(&live, &3u64));
assert!(GossipValidator::<Block>::is_live(&live, &7u64));
assert!(GossipValidator::<Block>::is_live(&live, &10u64));
}

#[test]
Expand All @@ -188,12 +237,12 @@ mod tests {
gv.note_round(10u64);
gv.note_round(1u64);

let live = gv.live_rounds.read();
let live = gv.known_votes.read();

assert_eq!(live.len(), MAX_LIVE_GOSSIP_ROUNDS);

assert!(GossipValidator::<Block>::is_live(&live, 3u64));
assert!(!GossipValidator::<Block>::is_live(&live, 1u64));
assert!(GossipValidator::<Block>::is_live(&live, &3u64));
assert!(!GossipValidator::<Block>::is_live(&live, &1u64));

drop(live);

Expand All @@ -202,13 +251,13 @@ mod tests {
gv.note_round(20u64);
gv.note_round(2u64);

let live = gv.live_rounds.read();
let live = gv.known_votes.read();

assert_eq!(live.len(), MAX_LIVE_GOSSIP_ROUNDS);

assert!(GossipValidator::<Block>::is_live(&live, 15u64));
assert!(GossipValidator::<Block>::is_live(&live, 20u64));
assert!(GossipValidator::<Block>::is_live(&live, 23u64));
assert!(GossipValidator::<Block>::is_live(&live, &15u64));
assert!(GossipValidator::<Block>::is_live(&live, &20u64));
assert!(GossipValidator::<Block>::is_live(&live, &23u64));
}

#[test]
Expand All @@ -219,7 +268,7 @@ mod tests {
gv.note_round(7u64);
gv.note_round(10u64);

let live = gv.live_rounds.read();
let live = gv.known_votes.read();

assert_eq!(live.len(), MAX_LIVE_GOSSIP_ROUNDS);

Expand All @@ -228,12 +277,94 @@ mod tests {
// note round #7 again -> should not change anything
gv.note_round(7u64);

let live = gv.live_rounds.read();
let live = gv.known_votes.read();

assert_eq!(live.len(), MAX_LIVE_GOSSIP_ROUNDS);

assert!(GossipValidator::<Block>::is_live(&live, 3u64));
assert!(GossipValidator::<Block>::is_live(&live, 7u64));
assert!(GossipValidator::<Block>::is_live(&live, 10u64));
assert!(GossipValidator::<Block>::is_live(&live, &3u64));
assert!(GossipValidator::<Block>::is_live(&live, &7u64));
assert!(GossipValidator::<Block>::is_live(&live, &10u64));
}

struct TestContext;
impl<B: sp_runtime::traits::Block> ValidatorContext<B> for TestContext {
fn broadcast_topic(&mut self, _topic: B::Hash, _force: bool) {
todo!()
}

fn broadcast_message(&mut self, _topic: B::Hash, _message: Vec<u8>, _force: bool) {
todo!()
}

fn send_message(&mut self, _who: &sc_network::PeerId, _message: Vec<u8>) {
todo!()
}

fn send_topic(&mut self, _who: &sc_network::PeerId, _topic: B::Hash, _force: bool) {
todo!()
}
}

fn sign_commitment<BN: Encode, P: Encode>(who: &Keyring, commitment: &Commitment<BN, P>) -> Signature {
let store: SyncCryptoStorePtr = std::sync::Arc::new(LocalKeystore::in_memory());
SyncCryptoStore::ecdsa_generate_new(&*store, KEY_TYPE, Some(&who.to_seed())).unwrap();
let beefy_keystore: BeefyKeystore = Some(store).into();

beefy_keystore.sign(&who.public(), &commitment.encode()).unwrap()
}

#[test]
fn should_avoid_verifying_signatures_twice() {
let gv = GossipValidator::<Block>::new();
let sender = sc_network::PeerId::random();
let mut context = TestContext;

let commitment = Commitment {
payload: MmrRootHash::default(),
block_number: 3_u64,
validator_set_id: 0,
};

let signature = sign_commitment(&Keyring::Alice, &commitment);

let vote = VoteMessage {
commitment,
id: Keyring::Alice.public(),
signature,
};

gv.note_round(3u64);
gv.note_round(7u64);
gv.note_round(10u64);

// first time the cache should be populated.
let res = gv.validate(&mut context, &sender, &vote.encode());

assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
assert_eq!(
gv.known_votes
.read()
.get(&vote.commitment.block_number)
.map(|x| x.len()),
Some(1)
);

// second time we should hit the cache
let res = gv.validate(&mut context, &sender, &vote.encode());

assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));

// next we should quickly reject if the round is not live.
gv.note_round(11_u64);
gv.note_round(12_u64);

assert!(!GossipValidator::<Block>::is_live(
&*gv.known_votes.read(),
&vote.commitment.block_number
));

let res = gv.validate(&mut context, &sender, &vote.encode());

assert!(matches!(res, ValidationResult::Discard));
}
}

0 comments on commit b0e0cdb

Please sign in to comment.