Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(comms): reduce length of long functions (clippy) #4065

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 122 additions & 96 deletions comms/core/src/connectivity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ use std::{
use log::*;
use nom::lib::std::collections::hash_map::Entry;
use tari_shutdown::ShutdownSignal;
use tokio::{sync::mpsc, task::JoinHandle, time, time::MissedTickBehavior};
use tokio::{
sync::{mpsc, oneshot},
task::JoinHandle,
time,
time::MissedTickBehavior,
};
use tracing::{span, Instrument, Level};

use super::{
Expand Down Expand Up @@ -226,52 +231,9 @@ impl ConnectivityManagerActor {
},
DialPeer { node_id, reply_tx } => {
let tracing_id = tracing::Span::current().id();
let span = span!(Level::TRACE, "handle_request");
let span = span!(Level::TRACE, "handle_dial_peer");
span.follows_from(tracing_id);
async move {
match self.peer_manager.is_peer_banned(&node_id).await {
Ok(true) => {
if let Some(reply) = reply_tx {
let _result = reply.send(Err(ConnectionManagerError::PeerBanned));
}
return;
},
Ok(false) => {},
Err(err) => {
if let Some(reply) = reply_tx {
let _result = reply.send(Err(err.into()));
}
return;
},
}
match self.pool.get(&node_id) {
Some(state) if state.is_connected() => {
debug!(
target: LOG_TARGET,
"Found existing connection for peer `{}`",
node_id.short_str()
);
if let Some(reply_tx) = reply_tx {
let _result = reply_tx.send(Ok(state.connection().cloned().expect("Already checked")));
}
},
_ => {
debug!(
target: LOG_TARGET,
"No existing connection found for peer `{}`. Dialing...",
node_id.short_str()
);
if let Err(err) = self.connection_manager.send_dial_peer(node_id, reply_tx).await {
error!(
target: LOG_TARGET,
"Failed to send dial request to connection manager: {:?}", err
);
}
},
}
}
.instrument(span)
.await
self.handle_dial_peer(node_id, reply_tx).instrument(span).await;
},
SelectConnections(selection, reply) => {
let _result = reply.send(self.select_connections(selection).await);
Expand Down Expand Up @@ -323,6 +285,53 @@ impl ConnectivityManagerActor {
}
}

async fn handle_dial_peer(
&mut self,
node_id: NodeId,
reply_tx: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
) {
match self.peer_manager.is_peer_banned(&node_id).await {
Ok(true) => {
if let Some(reply) = reply_tx {
let _result = reply.send(Err(ConnectionManagerError::PeerBanned));
}
return;
},
Ok(false) => {},
Err(err) => {
if let Some(reply) = reply_tx {
let _result = reply.send(Err(err.into()));
}
return;
},
}
match self.pool.get(&node_id) {
Some(state) if state.is_connected() => {
debug!(
target: LOG_TARGET,
"Found existing connection for peer `{}`",
node_id.short_str()
);
if let Some(reply_tx) = reply_tx {
let _result = reply_tx.send(Ok(state.connection().cloned().expect("Already checked")));
}
},
_ => {
debug!(
target: LOG_TARGET,
"No existing connection found for peer `{}`. Dialing...",
node_id.short_str()
);
if let Err(err) = self.connection_manager.send_dial_peer(node_id, reply_tx).await {
error!(
target: LOG_TARGET,
"Failed to send dial request to connection manager: {:?}", err
);
}
},
}
}

async fn disconnect_all(&mut self) {
let mut node_ids = Vec::with_capacity(self.pool.count_connected());
for mut state in self.pool.filter_drain(|_| true) {
Expand Down Expand Up @@ -508,57 +517,12 @@ impl ConnectivityManagerActor {
debug!(target: LOG_TARGET, "Received event: {}", event);
match event {
PeerConnected(new_conn) => {
match self.pool.get_connection(new_conn.peer_node_id()).cloned() {
Some(existing_conn) if !existing_conn.is_connected() => {
debug!(
target: LOG_TARGET,
"Tie break: Existing connection (id: {}, peer: {}, direction: {}) was not connected, \
resolving tie break by using the new connection. (New: id: {}, peer: {}, direction: {})",
existing_conn.id(),
existing_conn.peer_node_id(),
existing_conn.direction(),
new_conn.id(),
new_conn.peer_node_id(),
new_conn.direction(),
);
self.pool.remove(existing_conn.peer_node_id());
},
Some(mut existing_conn) => {
if self.tie_break_existing_connection(&existing_conn, new_conn) {
debug!(
target: LOG_TARGET,
"Tie break: Keep new connection (id: {}, peer: {}, direction: {}). Disconnect \
existing connection (id: {}, peer: {}, direction: {})",
new_conn.id(),
new_conn.peer_node_id(),
new_conn.direction(),
existing_conn.id(),
existing_conn.peer_node_id(),
existing_conn.direction(),
);

let _result = existing_conn.disconnect_silent().await;
self.pool.remove(existing_conn.peer_node_id());
} else {
debug!(
target: LOG_TARGET,
"Tie break: Keeping existing connection (id: {}, peer: {}, direction: {}). \
Disconnecting new connection (id: {}, peer: {}, direction: {})",
new_conn.id(),
new_conn.peer_node_id(),
new_conn.direction(),
existing_conn.id(),
existing_conn.peer_node_id(),
existing_conn.direction(),
);

let _result = new_conn.clone().disconnect_silent().await;
// Ignore this event - state can stay as is
return Ok(());
}
match self.handle_new_connection_tie_break(new_conn).await {
TieBreak::KeepExisting => {
// Ignore event, we discarded the new connection and keeping the current one
return Ok(());
},

_ => {},
TieBreak::UseNew | TieBreak::None => {},
}
},
PeerDisconnected(id, node_id) => {
Expand Down Expand Up @@ -647,6 +611,62 @@ impl ConnectivityManagerActor {
Ok(())
}

async fn handle_new_connection_tie_break(&mut self, new_conn: &PeerConnection) -> TieBreak {
match self.pool.get_connection(new_conn.peer_node_id()).cloned() {
Some(existing_conn) if !existing_conn.is_connected() => {
debug!(
target: LOG_TARGET,
"Tie break: Existing connection (id: {}, peer: {}, direction: {}) was not connected, resolving \
tie break by using the new connection. (New: id: {}, peer: {}, direction: {})",
existing_conn.id(),
existing_conn.peer_node_id(),
existing_conn.direction(),
new_conn.id(),
new_conn.peer_node_id(),
new_conn.direction(),
);
self.pool.remove(existing_conn.peer_node_id());
TieBreak::UseNew
},
Some(mut existing_conn) => {
if self.tie_break_existing_connection(&existing_conn, new_conn) {
debug!(
target: LOG_TARGET,
"Tie break: Keep new connection (id: {}, peer: {}, direction: {}). Disconnect existing \
connection (id: {}, peer: {}, direction: {})",
new_conn.id(),
new_conn.peer_node_id(),
new_conn.direction(),
existing_conn.id(),
existing_conn.peer_node_id(),
existing_conn.direction(),
);

let _result = existing_conn.disconnect_silent().await;
self.pool.remove(existing_conn.peer_node_id());
TieBreak::UseNew
} else {
debug!(
target: LOG_TARGET,
"Tie break: Keeping existing connection (id: {}, peer: {}, direction: {}). Disconnecting new \
connection (id: {}, peer: {}, direction: {})",
new_conn.id(),
new_conn.peer_node_id(),
new_conn.direction(),
existing_conn.id(),
existing_conn.peer_node_id(),
existing_conn.direction(),
);

let _result = new_conn.clone().disconnect_silent().await;
TieBreak::KeepExisting
}
},

None => TieBreak::None,
}
}

/// Two connections to the same peer have been created. This function deterministically determines which peer
/// connection to close. It does this by comparing our NodeId to that of the peer. This rule enables both sides to
/// agree which connection to disconnect
Expand Down Expand Up @@ -840,3 +860,9 @@ impl ConnectivityManagerActor {
}
}
}

enum TieBreak {
None,
UseNew,
KeepExisting,
}
1 change: 1 addition & 0 deletions comms/core/src/noise/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ where TSocket: AsyncRead + Unpin
impl<TSocket> NoiseSocket<TSocket>
where TSocket: AsyncWrite + Unpin
{
#[allow(clippy::too_many_lines)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT:
I would put the nice explanation you gave in the PR desc here as well.

fn poll_write_or_flush(&mut self, context: &mut Context, buf: Option<&[u8]>) -> Poll<io::Result<Option<usize>>> {
loop {
trace!(
Expand Down