Skip to content

Commit

Permalink
[Network] Add metric for connection operations.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Nov 4, 2024
1 parent c8a1eab commit 157ed9d
Show file tree
Hide file tree
Showing 15 changed files with 130 additions and 43 deletions.
17 changes: 14 additions & 3 deletions network/framework/src/application/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use crate::{
application::{error::Error, storage::PeersAndMetadata},
peer::DisconnectReason,
protocols::{
network::{Message, NetworkEvents, NetworkSender},
wire::handshake::v1::{ProtocolId, ProtocolIdSet},
Expand Down Expand Up @@ -39,7 +40,11 @@ pub trait NetworkClientInterface<Message: NetworkMessageTrait>: Clone + Send + S
/// Requests that the network connection for the specified peer
/// is disconnected.
// TODO: support disconnect reasons.
async fn disconnect_from_peer(&self, _peer: PeerNetworkId) -> Result<(), Error>;
async fn disconnect_from_peer(
&self,
_peer: PeerNetworkId,
_disconnect_reason: DisconnectReason,
) -> Result<(), Error>;

/// Returns a list of available peers (i.e., those that are
/// currently connected and support the relevant protocols
Expand Down Expand Up @@ -196,9 +201,15 @@ impl<Message: NetworkMessageTrait> NetworkClientInterface<Message> for NetworkCl
unimplemented!("Adding peers to discovery is not yet supported!");
}

async fn disconnect_from_peer(&self, peer: PeerNetworkId) -> Result<(), Error> {
async fn disconnect_from_peer(
&self,
peer: PeerNetworkId,
disconnect_reason: DisconnectReason,
) -> Result<(), Error> {
let network_sender = self.get_sender_for_network_id(&peer.network_id())?;
Ok(network_sender.disconnect_peer(peer.peer_id()).await?)
Ok(network_sender
.disconnect_peer(peer.peer_id(), disconnect_reason)
.await?)
}

fn get_available_peers(&self) -> Result<Vec<PeerNetworkId>, Error> {
Expand Down
7 changes: 5 additions & 2 deletions network/framework/src/connectivity_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{
application::storage::PeersAndMetadata,
counters,
logging::NetworkSchema,
peer::DisconnectReason,
peer_manager::{self, conn_notifs_channel, ConnectionRequestSender, PeerManagerError},
transport::ConnectionMetadata,
};
Expand Down Expand Up @@ -511,8 +512,10 @@ where
stale_peer.short_str()
);

if let Err(disconnect_error) =
self.connection_reqs_tx.disconnect_peer(stale_peer).await
if let Err(disconnect_error) = self
.connection_reqs_tx
.disconnect_peer(stale_peer, DisconnectReason::StaleConnection)
.await
{
info!(
NetworkSchema::new(&self.network_context)
Expand Down
2 changes: 1 addition & 1 deletion network/framework/src/connectivity_manager/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl TestHarness {
info!("Waiting to receive disconnect request");
let success = result.is_ok();
match self.connection_reqs_rx.next().await.unwrap() {
ConnectionRequest::DisconnectPeer(p, result_tx) => {
ConnectionRequest::DisconnectPeer(p, _, result_tx) => {
assert_eq!(peer_id, p);
result_tx.send(result).unwrap();
},
Expand Down
26 changes: 26 additions & 0 deletions network/framework/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ pub const SUCCEEDED_LABEL: &str = "succeeded";
pub const FAILED_LABEL: &str = "failed";
pub const UNKNOWN_LABEL: &str = "unknown";

// Connection operation labels
pub const DIAL_LABEL: &str = "dial";
pub const DIAL_PEER_LABEL: &str = "dial_peer";
pub const DISCONNECT_LABEL: &str = "disconnect";

// Direction labels
pub const INBOUND_LABEL: &str = "inbound";
pub const OUTBOUND_LABEL: &str = "outbound";
Expand Down Expand Up @@ -139,6 +144,27 @@ pub fn pending_connection_upgrades(
])
}

/// A simple counter for tracking network connection operations
pub static APTOS_NETWORK_CONNECTION_OPERATIONS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_network_connection_operations",
"Counter for tracking connection operations",
&["network_id", "operation", "label"]
)
.unwrap()
});

/// Updates the network connection operation metrics with the given operation and label
pub fn update_network_connection_operation_metrics(
network_context: &NetworkContext,
operation: String,
label: String,
) {
APTOS_NETWORK_CONNECTION_OPERATIONS
.with_label_values(&[network_context.network_id().as_str(), &operation, &label])
.inc();
}

pub static APTOS_NETWORK_CONNECTION_UPGRADE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"aptos_network_connection_upgrade_time_seconds",
Expand Down
1 change: 0 additions & 1 deletion network/framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,5 @@ pub mod fuzzing;
#[cfg(any(test, feature = "testing", feature = "fuzzing"))]
pub mod testutils;

pub type DisconnectReason = peer::DisconnectReason;
pub type ConnectivityRequest = connectivity_manager::ConnectivityRequest;
pub type ProtocolId = protocols::wire::handshake::v1::ProtocolId;
40 changes: 24 additions & 16 deletions network/framework/src/peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,25 +71,33 @@ pub enum PeerRequest {
SendDirectSend(Message),
}

/// The reason for closing a connection.
///
/// For example, if the remote peer closed the connection or the connection was
/// lost, the disconnect reason will be `ConnectionLost`. In contrast, if the
/// [`PeerManager`](crate::peer_manager::PeerManager) requested us to close this
/// connection, then the disconnect reason will be `Requested`.
/// The reason for closing a network connection
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
pub enum DisconnectReason {
Requested,
ConnectionLost,
ConnectionClosed, // The connection was gracefully closed (e.g., by the peer)
InputOutputError, // An I/O error occurred on the connection (e.g., when reading messages)
NetworkHealthCheckFailure, // The connection failed the network health check (e.g., pings)
RequestedByPeerManager, // The peer manager requested the connection to be closed
StaleConnection, // The connection is stale (e.g., when a validator leaves the validator set)
}

impl DisconnectReason {
/// Returns a string label for the disconnect reason
pub fn get_label(&self) -> String {
let label = match self {
DisconnectReason::ConnectionClosed => "ConnectionClosed",
DisconnectReason::InputOutputError => "InputOutputError",
DisconnectReason::NetworkHealthCheckFailure => "NetworkHealthCheckFailure",
DisconnectReason::RequestedByPeerManager => "RequestedByPeerManager",
DisconnectReason::StaleConnection => "StaleConnection",
};
label.to_string()
}
}

impl fmt::Display for DisconnectReason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self {
DisconnectReason::Requested => "Requested",
DisconnectReason::ConnectionLost => "ConnectionLost",
};
write!(f, "{}", s)
write!(f, "{}", self.get_label())
}
}

Expand Down Expand Up @@ -237,7 +245,7 @@ where
Some(request) => self.handle_outbound_request(request, &mut write_reqs_tx),
// The PeerManager is requesting this connection to close
// by dropping the corresponding peer_reqs_tx handle.
None => self.shutdown(DisconnectReason::Requested),
None => self.shutdown(DisconnectReason::RequestedByPeerManager),
}
},
// Handle a new inbound MultiplexMessage that we've just read off
Expand All @@ -258,7 +266,7 @@ where
}
},
// The socket was gracefully closed by the remote peer.
None => self.shutdown(DisconnectReason::ConnectionLost),
None => self.shutdown(DisconnectReason::ConnectionClosed),
}
},
// Drive the queue of pending inbound rpcs. When one is fulfilled
Expand Down Expand Up @@ -580,7 +588,7 @@ where
},
ReadError::IoError(_) => {
// IoErrors are mostly unrecoverable so just close the connection.
self.shutdown(DisconnectReason::ConnectionLost);
self.shutdown(DisconnectReason::InputOutputError);
return Err(err.into());
},
},
Expand Down
12 changes: 6 additions & 6 deletions network/framework/src/peer/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,13 +368,13 @@ fn peers_send_message_concurrent() {
// Check that we received both shutdown events
assert_disconnected_event(
remote_peer_id_a,
DisconnectReason::Requested,
DisconnectReason::RequestedByPeerManager,
&mut connection_notifs_rx_a,
)
.await;
assert_disconnected_event(
remote_peer_id_b,
DisconnectReason::ConnectionLost,
DisconnectReason::ConnectionClosed,
&mut connection_notifs_rx_b,
)
.await;
Expand Down Expand Up @@ -905,7 +905,7 @@ fn peer_disconnect_request() {
drop(peer_handle);
assert_disconnected_event(
remote_peer_id,
DisconnectReason::Requested,
DisconnectReason::RequestedByPeerManager,
&mut connection_notifs_rx,
)
.await;
Expand All @@ -932,7 +932,7 @@ fn peer_disconnect_connection_lost() {
connection.close().await.unwrap();
assert_disconnected_event(
remote_peer_id,
DisconnectReason::ConnectionLost,
DisconnectReason::ConnectionClosed,
&mut connection_notifs_rx,
)
.await;
Expand Down Expand Up @@ -1019,13 +1019,13 @@ fn peers_send_multiplex() {
// Check that we received both shutdown events
assert_disconnected_event(
remote_peer_id_a,
DisconnectReason::Requested,
DisconnectReason::RequestedByPeerManager,
&mut connection_notifs_rx_a,
)
.await;
assert_disconnected_event(
remote_peer_id_b,
DisconnectReason::ConnectionLost,
DisconnectReason::ConnectionClosed,
&mut connection_notifs_rx_b,
)
.await;
Expand Down
17 changes: 16 additions & 1 deletion network/framework/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,11 +454,26 @@ where
);
}
} else {
// Update the connection dial metrics
counters::update_network_connection_operation_metrics(
&self.network_context,
counters::DIAL_LABEL.into(),
counters::DIAL_PEER_LABEL.into(),
);

// Send a transport request to dial the peer
let request = TransportRequest::DialPeer(requested_peer_id, addr, response_tx);
self.transport_reqs_tx.send(request).await.unwrap();
};
},
ConnectionRequest::DisconnectPeer(peer_id, resp_tx) => {
ConnectionRequest::DisconnectPeer(peer_id, disconnect_reason, resp_tx) => {
// Update the connection disconnect metrics
counters::update_network_connection_operation_metrics(
&self.network_context,
counters::DISCONNECT_LABEL.into(),
disconnect_reason.get_label(),
);

// Send a CloseConnection request to Peer and drop the send end of the
// PeerRequest channel.
if let Some((conn_metadata, sender)) = self.active_peers.remove(&peer_id) {
Expand Down
13 changes: 10 additions & 3 deletions network/framework/src/peer_manager/senders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
peer::DisconnectReason,
peer_manager::{types::PeerManagerRequest, ConnectionRequest, PeerManagerError},
protocols::{
direct_send::Message,
Expand Down Expand Up @@ -125,10 +126,16 @@ impl ConnectionRequestSender {
oneshot_rx.await?
}

pub async fn disconnect_peer(&self, peer: PeerId) -> Result<(), PeerManagerError> {
pub async fn disconnect_peer(
&self,
peer: PeerId,
disconnect_reason: DisconnectReason,
) -> Result<(), PeerManagerError> {
let (oneshot_tx, oneshot_rx) = oneshot::channel();
self.inner
.push(peer, ConnectionRequest::DisconnectPeer(peer, oneshot_tx))?;
self.inner.push(
peer,
ConnectionRequest::DisconnectPeer(peer, disconnect_reason, oneshot_tx),
)?;
oneshot_rx.await?
}
}
9 changes: 5 additions & 4 deletions network/framework/src/peer_manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ async fn check_correct_connection_is_live(
assert_peer_disconnected_event(
expected_peer_id,
dropped_connection_origin,
DisconnectReason::Requested,
DisconnectReason::RequestedByPeerManager,
peer_manager,
)
.await;
Expand All @@ -218,7 +218,7 @@ async fn check_correct_connection_is_live(
assert_peer_disconnected_event(
expected_peer_id,
live_connection_origin,
DisconnectReason::ConnectionLost,
DisconnectReason::ConnectionClosed,
peer_manager,
)
.await;
Expand Down Expand Up @@ -580,7 +580,7 @@ fn peer_manager_simultaneous_dial_disconnect_event() {
ProtocolIdSet::mock(),
PeerRole::Unknown,
),
DisconnectReason::ConnectionLost,
DisconnectReason::ConnectionClosed,
);
peer_manager.handle_connection_event(event);
// The active connection should still remain.
Expand Down Expand Up @@ -621,6 +621,7 @@ fn test_dial_disconnect() {
peer_manager
.handle_outbound_connection_request(ConnectionRequest::DisconnectPeer(
ids[0],
DisconnectReason::ConnectionClosed,
disconnect_resp_tx,
))
.await;
Expand All @@ -636,7 +637,7 @@ fn test_dial_disconnect() {
ProtocolIdSet::mock(),
PeerRole::Unknown,
),
DisconnectReason::Requested,
DisconnectReason::RequestedByPeerManager,
);
peer_manager.handle_connection_event(event);

Expand Down
1 change: 1 addition & 0 deletions network/framework/src/peer_manager/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub enum ConnectionRequest {
),
DisconnectPeer(
PeerId,
DisconnectReason,
#[serde(skip)] oneshot::Sender<Result<(), PeerManagerError>>,
),
}
Expand Down
9 changes: 7 additions & 2 deletions network/framework/src/protocols/health_checker/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
error::Error, interface::NetworkClientInterface, metadata::ConnectionState,
storage::PeersAndMetadata,
},
peer::DisconnectReason,
protocols::{
health_checker::{HealthCheckerMsg, HealthCheckerNetworkEvents},
network::Event,
Expand Down Expand Up @@ -62,12 +63,16 @@ impl<NetworkClient: NetworkClientInterface<HealthCheckerMsg>>

/// Disconnect a peer, and keep track of the associated state
/// Note: This removes the peer outright for now until we add GCing, and historical state management
pub async fn disconnect_peer(&mut self, peer_network_id: PeerNetworkId) -> Result<(), Error> {
pub async fn disconnect_peer(
&mut self,
peer_network_id: PeerNetworkId,
disconnect_reason: DisconnectReason,
) -> Result<(), Error> {
// Possibly already disconnected, but try anyways
let _ = self.update_connection_state(peer_network_id, ConnectionState::Disconnecting);
let result = self
.network_client
.disconnect_from_peer(peer_network_id)
.disconnect_from_peer(peer_network_id, disconnect_reason)
.await;
let peer_id = peer_network_id.peer_id();
if result.is_ok() {
Expand Down
6 changes: 5 additions & 1 deletion network/framework/src/protocols/health_checker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::{
constants::NETWORK_CHANNEL_SIZE,
counters,
logging::NetworkSchema,
peer::DisconnectReason,
peer_manager::ConnectionNotification,
protocols::{
health_checker::interface::HealthCheckNetworkInterface,
Expand Down Expand Up @@ -372,7 +373,10 @@ impl<NetworkClient: NetworkClientInterface<HealthCheckerMsg> + Unpin> HealthChec
PeerNetworkId::new(self.network_context.network_id(), peer_id);
if let Err(err) = timeout(
Duration::from_millis(50),
self.network_interface.disconnect_peer(peer_network_id),
self.network_interface.disconnect_peer(
peer_network_id,
DisconnectReason::NetworkHealthCheckFailure,
),
)
.await
{
Expand Down
Loading

0 comments on commit 157ed9d

Please sign in to comment.