diff --git a/crates/sui-config/src/p2p.rs b/crates/sui-config/src/p2p.rs index dc2007be7a921..9197bb688b2ce 100644 --- a/crates/sui-config/src/p2p.rs +++ b/crates/sui-config/src/p2p.rs @@ -1,7 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::net::SocketAddr; +use std::{net::SocketAddr, time::Duration}; use multiaddr::Multiaddr; use serde::{Deserialize, Serialize}; @@ -46,3 +46,73 @@ pub struct SeedPeer { pub peer_id: Option, pub address: Multiaddr, } + +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +#[serde(rename_all = "kebab-case")] +pub struct StateSyncConfig { + /// Query peers for their latest checkpoint every interval period. + /// + /// If unspecified, this will default to `5,000` milliseconds. + #[serde(skip_serializing_if = "Option::is_none")] + pub interval_period_ms: Option, + + /// Size of the StateSync actor's mailbox. + /// + /// If unspecified, this will default to `128`. + #[serde(skip_serializing_if = "Option::is_none")] + pub mailbox_capacity: Option, + + /// Size of the broadcast channel use for notifying other systems of newly sync'ed checkpoints. + /// + /// If unspecified, this will default to `128`. + #[serde(skip_serializing_if = "Option::is_none")] + pub synced_checkpoint_broadcast_channel_capacity: Option, + + /// Set the upper bound on the number of checkpoint headers to be downloaded concurrently. + /// + /// If unspecified, this will default to `100`. + #[serde(skip_serializing_if = "Option::is_none")] + pub checkpoint_header_download_concurrency: Option, + + /// Set the upper bound on the number of transactions to be downloaded concurrently from a + /// single checkpoint. + /// + /// If unspecified, this will default to `100`. + #[serde(skip_serializing_if = "Option::is_none")] + pub transaction_download_concurrency: Option, +} + +impl StateSyncConfig { + pub fn interval_period(&self) -> Duration { + const INTERVAL_PERIOD_MS: u64 = 5_000; // 5 seconds + + Duration::from_millis(self.interval_period_ms.unwrap_or(INTERVAL_PERIOD_MS)) + } + + pub fn mailbox_capacity(&self) -> usize { + const MAILBOX_CAPACITY: usize = 128; + + self.mailbox_capacity.unwrap_or(MAILBOX_CAPACITY) + } + + pub fn synced_checkpoint_broadcast_channel_capacity(&self) -> usize { + const SYNCED_CHECKPOINT_BROADCAST_CHANNEL_CAPACITY: usize = 128; + + self.synced_checkpoint_broadcast_channel_capacity + .unwrap_or(SYNCED_CHECKPOINT_BROADCAST_CHANNEL_CAPACITY) + } + + pub fn checkpoint_header_download_concurrency(&self) -> usize { + const CHECKPOINT_HEADER_DOWNLOAD_CONCURRENCY: usize = 100; + + self.checkpoint_header_download_concurrency + .unwrap_or(CHECKPOINT_HEADER_DOWNLOAD_CONCURRENCY) + } + + pub fn transaction_download_concurrency(&self) -> usize { + const TRANSACTION_DOWNLOAD_CONCURRENCY: usize = 100; + + self.transaction_download_concurrency + .unwrap_or(TRANSACTION_DOWNLOAD_CONCURRENCY) + } +} diff --git a/crates/sui-network/src/state_sync/builder.rs b/crates/sui-network/src/state_sync/builder.rs index d2afeefe165f3..631b1afe839be 100644 --- a/crates/sui-network/src/state_sync/builder.rs +++ b/crates/sui-network/src/state_sync/builder.rs @@ -5,6 +5,7 @@ use std::{ collections::HashMap, sync::{Arc, RwLock}, }; +use sui_config::p2p::StateSyncConfig; use sui_types::messages_checkpoint::VerifiedCheckpoint; use tap::Pipe; use tokio::{ @@ -20,19 +21,30 @@ use sui_types::storage::WriteStore; pub struct Builder { store: Option, - // config: Option, + config: Option, } impl Builder<()> { #[allow(clippy::new_without_default)] pub fn new() -> Self { - Self { store: None } + Self { + store: None, + config: None, + } } } impl Builder { - pub fn checkpoint_store(self, store: NewStore) -> Builder { - Builder { store: Some(store) } + pub fn store(self, store: NewStore) -> Builder { + Builder { + store: Some(store), + config: self.config, + } + } + + pub fn config(mut self, config: StateSyncConfig) -> Self { + self.config = Some(config); + self } } @@ -46,11 +58,13 @@ where } pub(super) fn build_internal(self) -> (UnstartedStateSync, Server) { - let Builder { store } = self; + let Builder { store, config } = self; let store = store.unwrap(); + let config = config.unwrap_or_default(); - let (sender, mailbox) = mpsc::channel(128); - let (checkpoint_event_sender, _reciever) = tokio::sync::broadcast::channel(128); + let (sender, mailbox) = mpsc::channel(config.mailbox_capacity()); + let (checkpoint_event_sender, _reciever) = + broadcast::channel(config.synced_checkpoint_broadcast_channel_capacity()); let weak_sender = sender.downgrade(); let handle = Handle { sender, @@ -72,6 +86,7 @@ where ( UnstartedStateSync { + config, handle, mailbox, store, @@ -84,6 +99,7 @@ where } pub struct UnstartedStateSync { + pub(super) config: StateSyncConfig, pub(super) handle: Handle, pub(super) mailbox: mpsc::Receiver, pub(super) store: S, @@ -97,6 +113,7 @@ where { pub(super) fn build(self, network: anemo::Network) -> (StateSyncEventLoop, Handle) { let Self { + config, handle, mailbox, store, @@ -106,6 +123,7 @@ where ( StateSyncEventLoop { + config, mailbox, weak_sender: handle.sender.downgrade(), tasks: JoinSet::new(), diff --git a/crates/sui-network/src/state_sync/mod.rs b/crates/sui-network/src/state_sync/mod.rs index db50a46197d37..86ae8ff9776b8 100644 --- a/crates/sui-network/src/state_sync/mod.rs +++ b/crates/sui-network/src/state_sync/mod.rs @@ -59,6 +59,7 @@ use std::{ sync::{Arc, RwLock}, time::Duration, }; +use sui_config::p2p::StateSyncConfig; use sui_types::{ base_types::ExecutionDigests, message_envelope::Message, @@ -210,6 +211,8 @@ enum StateSyncMessage { } struct StateSyncEventLoop { + config: StateSyncConfig, + mailbox: mpsc::Receiver, /// Weak reference to our own mailbox weak_sender: mpsc::WeakSender, @@ -235,7 +238,7 @@ where pub async fn start(mut self) { info!("State-Synchronizer started"); - let mut interval = tokio::time::interval(Duration::from_secs(60 * 10)); + let mut interval = tokio::time::interval(self.config.interval_period()); let mut peer_events = { let (subscriber, peers) = self.network.subscribe(); for peer_id in peers { @@ -419,6 +422,7 @@ where self.network.clone(), self.store.clone(), self.peer_heights.clone(), + self.config.checkpoint_header_download_concurrency(), // The if condition should ensure that this is Some highest_known_checkpoint.unwrap(), ) @@ -455,6 +459,7 @@ where self.peer_heights.clone(), self.weak_sender.clone(), self.checkpoint_event_sender.clone(), + self.config.transaction_download_concurrency(), // The if condition should ensure that this is Some highest_verified_checkpoint.unwrap(), ); @@ -583,6 +588,7 @@ async fn sync_to_checkpoint( network: anemo::Network, store: S, peer_heights: Arc>, + checkpoint_header_download_concurrency: usize, checkpoint: Checkpoint, ) -> Result<()> { let mut current = store.get_highest_verified_checkpoint(); @@ -662,7 +668,7 @@ async fn sync_to_checkpoint( } }) .pipe(futures::stream::iter) - .buffered(20); + .buffered(checkpoint_header_download_concurrency); while let Some((maybe_checkpoint, next)) = request_stream.next().await { // Verify the checkpoint @@ -725,6 +731,7 @@ async fn sync_checkpoint_contents( peer_heights: Arc>, sender: mpsc::WeakSender, checkpoint_event_sender: broadcast::Sender, + transaction_download_concurrency: usize, target_checkpoint: VerifiedCheckpoint, ) { let mut highest_synced = None; @@ -743,6 +750,7 @@ async fn sync_checkpoint_contents( network.clone(), &store, peer_heights.clone(), + transaction_download_concurrency, checkpoint, ) .await @@ -773,6 +781,7 @@ async fn sync_one_checkpoint_contents( network: anemo::Network, store: S, peer_heights: Arc>, + transaction_download_concurrency: usize, checkpoint: VerifiedCheckpoint, ) -> Result { let mut rng = ::from_entropy(); @@ -800,7 +809,7 @@ async fn sync_one_checkpoint_contents( .into_iter() .map(|digests| get_transaction_and_effects(peers.clone(), store.clone(), digests)) .pipe(futures::stream::iter) - .buffer_unordered(100); + .buffer_unordered(transaction_download_concurrency); while let Some(result) = stream.next().await { result?; diff --git a/crates/sui-network/src/state_sync/tests.rs b/crates/sui-network/src/state_sync/tests.rs index bb28d3d90bef6..c231d0a783ff3 100644 --- a/crates/sui-network/src/state_sync/tests.rs +++ b/crates/sui-network/src/state_sync/tests.rs @@ -39,7 +39,7 @@ async fn server_push_checkpoint() { .. }, server, - ) = Builder::new().checkpoint_store(store).build_internal(); + ) = Builder::new().store(store).build_internal(); let peer_id = PeerId([9; 32]); // fake PeerId let checkpoint = ordered_checkpoints[0].inner().to_owned(); @@ -78,7 +78,7 @@ async fn server_push_checkpoint() { #[tokio::test] async fn server_get_checkpoint() { let (builder, server) = Builder::new() - .checkpoint_store(SharedInMemoryStore::default()) + .store(SharedInMemoryStore::default()) .build_internal(); // Requests for checkpoints that aren't in the server's store @@ -146,14 +146,10 @@ async fn isolated_sync_job() { let committee = CommitteeFixture::generate(rand::rngs::OsRng, 0, 4); // Build and connect two nodes - let (builder, server) = Builder::new() - .checkpoint_store(SharedInMemoryStore::default()) - .build(); + let (builder, server) = Builder::new().store(SharedInMemoryStore::default()).build(); let network_1 = build_network(|router| router.add_rpc_service(server)); let (mut event_loop_1, _handle_1) = builder.build(network_1.clone()); - let (builder, server) = Builder::new() - .checkpoint_store(SharedInMemoryStore::default()) - .build(); + let (builder, server) = Builder::new().store(SharedInMemoryStore::default()).build(); let network_2 = build_network(|router| router.add_rpc_service(server)); let (event_loop_2, _handle_2) = builder.build(network_2.clone()); network_1.connect(network_2.local_addr()).await.unwrap(); @@ -230,14 +226,10 @@ async fn sync_with_checkpoints_being_inserted() { let committee = CommitteeFixture::generate(rand::rngs::OsRng, 0, 4); // Build and connect two nodes - let (builder, server) = Builder::new() - .checkpoint_store(SharedInMemoryStore::default()) - .build(); + let (builder, server) = Builder::new().store(SharedInMemoryStore::default()).build(); let network_1 = build_network(|router| router.add_rpc_service(server)); let (event_loop_1, handle_1) = builder.build(network_1.clone()); - let (builder, server) = Builder::new() - .checkpoint_store(SharedInMemoryStore::default()) - .build(); + let (builder, server) = Builder::new().store(SharedInMemoryStore::default()).build(); let network_2 = build_network(|router| router.add_rpc_service(server)); let (event_loop_2, handle_2) = builder.build(network_2.clone()); network_1.connect(network_2.local_addr()).await.unwrap();