Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

markuze/aptos perf #5998

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 25 additions & 63 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ assert_approx_eq = "1.1.0"
assert_unordered = "0.1.1"
async-stream = "0.3"
async-trait = "0.1.53"
axum = "0.5.16"
axum = "0.6.1"
base64 = "0.13.0"
backtrace = "0.3.58"
bcs = { git = "https://github.com/aptos-labs/bcs.git", rev = "d31fab9d81748e2594be5cd5cdf845786a30562d" }
Expand Down Expand Up @@ -342,7 +342,7 @@ hkdf = "0.10.0"
hostname = "0.3.1"
http = "0.2.3"
httpmock = "0.6"
hyper = { version = "0.14.18", features = ["full"] }
hyper = { version = "0.14.23", features = ["full"] }
hyper-tls = "0.5.0"
include_dir = { version = "0.7.2", features = ["glob"] }
indicatif = "0.15.0"
Expand Down
2 changes: 2 additions & 0 deletions config/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,10 +434,12 @@ impl NodeConfig {

if let Some(network) = self.validator_network.as_mut() {
network.listen_address = crate::utils::get_available_port_in_multiaddr(true);
network.randomize_ports();
}

for network in self.full_node_networks.iter_mut() {
network.listen_address = crate::utils::get_available_port_in_multiaddr(true);
network.randomize_ports();
}
}

Expand Down
16 changes: 16 additions & 0 deletions config/src/config/network_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub const INBOUND_TCP_RX_BUFFER_SIZE: u32 = 3 * 1024 * 1024; // 3MB ~6MB/s with
pub const INBOUND_TCP_TX_BUFFER_SIZE: u32 = 512 * 1024; // 1MB use a bigger spoon
pub const OUTBOUND_TCP_RX_BUFFER_SIZE: u32 = 3 * 1024 * 1024; // 3MB ~6MB/s with 500ms latency
pub const OUTBOUND_TCP_TX_BUFFER_SIZE: u32 = 1024 * 1024; // 1MB use a bigger spoon
pub const ENABLE_APTOS_NETPERF_CLIENT: bool = true;
pub const DEFAULT_APTOS_NETPERF_CLIENT_PORT: u16 = 9107;

#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
Expand All @@ -80,6 +82,7 @@ pub struct NetworkConfig {
// Select this to enforce that both peers should authenticate each other, otherwise
// authentication only occurs for outgoing connections.
pub mutual_authentication: bool,
pub netperf_client_port: Option<u16>,
pub network_id: NetworkId,
pub runtime_threads: Option<usize>,
pub inbound_rx_buffer_size_bytes: Option<u32>,
Expand Down Expand Up @@ -120,7 +123,19 @@ impl Default for NetworkConfig {
}
}

fn netperf_client_port(enabled: bool) -> Option<u16> {
if enabled {
Some(DEFAULT_APTOS_NETPERF_CLIENT_PORT)
} else {
None
}
}

impl NetworkConfig {
pub fn randomize_ports(&mut self) {
self.netperf_client_port = Some(utils::get_available_port());
}

pub fn network_with_id(network_id: NetworkId) -> NetworkConfig {
let mutual_authentication = network_id.is_validator_network();
let mut config = Self {
Expand All @@ -129,6 +144,7 @@ impl NetworkConfig {
identity: Identity::None,
listen_address: "/ip4/0.0.0.0/tcp/6180".parse().unwrap(),
mutual_authentication,
netperf_client_port: netperf_client_port(ENABLE_APTOS_NETPERF_CLIENT),
network_id,
runtime_threads: None,
seed_addrs: HashMap::new(),
Expand Down
1 change: 1 addition & 0 deletions consensus/src/payload_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tokio::sync::oneshot;

/// Responsible to extract the transactions out of the payload and notify QuorumStore about commits.
/// If QuorumStore is enabled, has to ask BatchReader for the transaction behind the proofs of availability in the payload.
#[allow(dead_code)]
pub enum PayloadManager {
DirectMempool,
InQuorumStore(BatchReader, Mutex<Sender<PayloadRequest>>),
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ type NotificationType = (
Vec<ContractEvent>,
);

#[allow(dead_code)]
type CommitType = (u64, Round, Vec<Payload>);

/// Basic communication with the Execution module;
/// implements StateComputer traits.
#[allow(dead_code)]
pub struct ExecutionProxy {
executor: Arc<dyn BlockExecutorTrait>,
txn_notifier: Arc<dyn TxnNotifier>,
Expand Down
1 change: 1 addition & 0 deletions crates/channel/src/message_queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl<K: Eq + Hash + Clone, T> PerKeyQueue<K, T> {
// For example, many of our queues have a max capacity of 1024. To
// handle a single rpc from a transient peer, we would end up
// allocating ~ 96 b * 1024 ~ 64 Kib per queue.
//TODO: Reconsider this. Maybe true for VFN but not Validators
.or_insert_with(|| VecDeque::with_capacity(1));

// Add the key to our round-robin queue if it's not already there
Expand Down
2 changes: 2 additions & 0 deletions network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ aptos-short-hex-str = { workspace = true }
aptos-time-service = { workspace = true }
aptos-types = { workspace = true }
async-trait = { workspace = true }
axum = { workspace = true }
bcs = { workspace = true }
bytes = { workspace = true }
dashmap = { worksapce = true }
futures = { workspace = true }
futures-util = { workspace = true }
hex = { workspace = true }
Expand Down
40 changes: 39 additions & 1 deletion network/builder/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ use aptos_infallible::RwLock;
use aptos_logger::prelude::*;
use aptos_netcore::transport::tcp::TCPBufferCfg;
use aptos_network::{
application::storage::PeerMetadataStorage,
application::{
netperf::{builder::NetPerfBuilder, NetPerf},
storage::PeerMetadataStorage,
},
connectivity_manager::{builder::ConnectivityManagerBuilder, ConnectivityRequest},
constants::MAX_MESSAGE_SIZE,
logging::NetworkSchema,
Expand Down Expand Up @@ -70,6 +73,7 @@ pub struct NetworkBuilder {
health_checker_builder: Option<HealthCheckerBuilder>,
peer_manager_builder: PeerManagerBuilder,
peer_metadata_storage: Arc<PeerMetadataStorage>,
netperf_builder: Option<NetPerfBuilder>,
}

impl NetworkBuilder {
Expand Down Expand Up @@ -125,6 +129,7 @@ impl NetworkBuilder {
health_checker_builder: None,
peer_manager_builder,
peer_metadata_storage,
netperf_builder: None,
}
}

Expand Down Expand Up @@ -226,6 +231,10 @@ impl NetworkBuilder {
config.ping_failures_tolerated,
);

if let Some(netperf_port) = config.netperf_client_port {
network_builder.add_network_perf(netperf_port);
}

// Always add a connectivity manager to keep track of known peers
let seeds = merge_seeds(config);

Expand Down Expand Up @@ -315,6 +324,16 @@ impl NetworkBuilder {
);
}

if let Some(netperf_builder) = self.netperf_builder.as_mut() {
if self.network_context.role() == RoleType::Validator {
netperf_builder.start(executor);
debug!(
NetworkSchema::new(&self.network_context),
"{} Started Aptos NetPerf", self.network_context
);
}
}

if let Some(discovery_listeners) = self.discovery_listeners.take() {
discovery_listeners
.into_iter()
Expand Down Expand Up @@ -426,6 +445,25 @@ impl NetworkBuilder {
.push(listener);
}

/// Add a Aptos NetPerf to the network.
fn add_network_perf(&mut self, netperf_port: u16) -> &mut Self {
let (netperf_tx, netperf_rx) =
self.add_client_and_service(&NetPerf::network_endpoint_config());
self.netperf_builder = Some(NetPerfBuilder::new(
self.network_context(),
self.peer_metadata_storage.clone(),
Arc::new(netperf_tx),
netperf_rx,
netperf_port,
));
debug!(
NetworkSchema::new(&self.network_context),
"{} Created Aptos NetPerf", self.network_context
);

self
}

/// Add a HealthChecker to the network.
fn add_connection_monitoring(
&mut self,
Expand Down
1 change: 1 addition & 0 deletions network/src/application/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

pub mod error;
pub mod interface;
pub mod netperf;
pub mod storage;
pub mod types;

Expand Down
Loading