Skip to content

Commit

Permalink
refactor(iroh-net): Optimise present nodes in ActiveRelay
Browse files Browse the repository at this point in the history
The ActiveRelay actor keeps track of which remote nodes are present
on the relay connection so that we can optimise relay connections to
remote nodes.  This does two main optimisations:

- There were two sets of these nodes kept, they could easily be
  unified.

- The set is best stored in a BTreeSet since they are simple
  NodeIds stored in them.

- Bonus: rename peer to node to match our naming convention.

- Bonus: identify nodes by NodeId since this is a routing key here.
  • Loading branch information
flub committed Oct 3, 2024
1 parent b1ea4b2 commit 686d609
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 42 deletions.
4 changes: 2 additions & 2 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ impl MagicSock {
let msg = RelayActorMessage::Send {
url: url.clone(),
contents,
peer: node,
remote_node: node,
};
match self.relay_actor_sender.try_send(msg) {
Ok(_) => {
Expand Down Expand Up @@ -1193,7 +1193,7 @@ impl MagicSock {
let msg = RelayActorMessage::Send {
url: url.clone(),
contents,
peer: node,
remote_node: node,
};
match self.relay_actor_sender.try_send(msg) {
Ok(_) => {
Expand Down
76 changes: 36 additions & 40 deletions iroh-net/src/magicsock/relay_actor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::{BTreeMap, HashSet},
collections::{BTreeMap, BTreeSet},
future::Future,
net::{IpAddr, SocketAddr},
sync::{atomic::Ordering, Arc},
Expand All @@ -19,7 +19,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, info, info_span, trace, warn, Instrument};

use crate::{
key::{PublicKey, PUBLIC_KEY_LENGTH},
key::{NodeId, PUBLIC_KEY_LENGTH},
relay::{self, client::conn::ReceivedMessage, client::ClientError, RelayUrl, MAX_PACKET_SIZE},
};

Expand All @@ -36,7 +36,7 @@ pub(super) enum RelayActorMessage {
Send {
url: RelayUrl,
contents: RelayContents,
peer: PublicKey,
remote_node: NodeId,
},
MaybeCloseRelaysOnRebind(Vec<IpAddr>),
SetHome {
Expand All @@ -51,21 +51,17 @@ struct ActiveRelay {
/// channel (currently even if there was no write).
last_write: Instant,
msg_sender: mpsc::Sender<ActorMessage>,
/// Contains optional alternate routes to use as an optimization instead of
/// contacting a peer via their home relay connection. If they sent us a message
/// on this relay connection (which should really only be on our relay
/// home connection, or what was once our home), then we remember that route here to optimistically
/// use instead of creating a new relay connection back to their home.
relay_routes: Vec<PublicKey>,
url: RelayUrl,
relay_client: relay::client::Client,
relay_client_receiver: relay::client::ClientReceiver,
/// The set of senders we know are present on this connection, based on
/// messages we've received from the server.
peer_present: HashSet<PublicKey>,
/// The set of remote nodes we know are present on this relay server.
///
/// If we receive messages from a remote node via this server it is added to this set,
/// if the server notifies us this node is gone it is removed from this set.
node_present: BTreeSet<NodeId>,
backoff: backoff::exponential::ExponentialBackoff<backoff::SystemClock>,
last_packet_time: Option<Instant>,
last_packet_src: Option<PublicKey>,
last_packet_src: Option<NodeId>,
}

#[derive(Debug)]
Expand All @@ -74,7 +70,7 @@ enum ActiveRelayMessage {
GetLastWrite(oneshot::Sender<Instant>),
Ping(oneshot::Sender<Result<Duration, ClientError>>),
GetLocalAddr(oneshot::Sender<Option<SocketAddr>>),
GetPeerRoute(PublicKey, oneshot::Sender<Option<relay::client::Client>>),
GetNodeRoute(NodeId, oneshot::Sender<Option<relay::client::Client>>),
GetClient(oneshot::Sender<relay::client::Client>),
NotePreferred(bool),
Shutdown,
Expand All @@ -90,9 +86,8 @@ impl ActiveRelay {
ActiveRelay {
last_write: Instant::now(),
msg_sender,
relay_routes: Default::default(),
url,
peer_present: HashSet::new(),
node_present: BTreeSet::new(),
backoff: backoff::exponential::ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_millis(10))
.with_max_interval(Duration::from_secs(5))
Expand Down Expand Up @@ -132,13 +127,13 @@ impl ActiveRelay {
ActiveRelayMessage::NotePreferred(is_preferred) => {
self.relay_client.note_preferred(is_preferred).await;
}
ActiveRelayMessage::GetPeerRoute(peer, r) => {
let res = if self.relay_routes.contains(&peer) {
ActiveRelayMessage::GetNodeRoute(peer, r) => {
let client = if self.node_present.contains(&peer) {
Some(self.relay_client.clone())
} else {
None
};
r.send(res).ok();
r.send(client).ok();
}
ActiveRelayMessage::Shutdown => {
self.relay_client.close().await.ok();
Expand Down Expand Up @@ -171,8 +166,7 @@ impl ActiveRelay {
warn!("recv error {:?}", err);

// Forget that all these peers have routes.
let peers: Vec<_> = self.peer_present.drain().collect();
self.relay_routes.retain(|peer| !peers.contains(peer));
self.node_present.clear();

if matches!(
err,
Expand Down Expand Up @@ -223,10 +217,7 @@ impl ActiveRelay {
{
// avoid map lookup w/ high throughput single peer
self.last_packet_src = Some(source);
if !self.peer_present.contains(&source) {
self.peer_present.insert(source);
self.relay_routes.push(source);
}
self.node_present.insert(source);
}

let res = RelayReadResult {
Expand All @@ -253,7 +244,7 @@ impl ActiveRelay {
}
relay::client::conn::ReceivedMessage::Health { .. } => ReadResult::Continue,
relay::client::conn::ReceivedMessage::PeerGone(key) => {
self.relay_routes.retain(|peer| peer != &key);
self.node_present.remove(&key);
ReadResult::Continue
}
other => {
Expand Down Expand Up @@ -337,9 +328,9 @@ impl RelayActor {
RelayActorMessage::Send {
url,
contents,
peer,
remote_node,
} => {
self.send_relay(&url, contents, peer).await;
self.send_relay(&url, contents, remote_node).await;
}
RelayActorMessage::SetHome { url } => {
self.note_preferred(&url).await;
Expand All @@ -361,12 +352,17 @@ impl RelayActor {
.await;
}

async fn send_relay(&mut self, url: &RelayUrl, contents: RelayContents, peer: PublicKey) {
trace!(%url, peer = %peer.fmt_short(),len = contents.iter().map(|c| c.len()).sum::<usize>(), "sending over relay");
async fn send_relay(&mut self, url: &RelayUrl, contents: RelayContents, remote_node: NodeId) {
trace!(
%url,
remote_node = %remote_node.fmt_short(),
len = contents.iter().map(|c| c.len()).sum::<usize>(),
"sending over relay",
);
// Relay Send
let relay_client = self.connect_relay(url, Some(&peer)).await;
let relay_client = self.connect_relay(url, Some(&remote_node)).await;
for content in &contents {
trace!(%url, ?peer, "sending {}B", content.len());
trace!(%url, ?remote_node, "sending {}B", content.len());
}
let total_bytes = contents.iter().map(|c| c.len() as u64).sum::<u64>();

Expand All @@ -377,7 +373,7 @@ impl RelayActor {
// But we have no guarantee that the total size of the contents including
// length prefix will be smaller than the payload size.
for packet in PacketizeIter::<_, PAYLAOD_SIZE>::new(contents) {
match relay_client.send(peer, packet).await {
match relay_client.send(remote_node, packet).await {
Ok(_) => {
inc_by!(MagicsockMetrics, send_relay, total_bytes);
}
Expand Down Expand Up @@ -414,9 +410,9 @@ impl RelayActor {
async fn connect_relay(
&mut self,
url: &RelayUrl,
peer: Option<&PublicKey>,
remote_node: Option<&NodeId>,
) -> relay::client::Client {
debug!("connect relay {} for peer {:?}", url, peer);
debug!("connect relay {} for peer {:?}", url, remote_node);
// See if we have a connection open to that relay node ID first. If so, might as
// well use it. (It's a little arbitrary whether we use this one vs. the reverse route
// below when we have both.)
Expand All @@ -439,7 +435,7 @@ impl RelayActor {
// perhaps peer's home is Frankfurt, but they dialed our home relay
// node in SF to reach us, so we can reply to them using our
// SF connection rather than dialing Frankfurt.
if let Some(peer) = peer {
if let Some(node) = remote_node {
for url in self
.active_relay
.keys()
Expand All @@ -449,7 +445,7 @@ impl RelayActor {
{
let (os, or) = oneshot::channel();
if self
.send_to_active(&url, ActiveRelayMessage::GetPeerRoute(*peer, os))
.send_to_active(&url, ActiveRelayMessage::GetNodeRoute(*node, os))
.await
{
if let Ok(Some(client)) = or.await {
Expand All @@ -459,8 +455,8 @@ impl RelayActor {
}
}

let why = if let Some(peer) = peer {
format!("{peer:?}")
let why = if let Some(node) = remote_node {
format!("{node:?}")
} else {
"home-keep-alive".to_string()
};
Expand Down Expand Up @@ -678,7 +674,7 @@ impl RelayActor {
#[derive(derive_more::Debug)]
pub(super) struct RelayReadResult {
pub(super) url: RelayUrl,
pub(super) src: PublicKey,
pub(super) src: NodeId,
/// packet data
#[debug(skip)]
pub(super) buf: Bytes,
Expand Down

0 comments on commit 686d609

Please sign in to comment.