Skip to content

Commit

Permalink
Replace consensus number notification Atomic with async watch (#603)
Browse files Browse the repository at this point in the history
* Replace consensus number notification Atomic with async watch

* Rename & review comments

* fmt

* Change names + header gc

Co-authored-by: George Danezis <george@danez.is>
  • Loading branch information
gdanezis and George Danezis authored Aug 2, 2022
1 parent cf3ddb6 commit fdf685a
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 132 deletions.
61 changes: 35 additions & 26 deletions narwhal/primary/src/certificate_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ use futures::{
stream::{futures_unordered::FuturesUnordered, StreamExt as _},
};
use once_cell::sync::OnceCell;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use std::sync::Arc;
use store::Store;
use tokio::{
sync::{
Expand Down Expand Up @@ -43,8 +40,8 @@ pub struct CertificateWaiter {
committee: Committee,
/// The persistent storage.
store: Store<CertificateDigest, Certificate>,
/// The current consensus round (used for cleanup).
consensus_round: Arc<AtomicU64>,
/// Receiver for signal of round change
rx_consensus_round_updates: watch::Receiver<u64>,
/// The depth of the garbage collector.
gc_depth: Round,
/// Watch channel notifying of epoch changes, it is only used for cleanup.
Expand All @@ -67,7 +64,7 @@ impl CertificateWaiter {
pub fn spawn(
committee: Committee,
store: Store<CertificateDigest, Certificate>,
consensus_round: Arc<AtomicU64>,
rx_consensus_round_updates: watch::Receiver<u64>,
gc_depth: Round,
rx_reconfigure: watch::Receiver<ReconfigureNotification>,
rx_synchronizer: Receiver<Certificate>,
Expand All @@ -78,7 +75,7 @@ impl CertificateWaiter {
Self {
committee,
store,
consensus_round,
rx_consensus_round_updates,
gc_depth,
rx_reconfigure,
rx_synchronizer,
Expand Down Expand Up @@ -115,8 +112,12 @@ impl CertificateWaiter {

let timer = sleep(Duration::from_millis(GC_RESOLUTION));
tokio::pin!(timer);
let mut attempt_garbage_collection;

loop {
// Initially set to not garbage collect
attempt_garbage_collection = false;

tokio::select! {
Some(certificate) = self.rx_synchronizer.recv() => {
if certificate.epoch() < self.committee.epoch() {
Expand Down Expand Up @@ -178,27 +179,35 @@ impl CertificateWaiter {

// Reschedule the timer.
timer.as_mut().reset(Instant::now() + Duration::from_millis(GC_RESOLUTION));
attempt_garbage_collection = true;
},

Ok(()) = self.rx_consensus_round_updates.changed() => {
attempt_garbage_collection = true;
}

}

// Cleanup internal state. Deliver the certificates waiting on garbage collected ancestors.
let round = self.consensus_round.load(Ordering::Relaxed);
if round > self.gc_depth {
let gc_round = round - self.gc_depth;

self.pending.retain(|_digest, (r, once_cancel)| {
if *r <= gc_round {
// note: this send can fail, harmlessly, if the certificate has been delivered (`notify_read`)
// and the present code path fires before the corresponding `waiting` item is unpacked above.
let _ = once_cancel
.take()
.expect("This should be protected by a write lock")
.send(());
false
} else {
true
}
});
// Either upon time-out or round change
if attempt_garbage_collection {
let round = *self.rx_consensus_round_updates.borrow();
if round > self.gc_depth {
let gc_round = round - self.gc_depth;

self.pending.retain(|_digest, (r, once_cancel)| {
if *r <= gc_round {
// note: this send can fail, harmlessly, if the certificate has been delivered (`notify_read`)
// and the present code path fires before the corresponding `waiting` item is unpacked above.
let _ = once_cancel
.take()
.expect("This should be protected by a write lock")
.send(());
false
} else {
true
}
});
}
}

self.update_metrics();
Expand Down
54 changes: 28 additions & 26 deletions narwhal/primary/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ use crypto::{Hash as _, PublicKey, Signature, SignatureService};
use network::{CancelOnDropHandler, MessageResult, PrimaryNetwork, ReliableNetwork};
use std::{
collections::{HashMap, HashSet},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
sync::Arc,
time::Instant,
};
use store::Store;
Expand Down Expand Up @@ -51,8 +48,8 @@ pub struct Core {
synchronizer: Synchronizer,
/// Service to sign headers.
signature_service: SignatureService<Signature>,
/// The current consensus round (used for cleanup).
consensus_round: Arc<AtomicU64>,
/// Get a signal when the round changes
rx_consensus_round_updates: watch::Receiver<u64>,
/// The depth of the garbage collector.
gc_depth: Round,

Expand Down Expand Up @@ -101,7 +98,7 @@ impl Core {
certificate_store: Store<CertificateDigest, Certificate>,
synchronizer: Synchronizer,
signature_service: SignatureService<Signature>,
consensus_round: Arc<AtomicU64>,
rx_consensus_round_updates: watch::Receiver<u64>,
gc_depth: Round,
rx_committee: watch::Receiver<ReconfigureNotification>,
rx_primaries: Receiver<PrimaryMessage>,
Expand All @@ -121,7 +118,7 @@ impl Core {
certificate_store,
synchronizer,
signature_service,
consensus_round,
rx_consensus_round_updates,
gc_depth,
rx_reconfigure: rx_committee,
rx_primaries,
Expand Down Expand Up @@ -602,6 +599,29 @@ impl Core {
ReconfigureNotification::Shutdown => return
}
}

// Check whether the consensus round has changed, to clean up structures
Ok(()) = self.rx_consensus_round_updates.changed() => {
let round = *self.rx_consensus_round_updates.borrow();
if round > self.gc_depth {
let now = Instant::now();

let gc_round = round - self.gc_depth;
self.last_voted.retain(|k, _| k > &gc_round);
self.processing.retain(|k, _| k > &gc_round);
self.certificates_aggregators.retain(|k, _| k > &gc_round);
self.cancel_handlers.retain(|k, _| k > &gc_round);
self.gc_round = gc_round;

self.metrics
.gc_core_latency
.with_label_values(&[&self.committee.epoch.to_string()])
.observe(now.elapsed().as_secs_f64());
}

Ok(())
}

};
match result {
Ok(()) => (),
Expand All @@ -614,24 +634,6 @@ impl Core {
Err(e) => warn!("{e}"),
}

// Cleanup internal state.
let round = self.consensus_round.load(Ordering::Relaxed);
if round > self.gc_depth {
let now = Instant::now();

let gc_round = round - self.gc_depth;
self.last_voted.retain(|k, _| k > &gc_round);
self.processing.retain(|k, _| k > &gc_round);
self.certificates_aggregators.retain(|k, _| k > &gc_round);
self.cancel_handlers.retain(|k, _| k > &gc_round);
self.gc_round = gc_round;

self.metrics
.gc_core_latency
.with_label_values(&[&self.committee.epoch.to_string()])
.observe(now.elapsed().as_secs_f64());
}

self.metrics
.core_cancel_handlers_total
.with_label_values(&[&self.committee.epoch.to_string()])
Expand Down
84 changes: 45 additions & 39 deletions narwhal/primary/src/header_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ use network::{LuckyNetwork, PrimaryNetwork, PrimaryToWorkerNetwork, UnreliableNe
use serde::{de::DeserializeOwned, Serialize};
use std::{
collections::HashMap,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use store::Store;
Expand Down Expand Up @@ -62,8 +59,8 @@ pub struct HeaderWaiter {
certificate_store: Store<CertificateDigest, Certificate>,
/// The persistent storage for payload markers from workers.
payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
/// The current consensus round (used for cleanup).
consensus_round: Arc<AtomicU64>,
/// A watch channel receiver to get consensus round updates.
rx_consensus_round_updates: watch::Receiver<u64>,
/// The depth of the garbage collector.
gc_depth: Round,
/// The delay to wait before re-trying sync requests.
Expand Down Expand Up @@ -101,7 +98,7 @@ impl HeaderWaiter {
committee: Committee,
certificate_store: Store<CertificateDigest, Certificate>,
payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
consensus_round: Arc<AtomicU64>,
rx_consensus_round_updates: watch::Receiver<u64>,
gc_depth: Round,
sync_retry_delay: Duration,
sync_retry_nodes: usize,
Expand All @@ -118,7 +115,7 @@ impl HeaderWaiter {
committee,
certificate_store,
payload_store,
consensus_round,
rx_consensus_round_updates,
gc_depth,
sync_retry_delay,
sync_retry_nodes,
Expand Down Expand Up @@ -176,6 +173,8 @@ impl HeaderWaiter {
tokio::pin!(timer);

loop {
let mut attempt_garbage_collection = false;

tokio::select! {
Some(message) = self.rx_synchronizer.recv() => {
match message {
Expand Down Expand Up @@ -332,39 +331,46 @@ impl HeaderWaiter {
},
ReconfigureNotification::Shutdown => return
}
}
},

// Check for a new consensus round number
Ok(()) = self.rx_consensus_round_updates.changed() => {
attempt_garbage_collection = true;
},

}

// Cleanup internal state.
let round = self.consensus_round.load(Ordering::Relaxed);
if round > self.gc_depth {
let now = Instant::now();

let mut gc_round = round - self.gc_depth;

// Cancel expired `notify_read`s, keep the rest in the map
// TODO: replace with `drain_filter` once that API stabilizes
self.pending = self
.pending
.drain()
.flat_map(|(digest, (r, handler))| {
if r <= gc_round {
// note: this send can fail, harmlessly, if the certificate has been delivered (`notify_read`)
// and the present code path fires before the corresponding `waiting` item is unpacked above.
let _ = handler.send(());
None
} else {
Some((digest, (r, handler)))
}
})
.collect();
self.batch_requests.retain(|_, r| r > &mut gc_round);
self.parent_requests.retain(|_, (r, _)| r > &mut gc_round);

self.metrics
.gc_header_waiter_latency
.with_label_values(&[&self.committee.epoch.to_string()])
.observe(now.elapsed().as_secs_f64());
if attempt_garbage_collection {
let round = *self.rx_consensus_round_updates.borrow();
if round > self.gc_depth {
let now = Instant::now();

let mut gc_round = round - self.gc_depth;

// Cancel expired `notify_read`s, keep the rest in the map
// TODO: replace with `drain_filter` once that API stabilizes
self.pending = self
.pending
.drain()
.flat_map(|(digest, (r, handler))| {
if r <= gc_round {
// note: this send can fail, harmlessly, if the certificate has been delivered (`notify_read`)
// and the present code path fires before the corresponding `waiting` item is unpacked above.
let _ = handler.send(());
None
} else {
Some((digest, (r, handler)))
}
})
.collect();
self.batch_requests.retain(|_, r| r > &mut gc_round);
self.parent_requests.retain(|_, (r, _)| r > &mut gc_round);

self.metrics
.gc_header_waiter_latency
.with_label_values(&[&self.committee.epoch.to_string()])
.observe(now.elapsed().as_secs_f64());
}
}

// measure the pending & parent elements
Expand Down
Loading

0 comments on commit fdf685a

Please sign in to comment.