Skip to content

Commit

Permalink
fix: use correct retry iterators
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Nov 22, 2023
1 parent 230c4b2 commit 8fc5e30
Showing 1 changed file with 21 additions and 15 deletions.
36 changes: 21 additions & 15 deletions p2p/src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,37 +349,43 @@ impl Stream for PublisherWorker {
.map(|hash| hash.to_bytes().into())
.collect();

// Collect the keys that need to be retried up to the batch size limit.
// Collect all keys including the keys that need to be retried up to the batch size limit.
// We limit the number of new keys to ensure we have room for retries.
// However if the batch size is smaller than the number of retries we need to
// only retry up to batch size.
let mut retries = self.retries.iter().take(self.batch_size - new_keys.len());
let repeat_count = retries.by_ref().count() as i64;
let max_retry_count = *retries
.by_ref()
.map(|(_key, value)| value)
.max()
.unwrap_or(&0) as i64;

// Collect all keys
let keys: Vec<Key> = retries
.map(|(key, _value)| key)
.cloned()
let mut max_retry_count = 0i64;
let mut repeat_count = 0;
let keys: Vec<Key> = self
.retries
.iter()
.take(self.batch_size - new_keys.len())
// Use inspect to track stats about the retries
.inspect(|(_key, retry_count)| {
max_retry_count = max(max_retry_count, **retry_count as i64);
repeat_count += 1;
})
.map(|(key, _)| key.clone())
.chain(new_keys.into_iter())
.collect();

// Expect a response for each key
// Expect a response for each key.
// Current queries should be empty because we finished the last batch.
debug_assert!(self.current_queries.is_empty());
keys.iter().for_each(|key| {
self.current_queries.insert(key.clone());
});

// Setup notification channel for when batch is complete
let (tx, rx) = oneshot::channel();

let retry_queue_len = self.retries.len();
// Record logs and metrics
debug!(
new_count,
repeat_count, max_retry_count, "starting new publish batch"
repeat_count,
max_retry_count,
retry_queue_len,
"starting new publish batch"
);
self.metrics.record(&metrics::PublisherEvent::BatchStarted {
new_count,
Expand Down

0 comments on commit 8fc5e30

Please sign in to comment.