Skip to content

Commit

Permalink
refactor(iroh-net): Keep connection name, remove connection count (#2779
Browse files Browse the repository at this point in the history
)

## Description

These are two cleanups in the relay client:

- The `relay::Client` hands out a connection object when asked to
  connect.  This `Conn` was imported with rename to `RelayClient`
  which was a bit confusing as this was already the relay client.  It
  is now left at it's original name which makes a lot more sense.  The
  related builder struct etc are renamed to match.

- The `relay::Client` had a counter for the number of connections made
  to the relay.  That seems fun, but was entirely unused.  If this is
  a useful thing to have it should probably be a counter metric
  instead but let's not add anything that no one is using.  Removing
  this makes a lot of APIs a bit simpler and removes some state
  tracking.

## Breaking Changes

None hopefully, please let this all  be internal APIs.

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
- [x] Documentation updates following the [style
guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text),
if relevant.
- [x] Tests if relevant.
- ~~[ ] All breaking changes documented.~~

---------

Co-authored-by: Divma <26765164+divagant-martian@users.noreply.github.com>
  • Loading branch information
flub and divagant-martian authored Oct 3, 2024
1 parent bd5e4fa commit 6b1186f
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 81 deletions.
7 changes: 2 additions & 5 deletions iroh-net/src/magicsock/relay_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,7 @@ impl ActiveRelay {
Ok(())
}

async fn handle_relay_msg(
&mut self,
msg: Result<(ReceivedMessage, usize), ClientError>,
) -> ReadResult {
async fn handle_relay_msg(&mut self, msg: Result<ReceivedMessage, ClientError>) -> ReadResult {
match msg {
Err(err) => {
warn!("recv error {:?}", err);
Expand Down Expand Up @@ -200,7 +197,7 @@ impl ActiveRelay {
None => ReadResult::Break,
}
}
Ok((msg, _conn_gen)) => {
Ok(msg) => {
// reset
self.backoff.reset();
let now = Instant::now();
Expand Down
125 changes: 57 additions & 68 deletions iroh-net/src/relay/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,12 @@ use tokio_util::task::AbortOnDropHandle;
use tracing::{debug, error, event, info_span, trace, warn, Instrument, Level};
use url::Url;

use conn::{
Conn as RelayClient, ConnBuilder as RelayClientBuilder, ConnReader,
ConnReceiver as RelayClientReceiver, ConnWriter, ReceivedMessage,
};
use conn::{Conn, ConnBuilder, ConnReader, ConnReceiver, ConnWriter, ReceivedMessage};
use streams::{downcast_upgrade, MaybeTlsStream, ProxyStream};

use crate::defaults::timeouts::relay::*;
use crate::dns::{DnsResolver, ResolverExt};
use crate::key::{PublicKey, SecretKey};
use crate::key::{NodeId, PublicKey, SecretKey};
use crate::relay::codec::DerpCodec;
use crate::relay::http::{Protocol, RELAY_PATH};
use crate::relay::RelayUrl;
Expand Down Expand Up @@ -140,7 +137,7 @@ pub struct Client {

#[derive(Debug)]
enum ActorMessage {
Connect(oneshot::Sender<Result<(RelayClient, usize), ClientError>>),
Connect(oneshot::Sender<Result<Conn, ClientError>>),
NotePreferred(bool),
LocalAddr(oneshot::Sender<Result<Option<SocketAddr>, ClientError>>),
Ping(oneshot::Sender<Result<Duration, ClientError>>),
Expand All @@ -154,19 +151,18 @@ enum ActorMessage {
/// Receiving end of a [`Client`].
#[derive(Debug)]
pub struct ClientReceiver {
msg_receiver: mpsc::Receiver<Result<(ReceivedMessage, usize), ClientError>>,
msg_receiver: mpsc::Receiver<Result<ReceivedMessage, ClientError>>,
}

#[derive(derive_more::Debug)]
struct Actor {
secret_key: SecretKey,
can_ack_pings: bool,
is_preferred: bool,
relay_client: Option<(RelayClient, RelayClientReceiver)>,
relay_conn: Option<(Conn, ConnReceiver)>,
is_closed: bool,
#[debug("address family selector callback")]
address_family_selector: Option<Box<dyn Fn() -> BoxFuture<bool> + Send + Sync + 'static>>,
conn_gen: usize,
url: RelayUrl,
protocol: Protocol,
#[debug("TlsConnector")]
Expand Down Expand Up @@ -334,10 +330,9 @@ impl ClientBuilder {
secret_key: key,
can_ack_pings: self.can_ack_pings,
is_preferred: self.is_preferred,
relay_client: None,
relay_conn: None,
is_closed: false,
address_family_selector: self.address_family_selector,
conn_gen: 0,
pings: PingTracker::default(),
ping_tasks: Default::default(),
url: self.url,
Expand Down Expand Up @@ -371,9 +366,8 @@ impl ClientBuilder {
}

impl ClientReceiver {
/// Reads a message from the server. Returns the message and the `conn_get`, or the number of
/// re-connections this Client has ever made
pub async fn recv(&mut self) -> Option<Result<(ReceivedMessage, usize), ClientError>> {
/// Reads a message from the server.
pub async fn recv(&mut self) -> Option<Result<ReceivedMessage, ClientError>> {
self.msg_receiver.recv().await
}
}
Expand All @@ -399,13 +393,13 @@ impl Client {
}
}

/// Connect to a relay Server and returns the underlying relay Client.
/// Connects to a relay Server and returns the underlying relay connection.
///
/// Returns [`ClientError::Closed`] if the [`Client`] is closed.
///
/// If there is already an active relay connection, returns the already
/// connected [`crate::relay::RelayConn`].
pub async fn connect(&self) -> Result<(RelayClient, usize), ClientError> {
pub async fn connect(&self) -> Result<Conn, ClientError> {
self.send_actor(ActorMessage::Connect).await
}

Expand Down Expand Up @@ -475,7 +469,7 @@ impl Actor {
async fn run(
mut self,
mut inbox: mpsc::Receiver<ActorMessage>,
msg_sender: mpsc::Sender<Result<(ReceivedMessage, usize), ClientError>>,
msg_sender: mpsc::Sender<Result<ReceivedMessage, ClientError>>,
) {
// Add an initial connection attempt.
if let Err(err) = self.connect("initial connect").await {
Expand All @@ -485,7 +479,7 @@ impl Actor {
loop {
tokio::select! {
res = self.recv_detail() => {
if let Ok((ReceivedMessage::Pong(ping), _)) = res {
if let Ok(ReceivedMessage::Pong(ping)) = res {
match self.pings.unregister(ping, "pong") {
Some(chan) => {
if chan.send(()).is_err() {
Expand All @@ -503,7 +497,7 @@ impl Actor {
Some(msg) = inbox.recv() => {
match msg {
ActorMessage::Connect(s) => {
let res = self.connect("actor msg").await.map(|(client, _, count)| (client, count));
let res = self.connect("actor msg").await.map(|(client, _)| (client));
s.send(res).ok();
},
ActorMessage::NotePreferred(is_preferred) => {
Expand Down Expand Up @@ -549,46 +543,51 @@ impl Actor {
}
}

/// Returns a connection to the relay.
///
/// If the client is currently connected, the existing connection is returned; otherwise,
/// a new connection is made.
///
/// Returns:
/// - A clonable connection object which can send DISCO messages to the relay.
/// - A reference to a channel receiving DISCO messages from the relay.
async fn connect(
&mut self,
why: &'static str,
) -> Result<(RelayClient, &'_ mut RelayClientReceiver, usize), ClientError> {
) -> Result<(Conn, &'_ mut ConnReceiver), ClientError> {
debug!(
"connect: {}, current client {}",
why,
self.relay_client.is_some()
self.relay_conn.is_some()
);

if self.is_closed {
return Err(ClientError::Closed);
}
async move {
if self.relay_client.is_none() {
if self.relay_conn.is_none() {
trace!("no connection, trying to connect");
let (relay_client, receiver) =
tokio::time::timeout(CONNECT_TIMEOUT, self.connect_0())
.await
.map_err(|_| ClientError::ConnectTimeout)??;
let (conn, receiver) = tokio::time::timeout(CONNECT_TIMEOUT, self.connect_0())
.await
.map_err(|_| ClientError::ConnectTimeout)??;

self.relay_client = Some((relay_client.clone(), receiver));
self.next_conn();
self.relay_conn = Some((conn, receiver));
} else {
trace!("already had connection");
}
let count = self.current_conn();
let (relay_client, receiver) = self
.relay_client
let (conn, receiver) = self
.relay_conn
.as_mut()
.map(|(c, r)| (c.clone(), r))
.expect("just checked");

Ok((relay_client, receiver, count))
Ok((conn, receiver))
}
.instrument(info_span!("connect"))
.await
}

async fn connect_0(&self) -> Result<(RelayClient, RelayClientReceiver), ClientError> {
async fn connect_0(&self) -> Result<(Conn, ConnReceiver), ClientError> {
let (reader, writer, local_addr) = match self.protocol {
Protocol::Websocket => {
let (reader, writer) = self.connect_ws().await?;
Expand All @@ -601,14 +600,14 @@ impl Actor {
}
};

let (relay_client, receiver) =
RelayClientBuilder::new(self.secret_key.clone(), local_addr, reader, writer)
let (conn, receiver) =
ConnBuilder::new(self.secret_key.clone(), local_addr, reader, writer)
.build()
.await
.map_err(|e| ClientError::Build(e.to_string()))?;

if self.is_preferred && relay_client.note_preferred(true).await.is_err() {
relay_client.close().await;
if self.is_preferred && conn.note_preferred(true).await.is_err() {
conn.close().await;
return Err(ClientError::Send);
}

Expand All @@ -620,7 +619,7 @@ impl Actor {
);

trace!("connect_0 done");
Ok((relay_client, receiver))
Ok((conn, receiver))
}

async fn connect_ws(&self) -> Result<(ConnReader, ConnWriter), ClientError> {
Expand Down Expand Up @@ -732,8 +731,8 @@ impl Actor {

// only send the preference if we already have a connection
let res = {
if let Some((ref client, _)) = self.relay_client {
client.note_preferred(is_preferred).await
if let Some((ref conn, _)) = self.relay_conn {
conn.note_preferred(is_preferred).await
} else {
return;
}
Expand All @@ -749,23 +748,23 @@ impl Actor {
if self.is_closed {
return None;
}
if let Some((ref client, _)) = self.relay_client {
client.local_addr()
if let Some((ref conn, _)) = self.relay_conn {
conn.local_addr()
} else {
None
}
}

async fn ping(&mut self, s: oneshot::Sender<Result<Duration, ClientError>>) {
let connect_res = self.connect("ping").await.map(|(c, _, _)| c);
let connect_res = self.connect("ping").await.map(|(c, _)| c);
let (ping, recv) = self.pings.register();
trace!("ping: {}", hex::encode(ping));

self.ping_tasks.spawn(async move {
let res = match connect_res {
Ok(client) => {
Ok(conn) => {
let start = Instant::now();
if let Err(err) = client.send_ping(ping).await {
if let Err(err) = conn.send_ping(ping).await {
warn!("failed to send ping: {:?}", err);
Err(ClientError::Send)
} else {
Expand All @@ -782,10 +781,10 @@ impl Actor {
});
}

async fn send(&mut self, dst_key: PublicKey, b: Bytes) -> Result<(), ClientError> {
trace!(dst = %dst_key.fmt_short(), len = b.len(), "send");
let (client, _, _) = self.connect("send").await?;
if client.send(dst_key, b).await.is_err() {
async fn send(&mut self, remote_node: NodeId, payload: Bytes) -> Result<(), ClientError> {
trace!(remote_node = %remote_node.fmt_short(), len = payload.len(), "send");
let (conn, _) = self.connect("send").await?;
if conn.send(remote_node, payload).await.is_err() {
self.close_for_reconnect().await;
return Err(ClientError::Send);
}
Expand All @@ -795,8 +794,8 @@ impl Actor {
async fn send_pong(&mut self, data: [u8; 8]) -> Result<(), ClientError> {
debug!("send_pong");
if self.can_ack_pings {
let (client, _, _) = self.connect("send_pong").await?;
if client.send_pong(data).await.is_err() {
let (conn, _) = self.connect("send_pong").await?;
if conn.send_pong(data).await.is_err() {
self.close_for_reconnect().await;
return Err(ClientError::Send);
}
Expand All @@ -817,16 +816,7 @@ impl Actor {
if self.is_closed {
return false;
}
self.relay_client.is_some()
}

fn current_conn(&self) -> usize {
self.conn_gen
}

fn next_conn(&mut self) -> usize {
self.conn_gen = self.conn_gen.wrapping_add(1);
self.conn_gen
self.relay_conn.is_some()
}

fn tls_servername(&self) -> Option<rustls::pki_types::ServerName> {
Expand Down Expand Up @@ -987,13 +977,12 @@ impl Actor {
}
}

async fn recv_detail(&mut self) -> Result<(ReceivedMessage, usize), ClientError> {
if let Some((_client, client_receiver)) = self.relay_client.as_mut() {
async fn recv_detail(&mut self) -> Result<ReceivedMessage, ClientError> {
if let Some((_conn, conn_receiver)) = self.relay_conn.as_mut() {
trace!("recv_detail tick");
match client_receiver.recv().await {
match conn_receiver.recv().await {
Ok(msg) => {
let current_gen = self.current_conn();
return Ok((msg, current_gen));
return Ok(msg);
}
Err(e) => {
self.close_for_reconnect().await;
Expand All @@ -1012,8 +1001,8 @@ impl Actor {
/// requires a connection, it will call `connect`.
async fn close_for_reconnect(&mut self) {
debug!("close for reconnect");
if let Some((client, _)) = self.relay_client.take() {
client.close().await
if let Some((conn, _)) = self.relay_conn.take() {
conn.close().await
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion iroh-net/src/relay/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ pub struct Conn {
inner: Arc<ConnTasks>,
}

/// The channel on which a relay connection sends received messages.
///
/// The [`Conn`] to a relay is easily clonable but can only send DISCO messages to a relay
/// server. This is the counterpart which receives DISCO messages from the relay server for
/// a connection. It is not clonable.
#[derive(Debug)]
pub struct ConnReceiver {
/// The reader channel, receiving incoming messages.
Expand Down Expand Up @@ -376,7 +381,7 @@ impl ConnBuilder {
recv_msgs: writer_recv,
}
.run()
.instrument(info_span!("client.writer")),
.instrument(info_span!("conn.writer")),
);

let (reader_sender, reader_recv) = mpsc::channel(PER_CLIENT_READ_QUEUE_DEPTH);
Expand Down Expand Up @@ -412,6 +417,7 @@ impl ConnBuilder {
}
}
}
.instrument(info_span!("conn.reader"))
});

let conn = Conn {
Expand Down
Loading

0 comments on commit 6b1186f

Please sign in to comment.