Skip to content

Commit

Permalink
doc(node): Add doc comments to iota-network/state_sync mod (#1645)
Browse files Browse the repository at this point in the history
* doc(node): Add doc comments to state_sync mod

* fix: Fix typo

* fix: Make comments in StateSyncMessage doc comments

* fix: Capitalize the first character in comments

---------

Co-authored-by: muXxer <mux3r@web.de>
  • Loading branch information
jkrvivian and muXxer authored Aug 13, 2024
1 parent eff71c2 commit d56a3e5
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 26 deletions.
85 changes: 66 additions & 19 deletions crates/iota-network/src/state_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ impl PeerHeights {
.filter(|(_peer_id, info)| info.on_same_chain_as_us)
}

// Returns a bool that indicates if the update was done successfully.
//
// This will return false if the given peer doesn't have an entry or is not on
// the same chain as us
/// Returns a bool that indicates if the update was done successfully.
///
/// This will return false if the given peer doesn't have an entry or is not
/// on the same chain as us
pub fn update_peer_info(
&mut self,
peer_id: PeerId,
Expand Down Expand Up @@ -357,13 +357,16 @@ impl Iterator for PeerBalancer {

#[derive(Clone, Debug)]
enum StateSyncMessage {
/// Node will send this to StateSyncEventLoop in order to kick off the state
/// sync process.
StartSyncJob,
// Validators will send this to the StateSyncEventLoop in order to kick off notifying our
// peers of the new checkpoint.
/// Validators will send this to the StateSyncEventLoop in order to kick off
/// notifying our peers of the new checkpoint.
VerifiedCheckpoint(Box<VerifiedCheckpoint>),
// Notification that the checkpoint content sync task will send to the event loop in the event
// it was able to successfully sync a checkpoint's contents. If multiple checkpoints were
// synced at the same time, only the highest checkpoint is sent.
/// Notification that the checkpoint content sync task will send to the
/// event loop in the event it was able to successfully sync a
/// checkpoint's contents. If multiple checkpoints were synced at the
/// same time, only the highest checkpoint is sent.
SyncedCheckpoint(Box<VerifiedCheckpoint>),
}

Expand All @@ -374,6 +377,7 @@ struct StateSyncEventLoop<S> {
/// Weak reference to our own mailbox
weak_sender: mpsc::WeakSender<StateSyncMessage>,

/// A set of all spawned tasks.
tasks: JoinSet<()>,
sync_checkpoint_summaries_task: Option<AbortHandle>,
sync_checkpoint_contents_task: Option<AbortHandle>,
Expand Down Expand Up @@ -460,9 +464,12 @@ where
loop {
tokio::select! {
now = interval.tick() => {
// Query the latest checkpoint of connected peers that are on the
// same chain as us. And check if download_limit_layer needs to be pruned or not.
self.handle_tick(now.into_std());
},
maybe_message = self.mailbox.recv() => {
// Handle StateSyncMessage.
// Once all handles to our mailbox have been dropped this
// will yield `None` and we can terminate the event loop
if let Some(message) = maybe_message {
Expand All @@ -472,31 +479,36 @@ where
}
},
peer_event = peer_events.recv() => {
// Handle new and closed peer connections.
self.handle_peer_event(peer_event);
},
// Resolve the spawned tasks.
Some(task_result) = self.tasks.join_next() => {
match task_result {
Ok(()) => {},
Err(e) => {
if e.is_cancelled() {
// avoid crashing on ungraceful shutdown
// Avoid crashing on ungraceful shutdown
} else if e.is_panic() {
// propagate panics.
// Propagate panics.
std::panic::resume_unwind(e.into_panic());
} else {
panic!("task failed: {e}");
}
},
};

// The sync_checkpoint_contents task is expected to run indefinitely.
if matches!(&self.sync_checkpoint_contents_task, Some(t) if t.is_finished()) {
panic!("sync_checkpoint_contents task unexpectedly terminated")
}

// Clean up sync_checkpoint_summaries_task if it's finished.
if matches!(&self.sync_checkpoint_summaries_task, Some(t) if t.is_finished()) {
self.sync_checkpoint_summaries_task = None;
}

// The sync_checkpoint_from_archive_task task is expected to run indefinitely.
if matches!(&self.sync_checkpoint_from_archive_task, Some(t) if t.is_finished()) {
panic!("sync_checkpoint_from_archive task unexpectedly terminated")
}
Expand Down Expand Up @@ -589,7 +601,7 @@ where
.collect::<Vec<_>>();
}

// If this is the last checkpoint of a epoch, we need to make sure
// If this is the last checkpoint of an epoch, we need to make sure
// new committee is in store before we verify newer checkpoints in next epoch.
// This could happen before this validator's reconfiguration finishes, because
// state sync does not reconfig.
Expand Down Expand Up @@ -618,6 +630,7 @@ where
// We don't care if no one is listening as this is a broadcast channel
let _ = self.checkpoint_event_sender.send(checkpoint.clone());

// Notify connected peers of the new checkpoint
self.spawn_notify_peers_of_checkpoint(checkpoint);
}

Expand Down Expand Up @@ -677,6 +690,9 @@ where
}
}

/// Starts syncing checkpoint summaries if there are peers that have a
/// higher known checkpoint than us. Only one sync task is allowed to
/// run at a time.
fn maybe_start_checkpoint_summary_sync_task(&mut self) {
// Only run one sync task at a time
if self.sync_checkpoint_summaries_task.is_some() {
Expand All @@ -700,7 +716,7 @@ where
.as_ref()
.map(|x| x.sequence_number())
{
// start sync job
// Start sync job
let task = sync_to_checkpoint(
self.network.clone(),
self.store.clone(),
Expand All @@ -723,6 +739,10 @@ where
}
}

/// Triggers the checkpoint contents sync task if
/// highest_verified_checkpoint > highest_synced_checkpoint and there
/// are peers that have highest_known_checkpoint >
/// highest_synced_checkpoint.
fn maybe_trigger_checkpoint_contents_sync_task(
&mut self,
target_sequence_channel: &watch::Sender<CheckpointSequenceNumber>,
Expand All @@ -738,7 +758,7 @@ where

if highest_verified_checkpoint.sequence_number()
> highest_synced_checkpoint.sequence_number()
// skip if we aren't connected to any peers that can help
// Skip if we aren't connected to any peers that can help
&& self
.peer_heights
.read()
Expand Down Expand Up @@ -768,6 +788,8 @@ where
}
}

/// Sends a notification of the new checkpoint to all connected peers that are
/// on the same chain as us.
async fn notify_peers_of_checkpoint(
network: anemo::Network,
peer_heights: Arc<RwLock<PeerHeights>>,
Expand All @@ -777,6 +799,7 @@ async fn notify_peers_of_checkpoint(
let futs = peer_heights
.read()
.unwrap()
// Filter out any peers who is not on the same chain as us
.peers_on_same_chain()
// Filter out any peers who we know already have a checkpoint higher than this one
.filter_map(|(peer_id, info)| {
Expand All @@ -793,6 +816,8 @@ async fn notify_peers_of_checkpoint(
futures::future::join_all(futs).await;
}

/// Queries a peer for their latest PeerStateSyncInfo and
/// keep the updated info in PeerHeights.
async fn get_latest_from_peer(
our_genesis_checkpoint_digest: CheckpointDigest,
peer: anemo::Peer,
Expand Down Expand Up @@ -891,7 +916,7 @@ async fn query_peer_for_latest_info(
};

// Then we try the old query
// TODO: remove this once the new feature stabilizes
// TODO: Remove this once the new feature stabilizes
let request = Request::new(GetCheckpointSummaryRequest::Latest).with_timeout(timeout);
let response = client
.get_checkpoint_summary(request)
Expand All @@ -907,6 +932,10 @@ async fn query_peer_for_latest_info(
}
}

/// Queries and update the latest checkpoint of connected peers that are on the
/// same chain as us. If the received highest checkpoint of any peer is higher
/// than the current one, we will start syncing via
/// StateSyncMessage::StartSyncJob.
async fn query_peers_for_their_latest_checkpoint(
network: anemo::Network,
peer_heights: Arc<RwLock<PeerHeights>>,
Expand Down Expand Up @@ -959,6 +988,10 @@ async fn query_peers_for_their_latest_checkpoint(
}
}

/// Queries connected peers for checkpoints from sequence
/// current+1 to the target. The received checkpoints will be verified and
/// stored in the store. Checkpoints in temporary store (peer_heights) will be
/// cleaned up after syncing.
async fn sync_to_checkpoint<S>(
network: anemo::Network,
store: S,
Expand Down Expand Up @@ -990,7 +1023,7 @@ where
peer_heights.clone(),
PeerCheckpointRequestType::Summary,
);
// range of the next sequence_numbers to fetch
// Range of the next sequence_numbers to fetch
let mut request_stream = (current.sequence_number().checked_add(1).unwrap()
..=*checkpoint.sequence_number())
.map(|next| {
Expand Down Expand Up @@ -1019,7 +1052,7 @@ where
.and_then(Response::into_inner)
.tap_none(|| trace!("peer unable to help sync"))
{
// peer didn't give us a checkpoint with the height that we requested
// Peer didn't give us a checkpoint with the height that we requested
if *checkpoint.sequence_number() != next {
tracing::debug!(
"peer returned checkpoint with wrong sequence number: expected {next}, got {}",
Expand All @@ -1028,7 +1061,7 @@ where
continue;
}

// peer gave us a checkpoint whose digest does not match pinned digest
// Peer gave us a checkpoint whose digest does not match pinned digest
let checkpoint_digest = checkpoint.digest();
if let Ok(pinned_digest_index) = pinned_checkpoints.binary_search_by_key(
checkpoint.sequence_number(),
Expand Down Expand Up @@ -1120,6 +1153,9 @@ where
Ok(())
}

/// Syncs checkpoint contents from one of the archive_readers if the
/// highest_synced_checkpoint < lowest_checkpoint among peers. The requesting
/// checkpoint range is from highest_synced_checkpoint+1 to lowest_checkpoint.
async fn sync_checkpoint_contents_from_archive<S>(
network: anemo::Network,
archive_readers: ArchiveReaderBalancer,
Expand All @@ -1129,6 +1165,7 @@ async fn sync_checkpoint_contents_from_archive<S>(
S: WriteStore + Clone + Send + Sync + 'static,
{
loop {
// Get connected peers that are on the same chain as us
let peers: Vec<_> = peer_heights
.read()
.unwrap()
Expand Down Expand Up @@ -1187,6 +1224,11 @@ async fn sync_checkpoint_contents_from_archive<S>(
}
}

/// Syncs checkpoint contents from peers if the target sequence cursor, which is
/// changed via target_sequence_channel, is greater than the current one. The
/// requesting checkpoint range is from current_sequence+1 to
/// target_sequence_cursor. It will also periodically notify the peers of our
/// latest synced checkpoint.
async fn sync_checkpoint_contents<S>(
network: anemo::Network,
store: S,
Expand Down Expand Up @@ -1262,7 +1304,7 @@ async fn sync_checkpoint_contents<S>(
},
}

// Start new tasks up to configured concurrency limits.
// Start syncing tasks up to configured concurrency limits.
while current_sequence < target_sequence_cursor
&& checkpoint_contents_tasks.len() < checkpoint_content_download_concurrency
{
Expand Down Expand Up @@ -1306,6 +1348,8 @@ async fn sync_checkpoint_contents<S>(
}

#[instrument(level = "debug", skip_all, fields(sequence_number = ?checkpoint.sequence_number()))]
/// Requests a single checkpoint contents from peers if the store does not
/// have it.
async fn sync_one_checkpoint_contents<S>(
network: anemo::Network,
store: S,
Expand Down Expand Up @@ -1354,6 +1398,9 @@ where
}

#[instrument(level = "debug", skip_all)]
/// Request the full checkpoint contents from peers if the store does not
/// already have it. Requests are sent to peer one by one, until the contents
/// are successfully retrieved.
async fn get_full_checkpoint_contents<S>(
peers: PeerBalancer,
store: S,
Expand Down
8 changes: 8 additions & 0 deletions crates/iota-network/src/state_sync/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ impl<S> StateSync for Server<S>
where
S: WriteStore + Send + Sync + 'static,
{
/// Pushes a checkpoint summary to the server.
/// If the checkpoint is higher than the highest verified checkpoint, it
/// will notify the event loop to potentially sync it.
async fn push_checkpoint_summary(
&self,
request: Request<Checkpoint>,
Expand Down Expand Up @@ -83,6 +86,8 @@ where
Ok(Response::new(()))
}

/// Gets a checkpoint summary by digest or sequence number, or get the
/// latest one.
async fn get_checkpoint_summary(
&self,
request: Request<GetCheckpointSummaryRequest>,
Expand All @@ -104,6 +109,8 @@ where
Ok(Response::new(checkpoint))
}

/// Gets the highest synced checkpoint and the lowest available checkpoint
/// of the node.
async fn get_checkpoint_availability(
&self,
_request: Request<()>,
Expand All @@ -124,6 +131,7 @@ where
}))
}

/// Gets the contents of a checkpoint.
async fn get_checkpoint_contents(
&self,
request: Request<CheckpointContentsDigest>,
Expand Down
16 changes: 9 additions & 7 deletions crates/iota-network/src/state_sync/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use crate::{
};

#[tokio::test]
// Test that the server stores the pushed checkpoint summary and triggers the
// sync job.
async fn server_push_checkpoint() {
let committee = CommitteeFixture::generate(rand::rngs::OsRng, 0, 4);
let (ordered_checkpoints, _, _sequence_number_to_digest, _checkpoints) =
Expand Down Expand Up @@ -186,7 +188,7 @@ async fn server_get_checkpoint() {
#[tokio::test]
async fn isolated_sync_job() {
let committee = CommitteeFixture::generate(rand::rngs::OsRng, 0, 4);
// build mock data
// Build mock data
let (ordered_checkpoints, _, sequence_number_to_digest, checkpoints) =
committee.make_empty_checkpoints(100, None);

Expand Down Expand Up @@ -271,7 +273,7 @@ async fn isolated_sync_job() {
#[tokio::test]
async fn test_state_sync_using_archive() -> anyhow::Result<()> {
let committee = CommitteeFixture::generate(rand::rngs::OsRng, 0, 4);
// build mock data
// Build mock data
let (ordered_checkpoints, _, sequence_number_to_digest, checkpoints) =
committee.make_empty_checkpoints(100, None);
// Initialize archive store with all checkpoints
Expand Down Expand Up @@ -455,7 +457,7 @@ async fn test_state_sync_using_archive() -> anyhow::Result<()> {
async fn sync_with_checkpoints_being_inserted() {
telemetry_subscribers::init_for_testing();
let committee = CommitteeFixture::generate(rand::rngs::OsRng, 0, 4);
// build mock data
// Build mock data
let (ordered_checkpoints, _contents, sequence_number_to_digest, checkpoints) =
committee.make_empty_checkpoints(4, None);

Expand All @@ -480,10 +482,10 @@ async fn sync_with_checkpoints_being_inserted() {
committee.committee().to_owned(),
);

// get handles to each node's stores
// Get handles to each node's stores
let store_1 = event_loop_1.store.clone();
let store_2 = event_loop_2.store.clone();
// make sure that node_1 knows about node_2
// Make sure that node_1 knows about node_2
event_loop_1.peer_heights.write().unwrap().peers.insert(
network_2.peer_id(),
PeerStateSyncInfo {
Expand Down Expand Up @@ -585,7 +587,7 @@ async fn sync_with_checkpoints_being_inserted() {
async fn sync_with_checkpoints_watermark() {
telemetry_subscribers::init_for_testing();
let committee = CommitteeFixture::generate(rand::rngs::OsRng, 0, 4);
// build mock data
// Build mock data
let (ordered_checkpoints, contents, _sequence_number_to_digest, _checkpoints) =
committee.make_random_checkpoints(4, None);
let last_checkpoint_seq = *ordered_checkpoints
Expand Down Expand Up @@ -614,7 +616,7 @@ async fn sync_with_checkpoints_watermark() {
committee.committee().to_owned(),
);

// get handles to each node's stores
// Get handles to each node's stores
let store_1 = event_loop_1.store.clone();
let store_2 = event_loop_2.store.clone();
let peer_id_1 = network_1.peer_id();
Expand Down

0 comments on commit d56a3e5

Please sign in to comment.