Skip to content

Commit

Permalink
Support multi-address hosts in client
Browse files Browse the repository at this point in the history
Previously the connector mechanism would always just take the first
address and try it. If somebody wants to use rrdns that won't work
properly. This will try all of the addresses that a server has before
giving up on the server for the RETRY backoff.

This also removes an unwrap, which is a step toward addressing issue#4.
  • Loading branch information
SpamapS committed May 31, 2024
1 parent 45075bc commit 629f128
Showing 1 changed file with 115 additions and 95 deletions.
210 changes: 115 additions & 95 deletions rustygear/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ use std::fmt::Display;
* limitations under the License.
*/
use std::io::{self, ErrorKind};
use std::net::ToSocketAddrs;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use bytes::{BufMut, Bytes, BytesMut};
use futures::sink::SinkExt;
use futures::stream::StreamExt;

use tokio::net::TcpStream;
use tokio::net::{lookup_host, TcpStream};
use tokio::runtime;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::{channel, Receiver, Sender};
Expand Down Expand Up @@ -334,35 +333,38 @@ impl Client {
offset
),
Some(server) => {
let server = server.clone();
let addr = server.to_socket_addrs().unwrap().next().unwrap();
trace!("really connecting: i={} addr={:?}", offset, addr);
match TcpStream::connect(addr).await {
let lookup_server = server.clone();
let mut addrs = match lookup_host(lookup_server).await {
Err(e) => {
error!(
"Couldn't connect to {} [{}], will retry after {:?}",
server, e, RECONNECT_BACKOFF
"Couldn't lookup address for server({}): {}",
server, e
);
let ctx_conn = ctx_conn.clone();
// Retry in BACKOFF seconds -- TODO: keep track and do exponential
runtime::Handle::current().spawn(async move {
sleep(RECONNECT_BACKOFF).await;
ctx_conn.send(offset).await
});
continue;
}
Ok(wholestream) => {
info!("Connected to {}", server);
let pc = PacketCodec {};
/*
if !connector_active_servers.lock().unwrap()[offset] {
Ok(addrs) => addrs,
};

// Perhaps here we should loop and return whether or not we connected to one of them
let abort_connector = loop {
let addr = match addrs.next() {
None => break false,
Some(addr) => addr,
};
trace!("really connecting: i={} addr={:?}", offset, addr);
match TcpStream::connect(addr).await {
Err(e) => {
warn!(
"Received offset of disconnected server, ignoring"
"Couldn't connect to {} with address {} [{}]",
server, addr, e
);
// try the next address if we got multiple
continue;
}
This probably should not be needed.
*/
let (mut sink, mut stream) = match tls {
Ok(wholestream) => {
info!("Connected to {}", server);
let pc = PacketCodec {};
let (mut sink, mut stream) = match tls {
#[cfg(not(feature = "tls"))]
Some(_) => unreachable!("We shouldn't have a tls config without feature = tls"),
#[cfg(feature = "tls")]
Expand Down Expand Up @@ -401,91 +403,109 @@ impl Client {
pc.framed(WrappedStream::from(wholestream)).split()
}
};
if let Some(ref client_id) = client_id {
let req = new_req(SET_CLIENT_ID, client_id.clone());
if let Err(e) = sink.send(req).await {
debug!(
if let Some(ref client_id) = client_id {
let req = new_req(SET_CLIENT_ID, client_id.clone());
if let Err(e) = sink.send(req).await {
debug!(
"Connection {:?} can't send packets. ({:?})",
sink, e
);
continue;
continue;
}
}
}
let (tx, mut rx) = channel(CLIENT_CHANNEL_BOUND_SIZE); // XXX pick a good value or const
let tx = tx.clone();
let tx2 = tx.clone();
let connserver = server.clone();
let handler = ConnHandler::new(
&client_id,
connserver.into(),
tx2,
handler_client_data.clone(),
true,
);
trace!("Inserting at {}", offset);
connector_conns
.lock()
.unwrap()
.insert(offset, handler.clone());
trace!("Inserted at {}", offset);
let reader_conns = connector_conns.clone();
let reader_ctx = ctx2.clone();
let reader = async move {
let (tx, mut rx) = channel(CLIENT_CHANNEL_BOUND_SIZE); // XXX pick a good value or const
let tx = tx.clone();
while let Some(frame) = stream.next().await {
trace!("Frame read: {:?}", frame);
let response = match frame {
Err(e) => Err(e),
Ok(frame) => {
let handler = handler.clone();
debug!("Locking handler");
let mut handler = handler;
debug!("Locked handler");
handler.call(frame)
let tx2 = tx.clone();
let connserver = server.clone();
let handler = ConnHandler::new(
&client_id,
connserver.into(),
tx2,
handler_client_data.clone(),
true,
);
trace!("Inserting at {}", offset);
connector_conns
.lock()
.unwrap()
.insert(offset, handler.clone());
trace!("Inserted at {}", offset);
let reader_conns = connector_conns.clone();
let reader_ctx = ctx2.clone();
let reader = async move {
let tx = tx.clone();
while let Some(frame) = stream.next().await {
trace!("Frame read: {:?}", frame);
let response = match frame {
Err(e) => Err(e),
Ok(frame) => {
let handler = handler.clone();
debug!("Locking handler");
let mut handler = handler;
debug!("Locked handler");
handler.call(frame)
}
};
if let Err(e) = response {
error!("conn dropped?: {}", e);
break;
}
if let Err(_) = tx.send(response.unwrap()).await
{
error!("receiver dropped")
}
};
if let Err(e) = response {
error!("conn dropped?: {}", e);
break;
}
if let Err(_) = tx.send(response.unwrap()).await {
error!("receiver dropped")
reader_conns
.lock()
.unwrap()
.get_mut(offset)
.and_then(|conn| Some(conn.set_active(false)));
if let Err(e) = reader_ctx.send(offset).await {
error!(
"Can't send to connector, aborting! {}",
e
);
}
}
reader_conns
.lock()
.unwrap()
.get_mut(offset)
.and_then(|conn| Some(conn.set_active(false)));
if let Err(e) = reader_ctx.send(offset).await {
error!("Can't send to connector, aborting! {}", e);
}
};
let writer_conns = connector_conns.clone();
let writer = async move {
while let Some(packet) = rx.recv().await {
trace!("Sending {:?}", &packet);
if let Err(_) = sink.send(packet).await {
error!("Connection ({}) dropped", offset);
writer_conns
.lock()
.unwrap()
.get_mut(offset)
.and_then(|conn| {
Some(conn.set_active(false))
});
};
let writer_conns = connector_conns.clone();
let writer = async move {
while let Some(packet) = rx.recv().await {
trace!("Sending {:?}", &packet);
if let Err(_) = sink.send(packet).await {
error!("Connection ({}) dropped", offset);
writer_conns
.lock()
.unwrap()
.get_mut(offset)
.and_then(|conn| {
Some(conn.set_active(false))
});
}
}
};
runtime::Handle::current().spawn(reader);
runtime::Handle::current().spawn(writer);
if let Err(e) = ctdtx.send(offset).await {
// Connected channel is closed, shut it all down
info!("Shutting down connector because connected channel returned error ({})", e);
break true;
}
};
runtime::Handle::current().spawn(reader);
runtime::Handle::current().spawn(writer);
if let Err(e) = ctdtx.send(offset).await {
// Connected channel is closed, shut it all down
info!("Shutting down connector because connected channel returned error ({})", e);
break;
}
}
};
if abort_connector {
break;
}
let ctx_conn = ctx_conn.clone();
error!(
"Could not connect to any addresses for {}, retrying in {:?}",
server, RECONNECT_BACKOFF
);
// Retry in BACKOFF seconds -- TODO: keep track and do exponential
runtime::Handle::current().spawn(async move {
sleep(RECONNECT_BACKOFF).await;
ctx_conn.send(offset).await
});
}
}
}
Expand Down

0 comments on commit 629f128

Please sign in to comment.