Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(iroh-net): Optimise present nodes in ActiveRelay #2781

Open
wants to merge 2 commits into
base: flub/relay-client-cleanup-1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ impl MagicSock {
let msg = RelayActorMessage::Send {
url: url.clone(),
contents,
peer: node,
remote_node: node,
};
match self.relay_actor_sender.try_send(msg) {
Ok(_) => {
Expand Down Expand Up @@ -1193,7 +1193,7 @@ impl MagicSock {
let msg = RelayActorMessage::Send {
url: url.clone(),
contents,
peer: node,
remote_node: node,
};
match self.relay_actor_sender.try_send(msg) {
Ok(_) => {
Expand Down
76 changes: 36 additions & 40 deletions iroh-net/src/magicsock/relay_actor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::{BTreeMap, HashSet},
collections::{BTreeMap, BTreeSet},
divagant-martian marked this conversation as resolved.
Show resolved Hide resolved
future::Future,
net::{IpAddr, SocketAddr},
sync::{atomic::Ordering, Arc},
Expand All @@ -19,7 +19,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, info, info_span, trace, warn, Instrument};

use crate::{
key::{PublicKey, PUBLIC_KEY_LENGTH},
key::{NodeId, PUBLIC_KEY_LENGTH},
relay::{self, client::conn::ReceivedMessage, client::ClientError, RelayUrl, MAX_PACKET_SIZE},
};

Expand All @@ -36,7 +36,7 @@ pub(super) enum RelayActorMessage {
Send {
url: RelayUrl,
contents: RelayContents,
peer: PublicKey,
remote_node: NodeId,
divagant-martian marked this conversation as resolved.
Show resolved Hide resolved
},
MaybeCloseRelaysOnRebind(Vec<IpAddr>),
SetHome {
Expand All @@ -51,21 +51,17 @@ struct ActiveRelay {
/// channel (currently even if there was no write).
last_write: Instant,
msg_sender: mpsc::Sender<ActorMessage>,
/// Contains optional alternate routes to use as an optimization instead of
/// contacting a peer via their home relay connection. If they sent us a message
/// on this relay connection (which should really only be on our relay
/// home connection, or what was once our home), then we remember that route here to optimistically
/// use instead of creating a new relay connection back to their home.
relay_routes: Vec<PublicKey>,
url: RelayUrl,
relay_client: relay::client::Client,
relay_client_receiver: relay::client::ClientReceiver,
/// The set of senders we know are present on this connection, based on
/// messages we've received from the server.
peer_present: HashSet<PublicKey>,
/// The set of remote nodes we know are present on this relay server.
///
/// If we receive messages from a remote node via, this server it is added to this set.
/// If the server notifies us this node is gone, it is removed from this set.
node_present: BTreeSet<NodeId>,
backoff: backoff::exponential::ExponentialBackoff<backoff::SystemClock>,
last_packet_time: Option<Instant>,
last_packet_src: Option<PublicKey>,
last_packet_src: Option<NodeId>,
}

#[derive(Debug)]
Expand All @@ -74,7 +70,7 @@ enum ActiveRelayMessage {
GetLastWrite(oneshot::Sender<Instant>),
Ping(oneshot::Sender<Result<Duration, ClientError>>),
GetLocalAddr(oneshot::Sender<Option<SocketAddr>>),
GetPeerRoute(PublicKey, oneshot::Sender<Option<relay::client::Client>>),
GetNodeRoute(NodeId, oneshot::Sender<Option<relay::client::Client>>),
GetClient(oneshot::Sender<relay::client::Client>),
NotePreferred(bool),
Shutdown,
Expand All @@ -90,9 +86,8 @@ impl ActiveRelay {
ActiveRelay {
last_write: Instant::now(),
msg_sender,
relay_routes: Default::default(),
url,
peer_present: HashSet::new(),
node_present: BTreeSet::new(),
backoff: backoff::exponential::ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_millis(10))
.with_max_interval(Duration::from_secs(5))
Expand Down Expand Up @@ -132,13 +127,13 @@ impl ActiveRelay {
ActiveRelayMessage::NotePreferred(is_preferred) => {
self.relay_client.note_preferred(is_preferred).await;
}
ActiveRelayMessage::GetPeerRoute(peer, r) => {
let res = if self.relay_routes.contains(&peer) {
ActiveRelayMessage::GetNodeRoute(peer, r) => {
let client = if self.node_present.contains(&peer) {
Some(self.relay_client.clone())
} else {
None
};
r.send(res).ok();
r.send(client).ok();
}
ActiveRelayMessage::Shutdown => {
self.relay_client.close().await.ok();
Expand Down Expand Up @@ -171,8 +166,7 @@ impl ActiveRelay {
warn!("recv error {:?}", err);

// Forget that all these peers have routes.
let peers: Vec<_> = self.peer_present.drain().collect();
self.relay_routes.retain(|peer| !peers.contains(peer));
self.node_present.clear();

if matches!(
err,
Expand Down Expand Up @@ -223,10 +217,7 @@ impl ActiveRelay {
{
// avoid map lookup w/ high throughput single peer
self.last_packet_src = Some(source);
if !self.peer_present.contains(&source) {
self.peer_present.insert(source);
self.relay_routes.push(source);
}
self.node_present.insert(source);
}

let res = RelayReadResult {
Expand All @@ -253,7 +244,7 @@ impl ActiveRelay {
}
relay::client::conn::ReceivedMessage::Health { .. } => ReadResult::Continue,
relay::client::conn::ReceivedMessage::PeerGone(key) => {
self.relay_routes.retain(|peer| peer != &key);
self.node_present.remove(&key);
ReadResult::Continue
}
other => {
Expand Down Expand Up @@ -337,9 +328,9 @@ impl RelayActor {
RelayActorMessage::Send {
url,
contents,
peer,
remote_node,
} => {
self.send_relay(&url, contents, peer).await;
self.send_relay(&url, contents, remote_node).await;
}
RelayActorMessage::SetHome { url } => {
self.note_preferred(&url).await;
Expand All @@ -361,12 +352,17 @@ impl RelayActor {
.await;
}

async fn send_relay(&mut self, url: &RelayUrl, contents: RelayContents, peer: PublicKey) {
trace!(%url, peer = %peer.fmt_short(),len = contents.iter().map(|c| c.len()).sum::<usize>(), "sending over relay");
async fn send_relay(&mut self, url: &RelayUrl, contents: RelayContents, remote_node: NodeId) {
trace!(
%url,
remote_node = %remote_node.fmt_short(),
len = contents.iter().map(|c| c.len()).sum::<usize>(),
"sending over relay",
);
// Relay Send
let relay_client = self.connect_relay(url, Some(&peer)).await;
let relay_client = self.connect_relay(url, Some(&remote_node)).await;
for content in &contents {
trace!(%url, ?peer, "sending {}B", content.len());
trace!(%url, ?remote_node, "sending {}B", content.len());
}
let total_bytes = contents.iter().map(|c| c.len() as u64).sum::<u64>();

Expand All @@ -377,7 +373,7 @@ impl RelayActor {
// But we have no guarantee that the total size of the contents including
// length prefix will be smaller than the payload size.
for packet in PacketizeIter::<_, PAYLAOD_SIZE>::new(contents) {
match relay_client.send(peer, packet).await {
match relay_client.send(remote_node, packet).await {
Ok(_) => {
inc_by!(MagicsockMetrics, send_relay, total_bytes);
}
Expand Down Expand Up @@ -414,9 +410,9 @@ impl RelayActor {
async fn connect_relay(
&mut self,
url: &RelayUrl,
peer: Option<&PublicKey>,
remote_node: Option<&NodeId>,
) -> relay::client::Client {
debug!("connect relay {} for peer {:?}", url, peer);
debug!(%url, ?remote_node, "connect relay");
// See if we have a connection open to that relay node ID first. If so, might as
// well use it. (It's a little arbitrary whether we use this one vs. the reverse route
// below when we have both.)
Expand All @@ -439,7 +435,7 @@ impl RelayActor {
// perhaps peer's home is Frankfurt, but they dialed our home relay
// node in SF to reach us, so we can reply to them using our
// SF connection rather than dialing Frankfurt.
if let Some(peer) = peer {
if let Some(node) = remote_node {
for url in self
.active_relay
.keys()
Expand All @@ -449,7 +445,7 @@ impl RelayActor {
{
let (os, or) = oneshot::channel();
if self
.send_to_active(&url, ActiveRelayMessage::GetPeerRoute(*peer, os))
.send_to_active(&url, ActiveRelayMessage::GetNodeRoute(*node, os))
.await
{
if let Ok(Some(client)) = or.await {
Expand All @@ -459,8 +455,8 @@ impl RelayActor {
}
}

let why = if let Some(peer) = peer {
format!("{peer:?}")
let why = if let Some(node) = remote_node {
format!("{node:?}")
} else {
"home-keep-alive".to_string()
};
Expand Down Expand Up @@ -678,7 +674,7 @@ impl RelayActor {
#[derive(derive_more::Debug)]
pub(super) struct RelayReadResult {
pub(super) url: RelayUrl,
pub(super) src: PublicKey,
pub(super) src: NodeId,
/// packet data
#[debug(skip)]
pub(super) buf: Bytes,
Expand Down
Loading