From c42b0729423d9f10e17fc65f9000a7835a974bff Mon Sep 17 00:00:00 2001 From: Alex Markuze Date: Thu, 22 Dec 2022 13:48:29 +0200 Subject: [PATCH 01/23] [Network/AptosPerf] Initial commit --- network/builder/src/builder.rs | 5 +++++ network/src/application/mod.rs | 1 + network/src/application/netperf/mod.rs | 30 ++++++++++++++++++++++++++ 3 files changed, 36 insertions(+) create mode 100644 network/src/application/netperf/mod.rs diff --git a/network/builder/src/builder.rs b/network/builder/src/builder.rs index 1775d54c1a543..899e403da0e37 100644 --- a/network/builder/src/builder.rs +++ b/network/builder/src/builder.rs @@ -426,6 +426,11 @@ impl NetworkBuilder { .push(listener); } + /// Add a Aptos NetPerf to the network. + fn add_network_perf(&mut self) -> &mut Self { + self + } + /// Add a HealthChecker to the network. fn add_connection_monitoring( &mut self, diff --git a/network/src/application/mod.rs b/network/src/application/mod.rs index 316eeadbfcda4..dc36f7973545d 100644 --- a/network/src/application/mod.rs +++ b/network/src/application/mod.rs @@ -3,6 +3,7 @@ pub mod error; pub mod interface; +pub mod netperf; pub mod storage; pub mod types; diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs new file mode 100644 index 0000000000000..61fd21ba19def --- /dev/null +++ b/network/src/application/netperf/mod.rs @@ -0,0 +1,30 @@ +// Copyright (c) Aptos +// SPDX-License-Identifier: Apache-2.0 + +//! Network stresser +//! +//! NetPerf is used to stress the network laayer to gouge potential performance capabilities and ease +//! network realted performance profiling and debugging +//! + +use crate::application::storage::PeerMetadataStorage; +use aptos_config::network_id::{NetworkContext, PeerNetworkId}; +use std::sync::Arc; + +pub struct NetPerf { + network_context: NetworkContext, + peers: Arc, +} + +impl NetPerf { + pub fn new( + network_context: NetworkContext, + peers: std::sync::Arc, + ) -> Self { + NetPerf { + network_context, + peers, + } + } + pub async fn start(mut self) {} +} From 25bfcfc586b2cb9f54ef24a9d81d9d8d024ea7fd Mon Sep 17 00:00:00 2001 From: Alex Markuze Date: Sun, 25 Dec 2022 13:11:11 +0200 Subject: [PATCH 02/23] [Network] AptosPerf Interface --- network/builder/src/builder.rs | 17 ++++++ network/src/application/netperf/mod.rs | 58 ++++++++++++++++++- .../src/protocols/wire/handshake/v1/mod.rs | 9 +++ 3 files changed, 83 insertions(+), 1 deletion(-) diff --git a/network/builder/src/builder.rs b/network/builder/src/builder.rs index 899e403da0e37..0cd592aa126a8 100644 --- a/network/builder/src/builder.rs +++ b/network/builder/src/builder.rs @@ -24,6 +24,7 @@ use aptos_infallible::RwLock; use aptos_logger::prelude::*; use aptos_netcore::transport::tcp::TCPBufferCfg; use aptos_network::{ + application::netperf::NetPerf, application::storage::PeerMetadataStorage, connectivity_manager::{builder::ConnectivityManagerBuilder, ConnectivityRequest}, constants::MAX_MESSAGE_SIZE, @@ -70,6 +71,7 @@ pub struct NetworkBuilder { health_checker_builder: Option, peer_manager_builder: PeerManagerBuilder, peer_metadata_storage: Arc, + aptos_netperf: bool, } impl NetworkBuilder { @@ -93,6 +95,7 @@ impl NetworkBuilder { inbound_rate_limit_config: Option, outbound_rate_limit_config: Option, tcp_buffer_cfg: TCPBufferCfg, + aptos_netperf: bool, ) -> Self { // A network cannot exist without a PeerManager // TODO: construct this in create and pass it to new() as a parameter. The complication is manual construction of NetworkBuilder in various tests. @@ -125,6 +128,7 @@ impl NetworkBuilder { health_checker_builder: None, peer_manager_builder, peer_metadata_storage, + aptos_netperf, } } @@ -157,6 +161,7 @@ impl NetworkBuilder { None, None, TCPBufferCfg::default(), + false, ); builder.add_connectivity_manager( @@ -181,6 +186,7 @@ impl NetworkBuilder { time_service: TimeService, mut reconfig_subscription_service: Option<&mut EventSubscriptionService>, peer_metadata_storage: Arc, + aptos_netperf: bool, ) -> NetworkBuilder { let peer_id = config.peer_id(); let identity_key = config.identity_key(); @@ -218,6 +224,7 @@ impl NetworkBuilder { config.outbound_rx_buffer_size_bytes, config.outbound_tx_buffer_size_bytes, ), + aptos_netperf, ); network_builder.add_connection_monitoring( @@ -226,6 +233,10 @@ impl NetworkBuilder { config.ping_failures_tolerated, ); + if (aptos_netperf == true) { + network_builder.add_network_perf(); + } + // Always add a connectivity manager to keep track of known peers let seeds = merge_seeds(config); @@ -428,6 +439,12 @@ impl NetworkBuilder { /// Add a Aptos NetPerf to the network. fn add_network_perf(&mut self) -> &mut Self { + let (netperf_tx, netperf_rx) = self.add_p2p_service(&NetPerf::network_endpoint_config()); + debug!( + NetworkSchema::new(&self.network_context), + "{} Created Aptos NetPerf", self.network_context + ); + self } diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs index 61fd21ba19def..8e926cadbc285 100644 --- a/network/src/application/netperf/mod.rs +++ b/network/src/application/netperf/mod.rs @@ -8,23 +8,79 @@ //! use crate::application::storage::PeerMetadataStorage; +use crate::{ + application::interface::NetworkInterface, + constants::NETWORK_CHANNEL_SIZE, + counters, + error::NetworkError, + logging::NetworkSchema, + peer_manager::{ConnectionRequestSender, PeerManagerRequestSender}, + protocols::{ + network::{ + AppConfig, ApplicationNetworkSender, Event, NetworkEvents, NetworkSender, + NewNetworkSender, + }, + rpc::error::RpcError, + }, + ProtocolId, +}; +use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::network_id::{NetworkContext, PeerNetworkId}; use std::sync::Arc; +//Interface + +pub enum NetPerfMsg { + BlockOfBytes64K, +} +/// The interface from Network to NetPerf layer. +/// +/// `NetPerfNetworkEvents` is a `Stream` of `PeerManagerNotification` where the +/// raw `Bytes` rpc messages are deserialized into +/// `NetPerfMsg` types. `NetPerfNetworkEvents` is a thin wrapper +/// around an `channel::Receiver`. +pub type NetPerfNetworkEvents = NetworkEvents; + +/// The interface from NetPerf to Networking layer. +/// +/// This is a thin wrapper around a `NetworkSender`, so it is +/// easy to clone and send off to a separate task. For example, the rpc requests +/// return Futures that encapsulate the whole flow, from sending the request to +/// remote, to finally receiving the response and deserializing. It therefore +/// makes the most sense to make the rpc call on a separate async task, which +/// requires the `NetPerfNetworkSender` to be `Clone` and `Send`. +pub type NetPerfNetworkSender = NetworkSender; + +//Interface End pub struct NetPerf { network_context: NetworkContext, peers: Arc, + sender: Arc, + events: NetPerfNetworkEvents, } impl NetPerf { pub fn new( network_context: NetworkContext, - peers: std::sync::Arc, + peers: Arc, + sender: Arc, + events: NetPerfNetworkEvents, ) -> Self { NetPerf { network_context, peers, + sender, + events, } } + + /// Configuration for the network endpoints to support NetPerf. + pub fn network_endpoint_config() -> AppConfig { + AppConfig::p2p( + [ProtocolId::NetPerfRpcCompressed], + aptos_channel::Config::new(NETWORK_CHANNEL_SIZE).queue_style(QueueStyle::LIFO), + ) + } + pub async fn start(mut self) {} } diff --git a/network/src/protocols/wire/handshake/v1/mod.rs b/network/src/protocols/wire/handshake/v1/mod.rs index 69e08a5bde0cb..0c29484d9f835 100644 --- a/network/src/protocols/wire/handshake/v1/mod.rs +++ b/network/src/protocols/wire/handshake/v1/mod.rs @@ -57,6 +57,8 @@ pub enum ProtocolId { PeerMonitoringServiceRpc = 10, ConsensusRpcCompressed = 11, ConsensusDirectSendCompressed = 12, + NetPerfRpcCompressed = 13, + NetPerfDirectSendCompressed = 14, } /// The encoding types for Protocols @@ -83,6 +85,8 @@ impl ProtocolId { PeerMonitoringServiceRpc => "PeerMonitoringServiceRpc", ConsensusRpcCompressed => "ConsensusRpcCompressed", ConsensusDirectSendCompressed => "ConsensusDirectSendCompressed", + NetPerfRpcCompressed => "NetPerfRpcCompressed", + NetPerfDirectSendCompressed => "NetPerfDirectSendCompressed", } } @@ -101,6 +105,8 @@ impl ProtocolId { ProtocolId::PeerMonitoringServiceRpc, ProtocolId::ConsensusRpcCompressed, ProtocolId::ConsensusDirectSendCompressed, + ProtocolId::NetPerfRpcCompressed, + ProtocolId::NetPerfDirectSendCompressed, ] } @@ -111,6 +117,9 @@ impl ProtocolId { ProtocolId::ConsensusDirectSendCompressed | ProtocolId::ConsensusRpcCompressed => { Encoding::CompressedBcs(RECURSION_LIMIT) }, + ProtocolId::NetPerfDirectSendCompressed | ProtocolId::NetPerfRpcCompressed => { + Encoding::CompressedBcs(RECURSION_LIMIT) + }, ProtocolId::MempoolDirectSend => Encoding::CompressedBcs(USER_INPUT_RECURSION_LIMIT), ProtocolId::MempoolRpc => Encoding::Bcs(USER_INPUT_RECURSION_LIMIT), _ => Encoding::Bcs(RECURSION_LIMIT), From 18399ec2effbed1c574d8e23258ccc4e07258a1f Mon Sep 17 00:00:00 2001 From: Alex Markuze Date: Sun, 25 Dec 2022 14:54:53 +0200 Subject: [PATCH 03/23] [Network] NetPerf separate interface and builder --- config/src/config/network_config.rs | 3 ++ network/builder/src/builder.rs | 26 ++++++++---- network/src/application/netperf/builder.rs | 43 ++++++++++++++++++++ network/src/application/netperf/interface.rs | 42 +++++++++++++++++++ network/src/application/netperf/mod.rs | 32 +++------------ 5 files changed, 113 insertions(+), 33 deletions(-) create mode 100644 network/src/application/netperf/builder.rs create mode 100644 network/src/application/netperf/interface.rs diff --git a/config/src/config/network_config.rs b/config/src/config/network_config.rs index 02ca660881cd1..9bbfc54470f65 100644 --- a/config/src/config/network_config.rs +++ b/config/src/config/network_config.rs @@ -54,6 +54,7 @@ pub const INBOUND_TCP_RX_BUFFER_SIZE: u32 = 3 * 1024 * 1024; // 3MB ~6MB/s with pub const INBOUND_TCP_TX_BUFFER_SIZE: u32 = 512 * 1024; // 1MB use a bigger spoon pub const OUTBOUND_TCP_RX_BUFFER_SIZE: u32 = 3 * 1024 * 1024; // 3MB ~6MB/s with 500ms latency pub const OUTBOUND_TCP_TX_BUFFER_SIZE: u32 = 1024 * 1024; // 1MB use a bigger spoon +pub const ENABLE_APTOS_NETPERF_CLIENT: bool = true; #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[serde(default, deny_unknown_fields)] @@ -80,6 +81,7 @@ pub struct NetworkConfig { // Select this to enforce that both peers should authenticate each other, otherwise // authentication only occurs for outgoing connections. pub mutual_authentication: bool, + pub enable_netperf_client: bool, pub network_id: NetworkId, pub runtime_threads: Option, pub inbound_rx_buffer_size_bytes: Option, @@ -129,6 +131,7 @@ impl NetworkConfig { identity: Identity::None, listen_address: "/ip4/0.0.0.0/tcp/6180".parse().unwrap(), mutual_authentication, + enable_netperf_client: ENABLE_APTOS_NETPERF_CLIENT, network_id, runtime_threads: None, seed_addrs: HashMap::new(), diff --git a/network/builder/src/builder.rs b/network/builder/src/builder.rs index 0cd592aa126a8..094e01e45ce41 100644 --- a/network/builder/src/builder.rs +++ b/network/builder/src/builder.rs @@ -23,6 +23,8 @@ use aptos_event_notifications::{EventSubscriptionService, ReconfigNotificationLi use aptos_infallible::RwLock; use aptos_logger::prelude::*; use aptos_netcore::transport::tcp::TCPBufferCfg; +use aptos_network::application::netperf::builder::NetPerfBuilder; +use aptos_network::protocols::wire::handshake::v1::ProtocolId::NetPerfDirectSendCompressed; use aptos_network::{ application::netperf::NetPerf, application::storage::PeerMetadataStorage, @@ -71,7 +73,7 @@ pub struct NetworkBuilder { health_checker_builder: Option, peer_manager_builder: PeerManagerBuilder, peer_metadata_storage: Arc, - aptos_netperf: bool, + netperf_builder: Option, } impl NetworkBuilder { @@ -95,7 +97,6 @@ impl NetworkBuilder { inbound_rate_limit_config: Option, outbound_rate_limit_config: Option, tcp_buffer_cfg: TCPBufferCfg, - aptos_netperf: bool, ) -> Self { // A network cannot exist without a PeerManager // TODO: construct this in create and pass it to new() as a parameter. The complication is manual construction of NetworkBuilder in various tests. @@ -128,7 +129,7 @@ impl NetworkBuilder { health_checker_builder: None, peer_manager_builder, peer_metadata_storage, - aptos_netperf, + netperf_builder: None, } } @@ -161,7 +162,6 @@ impl NetworkBuilder { None, None, TCPBufferCfg::default(), - false, ); builder.add_connectivity_manager( @@ -186,7 +186,6 @@ impl NetworkBuilder { time_service: TimeService, mut reconfig_subscription_service: Option<&mut EventSubscriptionService>, peer_metadata_storage: Arc, - aptos_netperf: bool, ) -> NetworkBuilder { let peer_id = config.peer_id(); let identity_key = config.identity_key(); @@ -224,7 +223,6 @@ impl NetworkBuilder { config.outbound_rx_buffer_size_bytes, config.outbound_tx_buffer_size_bytes, ), - aptos_netperf, ); network_builder.add_connection_monitoring( @@ -233,7 +231,7 @@ impl NetworkBuilder { config.ping_failures_tolerated, ); - if (aptos_netperf == true) { + if config.enable_netperf_client == true { network_builder.add_network_perf(); } @@ -326,6 +324,14 @@ impl NetworkBuilder { ); } + if let Some(netperf_builder) = self.netperf_builder.as_mut() { + netperf_builder.start(executor); + debug!( + NetworkSchema::new(&self.network_context), + "{} Started Aptos NetPerf", self.network_context + ); + } + if let Some(discovery_listeners) = self.discovery_listeners.take() { discovery_listeners .into_iter() @@ -440,6 +446,12 @@ impl NetworkBuilder { /// Add a Aptos NetPerf to the network. fn add_network_perf(&mut self) -> &mut Self { let (netperf_tx, netperf_rx) = self.add_p2p_service(&NetPerf::network_endpoint_config()); + self.netperf_builder = Some(NetPerfBuilder::new( + self.network_context(), + self.peer_metadata_storage.clone(), + Arc::new(netperf_tx), + netperf_rx, + )); debug!( NetworkSchema::new(&self.network_context), "{} Created Aptos NetPerf", self.network_context diff --git a/network/src/application/netperf/builder.rs b/network/src/application/netperf/builder.rs new file mode 100644 index 0000000000000..d33f3ffbae70a --- /dev/null +++ b/network/src/application/netperf/builder.rs @@ -0,0 +1,43 @@ +// Copyright (c) Aptos +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + application::netperf::{ + interface::{NetPerfNetworkEvents, NetPerfNetworkSender}, + NetPerf, + }, + application::storage::PeerMetadataStorage, +}; +use aptos_config::network_id::NetworkContext; +use aptos_logger::prelude::*; +use std::sync::Arc; +use tokio::runtime::Handle; + +pub struct NetPerfBuilder { + service: Option, +} + +impl NetPerfBuilder { + pub fn new( + network_context: NetworkContext, + peer_metadata_storage: Arc, + network_tx: Arc, + network_rx: NetPerfNetworkEvents, + ) -> Self { + let service = NetPerf::new( + network_context, + peer_metadata_storage, + network_tx, + network_rx, + ); + Self { + service: Some(service), + } + } + + pub fn start(&mut self, executor: &Handle) { + if let Some(service) = self.service.take() { + spawn_named!("[Network] NetPerf", executor, service.start()); + } + } +} diff --git a/network/src/application/netperf/interface.rs b/network/src/application/netperf/interface.rs new file mode 100644 index 0000000000000..02b89c0148af3 --- /dev/null +++ b/network/src/application/netperf/interface.rs @@ -0,0 +1,42 @@ +// Copyright (c) Aptos +// SPDX-License-Identifier: Apache-2.0 +// +use crate::{ + application::interface::NetworkInterface, + constants::NETWORK_CHANNEL_SIZE, + counters, + error::NetworkError, + logging::NetworkSchema, + peer_manager::{ConnectionRequestSender, PeerManagerRequestSender}, + protocols::{ + network::{ + AppConfig, ApplicationNetworkSender, Event, NetworkEvents, NetworkSender, + NewNetworkSender, + }, + rpc::error::RpcError, + }, + ProtocolId, +}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub enum NetPerfMsg { + BlockOfBytes64K, +} +/// The interface from Network to NetPerf layer. +/// +/// `NetPerfNetworkEvents` is a `Stream` of `PeerManagerNotification` where the +/// raw `Bytes` rpc messages are deserialized into +/// `NetPerfMsg` types. `NetPerfNetworkEvents` is a thin wrapper +/// around an `channel::Receiver`. +pub type NetPerfNetworkEvents = NetworkEvents; + +/// The interface from NetPerf to Networking layer. +/// +/// This is a thin wrapper around a `NetworkSender`, so it is +/// easy to clone and send off to a separate task. For example, the rpc requests +/// return Futures that encapsulate the whole flow, from sending the request to +/// remote, to finally receiving the response and deserializing. It therefore +/// makes the most sense to make the rpc call on a separate async task, which +/// requires the `NetPerfNetworkSender` to be `Clone` and `Send`. +pub type NetPerfNetworkSender = NetworkSender; diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs index 8e926cadbc285..7a6991fc16b31 100644 --- a/network/src/application/netperf/mod.rs +++ b/network/src/application/netperf/mod.rs @@ -1,15 +1,15 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 -//! Network stresser +//! Network Load Generator //! -//! NetPerf is used to stress the network laayer to gouge potential performance capabilities and ease -//! network realted performance profiling and debugging +//! NetPerf is used to stress the network layer to gouge potential performance capabilities +//! and simplify network-related performance profiling and debugging //! use crate::application::storage::PeerMetadataStorage; use crate::{ - application::interface::NetworkInterface, + application::netperf::interface::{NetPerfNetworkEvents, NetPerfNetworkSender}, constants::NETWORK_CHANNEL_SIZE, counters, error::NetworkError, @@ -28,28 +28,8 @@ use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::network_id::{NetworkContext, PeerNetworkId}; use std::sync::Arc; -//Interface - -pub enum NetPerfMsg { - BlockOfBytes64K, -} -/// The interface from Network to NetPerf layer. -/// -/// `NetPerfNetworkEvents` is a `Stream` of `PeerManagerNotification` where the -/// raw `Bytes` rpc messages are deserialized into -/// `NetPerfMsg` types. `NetPerfNetworkEvents` is a thin wrapper -/// around an `channel::Receiver`. -pub type NetPerfNetworkEvents = NetworkEvents; - -/// The interface from NetPerf to Networking layer. -/// -/// This is a thin wrapper around a `NetworkSender`, so it is -/// easy to clone and send off to a separate task. For example, the rpc requests -/// return Futures that encapsulate the whole flow, from sending the request to -/// remote, to finally receiving the response and deserializing. It therefore -/// makes the most sense to make the rpc call on a separate async task, which -/// requires the `NetPerfNetworkSender` to be `Clone` and `Send`. -pub type NetPerfNetworkSender = NetworkSender; +pub mod builder; +mod interface; //Interface End pub struct NetPerf { From cf4bbb251c5478fa805b198d1f064b74b63264ca Mon Sep 17 00:00:00 2001 From: Alex Markuze Date: Sun, 25 Dec 2022 18:25:39 +0200 Subject: [PATCH 04/23] [Netowrk] AptosPerf axum router --- Cargo.lock | 87 +++++++------------------- Cargo.toml | 4 +- network/Cargo.toml | 1 + network/src/application/netperf/mod.rs | 32 +++++++++- 4 files changed, 57 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d244d10cbf870..a8c6a31b1baad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1611,6 +1611,7 @@ dependencies = [ "aptos-time-service", "aptos-types", "async-trait", + "axum", "bcs 0.1.4 (git+https://github.com/aptos-labs/bcs.git?rev=d31fab9d81748e2594be5cd5cdf845786a30562d)", "bytes 1.2.1", "futures", @@ -3048,37 +3049,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" -[[package]] -name = "axum" -version = "0.5.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9e3356844c4d6a6d6467b8da2cffb4a2820be256f50a3a386c9d152bab31043" -dependencies = [ - "async-trait", - "axum-core 0.2.8", - "bitflags", - "bytes 1.2.1", - "futures-util", - "http", - "http-body", - "hyper", - "itoa 1.0.3", - "matchit 0.5.0", - "memchr", - "mime", - "percent-encoding", - "pin-project-lite", - "serde 1.0.149", - "serde_json", - "serde_urlencoded", - "sync_wrapper", - "tokio", - "tower", - "tower-http", - "tower-layer", - "tower-service", -] - [[package]] name = "axum" version = "0.6.1" @@ -3086,7 +3056,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08b108ad2665fa3f6e6a517c3d80ec3e77d224c47d605167aefaa5d7ef97fa48" dependencies = [ "async-trait", - "axum-core 0.3.0", + "axum-core", "bitflags", "bytes 1.2.1", "futures-util", @@ -3094,36 +3064,24 @@ dependencies = [ "http-body", "hyper", "itoa 1.0.3", - "matchit 0.7.0", + "matchit", "memchr", "mime", "percent-encoding", "pin-project-lite", "rustversion", "serde 1.0.149", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", "sync_wrapper", + "tokio", "tower", "tower-http", "tower-layer", "tower-service", ] -[[package]] -name = "axum-core" -version = "0.2.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9f0c0a60006f2a293d82d571f635042a72edf927539b7685bd62d361963839b" -dependencies = [ - "async-trait", - "bytes 1.2.1", - "futures-util", - "http", - "http-body", - "mime", - "tower-layer", - "tower-service", -] - [[package]] name = "axum-core" version = "0.3.0" @@ -3145,7 +3103,7 @@ dependencies = [ name = "axum-test" version = "0.1.0" dependencies = [ - "axum 0.5.16", + "axum", "tokio", ] @@ -5273,9 +5231,9 @@ checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" [[package]] name = "httparse" -version = "1.7.1" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "496ce29bb5a52785b44e0f7ca2847ae0bb839c9bd28f69acac9b99d461c0c04c" +checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" [[package]] name = "httpdate" @@ -5334,9 +5292,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.20" +version = "0.14.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02c929dc5c39e335a03c405292728118860721b10190d98c2a0f0efd5baafbac" +checksum = "034711faac9d2166cb1baf1a2fb0b60b1f277f8492fd72176c17f3515e1abd3c" dependencies = [ "bytes 1.2.1", "futures-channel", @@ -6125,12 +6083,6 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" -[[package]] -name = "matchit" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" - [[package]] name = "matchit" version = "0.7.0" @@ -8898,6 +8850,15 @@ dependencies = [ "serde 1.0.149", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b04f22b563c91331a10074bda3dd5492e3cc39d56bd557e91c0af42b6c7341" +dependencies = [ + "serde 1.0.149", +] + [[package]] name = "serde_regex" version = "1.1.0" @@ -9234,9 +9195,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.4.4" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" +checksum = "02e2d2db9033d13a1567121ddd7a095ee144db4e1ca1b1bda3419bc0da294ebd" dependencies = [ "libc", "winapi 0.3.9", @@ -9828,7 +9789,7 @@ checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" dependencies = [ "async-stream", "async-trait", - "axum 0.6.1", + "axum", "base64 0.13.0", "bytes 1.2.1", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index 6121bc0b81cf0..3cfd1a2ba2302 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -284,7 +284,7 @@ assert_approx_eq = "1.1.0" assert_unordered = "0.1.1" async-stream = "0.3" async-trait = "0.1.53" -axum = "0.5.16" +axum = "0.6.1" base64 = "0.13.0" backtrace = "0.3.58" bcs = { git = "https://github.com/aptos-labs/bcs.git", rev = "d31fab9d81748e2594be5cd5cdf845786a30562d" } @@ -342,7 +342,7 @@ hkdf = "0.10.0" hostname = "0.3.1" http = "0.2.3" httpmock = "0.6" -hyper = { version = "0.14.18", features = ["full"] } +hyper = { version = "0.14.23", features = ["full"] } hyper-tls = "0.5.0" include_dir = { version = "0.7.2", features = ["glob"] } indicatif = "0.15.0" diff --git a/network/Cargo.toml b/network/Cargo.toml index b929dcb9370c8..ffdad5712b6c7 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -33,6 +33,7 @@ aptos-short-hex-str = { workspace = true } aptos-time-service = { workspace = true } aptos-types = { workspace = true } async-trait = { workspace = true } +axum = { workspace = true } bcs = { workspace = true } bytes = { workspace = true } futures = { workspace = true } diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs index 7a6991fc16b31..358886575bd14 100644 --- a/network/src/application/netperf/mod.rs +++ b/network/src/application/netperf/mod.rs @@ -26,12 +26,12 @@ use crate::{ }; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::network_id::{NetworkContext, PeerNetworkId}; +use axum::{routing::get, Router}; use std::sync::Arc; pub mod builder; mod interface; -//Interface End pub struct NetPerf { network_context: NetworkContext, peers: Arc, @@ -39,6 +39,11 @@ pub struct NetPerf { events: NetPerfNetworkEvents, } +struct NetPerfState { + peers: Arc, + sender: Arc, +} + impl NetPerf { pub fn new( network_context: NetworkContext, @@ -62,5 +67,28 @@ impl NetPerf { ) } - pub async fn start(mut self) {} + pub async fn start(mut self) { + let state = NetPerfState { + peers: self.peers.clone(), + sender: self.sender.clone(), + }; + + let app = Router::new() + .route("/", get(usage_handler)) + .route("/peers", get(get_peers)); + + // run it with hyper on localhost:9107 + axum::Server::bind(&"0.0.0.0:9107".parse().unwrap()) + .serve(app.into_make_service()) + .await + .unwrap(); + } +} + +async fn usage_handler() -> &'static str { + "Usage: curl 127.0.0.01:9107/peers" +} + +async fn get_peers() -> &'static str { + "Usage: curl 127.0.0.01:9107/peers" } From 391e398b637a4eb6cdad5e3cbcb77d0bdad908f3 Mon Sep 17 00:00:00 2001 From: Alex Markuze Date: Sun, 25 Dec 2022 21:33:04 +0200 Subject: [PATCH 05/23] [Network] NetPerf DashMap for peers and custom hasher --- network/src/application/netperf/interface.rs | 9 ++ network/src/application/netperf/mod.rs | 142 ++++++++++++++++++- 2 files changed, 147 insertions(+), 4 deletions(-) diff --git a/network/src/application/netperf/interface.rs b/network/src/application/netperf/interface.rs index 02b89c0148af3..137939d120852 100644 --- a/network/src/application/netperf/interface.rs +++ b/network/src/application/netperf/interface.rs @@ -30,7 +30,16 @@ pub enum NetPerfMsg { /// `NetPerfMsg` types. `NetPerfNetworkEvents` is a thin wrapper /// around an `channel::Receiver`. pub type NetPerfNetworkEvents = NetworkEvents; +/* +impl Stream for HealthCheckNetworkInterface { + type Item = Event; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().receiver).poll_next(cx) + } +} + + */ /// The interface from NetPerf to Networking layer. /// /// This is a thin wrapper around a `NetworkSender`, so it is diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs index 358886575bd14..0fb64a5fd25e4 100644 --- a/network/src/application/netperf/mod.rs +++ b/network/src/application/netperf/mod.rs @@ -26,7 +26,8 @@ use crate::{ }; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::network_id::{NetworkContext, PeerNetworkId}; -use axum::{routing::get, Router}; +use aptos_logger::prelude::*; +use axum::{routing::get, Extension, Router}; use std::sync::Arc; pub mod builder; @@ -39,8 +40,10 @@ pub struct NetPerf { events: NetPerfNetworkEvents, } +#[derive(Clone)] struct NetPerfState { peers: Arc, + peer_list: DashMap, //with capacity and hasher sender: Arc, } @@ -67,7 +70,114 @@ impl NetPerf { ) } - pub async fn start(mut self) { + async fn event_handler(self) { + info!( + NetworkSchema::new(&self.network_context), + "{} NetPerf Event Listener started", self.network_context + ); + + loop { + futures::select! { + maybe_event = self.events.next() => { + // Shutdown the NetPerf when this network instance shuts + // down. This happens when the `PeerManager` drops. + let event = match maybe_event { + Some(event) => event, + None => break, + }; + + match event { + Event::NewPeer(metadata) => { + self.network_interface.app_data().insert( + metadata.remote_peer_id, + HealthCheckData::new(self.round) + ); + } + Event::LostPeer(metadata) => { + self.network_interface.app_data().remove( + &metadata.remote_peer_id + ); + } + /* + Event::RpcRequest(peer_id, msg, protocol, res_tx) => { + match msg { + NetPerfMsg::Ping(ping) => self.handle_ping_request(peer_id, ping, protocol, res_tx), + _ => { + warn!( + SecurityEvent::InvalidNetPerfMsg, + NetworkSchema::new(&self.network_context).remote_peer(&peer_id), + rpc_message = msg, + "{} Unexpected RPC message from {}", + self.network_context, + peer_id + ); + debug_assert!(false, "Unexpected rpc request"); + } + }; + } + Event::Message(peer_id, msg) => { + error!( + SecurityEvent::InvalidNetworkEventHC, + NetworkSchema::new(&self.network_context).remote_peer(&peer_id), + "{} Unexpected direct send from {} msg {:?}", + self.network_context, + peer_id, + msg, + ); + debug_assert!(false, "Unexpected network event"); + } + */ + } + } + _ = ticker.select_next_some() => { + self.round += 1; + let connected = self.network_interface.connected_peers(); + if connected.is_empty() { + trace!( + NetworkSchema::new(&self.network_context), + round = self.round, + "{} No connected peer to ping round: {}", + self.network_context, + self.round + ); + continue + } + + for peer_id in connected { + let nonce = self.rng.gen::(); + trace!( + NetworkSchema::new(&self.network_context), + round = self.round, + "{} Will ping: {} for round: {} nonce: {}", + self.network_context, + peer_id.short_str(), + self.round, + nonce + ); + + tick_handlers.push(Self::ping_peer( + self.network_context, + self.network_interface.sender(), + peer_id, + self.round, + nonce, + self.ping_timeout, + )); + } + } + res = tick_handlers.select_next_some() => { + let (peer_id, round, nonce, ping_result) = res; + self.handle_ping_response(peer_id, round, nonce, ping_result).await; + } + } + } + warn!( + NetworkSchema::new(&self.network_context), + "{} NetPerf event listener terminated", self.network_context + ); + } + + pub async fn start(self) { let state = NetPerfState { peers: self.peers.clone(), sender: self.sender.clone(), @@ -75,7 +185,7 @@ impl NetPerf { let app = Router::new() .route("/", get(usage_handler)) - .route("/peers", get(get_peers)); + .route("/peers", get(get_peers).layer(Extension(state))); // run it with hyper on localhost:9107 axum::Server::bind(&"0.0.0.0:9107".parse().unwrap()) @@ -89,6 +199,30 @@ async fn usage_handler() -> &'static str { "Usage: curl 127.0.0.01:9107/peers" } -async fn get_peers() -> &'static str { +//#TODO: Json output +async fn get_peers(Extension(state): Extension) -> &'static str { + let connected = self.network_interface.connected_peers(); + if connected.is_empty() { + trace!( + NetworkSchema::new(&self.network_context), + round = self.round, + "{} No connected peer to ping round: {}", + self.network_context, + self.round + ); + } + + for peer_id in connected { + let nonce = self.rng.gen::(); + trace!( + NetworkSchema::new(&self.network_context), + round = self.round, + "{} Will ping: {} for round: {} nonce: {}", + self.network_context, + peer_id.short_str(), + self.round, + nonce + ); + } "Usage: curl 127.0.0.01:9107/peers" } From b904a3815d56b2e5d496c430dac3e64c66b2b3f1 Mon Sep 17 00:00:00 2001 From: Alex Markuze Date: Mon, 26 Dec 2022 18:21:05 +0200 Subject: [PATCH 06/23] [Network] AptosPerf event loop --- Cargo.lock | 1 + network/Cargo.toml | 1 + network/src/application/netperf/mod.rs | 181 +++++++++---------------- 3 files changed, 69 insertions(+), 114 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a8c6a31b1baad..fe2b57ecf0c4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1614,6 +1614,7 @@ dependencies = [ "axum", "bcs 0.1.4 (git+https://github.com/aptos-labs/bcs.git?rev=d31fab9d81748e2594be5cd5cdf845786a30562d)", "bytes 1.2.1", + "dashmap", "futures", "futures-util", "hex", diff --git a/network/Cargo.toml b/network/Cargo.toml index ffdad5712b6c7..73ad30dad4c24 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -36,6 +36,7 @@ async-trait = { workspace = true } axum = { workspace = true } bcs = { workspace = true } bytes = { workspace = true } +dashmap = {worksapce = true } futures = { workspace = true } futures-util = { workspace = true } hex = { workspace = true } diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs index 0fb64a5fd25e4..20c25fd18b592 100644 --- a/network/src/application/netperf/mod.rs +++ b/network/src/application/netperf/mod.rs @@ -8,6 +8,7 @@ //! use crate::application::storage::PeerMetadataStorage; +use crate::transport::ConnectionMetadata; use crate::{ application::netperf::interface::{NetPerfNetworkEvents, NetPerfNetworkSender}, constants::NETWORK_CHANNEL_SIZE, @@ -27,7 +28,12 @@ use crate::{ use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::network_id::{NetworkContext, PeerNetworkId}; use aptos_logger::prelude::*; -use axum::{routing::get, Extension, Router}; +use aptos_types::account_address::AccountAddress; +use aptos_types::PeerId; +use axum::{routing::get, Extension, Json, Router}; +use dashmap::DashMap; +use futures::StreamExt; +use serde::Serialize; use std::sync::Arc; pub mod builder; @@ -36,14 +42,23 @@ mod interface; pub struct NetPerf { network_context: NetworkContext, peers: Arc, + peer_list: Arc>, //with capacity and hasher sender: Arc, events: NetPerfNetworkEvents, } +struct PeerNetPerfStat {} + +impl PeerNetPerfStat { + pub fn new(_md: ConnectionMetadata) -> Self { + PeerNetPerfStat {} + } +} + #[derive(Clone)] struct NetPerfState { - peers: Arc, - peer_list: DashMap, //with capacity and hasher + peers: Arc, //TODO: DO I need this? + peer_list: Arc>, //with capacity and hasher sender: Arc, } @@ -57,6 +72,7 @@ impl NetPerf { NetPerf { network_context, peers, + peer_list: Arc::new(DashMap::with_capacity(128)), sender, events, } @@ -70,12 +86,22 @@ impl NetPerf { ) } - async fn event_handler(self) { + fn net_perf_state(&self) -> NetPerfState { + NetPerfState { + peers: self.peers.clone(), + sender: self.sender.clone(), + peer_list: self.peer_list.clone(), + } + } + + async fn start(mut self) { info!( NetworkSchema::new(&self.network_context), "{} NetPerf Event Listener started", self.network_context ); + spawn_named!("NetPerf Axum", start_axum(self.net_perf_state())); + loop { futures::select! { maybe_event = self.events.next() => { @@ -88,87 +114,19 @@ impl NetPerf { match event { Event::NewPeer(metadata) => { - self.network_interface.app_data().insert( + self.peer_list.insert( metadata.remote_peer_id, - HealthCheckData::new(self.round) + PeerNetPerfStat::new(metadata) ); } Event::LostPeer(metadata) => { - self.network_interface.app_data().remove( + self.peer_list.remove( &metadata.remote_peer_id ); } - /* - Event::RpcRequest(peer_id, msg, protocol, res_tx) => { - match msg { - NetPerfMsg::Ping(ping) => self.handle_ping_request(peer_id, ping, protocol, res_tx), - _ => { - warn!( - SecurityEvent::InvalidNetPerfMsg, - NetworkSchema::new(&self.network_context).remote_peer(&peer_id), - rpc_message = msg, - "{} Unexpected RPC message from {}", - self.network_context, - peer_id - ); - debug_assert!(false, "Unexpected rpc request"); - } - }; - } - Event::Message(peer_id, msg) => { - error!( - SecurityEvent::InvalidNetworkEventHC, - NetworkSchema::new(&self.network_context).remote_peer(&peer_id), - "{} Unexpected direct send from {} msg {:?}", - self.network_context, - peer_id, - msg, - ); - debug_assert!(false, "Unexpected network event"); - } - */ - } - } - _ = ticker.select_next_some() => { - self.round += 1; - let connected = self.network_interface.connected_peers(); - if connected.is_empty() { - trace!( - NetworkSchema::new(&self.network_context), - round = self.round, - "{} No connected peer to ping round: {}", - self.network_context, - self.round - ); - continue - } - - for peer_id in connected { - let nonce = self.rng.gen::(); - trace!( - NetworkSchema::new(&self.network_context), - round = self.round, - "{} Will ping: {} for round: {} nonce: {}", - self.network_context, - peer_id.short_str(), - self.round, - nonce - ); - - tick_handlers.push(Self::ping_peer( - self.network_context, - self.network_interface.sender(), - peer_id, - self.round, - nonce, - self.ping_timeout, - )); + _ => {/* Currently ignore all*/} } } - res = tick_handlers.select_next_some() => { - let (peer_id, round, nonce, ping_result) = res; - self.handle_ping_response(peer_id, round, nonce, ping_result).await; - } } } warn!( @@ -176,53 +134,48 @@ impl NetPerf { "{} NetPerf event listener terminated", self.network_context ); } +} - pub async fn start(self) { - let state = NetPerfState { - peers: self.peers.clone(), - sender: self.sender.clone(), - }; - - let app = Router::new() - .route("/", get(usage_handler)) - .route("/peers", get(get_peers).layer(Extension(state))); +async fn start_axum(state: NetPerfState) { + let app = Router::new() + .route("/", get(usage_handler)) + .route("/peers", get(get_peers).layer(Extension(state))); - // run it with hyper on localhost:9107 - axum::Server::bind(&"0.0.0.0:9107".parse().unwrap()) - .serve(app.into_make_service()) - .await - .unwrap(); - } + // run it with hyper on localhost:9107 + axum::Server::bind(&"0.0.0.0:9107".parse().unwrap()) + .serve(app.into_make_service()) + .await + .unwrap(); } async fn usage_handler() -> &'static str { "Usage: curl 127.0.0.01:9107/peers" } -//#TODO: Json output -async fn get_peers(Extension(state): Extension) -> &'static str { - let connected = self.network_interface.connected_peers(); - if connected.is_empty() { - trace!( - NetworkSchema::new(&self.network_context), - round = self.round, - "{} No connected peer to ping round: {}", - self.network_context, - self.round - ); +#[derive(Serialize)] +struct PeerList { + len: usize, + peers: Vec, +} + +impl PeerList { + pub fn new(len: usize) -> Self { + PeerList { + len, + peers: Vec::with_capacity(len), + } } +} - for peer_id in connected { - let nonce = self.rng.gen::(); - trace!( - NetworkSchema::new(&self.network_context), - round = self.round, - "{} Will ping: {} for round: {} nonce: {}", - self.network_context, - peer_id.short_str(), - self.round, - nonce - ); +async fn get_peers(Extension(state): Extension) -> Json { + let mut out = PeerList::new(state.peer_list.len()); + + let connected = state.peer_list.iter(); + + for peer in connected { + //I hate cloning, but lets worry about lifetimes later + out.peers.push(peer.key().to_owned()); } - "Usage: curl 127.0.0.01:9107/peers" + + Json(out) } From ddba1c0078f552a69bc131effbcff020ec54dcbf Mon Sep 17 00:00:00 2001 From: Alex Markuze Date: Mon, 26 Dec 2022 20:16:52 +0200 Subject: [PATCH 07/23] [Netowrk] AptosPerf randomize_port for smoketest --- config/src/config/mod.rs | 2 ++ config/src/config/network_config.rs | 17 +++++++++++++++-- network/builder/src/builder.rs | 7 ++++--- network/src/application/netperf/builder.rs | 2 ++ network/src/application/netperf/mod.rs | 17 ++++++++++++----- 5 files changed, 35 insertions(+), 10 deletions(-) diff --git a/config/src/config/mod.rs b/config/src/config/mod.rs index 913a89217559c..1de45b857acd3 100644 --- a/config/src/config/mod.rs +++ b/config/src/config/mod.rs @@ -434,10 +434,12 @@ impl NodeConfig { if let Some(network) = self.validator_network.as_mut() { network.listen_address = crate::utils::get_available_port_in_multiaddr(true); + network.randomize_ports(); } for network in self.full_node_networks.iter_mut() { network.listen_address = crate::utils::get_available_port_in_multiaddr(true); + network.randomize_ports(); } } diff --git a/config/src/config/network_config.rs b/config/src/config/network_config.rs index 9bbfc54470f65..cb3f87c1edea1 100644 --- a/config/src/config/network_config.rs +++ b/config/src/config/network_config.rs @@ -55,6 +55,7 @@ pub const INBOUND_TCP_TX_BUFFER_SIZE: u32 = 512 * 1024; // 1MB use a bigger spoo pub const OUTBOUND_TCP_RX_BUFFER_SIZE: u32 = 3 * 1024 * 1024; // 3MB ~6MB/s with 500ms latency pub const OUTBOUND_TCP_TX_BUFFER_SIZE: u32 = 1024 * 1024; // 1MB use a bigger spoon pub const ENABLE_APTOS_NETPERF_CLIENT: bool = true; +pub const DEFAULT_APTOS_NETPERF_CLIENT_PORT: u16 = 9107; #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[serde(default, deny_unknown_fields)] @@ -81,7 +82,7 @@ pub struct NetworkConfig { // Select this to enforce that both peers should authenticate each other, otherwise // authentication only occurs for outgoing connections. pub mutual_authentication: bool, - pub enable_netperf_client: bool, + pub netperf_client_port: Option, pub network_id: NetworkId, pub runtime_threads: Option, pub inbound_rx_buffer_size_bytes: Option, @@ -122,7 +123,19 @@ impl Default for NetworkConfig { } } +fn netperf_client_port(enabled: bool) -> Option { + if enabled { + Some(DEFAULT_APTOS_NETPERF_CLIENT_PORT) + } else { + None + } +} + impl NetworkConfig { + pub fn randomize_ports(&mut self) { + self.netperf_client_port = Some(utils::get_available_port()); + } + pub fn network_with_id(network_id: NetworkId) -> NetworkConfig { let mutual_authentication = network_id.is_validator_network(); let mut config = Self { @@ -131,7 +144,7 @@ impl NetworkConfig { identity: Identity::None, listen_address: "/ip4/0.0.0.0/tcp/6180".parse().unwrap(), mutual_authentication, - enable_netperf_client: ENABLE_APTOS_NETPERF_CLIENT, + netperf_client_port: netperf_client_port(ENABLE_APTOS_NETPERF_CLIENT), network_id, runtime_threads: None, seed_addrs: HashMap::new(), diff --git a/network/builder/src/builder.rs b/network/builder/src/builder.rs index 094e01e45ce41..f45c71520384e 100644 --- a/network/builder/src/builder.rs +++ b/network/builder/src/builder.rs @@ -231,8 +231,8 @@ impl NetworkBuilder { config.ping_failures_tolerated, ); - if config.enable_netperf_client == true { - network_builder.add_network_perf(); + if let Some(netperf_port) = config.netperf_client_port { + network_builder.add_network_perf(netperf_port); } // Always add a connectivity manager to keep track of known peers @@ -444,13 +444,14 @@ impl NetworkBuilder { } /// Add a Aptos NetPerf to the network. - fn add_network_perf(&mut self) -> &mut Self { + fn add_network_perf(&mut self, netperf_port: u16) -> &mut Self { let (netperf_tx, netperf_rx) = self.add_p2p_service(&NetPerf::network_endpoint_config()); self.netperf_builder = Some(NetPerfBuilder::new( self.network_context(), self.peer_metadata_storage.clone(), Arc::new(netperf_tx), netperf_rx, + netperf_port, )); debug!( NetworkSchema::new(&self.network_context), diff --git a/network/src/application/netperf/builder.rs b/network/src/application/netperf/builder.rs index d33f3ffbae70a..b42f89b2c38c0 100644 --- a/network/src/application/netperf/builder.rs +++ b/network/src/application/netperf/builder.rs @@ -23,12 +23,14 @@ impl NetPerfBuilder { peer_metadata_storage: Arc, network_tx: Arc, network_rx: NetPerfNetworkEvents, + netperf_port: u16, ) -> Self { let service = NetPerf::new( network_context, peer_metadata_storage, network_tx, network_rx, + netperf_port, ); Self { service: Some(service), diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs index 20c25fd18b592..5d893b440a5a1 100644 --- a/network/src/application/netperf/mod.rs +++ b/network/src/application/netperf/mod.rs @@ -45,6 +45,7 @@ pub struct NetPerf { peer_list: Arc>, //with capacity and hasher sender: Arc, events: NetPerfNetworkEvents, + netperf_port: u16, } struct PeerNetPerfStat {} @@ -68,6 +69,7 @@ impl NetPerf { peers: Arc, sender: Arc, events: NetPerfNetworkEvents, + netperf_port: u16, ) -> Self { NetPerf { network_context, @@ -75,6 +77,7 @@ impl NetPerf { peer_list: Arc::new(DashMap::with_capacity(128)), sender, events, + netperf_port, } } @@ -100,7 +103,10 @@ impl NetPerf { "{} NetPerf Event Listener started", self.network_context ); - spawn_named!("NetPerf Axum", start_axum(self.net_perf_state())); + spawn_named!( + "NetPerf Axum", + start_axum(self.net_perf_state(), self.netperf_port) + ); loop { futures::select! { @@ -136,13 +142,15 @@ impl NetPerf { } } -async fn start_axum(state: NetPerfState) { +async fn start_axum(state: NetPerfState, netperf_port: u16) { let app = Router::new() .route("/", get(usage_handler)) .route("/peers", get(get_peers).layer(Extension(state))); - // run it with hyper on localhost:9107 - axum::Server::bind(&"0.0.0.0:9107".parse().unwrap()) + let addr = std::net::SocketAddr::from(([0, 0, 0, 0], netperf_port)); + + // run it with hyper on netperf_port + axum::Server::bind(&addr) .serve(app.into_make_service()) .await .unwrap(); @@ -173,7 +181,6 @@ async fn get_peers(Extension(state): Extension) -> Json let connected = state.peer_list.iter(); for peer in connected { - //I hate cloning, but lets worry about lifetimes later out.peers.push(peer.key().to_owned()); } From 35e53cb901f03660800310e3909e0214148096e6 Mon Sep 17 00:00:00 2001 From: Alex Markuze Date: Mon, 26 Dec 2022 21:04:02 +0200 Subject: [PATCH 08/23] [Network] prefferred port hack --- network/builder/src/builder.rs | 12 +++++++----- network/src/application/netperf/mod.rs | 24 +++++++++++++++++++----- testsuite/smoke-test/src/network.rs | 5 ++++- 3 files changed, 30 insertions(+), 11 deletions(-) diff --git a/network/builder/src/builder.rs b/network/builder/src/builder.rs index f45c71520384e..98db04c4ba7a7 100644 --- a/network/builder/src/builder.rs +++ b/network/builder/src/builder.rs @@ -325,11 +325,13 @@ impl NetworkBuilder { } if let Some(netperf_builder) = self.netperf_builder.as_mut() { - netperf_builder.start(executor); - debug!( - NetworkSchema::new(&self.network_context), - "{} Started Aptos NetPerf", self.network_context - ); + if self.network_context.role() == RoleType::Validator { + netperf_builder.start(executor); + debug!( + NetworkSchema::new(&self.network_context), + "{} Started Aptos NetPerf", self.network_context + ); + } } if let Some(discovery_listeners) = self.discovery_listeners.take() { diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs index 5d893b440a5a1..e2ee5e337ceb7 100644 --- a/network/src/application/netperf/mod.rs +++ b/network/src/application/netperf/mod.rs @@ -34,6 +34,7 @@ use axum::{routing::get, Extension, Json, Router}; use dashmap::DashMap; use futures::StreamExt; use serde::Serialize; +use std::fs::OpenOptions; use std::sync::Arc; pub mod builder; @@ -98,15 +99,13 @@ impl NetPerf { } async fn start(mut self) { + let port = preferred_axum_port(self.netperf_port); info!( NetworkSchema::new(&self.network_context), - "{} NetPerf Event Listener started", self.network_context + "{} NetPerf Event Listener started", self.network_context, ); - spawn_named!( - "NetPerf Axum", - start_axum(self.net_perf_state(), self.netperf_port) - ); + spawn_named!("NetPerf Axum", start_axum(self.net_perf_state(), port)); loop { futures::select! { @@ -142,6 +141,21 @@ impl NetPerf { } } +fn preferred_axum_port(netperf_port: u16) -> u16 { + if netperf_port != 9107 { + let _ = OpenOptions::new() + .write(true) + .create_new(true) + .open("/tmp/9107.tmp"); + + let _ = OpenOptions::new() + .write(true) + .create(true) + .open(format!("/tmp/{}.tmp", netperf_port)); + } + return netperf_port; +} + async fn start_axum(state: NetPerfState, netperf_port: u16) { let app = Router::new() .route("/", get(usage_handler)) diff --git a/testsuite/smoke-test/src/network.rs b/testsuite/smoke-test/src/network.rs index de477018b1feb..54df6fc4ddbf2 100644 --- a/testsuite/smoke-test/src/network.rs +++ b/testsuite/smoke-test/src/network.rs @@ -24,7 +24,7 @@ use std::{ #[tokio::test] async fn test_connection_limiting() { - let mut swarm = new_local_swarm_with_aptos(1).await; + let mut swarm = new_local_swarm_with_aptos(5).await; let version = swarm.versions().max().unwrap(); let validator_peer_id = swarm.validators().next().unwrap().peer_id(); @@ -130,6 +130,9 @@ async fn test_connection_limiting() { .unwrap() .unwrap_or(0) ); + while std::path::Path::new("/tmp/9107.tmp").exists() { + tokio::time::sleep(Duration::from_secs(1)).await; + } } #[tokio::test] From 9ab01846f904297141f6c730f8cf9b7403766768 Mon Sep 17 00:00:00 2001 From: Alex Markuze Date: Tue, 27 Dec 2022 21:11:50 +0200 Subject: [PATCH 09/23] [Network] AptosPerf axum post infra --- network/src/application/netperf/mod.rs | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs index e2ee5e337ceb7..a3f53aeef90c9 100644 --- a/network/src/application/netperf/mod.rs +++ b/network/src/application/netperf/mod.rs @@ -30,7 +30,13 @@ use aptos_config::network_id::{NetworkContext, PeerNetworkId}; use aptos_logger::prelude::*; use aptos_types::account_address::AccountAddress; use aptos_types::PeerId; -use axum::{routing::get, Extension, Json, Router}; +use axum::{ + extract::Query, + http::StatusCode, + response::IntoResponse, + routing::{get, post}, + Extension, Json, Router, +}; use dashmap::DashMap; use futures::StreamExt; use serde::Serialize; @@ -159,7 +165,11 @@ fn preferred_axum_port(netperf_port: u16) -> u16 { async fn start_axum(state: NetPerfState, netperf_port: u16) { let app = Router::new() .route("/", get(usage_handler)) - .route("/peers", get(get_peers).layer(Extension(state))); + .route("/peers", get(get_peers).layer(Extension(state.clone()))) + .route( + "/command", + post(parse_query).layer(Extension(state.clone())), + ); let addr = std::net::SocketAddr::from(([0, 0, 0, 0], netperf_port)); @@ -200,3 +210,10 @@ async fn get_peers(Extension(state): Extension) -> Json Json(out) } + +async fn parse_query( + Extension(state): Extension, + Query(params): Query>, +) -> impl IntoResponse { + StatusCode::OK +} From b47b998215fe72dd3a340de6fc2ae3a7775853be Mon Sep 17 00:00:00 2001 From: Alex Markuze Date: Wed, 28 Dec 2022 22:38:43 +0200 Subject: [PATCH 10/23] [AptosPerf] Command interface --- crates/channel/src/message_queues.rs | 1 + network/src/application/netperf/mod.rs | 67 ++++++++++++++++++++++++-- network/src/protocols/network/mod.rs | 1 + 3 files changed, 64 insertions(+), 5 deletions(-) diff --git a/crates/channel/src/message_queues.rs b/crates/channel/src/message_queues.rs index 02f95f851b14a..8456184b9a217 100644 --- a/crates/channel/src/message_queues.rs +++ b/crates/channel/src/message_queues.rs @@ -123,6 +123,7 @@ impl PerKeyQueue { // For example, many of our queues have a max capacity of 1024. To // handle a single rpc from a transient peer, we would end up // allocating ~ 96 b * 1024 ~ 64 Kib per queue. + //TODO: Reconsider this. Maybe true for VFN but not Validators .or_insert_with(|| VecDeque::with_capacity(1)); // Add the key to our round-robin queue if it's not already there diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs index a3f53aeef90c9..97e2fd3c9c5dc 100644 --- a/network/src/application/netperf/mod.rs +++ b/network/src/application/netperf/mod.rs @@ -10,7 +10,7 @@ use crate::application::storage::PeerMetadataStorage; use crate::transport::ConnectionMetadata; use crate::{ - application::netperf::interface::{NetPerfNetworkEvents, NetPerfNetworkSender}, + application::netperf::interface::{NetPerfMsg::*, NetPerfNetworkEvents, NetPerfNetworkSender}, constants::NETWORK_CHANNEL_SIZE, counters, error::NetworkError, @@ -29,6 +29,7 @@ use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::network_id::{NetworkContext, PeerNetworkId}; use aptos_logger::prelude::*; use aptos_types::account_address::AccountAddress; +use aptos_types::network_address::ParseError::NetworkLayerMissing; use aptos_types::PeerId; use axum::{ extract::Query, @@ -39,13 +40,18 @@ use axum::{ }; use dashmap::DashMap; use futures::StreamExt; +use futures_util::stream::FuturesUnordered; use serde::Serialize; use std::fs::OpenOptions; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; +use tokio::spawn; +use tokio::sync::mpsc::{Receiver, Sender}; pub mod builder; mod interface; +const NETPERF_COMMAND_CHANNEL_SIZE: usize = 1024; + pub struct NetPerf { network_context: NetworkContext, peers: Arc, @@ -68,6 +74,7 @@ struct NetPerfState { peers: Arc, //TODO: DO I need this? peer_list: Arc>, //with capacity and hasher sender: Arc, + tx: Sender, } impl NetPerf { @@ -92,26 +99,37 @@ impl NetPerf { pub fn network_endpoint_config() -> AppConfig { AppConfig::p2p( [ProtocolId::NetPerfRpcCompressed], - aptos_channel::Config::new(NETWORK_CHANNEL_SIZE).queue_style(QueueStyle::LIFO), + aptos_channel::Config::new(NETWORK_CHANNEL_SIZE).queue_style(QueueStyle::FIFO), ) } - fn net_perf_state(&self) -> NetPerfState { + fn net_perf_state(&self, sender: Sender) -> NetPerfState { NetPerfState { peers: self.peers.clone(), sender: self.sender.clone(), peer_list: self.peer_list.clone(), + tx: sender, } } async fn start(mut self) { let port = preferred_axum_port(self.netperf_port); + let (tx, mut rx) = + tokio::sync::mpsc::channel::(NETPERF_COMMAND_CHANNEL_SIZE); + info!( NetworkSchema::new(&self.network_context), "{} NetPerf Event Listener started", self.network_context, ); - spawn_named!("NetPerf Axum", start_axum(self.net_perf_state(), port)); + spawn_named!( + "NetPerf Axum", + start_axum(self.net_perf_state(tx.clone()), port) + ); + spawn_named!( + "NetPerf EventHandler", + netperf_comp_handler(self.net_perf_state(tx.clone()), rx) + ); loop { futures::select! { @@ -146,6 +164,10 @@ impl NetPerf { ); } } +#[derive(Clone)] +enum NetPerfCommands { + Broadcast, +} fn preferred_axum_port(netperf_port: u16) -> u16 { if netperf_port != 9107 { @@ -215,5 +237,40 @@ async fn parse_query( Extension(state): Extension, Query(params): Query>, ) -> impl IntoResponse { + spawn_named!("[NetPerf] Broadcast Task", netperf_broadcast(state.clone())); + StatusCode::OK } + +async fn netperf_comp_handler(state: NetPerfState, mut rx: Receiver) { + let mut rpc_handlers = FuturesUnordered::new(); + + loop { + tokio::select! { + opt_cmd = rx.recv() => { + match opt_cmd { + Some(cmd) => { + for peer in state.peer_list.iter() { + //TODO(AlexM): Yet another Alloc + Copy OPs. + // Best use Refs - Just ARC + rpc_handlers.push(state.sender.send_rpc( + peer.key().to_owned(), + ProtocolId::NetPerfRpcCompressed, + BlockOfBytes64K, + Duration::from_secs(5), + )); + } + } + None => break, + } + } + res = rpc_handlers.select_next_some() => {} + } + } +} + +async fn netperf_broadcast(state: NetPerfState) { + loop { + let _ = state.tx.send(NetPerfCommands::Broadcast).await; + } +} diff --git a/network/src/protocols/network/mod.rs b/network/src/protocols/network/mod.rs index c4ba7d15b6140..47447875454aa 100644 --- a/network/src/protocols/network/mod.rs +++ b/network/src/protocols/network/mod.rs @@ -338,6 +338,7 @@ impl NetworkSender { timeout: Duration, ) -> Result { // serialize request + //TODO(AlexM): In Bcast next line repeats 100 Times, ergo, Bcast should be a Network function. let req_data = protocol.to_bytes(&req_msg)?.into(); let res_data = self .peer_mgr_reqs_tx From 56347b65aa41d8c93959e4dfc0e04c91ada8d13d Mon Sep 17 00:00:00 2001 From: Alex Markuze Date: Tue, 3 Jan 2023 16:07:16 +0200 Subject: [PATCH 11/23] [ApotosPerf] NEtPerfPayload --- network/src/application/netperf/interface.rs | 14 +++++++++++++- network/src/application/netperf/mod.rs | 7 +++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/network/src/application/netperf/interface.rs b/network/src/application/netperf/interface.rs index 137939d120852..bb81ae988871e 100644 --- a/network/src/application/netperf/interface.rs +++ b/network/src/application/netperf/interface.rs @@ -19,9 +19,21 @@ use crate::{ }; use serde::{Deserialize, Serialize}; +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct NetPerfPayload { + byte: Vec, +} + +impl NetPerfPayload { + pub fn new(len :usize) -> Self { + let v = Vec::with_capacity(len); + NetPerfPayload { byte: v } + } +} + #[derive(Clone, Debug, Deserialize, Serialize)] pub enum NetPerfMsg { - BlockOfBytes64K, + BlockOfBytes(NetPerfPayload), } /// The interface from Network to NetPerf layer. /// diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs index 97e2fd3c9c5dc..c4af7f4fa6716 100644 --- a/network/src/application/netperf/mod.rs +++ b/network/src/application/netperf/mod.rs @@ -10,7 +10,7 @@ use crate::application::storage::PeerMetadataStorage; use crate::transport::ConnectionMetadata; use crate::{ - application::netperf::interface::{NetPerfMsg::*, NetPerfNetworkEvents, NetPerfNetworkSender}, + application::netperf::interface::{NetPerfMsg::*, NetPerfNetworkEvents, NetPerfNetworkSender, NetPerfPayload}, constants::NETWORK_CHANNEL_SIZE, counters, error::NetworkError, @@ -46,6 +46,7 @@ use std::fs::OpenOptions; use std::{sync::Arc, time::Duration}; use tokio::spawn; use tokio::sync::mpsc::{Receiver, Sender}; +use crate::application::netperf::interface::NetPerfMsg; pub mod builder; mod interface; @@ -250,13 +251,15 @@ async fn netperf_comp_handler(state: NetPerfState, mut rx: Receiver { match opt_cmd { Some(cmd) => { + let msg = NetPerfMsg::BlockOfBytes(NetPerfPayload::new(64 * 1024)); + for peer in state.peer_list.iter() { //TODO(AlexM): Yet another Alloc + Copy OPs. // Best use Refs - Just ARC rpc_handlers.push(state.sender.send_rpc( peer.key().to_owned(), ProtocolId::NetPerfRpcCompressed, - BlockOfBytes64K, + msg.clone(), Duration::from_secs(5), )); } From 88fcbaaa327c6b4571447e49199347810c5fcc26 Mon Sep 17 00:00:00 2001 From: Alex Markuze Date: Tue, 3 Jan 2023 16:31:34 +0200 Subject: [PATCH 12/23] [AptosPerf] DirectSend --- network/src/application/netperf/mod.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs index c4af7f4fa6716..3d70610139109 100644 --- a/network/src/application/netperf/mod.rs +++ b/network/src/application/netperf/mod.rs @@ -99,7 +99,7 @@ impl NetPerf { /// Configuration for the network endpoints to support NetPerf. pub fn network_endpoint_config() -> AppConfig { AppConfig::p2p( - [ProtocolId::NetPerfRpcCompressed], + [ProtocolId::NetPerfDirectSendCompressed, ProtocolId::NetPerfRpcCompressed], aptos_channel::Config::new(NETWORK_CHANNEL_SIZE).queue_style(QueueStyle::FIFO), ) } @@ -273,7 +273,13 @@ async fn netperf_comp_handler(state: NetPerfState, mut rx: Receiver Date: Tue, 3 Jan 2023 18:01:02 +0200 Subject: [PATCH 13/23] rebase --- network/builder/src/builder.rs | 3 +- network/src/application/netperf/interface.rs | 11 +------ network/src/application/netperf/mod.rs | 31 ++++++++------------ 3 files changed, 15 insertions(+), 30 deletions(-) diff --git a/network/builder/src/builder.rs b/network/builder/src/builder.rs index 98db04c4ba7a7..40baf67a8f077 100644 --- a/network/builder/src/builder.rs +++ b/network/builder/src/builder.rs @@ -24,7 +24,6 @@ use aptos_infallible::RwLock; use aptos_logger::prelude::*; use aptos_netcore::transport::tcp::TCPBufferCfg; use aptos_network::application::netperf::builder::NetPerfBuilder; -use aptos_network::protocols::wire::handshake::v1::ProtocolId::NetPerfDirectSendCompressed; use aptos_network::{ application::netperf::NetPerf, application::storage::PeerMetadataStorage, @@ -447,7 +446,7 @@ impl NetworkBuilder { /// Add a Aptos NetPerf to the network. fn add_network_perf(&mut self, netperf_port: u16) -> &mut Self { - let (netperf_tx, netperf_rx) = self.add_p2p_service(&NetPerf::network_endpoint_config()); + let (netperf_tx, netperf_rx) = self.add_client_and_service(&NetPerf::network_endpoint_config()); self.netperf_builder = Some(NetPerfBuilder::new( self.network_context(), self.peer_metadata_storage.clone(), diff --git a/network/src/application/netperf/interface.rs b/network/src/application/netperf/interface.rs index bb81ae988871e..95d8ca919ce18 100644 --- a/network/src/application/netperf/interface.rs +++ b/network/src/application/netperf/interface.rs @@ -2,20 +2,11 @@ // SPDX-License-Identifier: Apache-2.0 // use crate::{ - application::interface::NetworkInterface, - constants::NETWORK_CHANNEL_SIZE, - counters, - error::NetworkError, - logging::NetworkSchema, - peer_manager::{ConnectionRequestSender, PeerManagerRequestSender}, protocols::{ network::{ - AppConfig, ApplicationNetworkSender, Event, NetworkEvents, NetworkSender, - NewNetworkSender, + NetworkEvents, NetworkSender, }, - rpc::error::RpcError, }, - ProtocolId, }; use serde::{Deserialize, Serialize}; diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs index 3d70610139109..8d47f01663b02 100644 --- a/network/src/application/netperf/mod.rs +++ b/network/src/application/netperf/mod.rs @@ -10,26 +10,19 @@ use crate::application::storage::PeerMetadataStorage; use crate::transport::ConnectionMetadata; use crate::{ - application::netperf::interface::{NetPerfMsg::*, NetPerfNetworkEvents, NetPerfNetworkSender, NetPerfPayload}, + application::netperf::interface::{NetPerfNetworkEvents, NetPerfNetworkSender, NetPerfPayload}, constants::NETWORK_CHANNEL_SIZE, - counters, - error::NetworkError, logging::NetworkSchema, - peer_manager::{ConnectionRequestSender, PeerManagerRequestSender}, protocols::{ network::{ - AppConfig, ApplicationNetworkSender, Event, NetworkEvents, NetworkSender, - NewNetworkSender, + Event, }, - rpc::error::RpcError, }, ProtocolId, }; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; -use aptos_config::network_id::{NetworkContext, PeerNetworkId}; +use aptos_config::network_id::{NetworkContext}; use aptos_logger::prelude::*; -use aptos_types::account_address::AccountAddress; -use aptos_types::network_address::ParseError::NetworkLayerMissing; use aptos_types::PeerId; use axum::{ extract::Query, @@ -44,9 +37,9 @@ use futures_util::stream::FuturesUnordered; use serde::Serialize; use std::fs::OpenOptions; use std::{sync::Arc, time::Duration}; -use tokio::spawn; use tokio::sync::mpsc::{Receiver, Sender}; use crate::application::netperf::interface::NetPerfMsg; +use crate::protocols::network::NetworkApplicationConfig; pub mod builder; mod interface; @@ -70,6 +63,7 @@ impl PeerNetPerfStat { } } +#[allow(dead_code)] #[derive(Clone)] struct NetPerfState { peers: Arc, //TODO: DO I need this? @@ -97,8 +91,8 @@ impl NetPerf { } /// Configuration for the network endpoints to support NetPerf. - pub fn network_endpoint_config() -> AppConfig { - AppConfig::p2p( + pub fn network_endpoint_config() -> NetworkApplicationConfig { + NetworkApplicationConfig::client_and_service( [ProtocolId::NetPerfDirectSendCompressed, ProtocolId::NetPerfRpcCompressed], aptos_channel::Config::new(NETWORK_CHANNEL_SIZE).queue_style(QueueStyle::FIFO), ) @@ -115,7 +109,7 @@ impl NetPerf { async fn start(mut self) { let port = preferred_axum_port(self.netperf_port); - let (tx, mut rx) = + let (tx, rx) = tokio::sync::mpsc::channel::(NETPERF_COMMAND_CHANNEL_SIZE); info!( @@ -165,6 +159,7 @@ impl NetPerf { ); } } +#[allow(dead_code)] #[derive(Clone)] enum NetPerfCommands { Broadcast, @@ -236,7 +231,7 @@ async fn get_peers(Extension(state): Extension) -> Json async fn parse_query( Extension(state): Extension, - Query(params): Query>, + Query(_params): Query>, ) -> impl IntoResponse { spawn_named!("[NetPerf] Broadcast Task", netperf_broadcast(state.clone())); @@ -250,7 +245,7 @@ async fn netperf_comp_handler(state: NetPerfState, mut rx: Receiver { match opt_cmd { - Some(cmd) => { + Some(_cmd) => { let msg = NetPerfMsg::BlockOfBytes(NetPerfPayload::new(64 * 1024)); for peer in state.peer_list.iter() { @@ -267,7 +262,7 @@ async fn netperf_comp_handler(state: NetPerfState, mut rx: Receiver break, } } - res = rpc_handlers.select_next_some() => {} + _res = rpc_handlers.select_next_some() => {} } } } @@ -276,7 +271,7 @@ async fn netperf_broadcast(state: NetPerfState) { let msg = NetPerfMsg::BlockOfBytes(NetPerfPayload::new(64 * 1024)); for peer in state.peer_list.iter() { - state.sender.send_to( + let _rc = state.sender.send_to( peer.key().to_owned(), ProtocolId::NetPerfDirectSendCompressed, msg.clone(), From 9ea60f2466b47f2b1cd6bf58fde6681254a981ac Mon Sep 17 00:00:00 2001 From: Alex Markuze Date: Tue, 3 Jan 2023 18:03:54 +0200 Subject: [PATCH 14/23] supress unused warnings on QS code --- consensus/src/payload_manager.rs | 1 + consensus/src/state_computer.rs | 2 ++ 2 files changed, 3 insertions(+) diff --git a/consensus/src/payload_manager.rs b/consensus/src/payload_manager.rs index a4fedfed23696..43f19180bd68f 100644 --- a/consensus/src/payload_manager.rs +++ b/consensus/src/payload_manager.rs @@ -18,6 +18,7 @@ use tokio::sync::oneshot; /// Responsible to extract the transactions out of the payload and notify QuorumStore about commits. /// If QuorumStore is enabled, has to ask BatchReader for the transaction behind the proofs of availability in the payload. +#[allow(dead_code)] pub enum PayloadManager { DirectMempool, InQuorumStore(BatchReader, Mutex>), diff --git a/consensus/src/state_computer.rs b/consensus/src/state_computer.rs index c069ded6adae3..04bc2dba370ba 100644 --- a/consensus/src/state_computer.rs +++ b/consensus/src/state_computer.rs @@ -37,10 +37,12 @@ type NotificationType = ( Vec, ); +#[allow(dead_code)] type CommitType = (u64, Round, Vec); /// Basic communication with the Execution module; /// implements StateComputer traits. +#[allow(dead_code)] pub struct ExecutionProxy { executor: Arc, txn_notifier: Arc, From 4e789b626c618b142ca317c62f005510b4e1420e Mon Sep 17 00:00:00 2001 From: Alex Markuze Date: Tue, 3 Jan 2023 18:50:19 +0200 Subject: [PATCH 15/23] broadcast_loop --- network/src/application/netperf/mod.rs | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs index 8d47f01663b02..eb5eb4a4c189c 100644 --- a/network/src/application/netperf/mod.rs +++ b/network/src/application/netperf/mod.rs @@ -268,13 +268,25 @@ async fn netperf_comp_handler(state: NetPerfState, mut rx: Receiver Date: Tue, 3 Jan 2023 19:18:01 +0200 Subject: [PATCH 16/23] [AptosPerf] RX side --- network/src/application/netperf/mod.rs | 30 +++++++++++++++----------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs index eb5eb4a4c189c..10ea537bcf824 100644 --- a/network/src/application/netperf/mod.rs +++ b/network/src/application/netperf/mod.rs @@ -7,21 +7,19 @@ //! and simplify network-related performance profiling and debugging //! +use crate::application::netperf::interface::NetPerfMsg; use crate::application::storage::PeerMetadataStorage; +use crate::protocols::network::NetworkApplicationConfig; use crate::transport::ConnectionMetadata; use crate::{ application::netperf::interface::{NetPerfNetworkEvents, NetPerfNetworkSender, NetPerfPayload}, constants::NETWORK_CHANNEL_SIZE, logging::NetworkSchema, - protocols::{ - network::{ - Event, - }, - }, + protocols::network::Event, ProtocolId, }; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; -use aptos_config::network_id::{NetworkContext}; +use aptos_config::network_id::NetworkContext; use aptos_logger::prelude::*; use aptos_types::PeerId; use axum::{ @@ -38,8 +36,6 @@ use serde::Serialize; use std::fs::OpenOptions; use std::{sync::Arc, time::Duration}; use tokio::sync::mpsc::{Receiver, Sender}; -use crate::application::netperf::interface::NetPerfMsg; -use crate::protocols::network::NetworkApplicationConfig; pub mod builder; mod interface; @@ -93,7 +89,10 @@ impl NetPerf { /// Configuration for the network endpoints to support NetPerf. pub fn network_endpoint_config() -> NetworkApplicationConfig { NetworkApplicationConfig::client_and_service( - [ProtocolId::NetPerfDirectSendCompressed, ProtocolId::NetPerfRpcCompressed], + [ + ProtocolId::NetPerfDirectSendCompressed, + ProtocolId::NetPerfRpcCompressed, + ], aptos_channel::Config::new(NETWORK_CHANNEL_SIZE).queue_style(QueueStyle::FIFO), ) } @@ -109,8 +108,7 @@ impl NetPerf { async fn start(mut self) { let port = preferred_axum_port(self.netperf_port); - let (tx, rx) = - tokio::sync::mpsc::channel::(NETPERF_COMMAND_CHANNEL_SIZE); + let (tx, rx) = tokio::sync::mpsc::channel::(NETPERF_COMMAND_CHANNEL_SIZE); info!( NetworkSchema::new(&self.network_context), @@ -148,6 +146,14 @@ impl NetPerf { &metadata.remote_peer_id ); } + Event::Message(peer_id, msg) => { + match msg { + NetPerfMsg::BlockOfBytes(bytes) => {} + _ => {} + + } + + } _ => {/* Currently ignore all*/} } } @@ -282,7 +288,7 @@ async fn netperf_broadcast(state: NetPerfState) { ); if let Err(_) = rc { should_yield = true - }//else update peer counters + } //else update peer counters } if should_yield == true { break; From 81668c21cd216d40726e84cdbdc3fdfa47ad63d7 Mon Sep 17 00:00:00 2001 From: Alex Markuze Date: Tue, 3 Jan 2023 19:48:02 +0200 Subject: [PATCH 17/23] [APtosPerf] use send_to_many --- network/src/application/netperf/mod.rs | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs index 10ea537bcf824..003f8de4e9026 100644 --- a/network/src/application/netperf/mod.rs +++ b/network/src/application/netperf/mod.rs @@ -239,7 +239,7 @@ async fn parse_query( Extension(state): Extension, Query(_params): Query>, ) -> impl IntoResponse { - spawn_named!("[NetPerf] Broadcast Task", netperf_broadcast(state.clone())); + spawn_named!("[NetPerf] Brodcast Task", netperf_broadcast(state.clone())); StatusCode::OK } @@ -278,18 +278,17 @@ async fn netperf_broadcast(state: NetPerfState) { let msg = NetPerfMsg::BlockOfBytes(NetPerfPayload::new(64 * 1024)); loop { - for peer in state.peer_list.iter() { - /* TODO(AlexM): This current implementation has a redundant Copy - consider Bytes*/ - let rc = state.sender.send_to( - peer.key().to_owned(), - ProtocolId::NetPerfDirectSendCompressed, - msg.clone(), - ); - if let Err(_) = rc { - should_yield = true - } //else update peer counters - } + /* TODO(AlexM): Better Fine grained controll with send_to. + * Its interesting to see which of the validr queus gets full. + * */ + let rc = state.sender.send_to_many( + state.peer_list.iter().map(|entry| entry.key().to_owned()), + ProtocolId::NetPerfDirectSendCompressed, + msg.clone(), + ); + if let Err(_) = rc { + should_yield = true + } //else update peer counters if should_yield == true { break; } From e51e3261480cc526060f13517254c552e173b275 Mon Sep 17 00:00:00 2001 From: Alex Markuze Date: Tue, 3 Jan 2023 19:53:25 +0200 Subject: [PATCH 18/23] redundant len in PeerList --- network/src/application/netperf/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs index 003f8de4e9026..5158dd78e1752 100644 --- a/network/src/application/netperf/mod.rs +++ b/network/src/application/netperf/mod.rs @@ -210,14 +210,12 @@ async fn usage_handler() -> &'static str { #[derive(Serialize)] struct PeerList { - len: usize, peers: Vec, } impl PeerList { pub fn new(len: usize) -> Self { PeerList { - len, peers: Vec::with_capacity(len), } } From 14831d75fe8074cbb6e57b0b2e433fb47ea7c2c5 Mon Sep 17 00:00:00 2001 From: Alex Markuze Date: Tue, 3 Jan 2023 20:13:06 +0200 Subject: [PATCH 19/23] [AptosPerf] set size + comments --- network/src/application/netperf/interface.rs | 15 ++++++--------- network/src/application/netperf/mod.rs | 19 ++++++++++++++----- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/network/src/application/netperf/interface.rs b/network/src/application/netperf/interface.rs index 95d8ca919ce18..c941f1fb264c3 100644 --- a/network/src/application/netperf/interface.rs +++ b/network/src/application/netperf/interface.rs @@ -1,13 +1,7 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 // -use crate::{ - protocols::{ - network::{ - NetworkEvents, NetworkSender, - }, - }, -}; +use crate::protocols::network::{NetworkEvents, NetworkSender}; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Deserialize, Serialize)] @@ -16,8 +10,11 @@ pub struct NetPerfPayload { } impl NetPerfPayload { - pub fn new(len :usize) -> Self { - let v = Vec::with_capacity(len); + pub fn new(len: usize) -> Self { + let mut v = Vec::with_capacity(len); + unsafe { + v.set_len(len); + } NetPerfPayload { byte: v } } } diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs index 5158dd78e1752..ff1227db7218f 100644 --- a/network/src/application/netperf/mod.rs +++ b/network/src/application/netperf/mod.rs @@ -146,10 +146,13 @@ impl NetPerf { &metadata.remote_peer_id ); } - Event::Message(peer_id, msg) => { + Event::Message(_peer_id, msg) => { match msg { - NetPerfMsg::BlockOfBytes(bytes) => {} - _ => {} + NetPerfMsg::BlockOfBytes(_bytes) => { + /* maybe add dedicated counters? but network_application_{out/in}bound_traffic + * seems to have us coverred + * */ + } } @@ -276,8 +279,10 @@ async fn netperf_broadcast(state: NetPerfState) { let msg = NetPerfMsg::BlockOfBytes(NetPerfPayload::new(64 * 1024)); loop { - /* TODO(AlexM): Better Fine grained controll with send_to. - * Its interesting to see which of the validr queus gets full. + /* TODO(AlexM): + * 1. Fine grained controll with send_to. + * Its interesting to see which of the validator queus gets filled. + * 2. msg.clone() is a disaster * */ let rc = state.sender.send_to_many( state.peer_list.iter().map(|entry| entry.key().to_owned()), @@ -287,6 +292,10 @@ async fn netperf_broadcast(state: NetPerfState) { if let Err(_) = rc { should_yield = true } //else update peer counters + /* maybe add dedicated counters? but network_application_{out/in}bound_traffic + * seems to have us coverred + * */ + if should_yield == true { break; } From 68abf570bd5db94e202b29016989a8ef0e1500fb Mon Sep 17 00:00:00 2001 From: Alex Markuze Date: Tue, 3 Jan 2023 20:26:48 +0200 Subject: [PATCH 20/23] [AptosPerf] adding COUNTERS --- network/src/application/netperf/mod.rs | 7 +++++-- network/src/counters.rs | 22 +++++++++++++++------- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs index ff1227db7218f..f80e457f81219 100644 --- a/network/src/application/netperf/mod.rs +++ b/network/src/application/netperf/mod.rs @@ -14,6 +14,7 @@ use crate::transport::ConnectionMetadata; use crate::{ application::netperf::interface::{NetPerfNetworkEvents, NetPerfNetworkSender, NetPerfPayload}, constants::NETWORK_CHANNEL_SIZE, + counters, logging::NetworkSchema, protocols::network::Event, ProtocolId, @@ -62,7 +63,7 @@ impl PeerNetPerfStat { #[allow(dead_code)] #[derive(Clone)] struct NetPerfState { - peers: Arc, //TODO: DO I need this? + peers: Arc, peer_list: Arc>, //with capacity and hasher sender: Arc, tx: Sender, @@ -93,7 +94,9 @@ impl NetPerf { ProtocolId::NetPerfDirectSendCompressed, ProtocolId::NetPerfRpcCompressed, ], - aptos_channel::Config::new(NETWORK_CHANNEL_SIZE).queue_style(QueueStyle::FIFO), + aptos_channel::Config::new(NETWORK_CHANNEL_SIZE) + .queue_style(QueueStyle::FIFO) + .counters(&counters::PENDING_NET_PERF_NETWORK_EVENTS), ) } diff --git a/network/src/counters.rs b/network/src/counters.rs index 61cd10dcfd65a..8334fb340e6f9 100644 --- a/network/src/counters.rs +++ b/network/src/counters.rs @@ -156,13 +156,11 @@ pub static APTOS_NETWORK_DISCOVERY_NOTES: Lazy = Lazy::new(|| { }); pub static APTOS_NETWORK_RPC_MESSAGES: Lazy = Lazy::new(|| { - register_int_counter_vec!("aptos_network_rpc_messages", "Number of RPC messages", &[ - "role_type", - "network_id", - "peer_id", - "type", - "state" - ]) + register_int_counter_vec!( + "aptos_network_rpc_messages", + "Number of RPC messages", + &["role_type", "network_id", "peer_id", "type", "state"] + ) .unwrap() }); @@ -326,6 +324,16 @@ pub static PENDING_NETWORK_REQUESTS: Lazy = Lazy::new(|| { .unwrap() }); +/// Counter of pending network events to Health Checker. +pub static PENDING_NET_PERF_NETWORK_EVENTS: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "aptos_network_pending_netperf_events", + "Number of pending netperf events by state", + &["state"] + ) + .unwrap() +}); + /// Counter of pending network events to Health Checker. pub static PENDING_HEALTH_CHECKER_NETWORK_EVENTS: Lazy = Lazy::new(|| { register_int_counter_vec!( From 441387f59d24ed2f679f45ce599d67e1cd98e6c8 Mon Sep 17 00:00:00 2001 From: Alex Markuze Date: Tue, 3 Jan 2023 20:52:36 +0200 Subject: [PATCH 21/23] [AptosPerf] Consts and timer --- network/src/application/netperf/mod.rs | 29 ++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs index f80e457f81219..1b33b84dac78a 100644 --- a/network/src/application/netperf/mod.rs +++ b/network/src/application/netperf/mod.rs @@ -35,6 +35,7 @@ use futures::StreamExt; use futures_util::stream::FuturesUnordered; use serde::Serialize; use std::fs::OpenOptions; +use std::sync::atomic::AtomicBool; use std::{sync::Arc, time::Duration}; use tokio::sync::mpsc::{Receiver, Sender}; @@ -42,6 +43,8 @@ pub mod builder; mod interface; const NETPERF_COMMAND_CHANNEL_SIZE: usize = 1024; +const NETPERF_DEFAULT_MSG_SIZE: usize = 64 * 1024; +const NETPERF_DEFAULT_DURTAION_SEC: u64 = 10; pub struct NetPerf { network_context: NetworkContext, @@ -243,7 +246,14 @@ async fn parse_query( Extension(state): Extension, Query(_params): Query>, ) -> impl IntoResponse { - spawn_named!("[NetPerf] Brodcast Task", netperf_broadcast(state.clone())); + //TODO(AlexM): Extract size and duration from _params + let size = NETPERF_DEFAULT_MSG_SIZE; + let duration = Duration::from_secs(NETPERF_DEFAULT_DURTAION_SEC); + + spawn_named!( + "[NetPerf] Brodcast Task", + netperf_broadcast(state.clone(), size, duration) + ); StatusCode::OK } @@ -277,9 +287,16 @@ async fn netperf_comp_handler(state: NetPerfState, mut rx: Receiver Date: Tue, 3 Jan 2023 21:20:47 +0200 Subject: [PATCH 22/23] [netperf minor fixups] --- network/src/application/netperf/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs index 1b33b84dac78a..257fcd1b4c047 100644 --- a/network/src/application/netperf/mod.rs +++ b/network/src/application/netperf/mod.rs @@ -192,7 +192,7 @@ fn preferred_axum_port(netperf_port: u16) -> u16 { .create(true) .open(format!("/tmp/{}.tmp", netperf_port)); } - return netperf_port; + netperf_port } async fn start_axum(state: NetPerfState, netperf_port: u16) { @@ -252,7 +252,7 @@ async fn parse_query( spawn_named!( "[NetPerf] Brodcast Task", - netperf_broadcast(state.clone(), size, duration) + netperf_broadcast(state, size, duration) ); StatusCode::OK @@ -309,7 +309,7 @@ async fn netperf_broadcast(state: NetPerfState, size: usize, duration: Duration) ProtocolId::NetPerfDirectSendCompressed, msg.clone(), ); - if let Err(_) = rc { + if rc.is_err() { should_yield = true } //else update peer counters /* maybe add dedicated counters? but network_application_{out/in}bound_traffic @@ -319,7 +319,7 @@ async fn netperf_broadcast(state: NetPerfState, size: usize, duration: Duration) if done.load(std::sync::atomic::Ordering::Relaxed) { break; } - if should_yield == true { + if should_yield { tokio::task::yield_now().await; should_yield = false; } From 39777f6448640b91dd925374d41532025f25a23d Mon Sep 17 00:00:00 2001 From: Alex Markuze Date: Tue, 3 Jan 2023 21:55:01 +0200 Subject: [PATCH 23/23] lint fixups --- network/Cargo.toml | 2 +- network/builder/src/builder.rs | 10 +++++---- network/src/application/netperf/builder.rs | 6 +++--- network/src/application/netperf/interface.rs | 7 ++++--- network/src/application/netperf/mod.rs | 22 ++++++++++++-------- network/src/counters.rs | 12 ++++++----- 6 files changed, 34 insertions(+), 25 deletions(-) diff --git a/network/Cargo.toml b/network/Cargo.toml index 73ad30dad4c24..cee422cd566ae 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -36,7 +36,7 @@ async-trait = { workspace = true } axum = { workspace = true } bcs = { workspace = true } bytes = { workspace = true } -dashmap = {worksapce = true } +dashmap = { worksapce = true } futures = { workspace = true } futures-util = { workspace = true } hex = { workspace = true } diff --git a/network/builder/src/builder.rs b/network/builder/src/builder.rs index 40baf67a8f077..681fa908a2d3d 100644 --- a/network/builder/src/builder.rs +++ b/network/builder/src/builder.rs @@ -23,10 +23,11 @@ use aptos_event_notifications::{EventSubscriptionService, ReconfigNotificationLi use aptos_infallible::RwLock; use aptos_logger::prelude::*; use aptos_netcore::transport::tcp::TCPBufferCfg; -use aptos_network::application::netperf::builder::NetPerfBuilder; use aptos_network::{ - application::netperf::NetPerf, - application::storage::PeerMetadataStorage, + application::{ + netperf::{builder::NetPerfBuilder, NetPerf}, + storage::PeerMetadataStorage, + }, connectivity_manager::{builder::ConnectivityManagerBuilder, ConnectivityRequest}, constants::MAX_MESSAGE_SIZE, logging::NetworkSchema, @@ -446,7 +447,8 @@ impl NetworkBuilder { /// Add a Aptos NetPerf to the network. fn add_network_perf(&mut self, netperf_port: u16) -> &mut Self { - let (netperf_tx, netperf_rx) = self.add_client_and_service(&NetPerf::network_endpoint_config()); + let (netperf_tx, netperf_rx) = + self.add_client_and_service(&NetPerf::network_endpoint_config()); self.netperf_builder = Some(NetPerfBuilder::new( self.network_context(), self.peer_metadata_storage.clone(), diff --git a/network/src/application/netperf/builder.rs b/network/src/application/netperf/builder.rs index b42f89b2c38c0..b61ccff9c83be 100644 --- a/network/src/application/netperf/builder.rs +++ b/network/src/application/netperf/builder.rs @@ -1,12 +1,12 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 -use crate::{ - application::netperf::{ +use crate::application::{ + netperf::{ interface::{NetPerfNetworkEvents, NetPerfNetworkSender}, NetPerf, }, - application::storage::PeerMetadataStorage, + storage::PeerMetadataStorage, }; use aptos_config::network_id::NetworkContext; use aptos_logger::prelude::*; diff --git a/network/src/application/netperf/interface.rs b/network/src/application/netperf/interface.rs index c941f1fb264c3..917bab5cb6619 100644 --- a/network/src/application/netperf/interface.rs +++ b/network/src/application/netperf/interface.rs @@ -10,10 +10,11 @@ pub struct NetPerfPayload { } impl NetPerfPayload { - pub fn new(len: usize) -> Self { + pub fn new(mut len: usize) -> Self { let mut v = Vec::with_capacity(len); - unsafe { - v.set_len(len); + while len > 0 { + v.push(0); + len -= 1; } NetPerfPayload { byte: v } } diff --git a/network/src/application/netperf/mod.rs b/network/src/application/netperf/mod.rs index 257fcd1b4c047..29c3be36ce7fb 100644 --- a/network/src/application/netperf/mod.rs +++ b/network/src/application/netperf/mod.rs @@ -7,16 +7,18 @@ //! and simplify network-related performance profiling and debugging //! -use crate::application::netperf::interface::NetPerfMsg; -use crate::application::storage::PeerMetadataStorage; -use crate::protocols::network::NetworkApplicationConfig; -use crate::transport::ConnectionMetadata; use crate::{ - application::netperf::interface::{NetPerfNetworkEvents, NetPerfNetworkSender, NetPerfPayload}, + application::{ + netperf::interface::{ + NetPerfMsg, NetPerfNetworkEvents, NetPerfNetworkSender, NetPerfPayload, + }, + storage::PeerMetadataStorage, + }, constants::NETWORK_CHANNEL_SIZE, counters, logging::NetworkSchema, - protocols::network::Event, + protocols::network::{Event, NetworkApplicationConfig}, + transport::ConnectionMetadata, ProtocolId, }; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; @@ -34,9 +36,11 @@ use dashmap::DashMap; use futures::StreamExt; use futures_util::stream::FuturesUnordered; use serde::Serialize; -use std::fs::OpenOptions; -use std::sync::atomic::AtomicBool; -use std::{sync::Arc, time::Duration}; +use std::{ + fs::OpenOptions, + sync::{atomic::AtomicBool, Arc}, + time::Duration, +}; use tokio::sync::mpsc::{Receiver, Sender}; pub mod builder; diff --git a/network/src/counters.rs b/network/src/counters.rs index 8334fb340e6f9..6fded6bafc3a3 100644 --- a/network/src/counters.rs +++ b/network/src/counters.rs @@ -156,11 +156,13 @@ pub static APTOS_NETWORK_DISCOVERY_NOTES: Lazy = Lazy::new(|| { }); pub static APTOS_NETWORK_RPC_MESSAGES: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "aptos_network_rpc_messages", - "Number of RPC messages", - &["role_type", "network_id", "peer_id", "type", "state"] - ) + register_int_counter_vec!("aptos_network_rpc_messages", "Number of RPC messages", &[ + "role_type", + "network_id", + "peer_id", + "type", + "state" + ]) .unwrap() });