diff --git a/Cargo.lock b/Cargo.lock index d244d10cbf870..fe2b57ecf0c4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1611,8 +1611,10 @@ dependencies = [ "aptos-time-service", "aptos-types", "async-trait", + "axum", "bcs 0.1.4 (git+https://github.com/aptos-labs/bcs.git?rev=d31fab9d81748e2594be5cd5cdf845786a30562d)", "bytes 1.2.1", + "dashmap", "futures", "futures-util", "hex", @@ -3048,37 +3050,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" -[[package]] -name = "axum" -version = "0.5.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9e3356844c4d6a6d6467b8da2cffb4a2820be256f50a3a386c9d152bab31043" -dependencies = [ - "async-trait", - "axum-core 0.2.8", - "bitflags", - "bytes 1.2.1", - "futures-util", - "http", - "http-body", - "hyper", - "itoa 1.0.3", - "matchit 0.5.0", - "memchr", - "mime", - "percent-encoding", - "pin-project-lite", - "serde 1.0.149", - "serde_json", - "serde_urlencoded", - "sync_wrapper", - "tokio", - "tower", - "tower-http", - "tower-layer", - "tower-service", -] - [[package]] name = "axum" version = "0.6.1" @@ -3086,7 +3057,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08b108ad2665fa3f6e6a517c3d80ec3e77d224c47d605167aefaa5d7ef97fa48" dependencies = [ "async-trait", - "axum-core 0.3.0", + "axum-core", "bitflags", "bytes 1.2.1", "futures-util", @@ -3094,36 +3065,24 @@ dependencies = [ "http-body", "hyper", "itoa 1.0.3", - "matchit 0.7.0", + "matchit", "memchr", "mime", "percent-encoding", "pin-project-lite", "rustversion", "serde 1.0.149", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", "sync_wrapper", + "tokio", "tower", "tower-http", "tower-layer", "tower-service", ] -[[package]] -name = "axum-core" -version = "0.2.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9f0c0a60006f2a293d82d571f635042a72edf927539b7685bd62d361963839b" -dependencies = [ - "async-trait", - "bytes 1.2.1", - "futures-util", - "http", - "http-body", - "mime", - "tower-layer", - "tower-service", -] - [[package]] name = "axum-core" version = "0.3.0" @@ -3145,7 +3104,7 @@ dependencies = [ name = "axum-test" version = "0.1.0" dependencies = [ - "axum 0.5.16", + "axum", "tokio", ] @@ -5273,9 +5232,9 @@ checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" [[package]] name = "httparse" -version = "1.7.1" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "496ce29bb5a52785b44e0f7ca2847ae0bb839c9bd28f69acac9b99d461c0c04c" +checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" [[package]] name = "httpdate" @@ -5334,9 +5293,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.20" +version = "0.14.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02c929dc5c39e335a03c405292728118860721b10190d98c2a0f0efd5baafbac" +checksum = "034711faac9d2166cb1baf1a2fb0b60b1f277f8492fd72176c17f3515e1abd3c" dependencies = [ "bytes 1.2.1", "futures-channel", @@ -6125,12 +6084,6 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" -[[package]] -name = "matchit" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" - [[package]] name = "matchit" version = "0.7.0" @@ -8898,6 +8851,15 @@ dependencies = [ "serde 1.0.149", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b04f22b563c91331a10074bda3dd5492e3cc39d56bd557e91c0af42b6c7341" +dependencies = [ + "serde 1.0.149", +] + [[package]] name = "serde_regex" version = "1.1.0" @@ -9234,9 +9196,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.4.4" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" +checksum = "02e2d2db9033d13a1567121ddd7a095ee144db4e1ca1b1bda3419bc0da294ebd" dependencies = [ "libc", "winapi 0.3.9", @@ -9828,7 +9790,7 @@ checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" dependencies = [ "async-stream", "async-trait", - "axum 0.6.1", + "axum", "base64 0.13.0", "bytes 1.2.1", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index 6121bc0b81cf0..3cfd1a2ba2302 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -284,7 +284,7 @@ assert_approx_eq = "1.1.0" assert_unordered = "0.1.1" async-stream = "0.3" async-trait = "0.1.53" -axum = "0.5.16" +axum = "0.6.1" base64 = "0.13.0" backtrace = "0.3.58" bcs = { git = "https://github.com/aptos-labs/bcs.git", rev = "d31fab9d81748e2594be5cd5cdf845786a30562d" } @@ -342,7 +342,7 @@ hkdf = "0.10.0" hostname = "0.3.1" http = "0.2.3" httpmock = "0.6" -hyper = { version = "0.14.18", features = ["full"] } +hyper = { version = "0.14.23", features = ["full"] } hyper-tls = "0.5.0" include_dir = { version = "0.7.2", features = ["glob"] } indicatif = "0.15.0" diff --git a/config/src/config/mod.rs b/config/src/config/mod.rs index 913a89217559c..1de45b857acd3 100644 --- a/config/src/config/mod.rs +++ b/config/src/config/mod.rs @@ -434,10 +434,12 @@ impl NodeConfig { if let Some(network) = self.validator_network.as_mut() { network.listen_address = crate::utils::get_available_port_in_multiaddr(true); + network.randomize_ports(); } for network in self.full_node_networks.iter_mut() { network.listen_address = crate::utils::get_available_port_in_multiaddr(true); + network.randomize_ports(); } } diff --git a/config/src/config/network_config.rs b/config/src/config/network_config.rs index 02ca660881cd1..cb3f87c1edea1 100644 --- a/config/src/config/network_config.rs +++ b/config/src/config/network_config.rs @@ -54,6 +54,8 @@ pub const INBOUND_TCP_RX_BUFFER_SIZE: u32 = 3 * 1024 * 1024; // 3MB ~6MB/s with pub const INBOUND_TCP_TX_BUFFER_SIZE: u32 = 512 * 1024; // 1MB use a bigger spoon pub const OUTBOUND_TCP_RX_BUFFER_SIZE: u32 = 3 * 1024 * 1024; // 3MB ~6MB/s with 500ms latency pub const OUTBOUND_TCP_TX_BUFFER_SIZE: u32 = 1024 * 1024; // 1MB use a bigger spoon +pub const ENABLE_APTOS_NETPERF_CLIENT: bool = true; +pub const DEFAULT_APTOS_NETPERF_CLIENT_PORT: u16 = 9107; #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[serde(default, deny_unknown_fields)] @@ -80,6 +82,7 @@ pub struct NetworkConfig { // Select this to enforce that both peers should authenticate each other, otherwise // authentication only occurs for outgoing connections. pub mutual_authentication: bool, + pub netperf_client_port: Option, pub network_id: NetworkId, pub runtime_threads: Option, pub inbound_rx_buffer_size_bytes: Option, @@ -120,7 +123,19 @@ impl Default for NetworkConfig { } } +fn netperf_client_port(enabled: bool) -> Option { + if enabled { + Some(DEFAULT_APTOS_NETPERF_CLIENT_PORT) + } else { + None + } +} + impl NetworkConfig { + pub fn randomize_ports(&mut self) { + self.netperf_client_port = Some(utils::get_available_port()); + } + pub fn network_with_id(network_id: NetworkId) -> NetworkConfig { let mutual_authentication = network_id.is_validator_network(); let mut config = Self { @@ -129,6 +144,7 @@ impl NetworkConfig { identity: Identity::None, listen_address: "/ip4/0.0.0.0/tcp/6180".parse().unwrap(), mutual_authentication, + netperf_client_port: netperf_client_port(ENABLE_APTOS_NETPERF_CLIENT), network_id, runtime_threads: None, seed_addrs: HashMap::new(), diff --git a/consensus/src/payload_manager.rs b/consensus/src/payload_manager.rs index a4fedfed23696..43f19180bd68f 100644 --- a/consensus/src/payload_manager.rs +++ b/consensus/src/payload_manager.rs @@ -18,6 +18,7 @@ use tokio::sync::oneshot; /// Responsible to extract the transactions out of the payload and notify QuorumStore about commits. /// If QuorumStore is enabled, has to ask BatchReader for the transaction behind the proofs of availability in the payload. +#[allow(dead_code)] pub enum PayloadManager { DirectMempool, InQuorumStore(BatchReader, Mutex>), diff --git a/consensus/src/state_computer.rs b/consensus/src/state_computer.rs index c069ded6adae3..04bc2dba370ba 100644 --- a/consensus/src/state_computer.rs +++ b/consensus/src/state_computer.rs @@ -37,10 +37,12 @@ type NotificationType = ( Vec, ); +#[allow(dead_code)] type CommitType = (u64, Round, Vec); /// Basic communication with the Execution module; /// implements StateComputer traits. +#[allow(dead_code)] pub struct ExecutionProxy { executor: Arc, txn_notifier: Arc, diff --git a/crates/channel/src/message_queues.rs b/crates/channel/src/message_queues.rs index 02f95f851b14a..8456184b9a217 100644 --- a/crates/channel/src/message_queues.rs +++ b/crates/channel/src/message_queues.rs @@ -123,6 +123,7 @@ impl PerKeyQueue { // For example, many of our queues have a max capacity of 1024. To // handle a single rpc from a transient peer, we would end up // allocating ~ 96 b * 1024 ~ 64 Kib per queue. + //TODO: Reconsider this. Maybe true for VFN but not Validators .or_insert_with(|| VecDeque::with_capacity(1)); // Add the key to our round-robin queue if it's not already there diff --git a/network/Cargo.toml b/network/Cargo.toml index b929dcb9370c8..cee422cd566ae 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -33,8 +33,10 @@ aptos-short-hex-str = { workspace = true } aptos-time-service = { workspace = true } aptos-types = { workspace = true } async-trait = { workspace = true } +axum = { workspace = true } bcs = { workspace = true } bytes = { workspace = true } +dashmap = { worksapce = true } futures = { workspace = true } futures-util = { workspace = true } hex = { workspace = true } diff --git a/network/builder/src/builder.rs b/network/builder/src/builder.rs index 1775d54c1a543..681fa908a2d3d 100644 --- a/network/builder/src/builder.rs +++ b/network/builder/src/builder.rs @@ -24,7 +24,10 @@ use aptos_infallible::RwLock; use aptos_logger::prelude::*; use aptos_netcore::transport::tcp::TCPBufferCfg; use aptos_network::{ - application::storage::PeerMetadataStorage, + application::{ + netperf::{builder::NetPerfBuilder, NetPerf}, + storage::PeerMetadataStorage, + }, connectivity_manager::{builder::ConnectivityManagerBuilder, ConnectivityRequest}, constants::MAX_MESSAGE_SIZE, logging::NetworkSchema, @@ -70,6 +73,7 @@ pub struct NetworkBuilder { health_checker_builder: Option, peer_manager_builder: PeerManagerBuilder, peer_metadata_storage: Arc, + netperf_builder: Option, } impl NetworkBuilder { @@ -125,6 +129,7 @@ impl NetworkBuilder { health_checker_builder: None, peer_manager_builder, peer_metadata_storage, + netperf_builder: None, } } @@ -226,6 +231,10 @@ impl NetworkBuilder { config.ping_failures_tolerated, ); + if let Some(netperf_port) = config.netperf_client_port { + network_builder.add_network_perf(netperf_port); + } + // Always add a connectivity manager to keep track of known peers let seeds = merge_seeds(config); @@ -315,6 +324,16 @@ impl NetworkBuilder { ); } + if let Some(netperf_builder) = self.netperf_builder.as_mut() { + if self.network_context.role() == RoleType::Validator { + netperf_builder.start(executor); + debug!( + NetworkSchema::new(&self.network_context), + "{} Started Aptos NetPerf", self.network_context + ); + } + } + if let Some(discovery_listeners) = self.discovery_listeners.take() { discovery_listeners .into_iter() @@ -426,6 +445,25 @@ impl NetworkBuilder { .push(listener); } + /// Add a Aptos NetPerf to the network. + fn add_network_perf(&mut self, netperf_port: u16) -> &mut Self { + let (netperf_tx, netperf_rx) = + self.add_client_and_service(&NetPerf::network_endpoint_config()); + self.netperf_builder = Some(NetPerfBuilder::new( + self.network_context(), + self.peer_metadata_storage.clone(), + Arc::new(netperf_tx), + netperf_rx, + netperf_port, + )); + debug!( + NetworkSchema::new(&self.network_context), + "{} Created Aptos NetPerf", self.network_context + ); + + self + } + /// Add a HealthChecker to the network. fn add_connection_monitoring( &mut self, diff --git a/network/src/application/mod.rs b/network/src/application/mod.rs index 316eeadbfcda4..dc36f7973545d 100644 --- a/network/src/application/mod.rs +++ b/network/src/application/mod.rs @@ -3,6 +3,7 @@ pub mod error; pub mod interface; +pub mod netperf; pub mod storage; pub mod types; diff --git a/network/src/application/netperf/builder.rs b/network/src/application/netperf/builder.rs new file mode 100644 index 0000000000000..b61ccff9c83be --- /dev/null +++ b/network/src/application/netperf/builder.rs @@ -0,0 +1,45 @@ +// Copyright (c) Aptos +// SPDX-License-Identifier: Apache-2.0 + +use crate::application::{ + netperf::{ + interface::{NetPerfNetworkEvents, NetPerfNetworkSender}, + NetPerf, + }, + storage::PeerMetadataStorage, +}; +use aptos_config::network_id::NetworkContext; +use aptos_logger::prelude::*; +use std::sync::Arc; +use tokio::runtime::Handle; + +pub struct NetPerfBuilder { + service: Option, +} + +impl NetPerfBuilder { + pub fn new( + network_context: NetworkContext, + peer_metadata_storage: Arc, + network_tx: Arc, + network_rx: NetPerfNetworkEvents, + netperf_port: u16, + ) -> Self { + let service = NetPerf::new( + network_context, + peer_metadata_storage, + network_tx, + network_rx, + netperf_port, + ); + Self { + service: Some(service), + } + } + + pub fn start(&mut self, executor: &Handle) { + if let Some(service) = self.service.take() { + spawn_named!("[Network] NetPerf", executor, service.start()); + } + } +} diff --git a/network/src/application/netperf/interface.rs b/network/src/application/netperf/interface.rs new file mode 100644 index 0000000000000..917bab5cb6619 --- /dev/null +++ b/network/src/application/netperf/interface.rs @@ -0,0 +1,52 @@ +// Copyright (c) Aptos +// SPDX-License-Identifier: Apache-2.0 +// +use crate::protocols::network::{NetworkEvents, NetworkSender}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct NetPerfPayload { + byte: Vec, +} + +impl NetPerfPayload { + pub fn new(mut len: usize) -> Self { + let mut v = Vec::with_capacity(len); + while len > 0 { + v.push(0); + len -= 1; + } + NetPerfPayload { byte: v } + } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub enum NetPerfMsg { + BlockOfBytes(NetPerfPayload), +} +/// The interface from Network to NetPerf layer. +/// +/// `NetPerfNetworkEvents` is a `Stream` of `PeerManagerNotification` where the +/// raw `Bytes` rpc messages are deserialized into +/// `NetPerfMsg` types. `NetPerfNetworkEvents` is a thin wrapper +/// around an `channel::Receiver`. +pub type NetPerfNetworkEvents = NetworkEvents; +/* +impl Stream for HealthCheckNetworkInterface { + type Item = Event; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().receiver).poll_next(cx) + } +} + + */ +/// The interface from NetPerf to Networking layer. +/// +/// This is a thin wrapper around a `NetworkSender`, so it is +/// easy to clone and send off to a separate task. For example, the rpc requests +/// return Futures that encapsulate the whole flow, from sending the request to +/// remote, to finally receiving the response and deserializing. It therefore +/// makes the most sense to make the rpc call on a separate async task, which +/// requires the `NetPerfNetworkSender` to be `Clone` and `Send`. +pub type NetPerfNetworkSender = NetworkSender; diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs new file mode 100644 index 0000000000000..29c3be36ce7fb --- /dev/null +++ b/network/src/application/netperf/mod.rs @@ -0,0 +1,332 @@ +// Copyright (c) Aptos +// SPDX-License-Identifier: Apache-2.0 + +//! Network Load Generator +//! +//! NetPerf is used to stress the network layer to gouge potential performance capabilities +//! and simplify network-related performance profiling and debugging +//! + +use crate::{ + application::{ + netperf::interface::{ + NetPerfMsg, NetPerfNetworkEvents, NetPerfNetworkSender, NetPerfPayload, + }, + storage::PeerMetadataStorage, + }, + constants::NETWORK_CHANNEL_SIZE, + counters, + logging::NetworkSchema, + protocols::network::{Event, NetworkApplicationConfig}, + transport::ConnectionMetadata, + ProtocolId, +}; +use aptos_channels::{aptos_channel, message_queues::QueueStyle}; +use aptos_config::network_id::NetworkContext; +use aptos_logger::prelude::*; +use aptos_types::PeerId; +use axum::{ + extract::Query, + http::StatusCode, + response::IntoResponse, + routing::{get, post}, + Extension, Json, Router, +}; +use dashmap::DashMap; +use futures::StreamExt; +use futures_util::stream::FuturesUnordered; +use serde::Serialize; +use std::{ + fs::OpenOptions, + sync::{atomic::AtomicBool, Arc}, + time::Duration, +}; +use tokio::sync::mpsc::{Receiver, Sender}; + +pub mod builder; +mod interface; + +const NETPERF_COMMAND_CHANNEL_SIZE: usize = 1024; +const NETPERF_DEFAULT_MSG_SIZE: usize = 64 * 1024; +const NETPERF_DEFAULT_DURTAION_SEC: u64 = 10; + +pub struct NetPerf { + network_context: NetworkContext, + peers: Arc, + peer_list: Arc>, //with capacity and hasher + sender: Arc, + events: NetPerfNetworkEvents, + netperf_port: u16, +} + +struct PeerNetPerfStat {} + +impl PeerNetPerfStat { + pub fn new(_md: ConnectionMetadata) -> Self { + PeerNetPerfStat {} + } +} + +#[allow(dead_code)] +#[derive(Clone)] +struct NetPerfState { + peers: Arc, + peer_list: Arc>, //with capacity and hasher + sender: Arc, + tx: Sender, +} + +impl NetPerf { + pub fn new( + network_context: NetworkContext, + peers: Arc, + sender: Arc, + events: NetPerfNetworkEvents, + netperf_port: u16, + ) -> Self { + NetPerf { + network_context, + peers, + peer_list: Arc::new(DashMap::with_capacity(128)), + sender, + events, + netperf_port, + } + } + + /// Configuration for the network endpoints to support NetPerf. + pub fn network_endpoint_config() -> NetworkApplicationConfig { + NetworkApplicationConfig::client_and_service( + [ + ProtocolId::NetPerfDirectSendCompressed, + ProtocolId::NetPerfRpcCompressed, + ], + aptos_channel::Config::new(NETWORK_CHANNEL_SIZE) + .queue_style(QueueStyle::FIFO) + .counters(&counters::PENDING_NET_PERF_NETWORK_EVENTS), + ) + } + + fn net_perf_state(&self, sender: Sender) -> NetPerfState { + NetPerfState { + peers: self.peers.clone(), + sender: self.sender.clone(), + peer_list: self.peer_list.clone(), + tx: sender, + } + } + + async fn start(mut self) { + let port = preferred_axum_port(self.netperf_port); + let (tx, rx) = tokio::sync::mpsc::channel::(NETPERF_COMMAND_CHANNEL_SIZE); + + info!( + NetworkSchema::new(&self.network_context), + "{} NetPerf Event Listener started", self.network_context, + ); + + spawn_named!( + "NetPerf Axum", + start_axum(self.net_perf_state(tx.clone()), port) + ); + spawn_named!( + "NetPerf EventHandler", + netperf_comp_handler(self.net_perf_state(tx.clone()), rx) + ); + + loop { + futures::select! { + maybe_event = self.events.next() => { + // Shutdown the NetPerf when this network instance shuts + // down. This happens when the `PeerManager` drops. + let event = match maybe_event { + Some(event) => event, + None => break, + }; + + match event { + Event::NewPeer(metadata) => { + self.peer_list.insert( + metadata.remote_peer_id, + PeerNetPerfStat::new(metadata) + ); + } + Event::LostPeer(metadata) => { + self.peer_list.remove( + &metadata.remote_peer_id + ); + } + Event::Message(_peer_id, msg) => { + match msg { + NetPerfMsg::BlockOfBytes(_bytes) => { + /* maybe add dedicated counters? but network_application_{out/in}bound_traffic + * seems to have us coverred + * */ + } + + } + + } + _ => {/* Currently ignore all*/} + } + } + } + } + warn!( + NetworkSchema::new(&self.network_context), + "{} NetPerf event listener terminated", self.network_context + ); + } +} +#[allow(dead_code)] +#[derive(Clone)] +enum NetPerfCommands { + Broadcast, +} + +fn preferred_axum_port(netperf_port: u16) -> u16 { + if netperf_port != 9107 { + let _ = OpenOptions::new() + .write(true) + .create_new(true) + .open("/tmp/9107.tmp"); + + let _ = OpenOptions::new() + .write(true) + .create(true) + .open(format!("/tmp/{}.tmp", netperf_port)); + } + netperf_port +} + +async fn start_axum(state: NetPerfState, netperf_port: u16) { + let app = Router::new() + .route("/", get(usage_handler)) + .route("/peers", get(get_peers).layer(Extension(state.clone()))) + .route( + "/command", + post(parse_query).layer(Extension(state.clone())), + ); + + let addr = std::net::SocketAddr::from(([0, 0, 0, 0], netperf_port)); + + // run it with hyper on netperf_port + axum::Server::bind(&addr) + .serve(app.into_make_service()) + .await + .unwrap(); +} + +async fn usage_handler() -> &'static str { + "Usage: curl 127.0.0.01:9107/peers" +} + +#[derive(Serialize)] +struct PeerList { + peers: Vec, +} + +impl PeerList { + pub fn new(len: usize) -> Self { + PeerList { + peers: Vec::with_capacity(len), + } + } +} + +async fn get_peers(Extension(state): Extension) -> Json { + let mut out = PeerList::new(state.peer_list.len()); + + let connected = state.peer_list.iter(); + + for peer in connected { + out.peers.push(peer.key().to_owned()); + } + + Json(out) +} + +async fn parse_query( + Extension(state): Extension, + Query(_params): Query>, +) -> impl IntoResponse { + //TODO(AlexM): Extract size and duration from _params + let size = NETPERF_DEFAULT_MSG_SIZE; + let duration = Duration::from_secs(NETPERF_DEFAULT_DURTAION_SEC); + + spawn_named!( + "[NetPerf] Brodcast Task", + netperf_broadcast(state, size, duration) + ); + + StatusCode::OK +} + +async fn netperf_comp_handler(state: NetPerfState, mut rx: Receiver) { + let mut rpc_handlers = FuturesUnordered::new(); + + loop { + tokio::select! { + opt_cmd = rx.recv() => { + match opt_cmd { + Some(_cmd) => { + let msg = NetPerfMsg::BlockOfBytes(NetPerfPayload::new(64 * 1024)); + + for peer in state.peer_list.iter() { + //TODO(AlexM): Yet another Alloc + Copy OPs. + // Best use Refs - Just ARC + rpc_handlers.push(state.sender.send_rpc( + peer.key().to_owned(), + ProtocolId::NetPerfRpcCompressed, + msg.clone(), + Duration::from_secs(5), + )); + } + } + None => break, + } + } + _res = rpc_handlers.select_next_some() => {} + } + } +} + +async fn netperf_broadcast(state: NetPerfState, size: usize, duration: Duration) { + let mut should_yield = false; + let msg = NetPerfMsg::BlockOfBytes(NetPerfPayload::new(size)); + let done = Arc::new(AtomicBool::new(false)); + let stop = done.clone(); + + tokio::spawn(async move { + tokio::time::sleep(duration).await; + stop.store(true, std::sync::atomic::Ordering::Relaxed); + }); + + loop { + /* TODO(AlexM): + * 1. Fine grained controll with send_to. + * Its interesting to see which of the validator queus gets filled. + * 2. msg.clone() is a disaster + * */ + let rc = state.sender.send_to_many( + state.peer_list.iter().map(|entry| entry.key().to_owned()), + ProtocolId::NetPerfDirectSendCompressed, + msg.clone(), + ); + if rc.is_err() { + should_yield = true + } //else update peer counters + /* maybe add dedicated counters? but network_application_{out/in}bound_traffic + * seems to have us coverred + * */ + + if done.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + if should_yield { + tokio::task::yield_now().await; + should_yield = false; + } + } + info!("Broadcast Op Finished"); +} diff --git a/network/src/counters.rs b/network/src/counters.rs index 61cd10dcfd65a..6fded6bafc3a3 100644 --- a/network/src/counters.rs +++ b/network/src/counters.rs @@ -326,6 +326,16 @@ pub static PENDING_NETWORK_REQUESTS: Lazy = Lazy::new(|| { .unwrap() }); +/// Counter of pending network events to Health Checker. +pub static PENDING_NET_PERF_NETWORK_EVENTS: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "aptos_network_pending_netperf_events", + "Number of pending netperf events by state", + &["state"] + ) + .unwrap() +}); + /// Counter of pending network events to Health Checker. pub static PENDING_HEALTH_CHECKER_NETWORK_EVENTS: Lazy = Lazy::new(|| { register_int_counter_vec!( diff --git a/network/src/protocols/network/mod.rs b/network/src/protocols/network/mod.rs index c4ba7d15b6140..47447875454aa 100644 --- a/network/src/protocols/network/mod.rs +++ b/network/src/protocols/network/mod.rs @@ -338,6 +338,7 @@ impl NetworkSender { timeout: Duration, ) -> Result { // serialize request + //TODO(AlexM): In Bcast next line repeats 100 Times, ergo, Bcast should be a Network function. let req_data = protocol.to_bytes(&req_msg)?.into(); let res_data = self .peer_mgr_reqs_tx diff --git a/network/src/protocols/wire/handshake/v1/mod.rs b/network/src/protocols/wire/handshake/v1/mod.rs index 69e08a5bde0cb..0c29484d9f835 100644 --- a/network/src/protocols/wire/handshake/v1/mod.rs +++ b/network/src/protocols/wire/handshake/v1/mod.rs @@ -57,6 +57,8 @@ pub enum ProtocolId { PeerMonitoringServiceRpc = 10, ConsensusRpcCompressed = 11, ConsensusDirectSendCompressed = 12, + NetPerfRpcCompressed = 13, + NetPerfDirectSendCompressed = 14, } /// The encoding types for Protocols @@ -83,6 +85,8 @@ impl ProtocolId { PeerMonitoringServiceRpc => "PeerMonitoringServiceRpc", ConsensusRpcCompressed => "ConsensusRpcCompressed", ConsensusDirectSendCompressed => "ConsensusDirectSendCompressed", + NetPerfRpcCompressed => "NetPerfRpcCompressed", + NetPerfDirectSendCompressed => "NetPerfDirectSendCompressed", } } @@ -101,6 +105,8 @@ impl ProtocolId { ProtocolId::PeerMonitoringServiceRpc, ProtocolId::ConsensusRpcCompressed, ProtocolId::ConsensusDirectSendCompressed, + ProtocolId::NetPerfRpcCompressed, + ProtocolId::NetPerfDirectSendCompressed, ] } @@ -111,6 +117,9 @@ impl ProtocolId { ProtocolId::ConsensusDirectSendCompressed | ProtocolId::ConsensusRpcCompressed => { Encoding::CompressedBcs(RECURSION_LIMIT) }, + ProtocolId::NetPerfDirectSendCompressed | ProtocolId::NetPerfRpcCompressed => { + Encoding::CompressedBcs(RECURSION_LIMIT) + }, ProtocolId::MempoolDirectSend => Encoding::CompressedBcs(USER_INPUT_RECURSION_LIMIT), ProtocolId::MempoolRpc => Encoding::Bcs(USER_INPUT_RECURSION_LIMIT), _ => Encoding::Bcs(RECURSION_LIMIT), diff --git a/testsuite/smoke-test/src/network.rs b/testsuite/smoke-test/src/network.rs index de477018b1feb..54df6fc4ddbf2 100644 --- a/testsuite/smoke-test/src/network.rs +++ b/testsuite/smoke-test/src/network.rs @@ -24,7 +24,7 @@ use std::{ #[tokio::test] async fn test_connection_limiting() { - let mut swarm = new_local_swarm_with_aptos(1).await; + let mut swarm = new_local_swarm_with_aptos(5).await; let version = swarm.versions().max().unwrap(); let validator_peer_id = swarm.validators().next().unwrap().peer_id(); @@ -130,6 +130,9 @@ async fn test_connection_limiting() { .unwrap() .unwrap_or(0) ); + while std::path::Path::new("/tmp/9107.tmp").exists() { + tokio::time::sleep(Duration::from_secs(1)).await; + } } #[tokio::test]