Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

generate_pull_response optimization #11597

Merged
merged 1 commit into from
Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
//! A value is updated to a new version if the labels match, and the value
//! wallclock is later, or the value hash is greater.

use crate::crds_gossip_pull::CrdsFilter;
use crate::crds_value::{CrdsValue, CrdsValueLabel};
use bincode::serialize;
use indexmap::map::IndexMap;
Expand All @@ -37,6 +38,8 @@ pub struct Crds {
/// Stores the map of labels and values
pub table: IndexMap<CrdsValueLabel, VersionedCrdsValue>,
pub num_inserts: usize,

pub masks: IndexMap<CrdsValueLabel, u64>,
}

#[derive(PartialEq, Debug)]
Expand Down Expand Up @@ -86,6 +89,7 @@ impl Default for Crds {
Crds {
table: IndexMap::new(),
num_inserts: 0,
masks: IndexMap::new(),
}
}
}
Expand Down Expand Up @@ -126,6 +130,10 @@ impl Crds {
.map(|current| new_value > *current)
.unwrap_or(true);
if do_insert {
self.masks.insert(
label.clone(),
CrdsFilter::hash_as_u64(&new_value.value_hash),
);
let old = self.table.insert(label, new_value);
self.num_inserts += 1;
Ok(old)
Expand Down Expand Up @@ -193,6 +201,7 @@ impl Crds {

pub fn remove(&mut self, key: &CrdsValueLabel) {
self.table.swap_remove(key);
self.masks.swap_remove(key);
}
}

Expand Down
45 changes: 32 additions & 13 deletions core/src/crds_gossip_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,18 @@ impl CrdsFilter {
// for small ratios this can result in a negative number, ensure it returns 0 instead
((num_items / max_items).log2().ceil()).max(0.0) as u32
}
fn hash_as_u64(item: &Hash) -> u64 {
pub fn hash_as_u64(item: &Hash) -> u64 {
let arr = item.as_ref();
let mut accum = 0;
for (i, val) in arr.iter().enumerate().take(8) {
accum |= (u64::from(*val)) << (i * 8) as u64;
}
accum
}
pub fn test_mask_u64(&self, item: u64, ones: u64) -> bool {
let bits = item | ones;
bits == self.mask
}
pub fn test_mask(&self, item: &Hash) -> bool {
// only consider the highest mask_bits bits from the hash and set the rest to 1.
let ones = (!0u64).checked_shr(self.mask_bits).unwrap_or(!0u64);
Expand All @@ -116,6 +120,9 @@ impl CrdsFilter {
}
self.filter.contains(item)
}
pub fn filter_contains(&self, item: &Hash) -> bool {
self.filter.contains(item)
}
}

#[derive(Default)]
Expand Down Expand Up @@ -395,18 +402,30 @@ impl CrdsGossipPull {
return ret;
}
let mut total_skipped = 0;
for v in crds.table.values() {
recent.iter().enumerate().for_each(|(i, (caller, filter))| {
//skip values that are too new
if v.value.wallclock() > caller.wallclock().checked_add(jitter).unwrap_or_else(|| 0)
{
total_skipped += 1;
return;
}
if !filter.contains(&v.value_hash) {
ret[i].push(v.value.clone());
}
});
let mask_ones: Vec<_> = recent
.iter()
.map(|(_caller, filter)| (!0u64).checked_shr(filter.mask_bits).unwrap_or(!0u64))
.collect();
for (label, mask) in crds.masks.iter() {
recent.iter().zip(mask_ones.iter()).enumerate().for_each(
|(i, ((caller, filter), mask_ones))| {
if filter.test_mask_u64(*mask, *mask_ones) {
let item = crds.table.get(label).unwrap();

//skip values that are too new
if item.value.wallclock()
> caller.wallclock().checked_add(jitter).unwrap_or_else(|| 0)
{
total_skipped += 1;
return;
}

if !filter.filter_contains(&item.value_hash) {
ret[i].push(item.value.clone());
}
}
},
);
}
inc_new_counter_info!("gossip_filter_crds_values-dropped_values", total_skipped);
ret
Expand Down