Skip to content

Commit

Permalink
Fix OptimisticSync::remove_source not removing obsolete requests (#2368)
Browse files Browse the repository at this point in the history
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tomaka and mergify[bot] authored Jun 14, 2022
1 parent 1004195 commit 6a9b573
Showing 1 changed file with 77 additions and 5 deletions.
82 changes: 77 additions & 5 deletions src/sync/optimistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use crate::{
use alloc::{
borrow::ToOwned as _,
boxed::Box,
collections::BTreeSet,
vec::{self, Vec},
};
use core::{
Expand Down Expand Up @@ -109,6 +110,18 @@ pub struct ConfigFull {
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub struct RequestId(u64);

impl RequestId {
/// Returns a value that compares inferior or equal to any other [`RequestId`].
pub fn min_value() -> Self {
Self(u64::min_value())
}

/// Returns a value that compares superior or equal to any other [`RequestId`].
pub fn max_value() -> Self {
Self(u64::max_value())
}
}

/// Identifier for a source in the [`OptimisticSync`].
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub struct SourceId(u64);
Expand Down Expand Up @@ -173,6 +186,9 @@ struct OptimisticSyncInner<TRq, TSrc, TBl> {

/// Requests that have been started but whose answers are no longer desired.
obsolete_requests: HashMap<RequestId, (SourceId, TRq), fnv::FnvBuildHasher>,

/// Same as [`OptimisticSyncInner::obsolete_requests`], but ordered differently.
obsolete_requests_by_source: BTreeSet<(SourceId, RequestId)>,
}

impl<TRq, TSrc, TBl> OptimisticSyncInner<TRq, TSrc, TBl> {
Expand All @@ -187,6 +203,14 @@ impl<TRq, TSrc, TBl> OptimisticSyncInner<TRq, TSrc, TBl> {
.obsolete_requests
.insert(request_id, (source, user_data));
debug_assert!(_was_in.is_none());
let _was_inserted = self
.obsolete_requests_by_source
.insert((source, request_id));
debug_assert!(_was_inserted);
debug_assert_eq!(
self.obsolete_requests.len(),
self.obsolete_requests_by_source.len()
);
}
}

Expand Down Expand Up @@ -273,6 +297,7 @@ impl<TRq, TSrc, TBl> OptimisticSync<TRq, TSrc, TBl> {
download_ahead_blocks: config.download_ahead_blocks,
next_request_id: RequestId(0),
obsolete_requests: HashMap::with_capacity_and_hasher(0, Default::default()),
obsolete_requests_by_source: BTreeSet::new(),
}),
}
}
Expand Down Expand Up @@ -439,12 +464,33 @@ impl<TRq, TSrc, TBl> OptimisticSync<TRq, TSrc, TBl> {
&'_ mut self,
source_id: SourceId,
) -> (TSrc, impl Iterator<Item = (RequestId, TRq)> + '_) {
// TODO: doesn't take obsolete requests into account /!\
let obsolete_requests_to_remove = self
.inner
.obsolete_requests_by_source
.range((source_id, RequestId::min_value())..=(source_id, RequestId::max_value()))
.map(|(_, id)| *id)
.collect::<Vec<_>>();
let mut obsolete_requests = Vec::with_capacity(obsolete_requests_to_remove.len());
for rq_id in obsolete_requests_to_remove {
let (_, user_data) = self.inner.obsolete_requests.remove(&rq_id).unwrap();
obsolete_requests.push((rq_id, user_data));
let _was_in = self
.inner
.obsolete_requests_by_source
.remove(&(source_id, rq_id));
debug_assert!(_was_in);
}

debug_assert_eq!(
self.inner.obsolete_requests.len(),
self.inner.obsolete_requests_by_source.len()
);

let src_user_data = self.inner.sources.remove(&source_id).unwrap().user_data;
let drain = RequestsDrain {
iter: self.inner.verification_queue.drain_source(source_id),
};
(src_user_data, drain)
(src_user_data, drain.chain(obsolete_requests))
}

/// Returns the list of sources in this state machine.
Expand All @@ -461,9 +507,8 @@ impl<TRq, TSrc, TBl> OptimisticSync<TRq, TSrc, TBl> {
pub fn source_num_ongoing_requests(&self, source_id: SourceId) -> usize {
let num_obsolete = self
.inner
.obsolete_requests
.values()
.filter(|(id, _)| *id == source_id)
.obsolete_requests_by_source
.range((source_id, RequestId::min_value())..=(source_id, RequestId::max_value()))
.count();
let num_regular = self
.inner
Expand Down Expand Up @@ -531,6 +576,15 @@ impl<TRq, TSrc, TBl> OptimisticSync<TRq, TSrc, TBl> {
self.inner
.obsolete_requests
.insert(request_id, (detail.source_id, user_data));
let _was_inserted = self
.inner
.obsolete_requests_by_source
.insert((detail.source_id, request_id));
debug_assert!(_was_inserted);
debug_assert_eq!(
self.inner.obsolete_requests.len(),
self.inner.obsolete_requests_by_source.len()
);
}
}

Expand Down Expand Up @@ -559,6 +613,15 @@ impl<TRq, TSrc, TBl> OptimisticSync<TRq, TSrc, TBl> {
) -> (TRq, FinishRequestOutcome) {
if let Some((source_id, user_data)) = self.inner.obsolete_requests.remove(&request_id) {
self.inner.obsolete_requests.shrink_to_fit();
let _was_in = self
.inner
.obsolete_requests_by_source
.remove(&(source_id, request_id));
debug_assert!(_was_in);
debug_assert_eq!(
self.inner.obsolete_requests.len(),
self.inner.obsolete_requests_by_source.len()
);
self.inner
.sources
.get_mut(&source_id)
Expand Down Expand Up @@ -592,6 +655,15 @@ impl<TRq, TSrc, TBl> OptimisticSync<TRq, TSrc, TBl> {
pub fn finish_request_failed(&mut self, request_id: RequestId) -> TRq {
if let Some((source_id, user_data)) = self.inner.obsolete_requests.remove(&request_id) {
self.inner.obsolete_requests.shrink_to_fit();
let _was_in = self
.inner
.obsolete_requests_by_source
.remove(&(source_id, request_id));
debug_assert!(_was_in);
debug_assert_eq!(
self.inner.obsolete_requests.len(),
self.inner.obsolete_requests_by_source.len()
);
self.inner
.sources
.get_mut(&source_id)
Expand Down

0 comments on commit 6a9b573

Please sign in to comment.