From 686d609a9418c59a89397c6e4c8ea6cd8d50e17e Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 3 Oct 2024 13:55:28 +0200 Subject: [PATCH] refactor(iroh-net): Optimise present nodes in ActiveRelay The ActiveRelay actor keeps track of which remote nodes are present on the relay connection so that we can optimise relay connections to remote nodes. This does two main optimisations: - There were two sets of these nodes kept, they could easily be unified. - The set is best stored in a BTreeSet since they are simple NodeIds stored in them. - Bonus: rename peer to node to match our naming convention. - Bonus: identify nodes by NodeId since this is a routing key here. --- iroh-net/src/magicsock.rs | 4 +- iroh-net/src/magicsock/relay_actor.rs | 76 +++++++++++++-------------- 2 files changed, 38 insertions(+), 42 deletions(-) diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index c2a285556bb..db42f29e5e2 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -606,7 +606,7 @@ impl MagicSock { let msg = RelayActorMessage::Send { url: url.clone(), contents, - peer: node, + remote_node: node, }; match self.relay_actor_sender.try_send(msg) { Ok(_) => { @@ -1193,7 +1193,7 @@ impl MagicSock { let msg = RelayActorMessage::Send { url: url.clone(), contents, - peer: node, + remote_node: node, }; match self.relay_actor_sender.try_send(msg) { Ok(_) => { diff --git a/iroh-net/src/magicsock/relay_actor.rs b/iroh-net/src/magicsock/relay_actor.rs index f7b1dd879b5..8b546861342 100644 --- a/iroh-net/src/magicsock/relay_actor.rs +++ b/iroh-net/src/magicsock/relay_actor.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, HashSet}, + collections::{BTreeMap, BTreeSet}, future::Future, net::{IpAddr, SocketAddr}, sync::{atomic::Ordering, Arc}, @@ -19,7 +19,7 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, info, info_span, trace, warn, Instrument}; use crate::{ - key::{PublicKey, PUBLIC_KEY_LENGTH}, + key::{NodeId, PUBLIC_KEY_LENGTH}, relay::{self, client::conn::ReceivedMessage, client::ClientError, RelayUrl, MAX_PACKET_SIZE}, }; @@ -36,7 +36,7 @@ pub(super) enum RelayActorMessage { Send { url: RelayUrl, contents: RelayContents, - peer: PublicKey, + remote_node: NodeId, }, MaybeCloseRelaysOnRebind(Vec), SetHome { @@ -51,21 +51,17 @@ struct ActiveRelay { /// channel (currently even if there was no write). last_write: Instant, msg_sender: mpsc::Sender, - /// Contains optional alternate routes to use as an optimization instead of - /// contacting a peer via their home relay connection. If they sent us a message - /// on this relay connection (which should really only be on our relay - /// home connection, or what was once our home), then we remember that route here to optimistically - /// use instead of creating a new relay connection back to their home. - relay_routes: Vec, url: RelayUrl, relay_client: relay::client::Client, relay_client_receiver: relay::client::ClientReceiver, - /// The set of senders we know are present on this connection, based on - /// messages we've received from the server. - peer_present: HashSet, + /// The set of remote nodes we know are present on this relay server. + /// + /// If we receive messages from a remote node via this server it is added to this set, + /// if the server notifies us this node is gone it is removed from this set. + node_present: BTreeSet, backoff: backoff::exponential::ExponentialBackoff, last_packet_time: Option, - last_packet_src: Option, + last_packet_src: Option, } #[derive(Debug)] @@ -74,7 +70,7 @@ enum ActiveRelayMessage { GetLastWrite(oneshot::Sender), Ping(oneshot::Sender>), GetLocalAddr(oneshot::Sender>), - GetPeerRoute(PublicKey, oneshot::Sender>), + GetNodeRoute(NodeId, oneshot::Sender>), GetClient(oneshot::Sender), NotePreferred(bool), Shutdown, @@ -90,9 +86,8 @@ impl ActiveRelay { ActiveRelay { last_write: Instant::now(), msg_sender, - relay_routes: Default::default(), url, - peer_present: HashSet::new(), + node_present: BTreeSet::new(), backoff: backoff::exponential::ExponentialBackoffBuilder::new() .with_initial_interval(Duration::from_millis(10)) .with_max_interval(Duration::from_secs(5)) @@ -132,13 +127,13 @@ impl ActiveRelay { ActiveRelayMessage::NotePreferred(is_preferred) => { self.relay_client.note_preferred(is_preferred).await; } - ActiveRelayMessage::GetPeerRoute(peer, r) => { - let res = if self.relay_routes.contains(&peer) { + ActiveRelayMessage::GetNodeRoute(peer, r) => { + let client = if self.node_present.contains(&peer) { Some(self.relay_client.clone()) } else { None }; - r.send(res).ok(); + r.send(client).ok(); } ActiveRelayMessage::Shutdown => { self.relay_client.close().await.ok(); @@ -171,8 +166,7 @@ impl ActiveRelay { warn!("recv error {:?}", err); // Forget that all these peers have routes. - let peers: Vec<_> = self.peer_present.drain().collect(); - self.relay_routes.retain(|peer| !peers.contains(peer)); + self.node_present.clear(); if matches!( err, @@ -223,10 +217,7 @@ impl ActiveRelay { { // avoid map lookup w/ high throughput single peer self.last_packet_src = Some(source); - if !self.peer_present.contains(&source) { - self.peer_present.insert(source); - self.relay_routes.push(source); - } + self.node_present.insert(source); } let res = RelayReadResult { @@ -253,7 +244,7 @@ impl ActiveRelay { } relay::client::conn::ReceivedMessage::Health { .. } => ReadResult::Continue, relay::client::conn::ReceivedMessage::PeerGone(key) => { - self.relay_routes.retain(|peer| peer != &key); + self.node_present.remove(&key); ReadResult::Continue } other => { @@ -337,9 +328,9 @@ impl RelayActor { RelayActorMessage::Send { url, contents, - peer, + remote_node, } => { - self.send_relay(&url, contents, peer).await; + self.send_relay(&url, contents, remote_node).await; } RelayActorMessage::SetHome { url } => { self.note_preferred(&url).await; @@ -361,12 +352,17 @@ impl RelayActor { .await; } - async fn send_relay(&mut self, url: &RelayUrl, contents: RelayContents, peer: PublicKey) { - trace!(%url, peer = %peer.fmt_short(),len = contents.iter().map(|c| c.len()).sum::(), "sending over relay"); + async fn send_relay(&mut self, url: &RelayUrl, contents: RelayContents, remote_node: NodeId) { + trace!( + %url, + remote_node = %remote_node.fmt_short(), + len = contents.iter().map(|c| c.len()).sum::(), + "sending over relay", + ); // Relay Send - let relay_client = self.connect_relay(url, Some(&peer)).await; + let relay_client = self.connect_relay(url, Some(&remote_node)).await; for content in &contents { - trace!(%url, ?peer, "sending {}B", content.len()); + trace!(%url, ?remote_node, "sending {}B", content.len()); } let total_bytes = contents.iter().map(|c| c.len() as u64).sum::(); @@ -377,7 +373,7 @@ impl RelayActor { // But we have no guarantee that the total size of the contents including // length prefix will be smaller than the payload size. for packet in PacketizeIter::<_, PAYLAOD_SIZE>::new(contents) { - match relay_client.send(peer, packet).await { + match relay_client.send(remote_node, packet).await { Ok(_) => { inc_by!(MagicsockMetrics, send_relay, total_bytes); } @@ -414,9 +410,9 @@ impl RelayActor { async fn connect_relay( &mut self, url: &RelayUrl, - peer: Option<&PublicKey>, + remote_node: Option<&NodeId>, ) -> relay::client::Client { - debug!("connect relay {} for peer {:?}", url, peer); + debug!("connect relay {} for peer {:?}", url, remote_node); // See if we have a connection open to that relay node ID first. If so, might as // well use it. (It's a little arbitrary whether we use this one vs. the reverse route // below when we have both.) @@ -439,7 +435,7 @@ impl RelayActor { // perhaps peer's home is Frankfurt, but they dialed our home relay // node in SF to reach us, so we can reply to them using our // SF connection rather than dialing Frankfurt. - if let Some(peer) = peer { + if let Some(node) = remote_node { for url in self .active_relay .keys() @@ -449,7 +445,7 @@ impl RelayActor { { let (os, or) = oneshot::channel(); if self - .send_to_active(&url, ActiveRelayMessage::GetPeerRoute(*peer, os)) + .send_to_active(&url, ActiveRelayMessage::GetNodeRoute(*node, os)) .await { if let Ok(Some(client)) = or.await { @@ -459,8 +455,8 @@ impl RelayActor { } } - let why = if let Some(peer) = peer { - format!("{peer:?}") + let why = if let Some(node) = remote_node { + format!("{node:?}") } else { "home-keep-alive".to_string() }; @@ -678,7 +674,7 @@ impl RelayActor { #[derive(derive_more::Debug)] pub(super) struct RelayReadResult { pub(super) url: RelayUrl, - pub(super) src: PublicKey, + pub(super) src: NodeId, /// packet data #[debug(skip)] pub(super) buf: Bytes,