Skip to content

Commit

Permalink
fix: simplify the publisher
Browse files Browse the repository at this point in the history
Prior to this change the publisher attempted to estimate and amortize
the DHT publish load over the entire interval. Now we use a simpler AIMD
method and then publish batches without any delay. Additionally no
estimate metrics are computed instead two simple metrics of how many
remaining records need to be published and the amount of time before the
deadline. Now operationally you can estimate the lag, buts the code
itself is simpler.

Additionally logic was added to not start publishing until kademlia has
been bootstrapped.
  • Loading branch information
nathanielc committed Nov 22, 2023
1 parent ad65ccc commit 3187c3c
Show file tree
Hide file tree
Showing 3 changed files with 303 additions and 210 deletions.
31 changes: 22 additions & 9 deletions p2p/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ pub struct Metrics {
publisher_batch_max_retry_count: Gauge,

publisher_batches_finished: Counter,
publisher_lag_ratio: Gauge<f64, std::sync::atomic::AtomicU64>,
publisher_remaining: Gauge<i64>,
publisher_deadline_seconds: Gauge<i64>,
publisher_batch_size: Gauge<i64>,

peering_connected_count: Counter,
Expand Down Expand Up @@ -95,8 +96,8 @@ impl Metrics {
sub_registry
);
register!(
publisher_lag_ratio,
"Ratio of estimated_needed_time / remaining_time",
publisher_remaining,
"Number of records left to publish in the current interval",
Gauge::default(),
sub_registry
);
Expand All @@ -106,6 +107,12 @@ impl Metrics {
Gauge::default(),
sub_registry
);
register!(
publisher_deadline_seconds,
"Number of seconds until the end of the current interval",
Gauge::default(),
sub_registry
);

register!(
peering_connected_count,
Expand Down Expand Up @@ -134,8 +141,9 @@ impl Metrics {
publisher_batch_repeat_count,
publisher_batch_max_retry_count,
publisher_batches_finished,
publisher_lag_ratio,
publisher_remaining,
publisher_batch_size,
publisher_deadline_seconds,
peering_connected_count,
peering_disconnected_count,
peering_dial_failure_count,
Expand All @@ -153,8 +161,9 @@ pub enum PublisherEvent {
},
BatchSendErr,
BatchFinished {
batch_size: usize,
lag_ratio: f64,
batch_size: i64,
remaining: Option<i64>,
deadline_seconds: i64,
},
}

Expand All @@ -179,11 +188,15 @@ impl Recorder<PublisherEvent> for Metrics {
}
PublisherEvent::BatchFinished {
batch_size,
lag_ratio,
remaining,
deadline_seconds,
} => {
self.publisher_batches_finished.inc();
self.publisher_lag_ratio.set(*lag_ratio);
self.publisher_batch_size.set(*batch_size as i64);
if let Some(remaining) = *remaining {
self.publisher_remaining.set(remaining);
}
self.publisher_deadline_seconds.set(*deadline_seconds);
self.publisher_batch_size.set(*batch_size);
}
PublisherEvent::BatchSendErr => {
self.publisher_batch_send_err_count.inc();
Expand Down
Loading

0 comments on commit 3187c3c

Please sign in to comment.