From 544b9330c3e68243b5fe32dc775116f7b69d5b49 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 3 Oct 2024 12:23:11 +0200 Subject: [PATCH 1/9] refactor(iroh-net): Keep connection name, remove connection count These are two cleanups in the relay client: - The `relay::Client` hands out a connection object when asked to connect. This `Conn` was imported with rename to `RelayClient` which was a bit confusing as this was already the relay client. It is now renamed to `RelayConn` which makes a lot more sense. The related builder struct etc are renamed to match. - The `relay::Client` had a counter for the number of connections made to the relay. That seems fun, but was entirely unused. If this is a useful thing to have it should probably be a counter metric instead but let's not add anything that no one is using. Removing this makes a lot of APIs a bit simpler and removes some state tracking. --- iroh-net/src/magicsock/relay_actor.rs | 7 +- iroh-net/src/relay/client.rs | 85 +++++++++++------------- iroh-net/src/relay/server.rs | 12 ++-- iroh-net/src/relay/server/http_server.rs | 2 +- 4 files changed, 48 insertions(+), 58 deletions(-) diff --git a/iroh-net/src/magicsock/relay_actor.rs b/iroh-net/src/magicsock/relay_actor.rs index 0a7b6c8fda..f7b1dd879b 100644 --- a/iroh-net/src/magicsock/relay_actor.rs +++ b/iroh-net/src/magicsock/relay_actor.rs @@ -165,10 +165,7 @@ impl ActiveRelay { Ok(()) } - async fn handle_relay_msg( - &mut self, - msg: Result<(ReceivedMessage, usize), ClientError>, - ) -> ReadResult { + async fn handle_relay_msg(&mut self, msg: Result) -> ReadResult { match msg { Err(err) => { warn!("recv error {:?}", err); @@ -200,7 +197,7 @@ impl ActiveRelay { None => ReadResult::Break, } } - Ok((msg, _conn_gen)) => { + Ok(msg) => { // reset self.backoff.reset(); let now = Instant::now(); diff --git a/iroh-net/src/relay/client.rs b/iroh-net/src/relay/client.rs index 3b276a8329..a7020d1eda 100644 --- a/iroh-net/src/relay/client.rs +++ b/iroh-net/src/relay/client.rs @@ -28,8 +28,8 @@ use tracing::{debug, error, event, info_span, trace, warn, Instrument, Level}; use url::Url; use conn::{ - Conn as RelayClient, ConnBuilder as RelayClientBuilder, ConnReader, - ConnReceiver as RelayClientReceiver, ConnWriter, ReceivedMessage, + Conn as RelayConn, ConnBuilder as RelayConnBuilder, ConnReader, + ConnReceiver as RelayConnReceiver, ConnWriter, ReceivedMessage, }; use streams::{downcast_upgrade, MaybeTlsStream, ProxyStream}; @@ -140,7 +140,7 @@ pub struct Client { #[derive(Debug)] enum ActorMessage { - Connect(oneshot::Sender>), + Connect(oneshot::Sender>), NotePreferred(bool), LocalAddr(oneshot::Sender, ClientError>>), Ping(oneshot::Sender>), @@ -154,7 +154,7 @@ enum ActorMessage { /// Receiving end of a [`Client`]. #[derive(Debug)] pub struct ClientReceiver { - msg_receiver: mpsc::Receiver>, + msg_receiver: mpsc::Receiver>, } #[derive(derive_more::Debug)] @@ -162,11 +162,10 @@ struct Actor { secret_key: SecretKey, can_ack_pings: bool, is_preferred: bool, - relay_client: Option<(RelayClient, RelayClientReceiver)>, + relay_conn: Option<(RelayConn, RelayConnReceiver)>, is_closed: bool, #[debug("address family selector callback")] address_family_selector: Option BoxFuture + Send + Sync + 'static>>, - conn_gen: usize, url: RelayUrl, protocol: Protocol, #[debug("TlsConnector")] @@ -334,10 +333,9 @@ impl ClientBuilder { secret_key: key, can_ack_pings: self.can_ack_pings, is_preferred: self.is_preferred, - relay_client: None, + relay_conn: None, is_closed: false, address_family_selector: self.address_family_selector, - conn_gen: 0, pings: PingTracker::default(), ping_tasks: Default::default(), url: self.url, @@ -371,9 +369,8 @@ impl ClientBuilder { } impl ClientReceiver { - /// Reads a message from the server. Returns the message and the `conn_get`, or the number of - /// re-connections this Client has ever made - pub async fn recv(&mut self) -> Option> { + /// Reads a message from the server. + pub async fn recv(&mut self) -> Option> { self.msg_receiver.recv().await } } @@ -405,7 +402,7 @@ impl Client { /// /// If there is already an active relay connection, returns the already /// connected [`crate::relay::RelayConn`]. - pub async fn connect(&self) -> Result<(RelayClient, usize), ClientError> { + pub async fn connect(&self) -> Result { self.send_actor(ActorMessage::Connect).await } @@ -475,7 +472,7 @@ impl Actor { async fn run( mut self, mut inbox: mpsc::Receiver, - msg_sender: mpsc::Sender>, + msg_sender: mpsc::Sender>, ) { // Add an initial connection attempt. if let Err(err) = self.connect("initial connect").await { @@ -485,7 +482,7 @@ impl Actor { loop { tokio::select! { res = self.recv_detail() => { - if let Ok((ReceivedMessage::Pong(ping), _)) = res { + if let Ok(ReceivedMessage::Pong(ping)) = res { match self.pings.unregister(ping, "pong") { Some(chan) => { if chan.send(()).is_err() { @@ -503,7 +500,7 @@ impl Actor { Some(msg) = inbox.recv() => { match msg { ActorMessage::Connect(s) => { - let res = self.connect("actor msg").await.map(|(client, _, count)| (client, count)); + let res = self.connect("actor msg").await.map(|(client, _)| (client)); s.send(res).ok(); }, ActorMessage::NotePreferred(is_preferred) => { @@ -549,46 +546,52 @@ impl Actor { } } + /// Returns a connection to the relay. + /// + /// If the client is currently connected the existing connection is returned, otherwise + /// a new connection is made. + /// + /// Returns: + /// - A clonable connection object which can send to DISCO messages to the relay. + /// - A reference to a channel receiving DISCO messages from the relay. async fn connect( &mut self, why: &'static str, - ) -> Result<(RelayClient, &'_ mut RelayClientReceiver, usize), ClientError> { + ) -> Result<(RelayConn, &'_ mut RelayConnReceiver), ClientError> { debug!( "connect: {}, current client {}", why, - self.relay_client.is_some() + self.relay_conn.is_some() ); if self.is_closed { return Err(ClientError::Closed); } async move { - if self.relay_client.is_none() { + if self.relay_conn.is_none() { trace!("no connection, trying to connect"); let (relay_client, receiver) = tokio::time::timeout(CONNECT_TIMEOUT, self.connect_0()) .await .map_err(|_| ClientError::ConnectTimeout)??; - self.relay_client = Some((relay_client.clone(), receiver)); - self.next_conn(); + self.relay_conn = Some((relay_client.clone(), receiver)); } else { trace!("already had connection"); } - let count = self.current_conn(); - let (relay_client, receiver) = self - .relay_client + let (conn, receiver) = self + .relay_conn .as_mut() .map(|(c, r)| (c.clone(), r)) .expect("just checked"); - Ok((relay_client, receiver, count)) + Ok((conn, receiver)) } .instrument(info_span!("connect")) .await } - async fn connect_0(&self) -> Result<(RelayClient, RelayClientReceiver), ClientError> { + async fn connect_0(&self) -> Result<(RelayConn, RelayConnReceiver), ClientError> { let (reader, writer, local_addr) = match self.protocol { Protocol::Websocket => { let (reader, writer) = self.connect_ws().await?; @@ -602,7 +605,7 @@ impl Actor { }; let (relay_client, receiver) = - RelayClientBuilder::new(self.secret_key.clone(), local_addr, reader, writer) + RelayConnBuilder::new(self.secret_key.clone(), local_addr, reader, writer) .build() .await .map_err(|e| ClientError::Build(e.to_string()))?; @@ -732,7 +735,7 @@ impl Actor { // only send the preference if we already have a connection let res = { - if let Some((ref client, _)) = self.relay_client { + if let Some((ref client, _)) = self.relay_conn { client.note_preferred(is_preferred).await } else { return; @@ -749,7 +752,7 @@ impl Actor { if self.is_closed { return None; } - if let Some((ref client, _)) = self.relay_client { + if let Some((ref client, _)) = self.relay_conn { client.local_addr() } else { None @@ -757,7 +760,7 @@ impl Actor { } async fn ping(&mut self, s: oneshot::Sender>) { - let connect_res = self.connect("ping").await.map(|(c, _, _)| c); + let connect_res = self.connect("ping").await.map(|(c, _)| c); let (ping, recv) = self.pings.register(); trace!("ping: {}", hex::encode(ping)); @@ -784,7 +787,7 @@ impl Actor { async fn send(&mut self, dst_key: PublicKey, b: Bytes) -> Result<(), ClientError> { trace!(dst = %dst_key.fmt_short(), len = b.len(), "send"); - let (client, _, _) = self.connect("send").await?; + let (client, _) = self.connect("send").await?; if client.send(dst_key, b).await.is_err() { self.close_for_reconnect().await; return Err(ClientError::Send); @@ -795,7 +798,7 @@ impl Actor { async fn send_pong(&mut self, data: [u8; 8]) -> Result<(), ClientError> { debug!("send_pong"); if self.can_ack_pings { - let (client, _, _) = self.connect("send_pong").await?; + let (client, _) = self.connect("send_pong").await?; if client.send_pong(data).await.is_err() { self.close_for_reconnect().await; return Err(ClientError::Send); @@ -817,16 +820,7 @@ impl Actor { if self.is_closed { return false; } - self.relay_client.is_some() - } - - fn current_conn(&self) -> usize { - self.conn_gen - } - - fn next_conn(&mut self) -> usize { - self.conn_gen = self.conn_gen.wrapping_add(1); - self.conn_gen + self.relay_conn.is_some() } fn tls_servername(&self) -> Option { @@ -987,13 +981,12 @@ impl Actor { } } - async fn recv_detail(&mut self) -> Result<(ReceivedMessage, usize), ClientError> { - if let Some((_client, client_receiver)) = self.relay_client.as_mut() { + async fn recv_detail(&mut self) -> Result { + if let Some((_client, client_receiver)) = self.relay_conn.as_mut() { trace!("recv_detail tick"); match client_receiver.recv().await { Ok(msg) => { - let current_gen = self.current_conn(); - return Ok((msg, current_gen)); + return Ok(msg); } Err(e) => { self.close_for_reconnect().await; @@ -1012,7 +1005,7 @@ impl Actor { /// requires a connection, it will call `connect`. async fn close_for_reconnect(&mut self) { debug!("close for reconnect"); - if let Some((client, _)) = self.relay_client.take() { + if let Some((client, _)) = self.relay_conn.take() { client.close().await } } diff --git a/iroh-net/src/relay/server.rs b/iroh-net/src/relay/server.rs index 9bc54192dc..27ffdc07ce 100644 --- a/iroh-net/src/relay/server.rs +++ b/iroh-net/src/relay/server.rs @@ -863,7 +863,7 @@ mod tests { let msg = Bytes::from("hello, b"); client_a.send(b_key, msg.clone()).await.unwrap(); - let (res, _) = client_b_receiver.recv().await.unwrap().unwrap(); + let res = client_b_receiver.recv().await.unwrap().unwrap(); if let ReceivedMessage::ReceivedPacket { source, data } = res { assert_eq!(a_key, source); assert_eq!(msg, data); @@ -875,7 +875,7 @@ mod tests { let msg = Bytes::from("howdy, a"); client_b.send(a_key, msg.clone()).await.unwrap(); - let (res, _) = client_a_receiver.recv().await.unwrap().unwrap(); + let res = client_a_receiver.recv().await.unwrap().unwrap(); if let ReceivedMessage::ReceivedPacket { source, data } = res { assert_eq!(b_key, source); assert_eq!(msg, data); @@ -931,7 +931,7 @@ mod tests { let msg = Bytes::from("hello, b"); client_a.send(b_key, msg.clone()).await.unwrap(); - let (res, _) = client_b_receiver.recv().await.unwrap().unwrap(); + let res = client_b_receiver.recv().await.unwrap().unwrap(); if let ReceivedMessage::ReceivedPacket { source, data } = res { assert_eq!(a_key, source); assert_eq!(msg, data); @@ -943,7 +943,7 @@ mod tests { let msg = Bytes::from("howdy, a"); client_b.send(a_key, msg.clone()).await.unwrap(); - let (res, _) = client_a_receiver.recv().await.unwrap().unwrap(); + let res = client_a_receiver.recv().await.unwrap().unwrap(); if let ReceivedMessage::ReceivedPacket { source, data } = res { assert_eq!(b_key, source); assert_eq!(msg, data); @@ -998,7 +998,7 @@ mod tests { let msg = Bytes::from("hello, b"); client_a.send(b_key, msg.clone()).await.unwrap(); - let (res, _) = client_b_receiver.recv().await.unwrap().unwrap(); + let res = client_b_receiver.recv().await.unwrap().unwrap(); if let ReceivedMessage::ReceivedPacket { source, data } = res { assert_eq!(a_key, source); assert_eq!(msg, data); @@ -1010,7 +1010,7 @@ mod tests { let msg = Bytes::from("howdy, a"); client_b.send(a_key, msg.clone()).await.unwrap(); - let (res, _) = client_a_receiver.recv().await.unwrap().unwrap(); + let res = client_a_receiver.recv().await.unwrap().unwrap(); if let ReceivedMessage::ReceivedPacket { source, data } = res { assert_eq!(b_key, source); assert_eq!(msg, data); diff --git a/iroh-net/src/relay/server/http_server.rs b/iroh-net/src/relay/server/http_server.rs index 91787e31c2..a52aef6870 100644 --- a/iroh-net/src/relay/server/http_server.rs +++ b/iroh-net/src/relay/server/http_server.rs @@ -821,7 +821,7 @@ mod tests { info!("client {:?} `recv` error {e}", key.public()); return; } - Some(Ok((msg, _))) => { + Some(Ok(msg)) => { info!("got message on {:?}: {msg:?}", key.public()); if let ReceivedMessage::ReceivedPacket { source, data } = msg { received_msg_s From b1ea4b2f9f39207941736b945fd591797241d9db Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 3 Oct 2024 13:06:53 +0200 Subject: [PATCH 2/9] small doc fixup --- iroh-net/src/relay/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-net/src/relay/client.rs b/iroh-net/src/relay/client.rs index a7020d1eda..ad1290510d 100644 --- a/iroh-net/src/relay/client.rs +++ b/iroh-net/src/relay/client.rs @@ -396,7 +396,7 @@ impl Client { } } - /// Connect to a relay Server and returns the underlying relay Client. + /// Connect to a relay Server and returns the underlying relay connection. /// /// Returns [`ClientError::Closed`] if the [`Client`] is closed. /// From 62c7421a3f0495523a487d5553eb2abe12824b69 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 3 Oct 2024 17:46:31 +0200 Subject: [PATCH 3/9] Apply suggestions from code review Co-authored-by: Divma <26765164+divagant-martian@users.noreply.github.com> --- iroh-net/src/relay/client.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/iroh-net/src/relay/client.rs b/iroh-net/src/relay/client.rs index ad1290510d..09cfec2506 100644 --- a/iroh-net/src/relay/client.rs +++ b/iroh-net/src/relay/client.rs @@ -548,11 +548,11 @@ impl Actor { /// Returns a connection to the relay. /// - /// If the client is currently connected the existing connection is returned, otherwise + /// If the client is currently connected, the existing connection is returned; otherwise, /// a new connection is made. /// /// Returns: - /// - A clonable connection object which can send to DISCO messages to the relay. + /// - A clonable connection object which can send DISCO messages to the relay. /// - A reference to a channel receiving DISCO messages from the relay. async fn connect( &mut self, @@ -982,7 +982,7 @@ impl Actor { } async fn recv_detail(&mut self) -> Result { - if let Some((_client, client_receiver)) = self.relay_conn.as_mut() { + if let Some((_conn, client_receiver)) = self.relay_conn.as_mut() { trace!("recv_detail tick"); match client_receiver.recv().await { Ok(msg) => { @@ -1005,7 +1005,7 @@ impl Actor { /// requires a connection, it will call `connect`. async fn close_for_reconnect(&mut self) { debug!("close for reconnect"); - if let Some((client, _)) = self.relay_conn.take() { + if let Some((conn, _)) = self.relay_conn.take() { client.close().await } } From 60ebe401226392eece13599697dba927dba060d4 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 3 Oct 2024 18:26:12 +0200 Subject: [PATCH 4/9] fixup some more names --- iroh-net/src/relay/client.rs | 2 +- iroh-net/src/relay/client/conn.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/iroh-net/src/relay/client.rs b/iroh-net/src/relay/client.rs index 09cfec2506..5b6ac44e08 100644 --- a/iroh-net/src/relay/client.rs +++ b/iroh-net/src/relay/client.rs @@ -1006,7 +1006,7 @@ impl Actor { async fn close_for_reconnect(&mut self) { debug!("close for reconnect"); if let Some((conn, _)) = self.relay_conn.take() { - client.close().await + conn.close().await } } } diff --git a/iroh-net/src/relay/client/conn.rs b/iroh-net/src/relay/client/conn.rs index 2df0c219cd..a8dd2f38cc 100644 --- a/iroh-net/src/relay/client/conn.rs +++ b/iroh-net/src/relay/client/conn.rs @@ -376,7 +376,7 @@ impl ConnBuilder { recv_msgs: writer_recv, } .run() - .instrument(info_span!("client.writer")), + .instrument(info_span!("conn.writer")), ); let (reader_sender, reader_recv) = mpsc::channel(PER_CLIENT_READ_QUEUE_DEPTH); @@ -412,6 +412,7 @@ impl ConnBuilder { } } } + .instrument(info_span!("conn.reader")) }); let conn = Conn { From b38cf2ff1e6812ccf8f10ee575b72db83574904f Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 3 Oct 2024 18:38:57 +0200 Subject: [PATCH 5/9] don't rename anything --- iroh-net/src/relay/client.rs | 17 +++++++---------- iroh-net/src/relay/client/conn.rs | 5 +++++ 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/iroh-net/src/relay/client.rs b/iroh-net/src/relay/client.rs index 5b6ac44e08..c2fcb39c8d 100644 --- a/iroh-net/src/relay/client.rs +++ b/iroh-net/src/relay/client.rs @@ -27,10 +27,7 @@ use tokio_util::task::AbortOnDropHandle; use tracing::{debug, error, event, info_span, trace, warn, Instrument, Level}; use url::Url; -use conn::{ - Conn as RelayConn, ConnBuilder as RelayConnBuilder, ConnReader, - ConnReceiver as RelayConnReceiver, ConnWriter, ReceivedMessage, -}; +use conn::{Conn, ConnBuilder, ConnReader, ConnReceiver, ConnWriter, ReceivedMessage}; use streams::{downcast_upgrade, MaybeTlsStream, ProxyStream}; use crate::defaults::timeouts::relay::*; @@ -140,7 +137,7 @@ pub struct Client { #[derive(Debug)] enum ActorMessage { - Connect(oneshot::Sender>), + Connect(oneshot::Sender>), NotePreferred(bool), LocalAddr(oneshot::Sender, ClientError>>), Ping(oneshot::Sender>), @@ -162,7 +159,7 @@ struct Actor { secret_key: SecretKey, can_ack_pings: bool, is_preferred: bool, - relay_conn: Option<(RelayConn, RelayConnReceiver)>, + relay_conn: Option<(Conn, ConnReceiver)>, is_closed: bool, #[debug("address family selector callback")] address_family_selector: Option BoxFuture + Send + Sync + 'static>>, @@ -402,7 +399,7 @@ impl Client { /// /// If there is already an active relay connection, returns the already /// connected [`crate::relay::RelayConn`]. - pub async fn connect(&self) -> Result { + pub async fn connect(&self) -> Result { self.send_actor(ActorMessage::Connect).await } @@ -557,7 +554,7 @@ impl Actor { async fn connect( &mut self, why: &'static str, - ) -> Result<(RelayConn, &'_ mut RelayConnReceiver), ClientError> { + ) -> Result<(Conn, &'_ mut ConnReceiver), ClientError> { debug!( "connect: {}, current client {}", why, @@ -591,7 +588,7 @@ impl Actor { .await } - async fn connect_0(&self) -> Result<(RelayConn, RelayConnReceiver), ClientError> { + async fn connect_0(&self) -> Result<(Conn, ConnReceiver), ClientError> { let (reader, writer, local_addr) = match self.protocol { Protocol::Websocket => { let (reader, writer) = self.connect_ws().await?; @@ -605,7 +602,7 @@ impl Actor { }; let (relay_client, receiver) = - RelayConnBuilder::new(self.secret_key.clone(), local_addr, reader, writer) + ConnBuilder::new(self.secret_key.clone(), local_addr, reader, writer) .build() .await .map_err(|e| ClientError::Build(e.to_string()))?; diff --git a/iroh-net/src/relay/client/conn.rs b/iroh-net/src/relay/client/conn.rs index a8dd2f38cc..e0667acb3d 100644 --- a/iroh-net/src/relay/client/conn.rs +++ b/iroh-net/src/relay/client/conn.rs @@ -46,6 +46,11 @@ pub struct Conn { inner: Arc, } +/// The channel on which a relay connection sends received messages. +/// +/// The [`Conn`] to a relay is easily clonable but can only send DISCO messages to a relay +/// server. This is the counterpart which receives DISCO messages from the relay server for +/// a connection. It is not clonable. #[derive(Debug)] pub struct ConnReceiver { /// The reader channel, receiving incoming messages. From a22c2d3f9498e10a15292544b96c6f8a43827a35 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 3 Oct 2024 18:42:57 +0200 Subject: [PATCH 6/9] docs style --- iroh-net/src/relay/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-net/src/relay/client.rs b/iroh-net/src/relay/client.rs index c2fcb39c8d..8da4a23154 100644 --- a/iroh-net/src/relay/client.rs +++ b/iroh-net/src/relay/client.rs @@ -393,7 +393,7 @@ impl Client { } } - /// Connect to a relay Server and returns the underlying relay connection. + /// Connects to a relay Server and returns the underlying relay connection. /// /// Returns [`ClientError::Closed`] if the [`Client`] is closed. /// From b344622ca04448fe9079c19e05b3f98111e3f226 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 3 Oct 2024 19:21:43 +0200 Subject: [PATCH 7/9] Rename more variables to be consistent --- iroh-net/src/relay/client.rs | 47 ++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/iroh-net/src/relay/client.rs b/iroh-net/src/relay/client.rs index 8da4a23154..1dac564786 100644 --- a/iroh-net/src/relay/client.rs +++ b/iroh-net/src/relay/client.rs @@ -32,7 +32,7 @@ use streams::{downcast_upgrade, MaybeTlsStream, ProxyStream}; use crate::defaults::timeouts::relay::*; use crate::dns::{DnsResolver, ResolverExt}; -use crate::key::{PublicKey, SecretKey}; +use crate::key::{NodeId, PublicKey, SecretKey}; use crate::relay::codec::DerpCodec; use crate::relay::http::{Protocol, RELAY_PATH}; use crate::relay::RelayUrl; @@ -567,12 +567,11 @@ impl Actor { async move { if self.relay_conn.is_none() { trace!("no connection, trying to connect"); - let (relay_client, receiver) = - tokio::time::timeout(CONNECT_TIMEOUT, self.connect_0()) - .await - .map_err(|_| ClientError::ConnectTimeout)??; + let (conn, receiver) = tokio::time::timeout(CONNECT_TIMEOUT, self.connect_0()) + .await + .map_err(|_| ClientError::ConnectTimeout)??; - self.relay_conn = Some((relay_client.clone(), receiver)); + self.relay_conn = Some((conn, receiver)); } else { trace!("already had connection"); } @@ -601,14 +600,14 @@ impl Actor { } }; - let (relay_client, receiver) = + let (conn, receiver) = ConnBuilder::new(self.secret_key.clone(), local_addr, reader, writer) .build() .await .map_err(|e| ClientError::Build(e.to_string()))?; - if self.is_preferred && relay_client.note_preferred(true).await.is_err() { - relay_client.close().await; + if self.is_preferred && conn.note_preferred(true).await.is_err() { + conn.close().await; return Err(ClientError::Send); } @@ -620,7 +619,7 @@ impl Actor { ); trace!("connect_0 done"); - Ok((relay_client, receiver)) + Ok((conn, receiver)) } async fn connect_ws(&self) -> Result<(ConnReader, ConnWriter), ClientError> { @@ -732,8 +731,8 @@ impl Actor { // only send the preference if we already have a connection let res = { - if let Some((ref client, _)) = self.relay_conn { - client.note_preferred(is_preferred).await + if let Some((ref conn, _)) = self.relay_conn { + conn.note_preferred(is_preferred).await } else { return; } @@ -749,8 +748,8 @@ impl Actor { if self.is_closed { return None; } - if let Some((ref client, _)) = self.relay_conn { - client.local_addr() + if let Some((ref conn, _)) = self.relay_conn { + conn.local_addr() } else { None } @@ -763,9 +762,9 @@ impl Actor { self.ping_tasks.spawn(async move { let res = match connect_res { - Ok(client) => { + Ok(conn) => { let start = Instant::now(); - if let Err(err) = client.send_ping(ping).await { + if let Err(err) = conn.send_ping(ping).await { warn!("failed to send ping: {:?}", err); Err(ClientError::Send) } else { @@ -782,10 +781,10 @@ impl Actor { }); } - async fn send(&mut self, dst_key: PublicKey, b: Bytes) -> Result<(), ClientError> { - trace!(dst = %dst_key.fmt_short(), len = b.len(), "send"); - let (client, _) = self.connect("send").await?; - if client.send(dst_key, b).await.is_err() { + async fn send(&mut self, remote_node: NodeId, payload: Bytes) -> Result<(), ClientError> { + trace!(remote_node = %remote_node.fmt_short(), len = payload.len(), "send"); + let (conn, _) = self.connect("send").await?; + if conn.send(remote_node, payload).await.is_err() { self.close_for_reconnect().await; return Err(ClientError::Send); } @@ -795,8 +794,8 @@ impl Actor { async fn send_pong(&mut self, data: [u8; 8]) -> Result<(), ClientError> { debug!("send_pong"); if self.can_ack_pings { - let (client, _) = self.connect("send_pong").await?; - if client.send_pong(data).await.is_err() { + let (conn, _) = self.connect("send_pong").await?; + if conn.send_pong(data).await.is_err() { self.close_for_reconnect().await; return Err(ClientError::Send); } @@ -979,9 +978,9 @@ impl Actor { } async fn recv_detail(&mut self) -> Result { - if let Some((_conn, client_receiver)) = self.relay_conn.as_mut() { + if let Some((_conn, conn_receiver)) = self.relay_conn.as_mut() { trace!("recv_detail tick"); - match client_receiver.recv().await { + match conn_receiver.recv().await { Ok(msg) => { return Ok(msg); } From 4c6e223843e4897b1944cceda703772b8ea6be1f Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 3 Oct 2024 13:55:28 +0200 Subject: [PATCH 8/9] refactor(iroh-net): Optimise present nodes in ActiveRelay The ActiveRelay actor keeps track of which remote nodes are present on the relay connection so that we can optimise relay connections to remote nodes. This does two main optimisations: - There were two sets of these nodes kept, they could easily be unified. - The set is best stored in a BTreeSet since they are simple NodeIds stored in them. - Bonus: rename peer to node to match our naming convention. - Bonus: identify nodes by NodeId since this is a routing key here. --- iroh-net/src/magicsock.rs | 4 +- iroh-net/src/magicsock/relay_actor.rs | 76 +++++++++++++-------------- 2 files changed, 38 insertions(+), 42 deletions(-) diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index c2a285556b..db42f29e5e 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -606,7 +606,7 @@ impl MagicSock { let msg = RelayActorMessage::Send { url: url.clone(), contents, - peer: node, + remote_node: node, }; match self.relay_actor_sender.try_send(msg) { Ok(_) => { @@ -1193,7 +1193,7 @@ impl MagicSock { let msg = RelayActorMessage::Send { url: url.clone(), contents, - peer: node, + remote_node: node, }; match self.relay_actor_sender.try_send(msg) { Ok(_) => { diff --git a/iroh-net/src/magicsock/relay_actor.rs b/iroh-net/src/magicsock/relay_actor.rs index f7b1dd879b..8b54686134 100644 --- a/iroh-net/src/magicsock/relay_actor.rs +++ b/iroh-net/src/magicsock/relay_actor.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, HashSet}, + collections::{BTreeMap, BTreeSet}, future::Future, net::{IpAddr, SocketAddr}, sync::{atomic::Ordering, Arc}, @@ -19,7 +19,7 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, info, info_span, trace, warn, Instrument}; use crate::{ - key::{PublicKey, PUBLIC_KEY_LENGTH}, + key::{NodeId, PUBLIC_KEY_LENGTH}, relay::{self, client::conn::ReceivedMessage, client::ClientError, RelayUrl, MAX_PACKET_SIZE}, }; @@ -36,7 +36,7 @@ pub(super) enum RelayActorMessage { Send { url: RelayUrl, contents: RelayContents, - peer: PublicKey, + remote_node: NodeId, }, MaybeCloseRelaysOnRebind(Vec), SetHome { @@ -51,21 +51,17 @@ struct ActiveRelay { /// channel (currently even if there was no write). last_write: Instant, msg_sender: mpsc::Sender, - /// Contains optional alternate routes to use as an optimization instead of - /// contacting a peer via their home relay connection. If they sent us a message - /// on this relay connection (which should really only be on our relay - /// home connection, or what was once our home), then we remember that route here to optimistically - /// use instead of creating a new relay connection back to their home. - relay_routes: Vec, url: RelayUrl, relay_client: relay::client::Client, relay_client_receiver: relay::client::ClientReceiver, - /// The set of senders we know are present on this connection, based on - /// messages we've received from the server. - peer_present: HashSet, + /// The set of remote nodes we know are present on this relay server. + /// + /// If we receive messages from a remote node via this server it is added to this set, + /// if the server notifies us this node is gone it is removed from this set. + node_present: BTreeSet, backoff: backoff::exponential::ExponentialBackoff, last_packet_time: Option, - last_packet_src: Option, + last_packet_src: Option, } #[derive(Debug)] @@ -74,7 +70,7 @@ enum ActiveRelayMessage { GetLastWrite(oneshot::Sender), Ping(oneshot::Sender>), GetLocalAddr(oneshot::Sender>), - GetPeerRoute(PublicKey, oneshot::Sender>), + GetNodeRoute(NodeId, oneshot::Sender>), GetClient(oneshot::Sender), NotePreferred(bool), Shutdown, @@ -90,9 +86,8 @@ impl ActiveRelay { ActiveRelay { last_write: Instant::now(), msg_sender, - relay_routes: Default::default(), url, - peer_present: HashSet::new(), + node_present: BTreeSet::new(), backoff: backoff::exponential::ExponentialBackoffBuilder::new() .with_initial_interval(Duration::from_millis(10)) .with_max_interval(Duration::from_secs(5)) @@ -132,13 +127,13 @@ impl ActiveRelay { ActiveRelayMessage::NotePreferred(is_preferred) => { self.relay_client.note_preferred(is_preferred).await; } - ActiveRelayMessage::GetPeerRoute(peer, r) => { - let res = if self.relay_routes.contains(&peer) { + ActiveRelayMessage::GetNodeRoute(peer, r) => { + let client = if self.node_present.contains(&peer) { Some(self.relay_client.clone()) } else { None }; - r.send(res).ok(); + r.send(client).ok(); } ActiveRelayMessage::Shutdown => { self.relay_client.close().await.ok(); @@ -171,8 +166,7 @@ impl ActiveRelay { warn!("recv error {:?}", err); // Forget that all these peers have routes. - let peers: Vec<_> = self.peer_present.drain().collect(); - self.relay_routes.retain(|peer| !peers.contains(peer)); + self.node_present.clear(); if matches!( err, @@ -223,10 +217,7 @@ impl ActiveRelay { { // avoid map lookup w/ high throughput single peer self.last_packet_src = Some(source); - if !self.peer_present.contains(&source) { - self.peer_present.insert(source); - self.relay_routes.push(source); - } + self.node_present.insert(source); } let res = RelayReadResult { @@ -253,7 +244,7 @@ impl ActiveRelay { } relay::client::conn::ReceivedMessage::Health { .. } => ReadResult::Continue, relay::client::conn::ReceivedMessage::PeerGone(key) => { - self.relay_routes.retain(|peer| peer != &key); + self.node_present.remove(&key); ReadResult::Continue } other => { @@ -337,9 +328,9 @@ impl RelayActor { RelayActorMessage::Send { url, contents, - peer, + remote_node, } => { - self.send_relay(&url, contents, peer).await; + self.send_relay(&url, contents, remote_node).await; } RelayActorMessage::SetHome { url } => { self.note_preferred(&url).await; @@ -361,12 +352,17 @@ impl RelayActor { .await; } - async fn send_relay(&mut self, url: &RelayUrl, contents: RelayContents, peer: PublicKey) { - trace!(%url, peer = %peer.fmt_short(),len = contents.iter().map(|c| c.len()).sum::(), "sending over relay"); + async fn send_relay(&mut self, url: &RelayUrl, contents: RelayContents, remote_node: NodeId) { + trace!( + %url, + remote_node = %remote_node.fmt_short(), + len = contents.iter().map(|c| c.len()).sum::(), + "sending over relay", + ); // Relay Send - let relay_client = self.connect_relay(url, Some(&peer)).await; + let relay_client = self.connect_relay(url, Some(&remote_node)).await; for content in &contents { - trace!(%url, ?peer, "sending {}B", content.len()); + trace!(%url, ?remote_node, "sending {}B", content.len()); } let total_bytes = contents.iter().map(|c| c.len() as u64).sum::(); @@ -377,7 +373,7 @@ impl RelayActor { // But we have no guarantee that the total size of the contents including // length prefix will be smaller than the payload size. for packet in PacketizeIter::<_, PAYLAOD_SIZE>::new(contents) { - match relay_client.send(peer, packet).await { + match relay_client.send(remote_node, packet).await { Ok(_) => { inc_by!(MagicsockMetrics, send_relay, total_bytes); } @@ -414,9 +410,9 @@ impl RelayActor { async fn connect_relay( &mut self, url: &RelayUrl, - peer: Option<&PublicKey>, + remote_node: Option<&NodeId>, ) -> relay::client::Client { - debug!("connect relay {} for peer {:?}", url, peer); + debug!("connect relay {} for peer {:?}", url, remote_node); // See if we have a connection open to that relay node ID first. If so, might as // well use it. (It's a little arbitrary whether we use this one vs. the reverse route // below when we have both.) @@ -439,7 +435,7 @@ impl RelayActor { // perhaps peer's home is Frankfurt, but they dialed our home relay // node in SF to reach us, so we can reply to them using our // SF connection rather than dialing Frankfurt. - if let Some(peer) = peer { + if let Some(node) = remote_node { for url in self .active_relay .keys() @@ -449,7 +445,7 @@ impl RelayActor { { let (os, or) = oneshot::channel(); if self - .send_to_active(&url, ActiveRelayMessage::GetPeerRoute(*peer, os)) + .send_to_active(&url, ActiveRelayMessage::GetNodeRoute(*node, os)) .await { if let Ok(Some(client)) = or.await { @@ -459,8 +455,8 @@ impl RelayActor { } } - let why = if let Some(peer) = peer { - format!("{peer:?}") + let why = if let Some(node) = remote_node { + format!("{node:?}") } else { "home-keep-alive".to_string() }; @@ -678,7 +674,7 @@ impl RelayActor { #[derive(derive_more::Debug)] pub(super) struct RelayReadResult { pub(super) url: RelayUrl, - pub(super) src: PublicKey, + pub(super) src: NodeId, /// packet data #[debug(skip)] pub(super) buf: Bytes, From 275b39966dc194c565899a9939ec17739f5b417d Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 3 Oct 2024 17:50:40 +0200 Subject: [PATCH 9/9] Apply suggestions from code review Co-authored-by: Divma <26765164+divagant-martian@users.noreply.github.com> --- iroh-net/src/magicsock/relay_actor.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/iroh-net/src/magicsock/relay_actor.rs b/iroh-net/src/magicsock/relay_actor.rs index 8b54686134..1385bd8830 100644 --- a/iroh-net/src/magicsock/relay_actor.rs +++ b/iroh-net/src/magicsock/relay_actor.rs @@ -56,8 +56,8 @@ struct ActiveRelay { relay_client_receiver: relay::client::ClientReceiver, /// The set of remote nodes we know are present on this relay server. /// - /// If we receive messages from a remote node via this server it is added to this set, - /// if the server notifies us this node is gone it is removed from this set. + /// If we receive messages from a remote node via, this server it is added to this set. + /// If the server notifies us this node is gone, it is removed from this set. node_present: BTreeSet, backoff: backoff::exponential::ExponentialBackoff, last_packet_time: Option, @@ -412,7 +412,7 @@ impl RelayActor { url: &RelayUrl, remote_node: Option<&NodeId>, ) -> relay::client::Client { - debug!("connect relay {} for peer {:?}", url, remote_node); + debug!(%url, ?remote_node, "connect relay"); // See if we have a connection open to that relay node ID first. If so, might as // well use it. (It's a little arbitrary whether we use this one vs. the reverse route // below when we have both.)