Skip to content

Commit

Permalink
builds a predicate for checking pull requests
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri committed Oct 12, 2020
1 parent e6ad39a commit 8772b4e
Showing 1 changed file with 26 additions and 25 deletions.
51 changes: 26 additions & 25 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{
};

use rand::distributions::{Distribution, WeightedIndex};
use rand::SeedableRng;
use rand::{CryptoRng, Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use solana_sdk::sanitize::{Sanitize, SanitizeError};

Expand Down Expand Up @@ -1812,43 +1812,42 @@ impl ClusterInfo {
}
}

// Filters out pull requests from invalid addresses, and those which have
// not passed the ping-pong check. Appends ping packets for the addresses
// which need to be (re)verified.
fn filter_pull_requests(
&self,
requests: Vec<PullData>,
packets: &mut Packets,
) -> Vec<PullData> {
let mut rng = rand::thread_rng();
// Returns a predicate checking if the pull request is from a valid
// address, and if the address have responded to a ping request. Also
// appends ping packets for the addresses which need to be (re)verified.
fn check_pull_request<'a, R>(
&'a self,
now: Instant,
mut rng: &'a mut R,
packets: &'a mut Packets,
) -> impl FnMut(&PullData) -> bool + 'a
where
R: Rng + CryptoRng,
{
let mut cache = HashMap::<(Pubkey, SocketAddr), bool>::new();
let mut pingf = || Ping::new_rand(&mut rng, &self.keypair).ok();
let mut pingf = move || Ping::new_rand(&mut rng, &self.keypair).ok();
let mut ping_cache = self.ping_cache.write().unwrap();
let now = Instant::now();
let mut hard_check = |node| {
let mut hard_check = move |node| {
let (_check, ping) = ping_cache.check(now, node, &mut pingf);
if let Some(ping) = ping {
let ping = Protocol::PingMessage(ping);
let ping = Packet::from_data(&node.1, ping);
packets.packets.push(ping);
}
// TODO: For backward compatibility, this unconditionally returns
// true for now. This has to return _check, once nodes start
// true for now. It has to return _check, once nodes start
// responding to ping messages.
true
};
// Because pull-responses are sent back to packet.meta.addr() of
// incoming pull-requests, pings are also sent to request.from_addr (as
// opposed to caller.gossip address).
requests
.into_iter()
.filter(|request| {
ContactInfo::is_valid_address(&request.from_addr) && {
let node = (request.caller.pubkey(), request.from_addr);
*cache.entry(node).or_insert_with(|| hard_check(node))
}
})
.collect()
move |request| {
ContactInfo::is_valid_address(&request.from_addr) && {
let node = (request.caller.pubkey(), request.from_addr);
*cache.entry(node).or_insert_with(|| hard_check(node))
}
}
}

// Pull requests take an incoming bloom filter of contained entries from a node
Expand All @@ -1863,10 +1862,12 @@ impl ClusterInfo {
self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests)
.process_pull_requests(requests.iter().map(|r| r.caller.clone()), timestamp());
self.update_data_budget(stakes);
let mut rng = rand::thread_rng();
let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests");
let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = self
.filter_pull_requests(requests, &mut packets)
let check_pull_request = self.check_pull_request(Instant::now(), &mut rng, &mut packets);
let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = requests
.into_iter()
.filter(check_pull_request)
.map(|r| ((r.caller, r.filter), r.from_addr))
.unzip();
let now = timestamp();
Expand Down

0 comments on commit 8772b4e

Please sign in to comment.