From 8fc5e30ed1514ae908763790de741fb71c70de59 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Wed, 22 Nov 2023 16:13:40 -0700 Subject: [PATCH] fix: use correct retry iterators --- p2p/src/publisher.rs | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/p2p/src/publisher.rs b/p2p/src/publisher.rs index 7d6dda7b9..4acfa388f 100644 --- a/p2p/src/publisher.rs +++ b/p2p/src/publisher.rs @@ -349,26 +349,28 @@ impl Stream for PublisherWorker { .map(|hash| hash.to_bytes().into()) .collect(); - // Collect the keys that need to be retried up to the batch size limit. + // Collect all keys including the keys that need to be retried up to the batch size limit. // We limit the number of new keys to ensure we have room for retries. // However if the batch size is smaller than the number of retries we need to // only retry up to batch size. - let mut retries = self.retries.iter().take(self.batch_size - new_keys.len()); - let repeat_count = retries.by_ref().count() as i64; - let max_retry_count = *retries - .by_ref() - .map(|(_key, value)| value) - .max() - .unwrap_or(&0) as i64; - - // Collect all keys - let keys: Vec = retries - .map(|(key, _value)| key) - .cloned() + let mut max_retry_count = 0i64; + let mut repeat_count = 0; + let keys: Vec = self + .retries + .iter() + .take(self.batch_size - new_keys.len()) + // Use inspect to track stats about the retries + .inspect(|(_key, retry_count)| { + max_retry_count = max(max_retry_count, **retry_count as i64); + repeat_count += 1; + }) + .map(|(key, _)| key.clone()) .chain(new_keys.into_iter()) .collect(); - // Expect a response for each key + // Expect a response for each key. + // Current queries should be empty because we finished the last batch. + debug_assert!(self.current_queries.is_empty()); keys.iter().for_each(|key| { self.current_queries.insert(key.clone()); }); @@ -376,10 +378,14 @@ impl Stream for PublisherWorker { // Setup notification channel for when batch is complete let (tx, rx) = oneshot::channel(); + let retry_queue_len = self.retries.len(); // Record logs and metrics debug!( new_count, - repeat_count, max_retry_count, "starting new publish batch" + repeat_count, + max_retry_count, + retry_queue_len, + "starting new publish batch" ); self.metrics.record(&metrics::PublisherEvent::BatchStarted { new_count,