diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index c2a285556bb..db42f29e5e2 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -606,7 +606,7 @@ impl MagicSock { let msg = RelayActorMessage::Send { url: url.clone(), contents, - peer: node, + remote_node: node, }; match self.relay_actor_sender.try_send(msg) { Ok(_) => { @@ -1193,7 +1193,7 @@ impl MagicSock { let msg = RelayActorMessage::Send { url: url.clone(), contents, - peer: node, + remote_node: node, }; match self.relay_actor_sender.try_send(msg) { Ok(_) => { diff --git a/iroh-net/src/magicsock/relay_actor.rs b/iroh-net/src/magicsock/relay_actor.rs index f7b1dd879b5..8b546861342 100644 --- a/iroh-net/src/magicsock/relay_actor.rs +++ b/iroh-net/src/magicsock/relay_actor.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, HashSet}, + collections::{BTreeMap, BTreeSet}, future::Future, net::{IpAddr, SocketAddr}, sync::{atomic::Ordering, Arc}, @@ -19,7 +19,7 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, info, info_span, trace, warn, Instrument}; use crate::{ - key::{PublicKey, PUBLIC_KEY_LENGTH}, + key::{NodeId, PUBLIC_KEY_LENGTH}, relay::{self, client::conn::ReceivedMessage, client::ClientError, RelayUrl, MAX_PACKET_SIZE}, }; @@ -36,7 +36,7 @@ pub(super) enum RelayActorMessage { Send { url: RelayUrl, contents: RelayContents, - peer: PublicKey, + remote_node: NodeId, }, MaybeCloseRelaysOnRebind(Vec), SetHome { @@ -51,21 +51,17 @@ struct ActiveRelay { /// channel (currently even if there was no write). last_write: Instant, msg_sender: mpsc::Sender, - /// Contains optional alternate routes to use as an optimization instead of - /// contacting a peer via their home relay connection. If they sent us a message - /// on this relay connection (which should really only be on our relay - /// home connection, or what was once our home), then we remember that route here to optimistically - /// use instead of creating a new relay connection back to their home. - relay_routes: Vec, url: RelayUrl, relay_client: relay::client::Client, relay_client_receiver: relay::client::ClientReceiver, - /// The set of senders we know are present on this connection, based on - /// messages we've received from the server. - peer_present: HashSet, + /// The set of remote nodes we know are present on this relay server. + /// + /// If we receive messages from a remote node via this server it is added to this set, + /// if the server notifies us this node is gone it is removed from this set. + node_present: BTreeSet, backoff: backoff::exponential::ExponentialBackoff, last_packet_time: Option, - last_packet_src: Option, + last_packet_src: Option, } #[derive(Debug)] @@ -74,7 +70,7 @@ enum ActiveRelayMessage { GetLastWrite(oneshot::Sender), Ping(oneshot::Sender>), GetLocalAddr(oneshot::Sender>), - GetPeerRoute(PublicKey, oneshot::Sender>), + GetNodeRoute(NodeId, oneshot::Sender>), GetClient(oneshot::Sender), NotePreferred(bool), Shutdown, @@ -90,9 +86,8 @@ impl ActiveRelay { ActiveRelay { last_write: Instant::now(), msg_sender, - relay_routes: Default::default(), url, - peer_present: HashSet::new(), + node_present: BTreeSet::new(), backoff: backoff::exponential::ExponentialBackoffBuilder::new() .with_initial_interval(Duration::from_millis(10)) .with_max_interval(Duration::from_secs(5)) @@ -132,13 +127,13 @@ impl ActiveRelay { ActiveRelayMessage::NotePreferred(is_preferred) => { self.relay_client.note_preferred(is_preferred).await; } - ActiveRelayMessage::GetPeerRoute(peer, r) => { - let res = if self.relay_routes.contains(&peer) { + ActiveRelayMessage::GetNodeRoute(peer, r) => { + let client = if self.node_present.contains(&peer) { Some(self.relay_client.clone()) } else { None }; - r.send(res).ok(); + r.send(client).ok(); } ActiveRelayMessage::Shutdown => { self.relay_client.close().await.ok(); @@ -171,8 +166,7 @@ impl ActiveRelay { warn!("recv error {:?}", err); // Forget that all these peers have routes. - let peers: Vec<_> = self.peer_present.drain().collect(); - self.relay_routes.retain(|peer| !peers.contains(peer)); + self.node_present.clear(); if matches!( err, @@ -223,10 +217,7 @@ impl ActiveRelay { { // avoid map lookup w/ high throughput single peer self.last_packet_src = Some(source); - if !self.peer_present.contains(&source) { - self.peer_present.insert(source); - self.relay_routes.push(source); - } + self.node_present.insert(source); } let res = RelayReadResult { @@ -253,7 +244,7 @@ impl ActiveRelay { } relay::client::conn::ReceivedMessage::Health { .. } => ReadResult::Continue, relay::client::conn::ReceivedMessage::PeerGone(key) => { - self.relay_routes.retain(|peer| peer != &key); + self.node_present.remove(&key); ReadResult::Continue } other => { @@ -337,9 +328,9 @@ impl RelayActor { RelayActorMessage::Send { url, contents, - peer, + remote_node, } => { - self.send_relay(&url, contents, peer).await; + self.send_relay(&url, contents, remote_node).await; } RelayActorMessage::SetHome { url } => { self.note_preferred(&url).await; @@ -361,12 +352,17 @@ impl RelayActor { .await; } - async fn send_relay(&mut self, url: &RelayUrl, contents: RelayContents, peer: PublicKey) { - trace!(%url, peer = %peer.fmt_short(),len = contents.iter().map(|c| c.len()).sum::(), "sending over relay"); + async fn send_relay(&mut self, url: &RelayUrl, contents: RelayContents, remote_node: NodeId) { + trace!( + %url, + remote_node = %remote_node.fmt_short(), + len = contents.iter().map(|c| c.len()).sum::(), + "sending over relay", + ); // Relay Send - let relay_client = self.connect_relay(url, Some(&peer)).await; + let relay_client = self.connect_relay(url, Some(&remote_node)).await; for content in &contents { - trace!(%url, ?peer, "sending {}B", content.len()); + trace!(%url, ?remote_node, "sending {}B", content.len()); } let total_bytes = contents.iter().map(|c| c.len() as u64).sum::(); @@ -377,7 +373,7 @@ impl RelayActor { // But we have no guarantee that the total size of the contents including // length prefix will be smaller than the payload size. for packet in PacketizeIter::<_, PAYLAOD_SIZE>::new(contents) { - match relay_client.send(peer, packet).await { + match relay_client.send(remote_node, packet).await { Ok(_) => { inc_by!(MagicsockMetrics, send_relay, total_bytes); } @@ -414,9 +410,9 @@ impl RelayActor { async fn connect_relay( &mut self, url: &RelayUrl, - peer: Option<&PublicKey>, + remote_node: Option<&NodeId>, ) -> relay::client::Client { - debug!("connect relay {} for peer {:?}", url, peer); + debug!("connect relay {} for peer {:?}", url, remote_node); // See if we have a connection open to that relay node ID first. If so, might as // well use it. (It's a little arbitrary whether we use this one vs. the reverse route // below when we have both.) @@ -439,7 +435,7 @@ impl RelayActor { // perhaps peer's home is Frankfurt, but they dialed our home relay // node in SF to reach us, so we can reply to them using our // SF connection rather than dialing Frankfurt. - if let Some(peer) = peer { + if let Some(node) = remote_node { for url in self .active_relay .keys() @@ -449,7 +445,7 @@ impl RelayActor { { let (os, or) = oneshot::channel(); if self - .send_to_active(&url, ActiveRelayMessage::GetPeerRoute(*peer, os)) + .send_to_active(&url, ActiveRelayMessage::GetNodeRoute(*node, os)) .await { if let Ok(Some(client)) = or.await { @@ -459,8 +455,8 @@ impl RelayActor { } } - let why = if let Some(peer) = peer { - format!("{peer:?}") + let why = if let Some(node) = remote_node { + format!("{node:?}") } else { "home-keep-alive".to_string() }; @@ -678,7 +674,7 @@ impl RelayActor { #[derive(derive_more::Debug)] pub(super) struct RelayReadResult { pub(super) url: RelayUrl, - pub(super) src: PublicKey, + pub(super) src: NodeId, /// packet data #[debug(skip)] pub(super) buf: Bytes,