Skip to content

Commit

Permalink
Merge 686d609 into b1ea4b2
Browse files Browse the repository at this point in the history
  • Loading branch information
flub authored Oct 3, 2024
2 parents b1ea4b2 + 686d609 commit c0c2416
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 42 deletions.
4 changes: 2 additions & 2 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ impl MagicSock {
let msg = RelayActorMessage::Send {
url: url.clone(),
contents,
peer: node,
remote_node: node,
};
match self.relay_actor_sender.try_send(msg) {
Ok(_) => {
Expand Down Expand Up @@ -1193,7 +1193,7 @@ impl MagicSock {
let msg = RelayActorMessage::Send {
url: url.clone(),
contents,
peer: node,
remote_node: node,
};
match self.relay_actor_sender.try_send(msg) {
Ok(_) => {
Expand Down
76 changes: 36 additions & 40 deletions iroh-net/src/magicsock/relay_actor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::{BTreeMap, HashSet},
collections::{BTreeMap, BTreeSet},
future::Future,
net::{IpAddr, SocketAddr},
sync::{atomic::Ordering, Arc},
Expand All @@ -19,7 +19,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, info, info_span, trace, warn, Instrument};

use crate::{
key::{PublicKey, PUBLIC_KEY_LENGTH},
key::{NodeId, PUBLIC_KEY_LENGTH},
relay::{self, client::conn::ReceivedMessage, client::ClientError, RelayUrl, MAX_PACKET_SIZE},
};

Expand All @@ -36,7 +36,7 @@ pub(super) enum RelayActorMessage {
Send {
url: RelayUrl,
contents: RelayContents,
peer: PublicKey,
remote_node: NodeId,
},
MaybeCloseRelaysOnRebind(Vec<IpAddr>),
SetHome {
Expand All @@ -51,21 +51,17 @@ struct ActiveRelay {
/// channel (currently even if there was no write).
last_write: Instant,
msg_sender: mpsc::Sender<ActorMessage>,
/// Contains optional alternate routes to use as an optimization instead of
/// contacting a peer via their home relay connection. If they sent us a message
/// on this relay connection (which should really only be on our relay
/// home connection, or what was once our home), then we remember that route here to optimistically
/// use instead of creating a new relay connection back to their home.
relay_routes: Vec<PublicKey>,
url: RelayUrl,
relay_client: relay::client::Client,
relay_client_receiver: relay::client::ClientReceiver,
/// The set of senders we know are present on this connection, based on
/// messages we've received from the server.
peer_present: HashSet<PublicKey>,
/// The set of remote nodes we know are present on this relay server.
///
/// If we receive messages from a remote node via this server it is added to this set,
/// if the server notifies us this node is gone it is removed from this set.
node_present: BTreeSet<NodeId>,
backoff: backoff::exponential::ExponentialBackoff<backoff::SystemClock>,
last_packet_time: Option<Instant>,
last_packet_src: Option<PublicKey>,
last_packet_src: Option<NodeId>,
}

#[derive(Debug)]
Expand All @@ -74,7 +70,7 @@ enum ActiveRelayMessage {
GetLastWrite(oneshot::Sender<Instant>),
Ping(oneshot::Sender<Result<Duration, ClientError>>),
GetLocalAddr(oneshot::Sender<Option<SocketAddr>>),
GetPeerRoute(PublicKey, oneshot::Sender<Option<relay::client::Client>>),
GetNodeRoute(NodeId, oneshot::Sender<Option<relay::client::Client>>),
GetClient(oneshot::Sender<relay::client::Client>),
NotePreferred(bool),
Shutdown,
Expand All @@ -90,9 +86,8 @@ impl ActiveRelay {
ActiveRelay {
last_write: Instant::now(),
msg_sender,
relay_routes: Default::default(),
url,
peer_present: HashSet::new(),
node_present: BTreeSet::new(),
backoff: backoff::exponential::ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_millis(10))
.with_max_interval(Duration::from_secs(5))
Expand Down Expand Up @@ -132,13 +127,13 @@ impl ActiveRelay {
ActiveRelayMessage::NotePreferred(is_preferred) => {
self.relay_client.note_preferred(is_preferred).await;
}
ActiveRelayMessage::GetPeerRoute(peer, r) => {
let res = if self.relay_routes.contains(&peer) {
ActiveRelayMessage::GetNodeRoute(peer, r) => {
let client = if self.node_present.contains(&peer) {
Some(self.relay_client.clone())
} else {
None
};
r.send(res).ok();
r.send(client).ok();
}
ActiveRelayMessage::Shutdown => {
self.relay_client.close().await.ok();
Expand Down Expand Up @@ -171,8 +166,7 @@ impl ActiveRelay {
warn!("recv error {:?}", err);

// Forget that all these peers have routes.
let peers: Vec<_> = self.peer_present.drain().collect();
self.relay_routes.retain(|peer| !peers.contains(peer));
self.node_present.clear();

if matches!(
err,
Expand Down Expand Up @@ -223,10 +217,7 @@ impl ActiveRelay {
{
// avoid map lookup w/ high throughput single peer
self.last_packet_src = Some(source);
if !self.peer_present.contains(&source) {
self.peer_present.insert(source);
self.relay_routes.push(source);
}
self.node_present.insert(source);
}

let res = RelayReadResult {
Expand All @@ -253,7 +244,7 @@ impl ActiveRelay {
}
relay::client::conn::ReceivedMessage::Health { .. } => ReadResult::Continue,
relay::client::conn::ReceivedMessage::PeerGone(key) => {
self.relay_routes.retain(|peer| peer != &key);
self.node_present.remove(&key);
ReadResult::Continue
}
other => {
Expand Down Expand Up @@ -337,9 +328,9 @@ impl RelayActor {
RelayActorMessage::Send {
url,
contents,
peer,
remote_node,
} => {
self.send_relay(&url, contents, peer).await;
self.send_relay(&url, contents, remote_node).await;
}
RelayActorMessage::SetHome { url } => {
self.note_preferred(&url).await;
Expand All @@ -361,12 +352,17 @@ impl RelayActor {
.await;
}

async fn send_relay(&mut self, url: &RelayUrl, contents: RelayContents, peer: PublicKey) {
trace!(%url, peer = %peer.fmt_short(),len = contents.iter().map(|c| c.len()).sum::<usize>(), "sending over relay");
async fn send_relay(&mut self, url: &RelayUrl, contents: RelayContents, remote_node: NodeId) {
trace!(
%url,
remote_node = %remote_node.fmt_short(),
len = contents.iter().map(|c| c.len()).sum::<usize>(),
"sending over relay",
);
// Relay Send
let relay_client = self.connect_relay(url, Some(&peer)).await;
let relay_client = self.connect_relay(url, Some(&remote_node)).await;
for content in &contents {
trace!(%url, ?peer, "sending {}B", content.len());
trace!(%url, ?remote_node, "sending {}B", content.len());
}
let total_bytes = contents.iter().map(|c| c.len() as u64).sum::<u64>();

Expand All @@ -377,7 +373,7 @@ impl RelayActor {
// But we have no guarantee that the total size of the contents including
// length prefix will be smaller than the payload size.
for packet in PacketizeIter::<_, PAYLAOD_SIZE>::new(contents) {
match relay_client.send(peer, packet).await {
match relay_client.send(remote_node, packet).await {
Ok(_) => {
inc_by!(MagicsockMetrics, send_relay, total_bytes);
}
Expand Down Expand Up @@ -414,9 +410,9 @@ impl RelayActor {
async fn connect_relay(
&mut self,
url: &RelayUrl,
peer: Option<&PublicKey>,
remote_node: Option<&NodeId>,
) -> relay::client::Client {
debug!("connect relay {} for peer {:?}", url, peer);
debug!("connect relay {} for peer {:?}", url, remote_node);
// See if we have a connection open to that relay node ID first. If so, might as
// well use it. (It's a little arbitrary whether we use this one vs. the reverse route
// below when we have both.)
Expand All @@ -439,7 +435,7 @@ impl RelayActor {
// perhaps peer's home is Frankfurt, but they dialed our home relay
// node in SF to reach us, so we can reply to them using our
// SF connection rather than dialing Frankfurt.
if let Some(peer) = peer {
if let Some(node) = remote_node {
for url in self
.active_relay
.keys()
Expand All @@ -449,7 +445,7 @@ impl RelayActor {
{
let (os, or) = oneshot::channel();
if self
.send_to_active(&url, ActiveRelayMessage::GetPeerRoute(*peer, os))
.send_to_active(&url, ActiveRelayMessage::GetNodeRoute(*node, os))
.await
{
if let Ok(Some(client)) = or.await {
Expand All @@ -459,8 +455,8 @@ impl RelayActor {
}
}

let why = if let Some(peer) = peer {
format!("{peer:?}")
let why = if let Some(node) = remote_node {
format!("{node:?}")
} else {
"home-keep-alive".to_string()
};
Expand Down Expand Up @@ -678,7 +674,7 @@ impl RelayActor {
#[derive(derive_more::Debug)]
pub(super) struct RelayReadResult {
pub(super) url: RelayUrl,
pub(super) src: PublicKey,
pub(super) src: NodeId,
/// packet data
#[debug(skip)]
pub(super) buf: Bytes,
Expand Down

0 comments on commit c0c2416

Please sign in to comment.