diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index c2a285556b..db42f29e5e 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -606,7 +606,7 @@ impl MagicSock { let msg = RelayActorMessage::Send { url: url.clone(), contents, - peer: node, + remote_node: node, }; match self.relay_actor_sender.try_send(msg) { Ok(_) => { @@ -1193,7 +1193,7 @@ impl MagicSock { let msg = RelayActorMessage::Send { url: url.clone(), contents, - peer: node, + remote_node: node, }; match self.relay_actor_sender.try_send(msg) { Ok(_) => { diff --git a/iroh-net/src/magicsock/relay_actor.rs b/iroh-net/src/magicsock/relay_actor.rs index 0a7b6c8fda..1385bd8830 100644 --- a/iroh-net/src/magicsock/relay_actor.rs +++ b/iroh-net/src/magicsock/relay_actor.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, HashSet}, + collections::{BTreeMap, BTreeSet}, future::Future, net::{IpAddr, SocketAddr}, sync::{atomic::Ordering, Arc}, @@ -19,7 +19,7 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, info, info_span, trace, warn, Instrument}; use crate::{ - key::{PublicKey, PUBLIC_KEY_LENGTH}, + key::{NodeId, PUBLIC_KEY_LENGTH}, relay::{self, client::conn::ReceivedMessage, client::ClientError, RelayUrl, MAX_PACKET_SIZE}, }; @@ -36,7 +36,7 @@ pub(super) enum RelayActorMessage { Send { url: RelayUrl, contents: RelayContents, - peer: PublicKey, + remote_node: NodeId, }, MaybeCloseRelaysOnRebind(Vec), SetHome { @@ -51,21 +51,17 @@ struct ActiveRelay { /// channel (currently even if there was no write). last_write: Instant, msg_sender: mpsc::Sender, - /// Contains optional alternate routes to use as an optimization instead of - /// contacting a peer via their home relay connection. If they sent us a message - /// on this relay connection (which should really only be on our relay - /// home connection, or what was once our home), then we remember that route here to optimistically - /// use instead of creating a new relay connection back to their home. - relay_routes: Vec, url: RelayUrl, relay_client: relay::client::Client, relay_client_receiver: relay::client::ClientReceiver, - /// The set of senders we know are present on this connection, based on - /// messages we've received from the server. - peer_present: HashSet, + /// The set of remote nodes we know are present on this relay server. + /// + /// If we receive messages from a remote node via, this server it is added to this set. + /// If the server notifies us this node is gone, it is removed from this set. + node_present: BTreeSet, backoff: backoff::exponential::ExponentialBackoff, last_packet_time: Option, - last_packet_src: Option, + last_packet_src: Option, } #[derive(Debug)] @@ -74,7 +70,7 @@ enum ActiveRelayMessage { GetLastWrite(oneshot::Sender), Ping(oneshot::Sender>), GetLocalAddr(oneshot::Sender>), - GetPeerRoute(PublicKey, oneshot::Sender>), + GetNodeRoute(NodeId, oneshot::Sender>), GetClient(oneshot::Sender), NotePreferred(bool), Shutdown, @@ -90,9 +86,8 @@ impl ActiveRelay { ActiveRelay { last_write: Instant::now(), msg_sender, - relay_routes: Default::default(), url, - peer_present: HashSet::new(), + node_present: BTreeSet::new(), backoff: backoff::exponential::ExponentialBackoffBuilder::new() .with_initial_interval(Duration::from_millis(10)) .with_max_interval(Duration::from_secs(5)) @@ -132,13 +127,13 @@ impl ActiveRelay { ActiveRelayMessage::NotePreferred(is_preferred) => { self.relay_client.note_preferred(is_preferred).await; } - ActiveRelayMessage::GetPeerRoute(peer, r) => { - let res = if self.relay_routes.contains(&peer) { + ActiveRelayMessage::GetNodeRoute(peer, r) => { + let client = if self.node_present.contains(&peer) { Some(self.relay_client.clone()) } else { None }; - r.send(res).ok(); + r.send(client).ok(); } ActiveRelayMessage::Shutdown => { self.relay_client.close().await.ok(); @@ -165,17 +160,13 @@ impl ActiveRelay { Ok(()) } - async fn handle_relay_msg( - &mut self, - msg: Result<(ReceivedMessage, usize), ClientError>, - ) -> ReadResult { + async fn handle_relay_msg(&mut self, msg: Result) -> ReadResult { match msg { Err(err) => { warn!("recv error {:?}", err); // Forget that all these peers have routes. - let peers: Vec<_> = self.peer_present.drain().collect(); - self.relay_routes.retain(|peer| !peers.contains(peer)); + self.node_present.clear(); if matches!( err, @@ -200,7 +191,7 @@ impl ActiveRelay { None => ReadResult::Break, } } - Ok((msg, _conn_gen)) => { + Ok(msg) => { // reset self.backoff.reset(); let now = Instant::now(); @@ -226,10 +217,7 @@ impl ActiveRelay { { // avoid map lookup w/ high throughput single peer self.last_packet_src = Some(source); - if !self.peer_present.contains(&source) { - self.peer_present.insert(source); - self.relay_routes.push(source); - } + self.node_present.insert(source); } let res = RelayReadResult { @@ -256,7 +244,7 @@ impl ActiveRelay { } relay::client::conn::ReceivedMessage::Health { .. } => ReadResult::Continue, relay::client::conn::ReceivedMessage::PeerGone(key) => { - self.relay_routes.retain(|peer| peer != &key); + self.node_present.remove(&key); ReadResult::Continue } other => { @@ -340,9 +328,9 @@ impl RelayActor { RelayActorMessage::Send { url, contents, - peer, + remote_node, } => { - self.send_relay(&url, contents, peer).await; + self.send_relay(&url, contents, remote_node).await; } RelayActorMessage::SetHome { url } => { self.note_preferred(&url).await; @@ -364,12 +352,17 @@ impl RelayActor { .await; } - async fn send_relay(&mut self, url: &RelayUrl, contents: RelayContents, peer: PublicKey) { - trace!(%url, peer = %peer.fmt_short(),len = contents.iter().map(|c| c.len()).sum::(), "sending over relay"); + async fn send_relay(&mut self, url: &RelayUrl, contents: RelayContents, remote_node: NodeId) { + trace!( + %url, + remote_node = %remote_node.fmt_short(), + len = contents.iter().map(|c| c.len()).sum::(), + "sending over relay", + ); // Relay Send - let relay_client = self.connect_relay(url, Some(&peer)).await; + let relay_client = self.connect_relay(url, Some(&remote_node)).await; for content in &contents { - trace!(%url, ?peer, "sending {}B", content.len()); + trace!(%url, ?remote_node, "sending {}B", content.len()); } let total_bytes = contents.iter().map(|c| c.len() as u64).sum::(); @@ -380,7 +373,7 @@ impl RelayActor { // But we have no guarantee that the total size of the contents including // length prefix will be smaller than the payload size. for packet in PacketizeIter::<_, PAYLAOD_SIZE>::new(contents) { - match relay_client.send(peer, packet).await { + match relay_client.send(remote_node, packet).await { Ok(_) => { inc_by!(MagicsockMetrics, send_relay, total_bytes); } @@ -417,9 +410,9 @@ impl RelayActor { async fn connect_relay( &mut self, url: &RelayUrl, - peer: Option<&PublicKey>, + remote_node: Option<&NodeId>, ) -> relay::client::Client { - debug!("connect relay {} for peer {:?}", url, peer); + debug!(%url, ?remote_node, "connect relay"); // See if we have a connection open to that relay node ID first. If so, might as // well use it. (It's a little arbitrary whether we use this one vs. the reverse route // below when we have both.) @@ -442,7 +435,7 @@ impl RelayActor { // perhaps peer's home is Frankfurt, but they dialed our home relay // node in SF to reach us, so we can reply to them using our // SF connection rather than dialing Frankfurt. - if let Some(peer) = peer { + if let Some(node) = remote_node { for url in self .active_relay .keys() @@ -452,7 +445,7 @@ impl RelayActor { { let (os, or) = oneshot::channel(); if self - .send_to_active(&url, ActiveRelayMessage::GetPeerRoute(*peer, os)) + .send_to_active(&url, ActiveRelayMessage::GetNodeRoute(*node, os)) .await { if let Ok(Some(client)) = or.await { @@ -462,8 +455,8 @@ impl RelayActor { } } - let why = if let Some(peer) = peer { - format!("{peer:?}") + let why = if let Some(node) = remote_node { + format!("{node:?}") } else { "home-keep-alive".to_string() }; @@ -681,7 +674,7 @@ impl RelayActor { #[derive(derive_more::Debug)] pub(super) struct RelayReadResult { pub(super) url: RelayUrl, - pub(super) src: PublicKey, + pub(super) src: NodeId, /// packet data #[debug(skip)] pub(super) buf: Bytes, diff --git a/iroh-net/src/relay/client.rs b/iroh-net/src/relay/client.rs index 3b276a8329..1dac564786 100644 --- a/iroh-net/src/relay/client.rs +++ b/iroh-net/src/relay/client.rs @@ -27,15 +27,12 @@ use tokio_util::task::AbortOnDropHandle; use tracing::{debug, error, event, info_span, trace, warn, Instrument, Level}; use url::Url; -use conn::{ - Conn as RelayClient, ConnBuilder as RelayClientBuilder, ConnReader, - ConnReceiver as RelayClientReceiver, ConnWriter, ReceivedMessage, -}; +use conn::{Conn, ConnBuilder, ConnReader, ConnReceiver, ConnWriter, ReceivedMessage}; use streams::{downcast_upgrade, MaybeTlsStream, ProxyStream}; use crate::defaults::timeouts::relay::*; use crate::dns::{DnsResolver, ResolverExt}; -use crate::key::{PublicKey, SecretKey}; +use crate::key::{NodeId, PublicKey, SecretKey}; use crate::relay::codec::DerpCodec; use crate::relay::http::{Protocol, RELAY_PATH}; use crate::relay::RelayUrl; @@ -140,7 +137,7 @@ pub struct Client { #[derive(Debug)] enum ActorMessage { - Connect(oneshot::Sender>), + Connect(oneshot::Sender>), NotePreferred(bool), LocalAddr(oneshot::Sender, ClientError>>), Ping(oneshot::Sender>), @@ -154,7 +151,7 @@ enum ActorMessage { /// Receiving end of a [`Client`]. #[derive(Debug)] pub struct ClientReceiver { - msg_receiver: mpsc::Receiver>, + msg_receiver: mpsc::Receiver>, } #[derive(derive_more::Debug)] @@ -162,11 +159,10 @@ struct Actor { secret_key: SecretKey, can_ack_pings: bool, is_preferred: bool, - relay_client: Option<(RelayClient, RelayClientReceiver)>, + relay_conn: Option<(Conn, ConnReceiver)>, is_closed: bool, #[debug("address family selector callback")] address_family_selector: Option BoxFuture + Send + Sync + 'static>>, - conn_gen: usize, url: RelayUrl, protocol: Protocol, #[debug("TlsConnector")] @@ -334,10 +330,9 @@ impl ClientBuilder { secret_key: key, can_ack_pings: self.can_ack_pings, is_preferred: self.is_preferred, - relay_client: None, + relay_conn: None, is_closed: false, address_family_selector: self.address_family_selector, - conn_gen: 0, pings: PingTracker::default(), ping_tasks: Default::default(), url: self.url, @@ -371,9 +366,8 @@ impl ClientBuilder { } impl ClientReceiver { - /// Reads a message from the server. Returns the message and the `conn_get`, or the number of - /// re-connections this Client has ever made - pub async fn recv(&mut self) -> Option> { + /// Reads a message from the server. + pub async fn recv(&mut self) -> Option> { self.msg_receiver.recv().await } } @@ -399,13 +393,13 @@ impl Client { } } - /// Connect to a relay Server and returns the underlying relay Client. + /// Connects to a relay Server and returns the underlying relay connection. /// /// Returns [`ClientError::Closed`] if the [`Client`] is closed. /// /// If there is already an active relay connection, returns the already /// connected [`crate::relay::RelayConn`]. - pub async fn connect(&self) -> Result<(RelayClient, usize), ClientError> { + pub async fn connect(&self) -> Result { self.send_actor(ActorMessage::Connect).await } @@ -475,7 +469,7 @@ impl Actor { async fn run( mut self, mut inbox: mpsc::Receiver, - msg_sender: mpsc::Sender>, + msg_sender: mpsc::Sender>, ) { // Add an initial connection attempt. if let Err(err) = self.connect("initial connect").await { @@ -485,7 +479,7 @@ impl Actor { loop { tokio::select! { res = self.recv_detail() => { - if let Ok((ReceivedMessage::Pong(ping), _)) = res { + if let Ok(ReceivedMessage::Pong(ping)) = res { match self.pings.unregister(ping, "pong") { Some(chan) => { if chan.send(()).is_err() { @@ -503,7 +497,7 @@ impl Actor { Some(msg) = inbox.recv() => { match msg { ActorMessage::Connect(s) => { - let res = self.connect("actor msg").await.map(|(client, _, count)| (client, count)); + let res = self.connect("actor msg").await.map(|(client, _)| (client)); s.send(res).ok(); }, ActorMessage::NotePreferred(is_preferred) => { @@ -549,46 +543,51 @@ impl Actor { } } + /// Returns a connection to the relay. + /// + /// If the client is currently connected, the existing connection is returned; otherwise, + /// a new connection is made. + /// + /// Returns: + /// - A clonable connection object which can send DISCO messages to the relay. + /// - A reference to a channel receiving DISCO messages from the relay. async fn connect( &mut self, why: &'static str, - ) -> Result<(RelayClient, &'_ mut RelayClientReceiver, usize), ClientError> { + ) -> Result<(Conn, &'_ mut ConnReceiver), ClientError> { debug!( "connect: {}, current client {}", why, - self.relay_client.is_some() + self.relay_conn.is_some() ); if self.is_closed { return Err(ClientError::Closed); } async move { - if self.relay_client.is_none() { + if self.relay_conn.is_none() { trace!("no connection, trying to connect"); - let (relay_client, receiver) = - tokio::time::timeout(CONNECT_TIMEOUT, self.connect_0()) - .await - .map_err(|_| ClientError::ConnectTimeout)??; + let (conn, receiver) = tokio::time::timeout(CONNECT_TIMEOUT, self.connect_0()) + .await + .map_err(|_| ClientError::ConnectTimeout)??; - self.relay_client = Some((relay_client.clone(), receiver)); - self.next_conn(); + self.relay_conn = Some((conn, receiver)); } else { trace!("already had connection"); } - let count = self.current_conn(); - let (relay_client, receiver) = self - .relay_client + let (conn, receiver) = self + .relay_conn .as_mut() .map(|(c, r)| (c.clone(), r)) .expect("just checked"); - Ok((relay_client, receiver, count)) + Ok((conn, receiver)) } .instrument(info_span!("connect")) .await } - async fn connect_0(&self) -> Result<(RelayClient, RelayClientReceiver), ClientError> { + async fn connect_0(&self) -> Result<(Conn, ConnReceiver), ClientError> { let (reader, writer, local_addr) = match self.protocol { Protocol::Websocket => { let (reader, writer) = self.connect_ws().await?; @@ -601,14 +600,14 @@ impl Actor { } }; - let (relay_client, receiver) = - RelayClientBuilder::new(self.secret_key.clone(), local_addr, reader, writer) + let (conn, receiver) = + ConnBuilder::new(self.secret_key.clone(), local_addr, reader, writer) .build() .await .map_err(|e| ClientError::Build(e.to_string()))?; - if self.is_preferred && relay_client.note_preferred(true).await.is_err() { - relay_client.close().await; + if self.is_preferred && conn.note_preferred(true).await.is_err() { + conn.close().await; return Err(ClientError::Send); } @@ -620,7 +619,7 @@ impl Actor { ); trace!("connect_0 done"); - Ok((relay_client, receiver)) + Ok((conn, receiver)) } async fn connect_ws(&self) -> Result<(ConnReader, ConnWriter), ClientError> { @@ -732,8 +731,8 @@ impl Actor { // only send the preference if we already have a connection let res = { - if let Some((ref client, _)) = self.relay_client { - client.note_preferred(is_preferred).await + if let Some((ref conn, _)) = self.relay_conn { + conn.note_preferred(is_preferred).await } else { return; } @@ -749,23 +748,23 @@ impl Actor { if self.is_closed { return None; } - if let Some((ref client, _)) = self.relay_client { - client.local_addr() + if let Some((ref conn, _)) = self.relay_conn { + conn.local_addr() } else { None } } async fn ping(&mut self, s: oneshot::Sender>) { - let connect_res = self.connect("ping").await.map(|(c, _, _)| c); + let connect_res = self.connect("ping").await.map(|(c, _)| c); let (ping, recv) = self.pings.register(); trace!("ping: {}", hex::encode(ping)); self.ping_tasks.spawn(async move { let res = match connect_res { - Ok(client) => { + Ok(conn) => { let start = Instant::now(); - if let Err(err) = client.send_ping(ping).await { + if let Err(err) = conn.send_ping(ping).await { warn!("failed to send ping: {:?}", err); Err(ClientError::Send) } else { @@ -782,10 +781,10 @@ impl Actor { }); } - async fn send(&mut self, dst_key: PublicKey, b: Bytes) -> Result<(), ClientError> { - trace!(dst = %dst_key.fmt_short(), len = b.len(), "send"); - let (client, _, _) = self.connect("send").await?; - if client.send(dst_key, b).await.is_err() { + async fn send(&mut self, remote_node: NodeId, payload: Bytes) -> Result<(), ClientError> { + trace!(remote_node = %remote_node.fmt_short(), len = payload.len(), "send"); + let (conn, _) = self.connect("send").await?; + if conn.send(remote_node, payload).await.is_err() { self.close_for_reconnect().await; return Err(ClientError::Send); } @@ -795,8 +794,8 @@ impl Actor { async fn send_pong(&mut self, data: [u8; 8]) -> Result<(), ClientError> { debug!("send_pong"); if self.can_ack_pings { - let (client, _, _) = self.connect("send_pong").await?; - if client.send_pong(data).await.is_err() { + let (conn, _) = self.connect("send_pong").await?; + if conn.send_pong(data).await.is_err() { self.close_for_reconnect().await; return Err(ClientError::Send); } @@ -817,16 +816,7 @@ impl Actor { if self.is_closed { return false; } - self.relay_client.is_some() - } - - fn current_conn(&self) -> usize { - self.conn_gen - } - - fn next_conn(&mut self) -> usize { - self.conn_gen = self.conn_gen.wrapping_add(1); - self.conn_gen + self.relay_conn.is_some() } fn tls_servername(&self) -> Option { @@ -987,13 +977,12 @@ impl Actor { } } - async fn recv_detail(&mut self) -> Result<(ReceivedMessage, usize), ClientError> { - if let Some((_client, client_receiver)) = self.relay_client.as_mut() { + async fn recv_detail(&mut self) -> Result { + if let Some((_conn, conn_receiver)) = self.relay_conn.as_mut() { trace!("recv_detail tick"); - match client_receiver.recv().await { + match conn_receiver.recv().await { Ok(msg) => { - let current_gen = self.current_conn(); - return Ok((msg, current_gen)); + return Ok(msg); } Err(e) => { self.close_for_reconnect().await; @@ -1012,8 +1001,8 @@ impl Actor { /// requires a connection, it will call `connect`. async fn close_for_reconnect(&mut self) { debug!("close for reconnect"); - if let Some((client, _)) = self.relay_client.take() { - client.close().await + if let Some((conn, _)) = self.relay_conn.take() { + conn.close().await } } } diff --git a/iroh-net/src/relay/client/conn.rs b/iroh-net/src/relay/client/conn.rs index 2df0c219cd..e0667acb3d 100644 --- a/iroh-net/src/relay/client/conn.rs +++ b/iroh-net/src/relay/client/conn.rs @@ -46,6 +46,11 @@ pub struct Conn { inner: Arc, } +/// The channel on which a relay connection sends received messages. +/// +/// The [`Conn`] to a relay is easily clonable but can only send DISCO messages to a relay +/// server. This is the counterpart which receives DISCO messages from the relay server for +/// a connection. It is not clonable. #[derive(Debug)] pub struct ConnReceiver { /// The reader channel, receiving incoming messages. @@ -376,7 +381,7 @@ impl ConnBuilder { recv_msgs: writer_recv, } .run() - .instrument(info_span!("client.writer")), + .instrument(info_span!("conn.writer")), ); let (reader_sender, reader_recv) = mpsc::channel(PER_CLIENT_READ_QUEUE_DEPTH); @@ -412,6 +417,7 @@ impl ConnBuilder { } } } + .instrument(info_span!("conn.reader")) }); let conn = Conn { diff --git a/iroh-net/src/relay/server.rs b/iroh-net/src/relay/server.rs index 9bc54192dc..27ffdc07ce 100644 --- a/iroh-net/src/relay/server.rs +++ b/iroh-net/src/relay/server.rs @@ -863,7 +863,7 @@ mod tests { let msg = Bytes::from("hello, b"); client_a.send(b_key, msg.clone()).await.unwrap(); - let (res, _) = client_b_receiver.recv().await.unwrap().unwrap(); + let res = client_b_receiver.recv().await.unwrap().unwrap(); if let ReceivedMessage::ReceivedPacket { source, data } = res { assert_eq!(a_key, source); assert_eq!(msg, data); @@ -875,7 +875,7 @@ mod tests { let msg = Bytes::from("howdy, a"); client_b.send(a_key, msg.clone()).await.unwrap(); - let (res, _) = client_a_receiver.recv().await.unwrap().unwrap(); + let res = client_a_receiver.recv().await.unwrap().unwrap(); if let ReceivedMessage::ReceivedPacket { source, data } = res { assert_eq!(b_key, source); assert_eq!(msg, data); @@ -931,7 +931,7 @@ mod tests { let msg = Bytes::from("hello, b"); client_a.send(b_key, msg.clone()).await.unwrap(); - let (res, _) = client_b_receiver.recv().await.unwrap().unwrap(); + let res = client_b_receiver.recv().await.unwrap().unwrap(); if let ReceivedMessage::ReceivedPacket { source, data } = res { assert_eq!(a_key, source); assert_eq!(msg, data); @@ -943,7 +943,7 @@ mod tests { let msg = Bytes::from("howdy, a"); client_b.send(a_key, msg.clone()).await.unwrap(); - let (res, _) = client_a_receiver.recv().await.unwrap().unwrap(); + let res = client_a_receiver.recv().await.unwrap().unwrap(); if let ReceivedMessage::ReceivedPacket { source, data } = res { assert_eq!(b_key, source); assert_eq!(msg, data); @@ -998,7 +998,7 @@ mod tests { let msg = Bytes::from("hello, b"); client_a.send(b_key, msg.clone()).await.unwrap(); - let (res, _) = client_b_receiver.recv().await.unwrap().unwrap(); + let res = client_b_receiver.recv().await.unwrap().unwrap(); if let ReceivedMessage::ReceivedPacket { source, data } = res { assert_eq!(a_key, source); assert_eq!(msg, data); @@ -1010,7 +1010,7 @@ mod tests { let msg = Bytes::from("howdy, a"); client_b.send(a_key, msg.clone()).await.unwrap(); - let (res, _) = client_a_receiver.recv().await.unwrap().unwrap(); + let res = client_a_receiver.recv().await.unwrap().unwrap(); if let ReceivedMessage::ReceivedPacket { source, data } = res { assert_eq!(b_key, source); assert_eq!(msg, data); diff --git a/iroh-net/src/relay/server/http_server.rs b/iroh-net/src/relay/server/http_server.rs index 91787e31c2..a52aef6870 100644 --- a/iroh-net/src/relay/server/http_server.rs +++ b/iroh-net/src/relay/server/http_server.rs @@ -821,7 +821,7 @@ mod tests { info!("client {:?} `recv` error {e}", key.public()); return; } - Some(Ok((msg, _))) => { + Some(Ok(msg)) => { info!("got message on {:?}: {msg:?}", key.public()); if let ReceivedMessage::ReceivedPacket { source, data } = msg { received_msg_s