Skip to content

Commit

Permalink
refactor(iroh-net): Keep connection name, remove connection count
Browse files Browse the repository at this point in the history
These are two cleanups in the relay client:

- The `relay::Client` hands out a connection object when asked to
  connect.  This `Conn` was imported with rename to `RelayClient`
  which was a bit confusing as this was already the relay client.  It
  is now renamed to `RelayConn` which makes a lot more sense.  The
  related builder struct etc are renamed to match.

- The `relay::Client` had a counter for the number of connections made
  to the relay.  That seems fun, but was entirely unused.  If this is
  a useful thing to have it should probably be a counter metric
  instead but let's not add anything that no one is using.  Removing
  this makes a lot of APIs a bit simpler and removes some state
  tracking.
  • Loading branch information
flub committed Oct 3, 2024
1 parent bd5e4fa commit 544b933
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 58 deletions.
7 changes: 2 additions & 5 deletions iroh-net/src/magicsock/relay_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,7 @@ impl ActiveRelay {
Ok(())
}

async fn handle_relay_msg(
&mut self,
msg: Result<(ReceivedMessage, usize), ClientError>,
) -> ReadResult {
async fn handle_relay_msg(&mut self, msg: Result<ReceivedMessage, ClientError>) -> ReadResult {
match msg {
Err(err) => {
warn!("recv error {:?}", err);
Expand Down Expand Up @@ -200,7 +197,7 @@ impl ActiveRelay {
None => ReadResult::Break,
}
}
Ok((msg, _conn_gen)) => {
Ok(msg) => {
// reset
self.backoff.reset();
let now = Instant::now();
Expand Down
85 changes: 39 additions & 46 deletions iroh-net/src/relay/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use tracing::{debug, error, event, info_span, trace, warn, Instrument, Level};
use url::Url;

use conn::{
Conn as RelayClient, ConnBuilder as RelayClientBuilder, ConnReader,
ConnReceiver as RelayClientReceiver, ConnWriter, ReceivedMessage,
Conn as RelayConn, ConnBuilder as RelayConnBuilder, ConnReader,
ConnReceiver as RelayConnReceiver, ConnWriter, ReceivedMessage,
};
use streams::{downcast_upgrade, MaybeTlsStream, ProxyStream};

Expand Down Expand Up @@ -140,7 +140,7 @@ pub struct Client {

#[derive(Debug)]
enum ActorMessage {
Connect(oneshot::Sender<Result<(RelayClient, usize), ClientError>>),
Connect(oneshot::Sender<Result<RelayConn, ClientError>>),
NotePreferred(bool),
LocalAddr(oneshot::Sender<Result<Option<SocketAddr>, ClientError>>),
Ping(oneshot::Sender<Result<Duration, ClientError>>),
Expand All @@ -154,19 +154,18 @@ enum ActorMessage {
/// Receiving end of a [`Client`].
#[derive(Debug)]
pub struct ClientReceiver {
msg_receiver: mpsc::Receiver<Result<(ReceivedMessage, usize), ClientError>>,
msg_receiver: mpsc::Receiver<Result<ReceivedMessage, ClientError>>,
}

#[derive(derive_more::Debug)]
struct Actor {
secret_key: SecretKey,
can_ack_pings: bool,
is_preferred: bool,
relay_client: Option<(RelayClient, RelayClientReceiver)>,
relay_conn: Option<(RelayConn, RelayConnReceiver)>,
is_closed: bool,
#[debug("address family selector callback")]
address_family_selector: Option<Box<dyn Fn() -> BoxFuture<bool> + Send + Sync + 'static>>,
conn_gen: usize,
url: RelayUrl,
protocol: Protocol,
#[debug("TlsConnector")]
Expand Down Expand Up @@ -334,10 +333,9 @@ impl ClientBuilder {
secret_key: key,
can_ack_pings: self.can_ack_pings,
is_preferred: self.is_preferred,
relay_client: None,
relay_conn: None,
is_closed: false,
address_family_selector: self.address_family_selector,
conn_gen: 0,
pings: PingTracker::default(),
ping_tasks: Default::default(),
url: self.url,
Expand Down Expand Up @@ -371,9 +369,8 @@ impl ClientBuilder {
}

impl ClientReceiver {
/// Reads a message from the server. Returns the message and the `conn_get`, or the number of
/// re-connections this Client has ever made
pub async fn recv(&mut self) -> Option<Result<(ReceivedMessage, usize), ClientError>> {
/// Reads a message from the server.
pub async fn recv(&mut self) -> Option<Result<ReceivedMessage, ClientError>> {
self.msg_receiver.recv().await
}
}
Expand Down Expand Up @@ -405,7 +402,7 @@ impl Client {
///
/// If there is already an active relay connection, returns the already
/// connected [`crate::relay::RelayConn`].
pub async fn connect(&self) -> Result<(RelayClient, usize), ClientError> {
pub async fn connect(&self) -> Result<RelayConn, ClientError> {
self.send_actor(ActorMessage::Connect).await
}

Expand Down Expand Up @@ -475,7 +472,7 @@ impl Actor {
async fn run(
mut self,
mut inbox: mpsc::Receiver<ActorMessage>,
msg_sender: mpsc::Sender<Result<(ReceivedMessage, usize), ClientError>>,
msg_sender: mpsc::Sender<Result<ReceivedMessage, ClientError>>,
) {
// Add an initial connection attempt.
if let Err(err) = self.connect("initial connect").await {
Expand All @@ -485,7 +482,7 @@ impl Actor {
loop {
tokio::select! {
res = self.recv_detail() => {
if let Ok((ReceivedMessage::Pong(ping), _)) = res {
if let Ok(ReceivedMessage::Pong(ping)) = res {
match self.pings.unregister(ping, "pong") {
Some(chan) => {
if chan.send(()).is_err() {
Expand All @@ -503,7 +500,7 @@ impl Actor {
Some(msg) = inbox.recv() => {
match msg {
ActorMessage::Connect(s) => {
let res = self.connect("actor msg").await.map(|(client, _, count)| (client, count));
let res = self.connect("actor msg").await.map(|(client, _)| (client));
s.send(res).ok();
},
ActorMessage::NotePreferred(is_preferred) => {
Expand Down Expand Up @@ -549,46 +546,52 @@ impl Actor {
}
}

/// Returns a connection to the relay.
///
/// If the client is currently connected the existing connection is returned, otherwise
/// a new connection is made.
///
/// Returns:
/// - A clonable connection object which can send to DISCO messages to the relay.
/// - A reference to a channel receiving DISCO messages from the relay.
async fn connect(
&mut self,
why: &'static str,
) -> Result<(RelayClient, &'_ mut RelayClientReceiver, usize), ClientError> {
) -> Result<(RelayConn, &'_ mut RelayConnReceiver), ClientError> {
debug!(
"connect: {}, current client {}",
why,
self.relay_client.is_some()
self.relay_conn.is_some()
);

if self.is_closed {
return Err(ClientError::Closed);
}
async move {
if self.relay_client.is_none() {
if self.relay_conn.is_none() {
trace!("no connection, trying to connect");
let (relay_client, receiver) =
tokio::time::timeout(CONNECT_TIMEOUT, self.connect_0())
.await
.map_err(|_| ClientError::ConnectTimeout)??;

self.relay_client = Some((relay_client.clone(), receiver));
self.next_conn();
self.relay_conn = Some((relay_client.clone(), receiver));
} else {
trace!("already had connection");
}
let count = self.current_conn();
let (relay_client, receiver) = self
.relay_client
let (conn, receiver) = self
.relay_conn
.as_mut()
.map(|(c, r)| (c.clone(), r))
.expect("just checked");

Ok((relay_client, receiver, count))
Ok((conn, receiver))
}
.instrument(info_span!("connect"))
.await
}

async fn connect_0(&self) -> Result<(RelayClient, RelayClientReceiver), ClientError> {
async fn connect_0(&self) -> Result<(RelayConn, RelayConnReceiver), ClientError> {
let (reader, writer, local_addr) = match self.protocol {
Protocol::Websocket => {
let (reader, writer) = self.connect_ws().await?;
Expand All @@ -602,7 +605,7 @@ impl Actor {
};

let (relay_client, receiver) =
RelayClientBuilder::new(self.secret_key.clone(), local_addr, reader, writer)
RelayConnBuilder::new(self.secret_key.clone(), local_addr, reader, writer)
.build()
.await
.map_err(|e| ClientError::Build(e.to_string()))?;
Expand Down Expand Up @@ -732,7 +735,7 @@ impl Actor {

// only send the preference if we already have a connection
let res = {
if let Some((ref client, _)) = self.relay_client {
if let Some((ref client, _)) = self.relay_conn {
client.note_preferred(is_preferred).await
} else {
return;
Expand All @@ -749,15 +752,15 @@ impl Actor {
if self.is_closed {
return None;
}
if let Some((ref client, _)) = self.relay_client {
if let Some((ref client, _)) = self.relay_conn {
client.local_addr()
} else {
None
}
}

async fn ping(&mut self, s: oneshot::Sender<Result<Duration, ClientError>>) {
let connect_res = self.connect("ping").await.map(|(c, _, _)| c);
let connect_res = self.connect("ping").await.map(|(c, _)| c);
let (ping, recv) = self.pings.register();
trace!("ping: {}", hex::encode(ping));

Expand All @@ -784,7 +787,7 @@ impl Actor {

async fn send(&mut self, dst_key: PublicKey, b: Bytes) -> Result<(), ClientError> {
trace!(dst = %dst_key.fmt_short(), len = b.len(), "send");
let (client, _, _) = self.connect("send").await?;
let (client, _) = self.connect("send").await?;
if client.send(dst_key, b).await.is_err() {
self.close_for_reconnect().await;
return Err(ClientError::Send);
Expand All @@ -795,7 +798,7 @@ impl Actor {
async fn send_pong(&mut self, data: [u8; 8]) -> Result<(), ClientError> {
debug!("send_pong");
if self.can_ack_pings {
let (client, _, _) = self.connect("send_pong").await?;
let (client, _) = self.connect("send_pong").await?;
if client.send_pong(data).await.is_err() {
self.close_for_reconnect().await;
return Err(ClientError::Send);
Expand All @@ -817,16 +820,7 @@ impl Actor {
if self.is_closed {
return false;
}
self.relay_client.is_some()
}

fn current_conn(&self) -> usize {
self.conn_gen
}

fn next_conn(&mut self) -> usize {
self.conn_gen = self.conn_gen.wrapping_add(1);
self.conn_gen
self.relay_conn.is_some()
}

fn tls_servername(&self) -> Option<rustls::pki_types::ServerName> {
Expand Down Expand Up @@ -987,13 +981,12 @@ impl Actor {
}
}

async fn recv_detail(&mut self) -> Result<(ReceivedMessage, usize), ClientError> {
if let Some((_client, client_receiver)) = self.relay_client.as_mut() {
async fn recv_detail(&mut self) -> Result<ReceivedMessage, ClientError> {
if let Some((_client, client_receiver)) = self.relay_conn.as_mut() {
trace!("recv_detail tick");
match client_receiver.recv().await {
Ok(msg) => {
let current_gen = self.current_conn();
return Ok((msg, current_gen));
return Ok(msg);
}
Err(e) => {
self.close_for_reconnect().await;
Expand All @@ -1012,7 +1005,7 @@ impl Actor {
/// requires a connection, it will call `connect`.
async fn close_for_reconnect(&mut self) {
debug!("close for reconnect");
if let Some((client, _)) = self.relay_client.take() {
if let Some((client, _)) = self.relay_conn.take() {
client.close().await
}
}
Expand Down
12 changes: 6 additions & 6 deletions iroh-net/src/relay/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ mod tests {
let msg = Bytes::from("hello, b");
client_a.send(b_key, msg.clone()).await.unwrap();

let (res, _) = client_b_receiver.recv().await.unwrap().unwrap();
let res = client_b_receiver.recv().await.unwrap().unwrap();
if let ReceivedMessage::ReceivedPacket { source, data } = res {
assert_eq!(a_key, source);
assert_eq!(msg, data);
Expand All @@ -875,7 +875,7 @@ mod tests {
let msg = Bytes::from("howdy, a");
client_b.send(a_key, msg.clone()).await.unwrap();

let (res, _) = client_a_receiver.recv().await.unwrap().unwrap();
let res = client_a_receiver.recv().await.unwrap().unwrap();
if let ReceivedMessage::ReceivedPacket { source, data } = res {
assert_eq!(b_key, source);
assert_eq!(msg, data);
Expand Down Expand Up @@ -931,7 +931,7 @@ mod tests {
let msg = Bytes::from("hello, b");
client_a.send(b_key, msg.clone()).await.unwrap();

let (res, _) = client_b_receiver.recv().await.unwrap().unwrap();
let res = client_b_receiver.recv().await.unwrap().unwrap();
if let ReceivedMessage::ReceivedPacket { source, data } = res {
assert_eq!(a_key, source);
assert_eq!(msg, data);
Expand All @@ -943,7 +943,7 @@ mod tests {
let msg = Bytes::from("howdy, a");
client_b.send(a_key, msg.clone()).await.unwrap();

let (res, _) = client_a_receiver.recv().await.unwrap().unwrap();
let res = client_a_receiver.recv().await.unwrap().unwrap();
if let ReceivedMessage::ReceivedPacket { source, data } = res {
assert_eq!(b_key, source);
assert_eq!(msg, data);
Expand Down Expand Up @@ -998,7 +998,7 @@ mod tests {
let msg = Bytes::from("hello, b");
client_a.send(b_key, msg.clone()).await.unwrap();

let (res, _) = client_b_receiver.recv().await.unwrap().unwrap();
let res = client_b_receiver.recv().await.unwrap().unwrap();
if let ReceivedMessage::ReceivedPacket { source, data } = res {
assert_eq!(a_key, source);
assert_eq!(msg, data);
Expand All @@ -1010,7 +1010,7 @@ mod tests {
let msg = Bytes::from("howdy, a");
client_b.send(a_key, msg.clone()).await.unwrap();

let (res, _) = client_a_receiver.recv().await.unwrap().unwrap();
let res = client_a_receiver.recv().await.unwrap().unwrap();
if let ReceivedMessage::ReceivedPacket { source, data } = res {
assert_eq!(b_key, source);
assert_eq!(msg, data);
Expand Down
2 changes: 1 addition & 1 deletion iroh-net/src/relay/server/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ mod tests {
info!("client {:?} `recv` error {e}", key.public());
return;
}
Some(Ok((msg, _))) => {
Some(Ok(msg)) => {
info!("got message on {:?}: {msg:?}", key.public());
if let ReceivedMessage::ReceivedPacket { source, data } = msg {
received_msg_s
Expand Down

0 comments on commit 544b933

Please sign in to comment.