diff --git a/crates/iota-network/src/state_sync/mod.rs b/crates/iota-network/src/state_sync/mod.rs index b54d29d6152..4b56869836f 100644 --- a/crates/iota-network/src/state_sync/mod.rs +++ b/crates/iota-network/src/state_sync/mod.rs @@ -185,10 +185,10 @@ impl PeerHeights { .filter(|(_peer_id, info)| info.on_same_chain_as_us) } - // Returns a bool that indicates if the update was done successfully. - // - // This will return false if the given peer doesn't have an entry or is not on - // the same chain as us + /// Returns a bool that indicates if the update was done successfully. + /// + /// This will return false if the given peer doesn't have an entry or is not + /// on the same chain as us pub fn update_peer_info( &mut self, peer_id: PeerId, @@ -357,13 +357,16 @@ impl Iterator for PeerBalancer { #[derive(Clone, Debug)] enum StateSyncMessage { + /// Node will send this to StateSyncEventLoop in order to kick off the state + /// sync process. StartSyncJob, - // Validators will send this to the StateSyncEventLoop in order to kick off notifying our - // peers of the new checkpoint. + /// Validators will send this to the StateSyncEventLoop in order to kick off + /// notifying our peers of the new checkpoint. VerifiedCheckpoint(Box), - // Notification that the checkpoint content sync task will send to the event loop in the event - // it was able to successfully sync a checkpoint's contents. If multiple checkpoints were - // synced at the same time, only the highest checkpoint is sent. + /// Notification that the checkpoint content sync task will send to the + /// event loop in the event it was able to successfully sync a + /// checkpoint's contents. If multiple checkpoints were synced at the + /// same time, only the highest checkpoint is sent. SyncedCheckpoint(Box), } @@ -374,6 +377,7 @@ struct StateSyncEventLoop { /// Weak reference to our own mailbox weak_sender: mpsc::WeakSender, + /// A set of all spawned tasks. tasks: JoinSet<()>, sync_checkpoint_summaries_task: Option, sync_checkpoint_contents_task: Option, @@ -460,9 +464,12 @@ where loop { tokio::select! { now = interval.tick() => { + // Query the latest checkpoint of connected peers that are on the + // same chain as us. And check if download_limit_layer needs to be pruned or not. self.handle_tick(now.into_std()); }, maybe_message = self.mailbox.recv() => { + // Handle StateSyncMessage. // Once all handles to our mailbox have been dropped this // will yield `None` and we can terminate the event loop if let Some(message) = maybe_message { @@ -472,16 +479,18 @@ where } }, peer_event = peer_events.recv() => { + // Handle new and closed peer connections. self.handle_peer_event(peer_event); }, + // Resolve the spawned tasks. Some(task_result) = self.tasks.join_next() => { match task_result { Ok(()) => {}, Err(e) => { if e.is_cancelled() { - // avoid crashing on ungraceful shutdown + // Avoid crashing on ungraceful shutdown } else if e.is_panic() { - // propagate panics. + // Propagate panics. std::panic::resume_unwind(e.into_panic()); } else { panic!("task failed: {e}"); @@ -489,14 +498,17 @@ where }, }; + // The sync_checkpoint_contents task is expected to run indefinitely. if matches!(&self.sync_checkpoint_contents_task, Some(t) if t.is_finished()) { panic!("sync_checkpoint_contents task unexpectedly terminated") } + // Clean up sync_checkpoint_summaries_task if it's finished. if matches!(&self.sync_checkpoint_summaries_task, Some(t) if t.is_finished()) { self.sync_checkpoint_summaries_task = None; } + // The sync_checkpoint_from_archive_task task is expected to run indefinitely. if matches!(&self.sync_checkpoint_from_archive_task, Some(t) if t.is_finished()) { panic!("sync_checkpoint_from_archive task unexpectedly terminated") } @@ -589,7 +601,7 @@ where .collect::>(); } - // If this is the last checkpoint of a epoch, we need to make sure + // If this is the last checkpoint of an epoch, we need to make sure // new committee is in store before we verify newer checkpoints in next epoch. // This could happen before this validator's reconfiguration finishes, because // state sync does not reconfig. @@ -618,6 +630,7 @@ where // We don't care if no one is listening as this is a broadcast channel let _ = self.checkpoint_event_sender.send(checkpoint.clone()); + // Notify connected peers of the new checkpoint self.spawn_notify_peers_of_checkpoint(checkpoint); } @@ -677,6 +690,9 @@ where } } + /// Starts syncing checkpoint summaries if there are peers that have a + /// higher known checkpoint than us. Only one sync task is allowed to + /// run at a time. fn maybe_start_checkpoint_summary_sync_task(&mut self) { // Only run one sync task at a time if self.sync_checkpoint_summaries_task.is_some() { @@ -700,7 +716,7 @@ where .as_ref() .map(|x| x.sequence_number()) { - // start sync job + // Start sync job let task = sync_to_checkpoint( self.network.clone(), self.store.clone(), @@ -723,6 +739,10 @@ where } } + /// Triggers the checkpoint contents sync task if + /// highest_verified_checkpoint > highest_synced_checkpoint and there + /// are peers that have highest_known_checkpoint > + /// highest_synced_checkpoint. fn maybe_trigger_checkpoint_contents_sync_task( &mut self, target_sequence_channel: &watch::Sender, @@ -738,7 +758,7 @@ where if highest_verified_checkpoint.sequence_number() > highest_synced_checkpoint.sequence_number() - // skip if we aren't connected to any peers that can help + // Skip if we aren't connected to any peers that can help && self .peer_heights .read() @@ -768,6 +788,8 @@ where } } +/// Sends a notification of the new checkpoint to all connected peers that are +/// on the same chain as us. async fn notify_peers_of_checkpoint( network: anemo::Network, peer_heights: Arc>, @@ -777,6 +799,7 @@ async fn notify_peers_of_checkpoint( let futs = peer_heights .read() .unwrap() + // Filter out any peers who is not on the same chain as us .peers_on_same_chain() // Filter out any peers who we know already have a checkpoint higher than this one .filter_map(|(peer_id, info)| { @@ -793,6 +816,8 @@ async fn notify_peers_of_checkpoint( futures::future::join_all(futs).await; } +/// Queries a peer for their latest PeerStateSyncInfo and +/// keep the updated info in PeerHeights. async fn get_latest_from_peer( our_genesis_checkpoint_digest: CheckpointDigest, peer: anemo::Peer, @@ -891,7 +916,7 @@ async fn query_peer_for_latest_info( }; // Then we try the old query - // TODO: remove this once the new feature stabilizes + // TODO: Remove this once the new feature stabilizes let request = Request::new(GetCheckpointSummaryRequest::Latest).with_timeout(timeout); let response = client .get_checkpoint_summary(request) @@ -907,6 +932,10 @@ async fn query_peer_for_latest_info( } } +/// Queries and update the latest checkpoint of connected peers that are on the +/// same chain as us. If the received highest checkpoint of any peer is higher +/// than the current one, we will start syncing via +/// StateSyncMessage::StartSyncJob. async fn query_peers_for_their_latest_checkpoint( network: anemo::Network, peer_heights: Arc>, @@ -959,6 +988,10 @@ async fn query_peers_for_their_latest_checkpoint( } } +/// Queries connected peers for checkpoints from sequence +/// current+1 to the target. The received checkpoints will be verified and +/// stored in the store. Checkpoints in temporary store (peer_heights) will be +/// cleaned up after syncing. async fn sync_to_checkpoint( network: anemo::Network, store: S, @@ -990,7 +1023,7 @@ where peer_heights.clone(), PeerCheckpointRequestType::Summary, ); - // range of the next sequence_numbers to fetch + // Range of the next sequence_numbers to fetch let mut request_stream = (current.sequence_number().checked_add(1).unwrap() ..=*checkpoint.sequence_number()) .map(|next| { @@ -1019,7 +1052,7 @@ where .and_then(Response::into_inner) .tap_none(|| trace!("peer unable to help sync")) { - // peer didn't give us a checkpoint with the height that we requested + // Peer didn't give us a checkpoint with the height that we requested if *checkpoint.sequence_number() != next { tracing::debug!( "peer returned checkpoint with wrong sequence number: expected {next}, got {}", @@ -1028,7 +1061,7 @@ where continue; } - // peer gave us a checkpoint whose digest does not match pinned digest + // Peer gave us a checkpoint whose digest does not match pinned digest let checkpoint_digest = checkpoint.digest(); if let Ok(pinned_digest_index) = pinned_checkpoints.binary_search_by_key( checkpoint.sequence_number(), @@ -1120,6 +1153,9 @@ where Ok(()) } +/// Syncs checkpoint contents from one of the archive_readers if the +/// highest_synced_checkpoint < lowest_checkpoint among peers. The requesting +/// checkpoint range is from highest_synced_checkpoint+1 to lowest_checkpoint. async fn sync_checkpoint_contents_from_archive( network: anemo::Network, archive_readers: ArchiveReaderBalancer, @@ -1129,6 +1165,7 @@ async fn sync_checkpoint_contents_from_archive( S: WriteStore + Clone + Send + Sync + 'static, { loop { + // Get connected peers that are on the same chain as us let peers: Vec<_> = peer_heights .read() .unwrap() @@ -1187,6 +1224,11 @@ async fn sync_checkpoint_contents_from_archive( } } +/// Syncs checkpoint contents from peers if the target sequence cursor, which is +/// changed via target_sequence_channel, is greater than the current one. The +/// requesting checkpoint range is from current_sequence+1 to +/// target_sequence_cursor. It will also periodically notify the peers of our +/// latest synced checkpoint. async fn sync_checkpoint_contents( network: anemo::Network, store: S, @@ -1262,7 +1304,7 @@ async fn sync_checkpoint_contents( }, } - // Start new tasks up to configured concurrency limits. + // Start syncing tasks up to configured concurrency limits. while current_sequence < target_sequence_cursor && checkpoint_contents_tasks.len() < checkpoint_content_download_concurrency { @@ -1306,6 +1348,8 @@ async fn sync_checkpoint_contents( } #[instrument(level = "debug", skip_all, fields(sequence_number = ?checkpoint.sequence_number()))] +/// Requests a single checkpoint contents from peers if the store does not +/// have it. async fn sync_one_checkpoint_contents( network: anemo::Network, store: S, @@ -1354,6 +1398,9 @@ where } #[instrument(level = "debug", skip_all)] +/// Request the full checkpoint contents from peers if the store does not +/// already have it. Requests are sent to peer one by one, until the contents +/// are successfully retrieved. async fn get_full_checkpoint_contents( peers: PeerBalancer, store: S, diff --git a/crates/iota-network/src/state_sync/server.rs b/crates/iota-network/src/state_sync/server.rs index 1a24487aa36..9d75afef580 100644 --- a/crates/iota-network/src/state_sync/server.rs +++ b/crates/iota-network/src/state_sync/server.rs @@ -47,6 +47,9 @@ impl StateSync for Server where S: WriteStore + Send + Sync + 'static, { + /// Pushes a checkpoint summary to the server. + /// If the checkpoint is higher than the highest verified checkpoint, it + /// will notify the event loop to potentially sync it. async fn push_checkpoint_summary( &self, request: Request, @@ -83,6 +86,8 @@ where Ok(Response::new(())) } + /// Gets a checkpoint summary by digest or sequence number, or get the + /// latest one. async fn get_checkpoint_summary( &self, request: Request, @@ -104,6 +109,8 @@ where Ok(Response::new(checkpoint)) } + /// Gets the highest synced checkpoint and the lowest available checkpoint + /// of the node. async fn get_checkpoint_availability( &self, _request: Request<()>, @@ -124,6 +131,7 @@ where })) } + /// Gets the contents of a checkpoint. async fn get_checkpoint_contents( &self, request: Request, diff --git a/crates/iota-network/src/state_sync/tests.rs b/crates/iota-network/src/state_sync/tests.rs index b378447c18b..18e729a9f0d 100644 --- a/crates/iota-network/src/state_sync/tests.rs +++ b/crates/iota-network/src/state_sync/tests.rs @@ -30,6 +30,8 @@ use crate::{ }; #[tokio::test] +// Test that the server stores the pushed checkpoint summary and triggers the +// sync job. async fn server_push_checkpoint() { let committee = CommitteeFixture::generate(rand::rngs::OsRng, 0, 4); let (ordered_checkpoints, _, _sequence_number_to_digest, _checkpoints) = @@ -186,7 +188,7 @@ async fn server_get_checkpoint() { #[tokio::test] async fn isolated_sync_job() { let committee = CommitteeFixture::generate(rand::rngs::OsRng, 0, 4); - // build mock data + // Build mock data let (ordered_checkpoints, _, sequence_number_to_digest, checkpoints) = committee.make_empty_checkpoints(100, None); @@ -271,7 +273,7 @@ async fn isolated_sync_job() { #[tokio::test] async fn test_state_sync_using_archive() -> anyhow::Result<()> { let committee = CommitteeFixture::generate(rand::rngs::OsRng, 0, 4); - // build mock data + // Build mock data let (ordered_checkpoints, _, sequence_number_to_digest, checkpoints) = committee.make_empty_checkpoints(100, None); // Initialize archive store with all checkpoints @@ -455,7 +457,7 @@ async fn test_state_sync_using_archive() -> anyhow::Result<()> { async fn sync_with_checkpoints_being_inserted() { telemetry_subscribers::init_for_testing(); let committee = CommitteeFixture::generate(rand::rngs::OsRng, 0, 4); - // build mock data + // Build mock data let (ordered_checkpoints, _contents, sequence_number_to_digest, checkpoints) = committee.make_empty_checkpoints(4, None); @@ -480,10 +482,10 @@ async fn sync_with_checkpoints_being_inserted() { committee.committee().to_owned(), ); - // get handles to each node's stores + // Get handles to each node's stores let store_1 = event_loop_1.store.clone(); let store_2 = event_loop_2.store.clone(); - // make sure that node_1 knows about node_2 + // Make sure that node_1 knows about node_2 event_loop_1.peer_heights.write().unwrap().peers.insert( network_2.peer_id(), PeerStateSyncInfo { @@ -585,7 +587,7 @@ async fn sync_with_checkpoints_being_inserted() { async fn sync_with_checkpoints_watermark() { telemetry_subscribers::init_for_testing(); let committee = CommitteeFixture::generate(rand::rngs::OsRng, 0, 4); - // build mock data + // Build mock data let (ordered_checkpoints, contents, _sequence_number_to_digest, _checkpoints) = committee.make_random_checkpoints(4, None); let last_checkpoint_seq = *ordered_checkpoints @@ -614,7 +616,7 @@ async fn sync_with_checkpoints_watermark() { committee.committee().to_owned(), ); - // get handles to each node's stores + // Get handles to each node's stores let store_1 = event_loop_1.store.clone(); let store_2 = event_loop_2.store.clone(); let peer_id_1 = network_1.peer_id();