Skip to content

Commit

Permalink
feat: opt-in nodelay for each service
Browse files Browse the repository at this point in the history
feat: enforce nodelay for every control channel
  • Loading branch information
rapiz1 committed Jan 12, 2022
1 parent 05242dd commit dddf735
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 17 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ remote_public_key = "key_encoded_in_base64" # Optional
type = "tcp" # Optional. The protocol that needs forwarding. Possible values: ["tcp", "udp"]. Default: "tcp"
token = "whatever" # Necessary if `client.default_token` not set
local_addr = "127.0.0.1:1081" # Necessary. The address of the service that needs to be forwarded
nodelay = false # Optional. Determine whether to enable TCP_NODELAY for data transmission, if applicable, to improve the latency but decrease the bandwidth. Default: false

[client.services.service2] # Multiple services can be defined
local_addr = "127.0.0.1:1082"
Expand All @@ -133,6 +134,7 @@ remote_public_key = "key_encoded_in_base64"
type = "tcp" # Optional. Same as the client `[client.services.X.type]
token = "whatever" # Necessary if `server.default_token` not set
bind_addr = "0.0.0.0:8081" # Necessary. The address of the service is exposed at. Generally only the port needs to be change.
nodelay = false # Optional. Same as the client

[server.services.service2]
bind_addr = "0.0.0.1:8082"
Expand Down
14 changes: 11 additions & 3 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::protocol::{
self, read_ack, read_control_cmd, read_data_cmd, read_hello, Ack, Auth, ControlChannelCmd,
DataChannelCmd, UdpTraffic, CURRENT_PROTO_VERSION, HASH_WIDTH_IN_BYTES,
};
use crate::transport::{TcpTransport, Transport};
use crate::transport::{control_channel_socket_opts, SocketOpts, TcpTransport, Transport};
use anyhow::{anyhow, bail, Context, Result};
use backoff::ExponentialBackoff;
use bytes::{Bytes, BytesMut};
Expand Down Expand Up @@ -151,6 +151,7 @@ struct RunDataChannelArgs<T: Transport> {
remote_addr: String,
local_addr: String,
connector: Arc<T>,
socket_opts: SocketOpts,
}

async fn do_data_channel_handshake<T: Transport>(
Expand All @@ -168,11 +169,14 @@ async fn do_data_channel_handshake<T: Transport>(
let mut conn: T::Stream = backoff::future::retry_notify(
backoff,
|| async {
Ok(args
let conn = args
.connector
.connect(&args.remote_addr)
.await
.with_context(|| "Failed to connect to remote_addr")?)
.with_context(|| "Failed to connect to remote_addr")?;
T::hint(&conn, args.socket_opts);

Ok(conn)
},
|e, _| {
warn!("{:?}", e);
Expand Down Expand Up @@ -378,6 +382,7 @@ impl<T: 'static + Transport> ControlChannel<T> {
.connect(&self.remote_addr)
.await
.with_context(|| format!("Failed to connect to the server: {}", &self.remote_addr))?;
T::hint(&conn, control_channel_socket_opts());

// Send hello
debug!("Sending hello");
Expand Down Expand Up @@ -422,11 +427,14 @@ impl<T: 'static + Transport> ControlChannel<T> {

let remote_addr = self.remote_addr.clone();
let local_addr = self.service.local_addr.clone();
// Socket options for the data channel
let socket_opts = SocketOpts::from_client_cfg(&self.service);
let data_ch_args = Arc::new(RunDataChannelArgs {
session_key,
remote_addr,
local_addr,
connector: self.transport.clone(),
socket_opts,
});

loop {
Expand Down
6 changes: 6 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub struct ClientServiceConfig {
pub name: String,
pub local_addr: String,
pub token: Option<String>,
#[serde(default = "default_nodelay")]
pub nodelay: bool,
}

impl ClientServiceConfig {
Expand Down Expand Up @@ -65,6 +67,8 @@ pub struct ServerServiceConfig {
pub name: String,
pub bind_addr: String,
pub token: Option<String>,
#[serde(default = "default_nodelay")]
pub nodelay: bool,
}

impl ServerServiceConfig {
Expand Down Expand Up @@ -300,6 +304,7 @@ mod tests {
name: "foo1".into(),
bind_addr: "127.0.0.1:80".into(),
token: None,
..Default::default()
},
);

Expand Down Expand Up @@ -347,6 +352,7 @@ mod tests {
name: "foo1".into(),
local_addr: "127.0.0.1:80".into(),
token: None,
..Default::default()
},
);

Expand Down
10 changes: 8 additions & 2 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::protocol::{
self, read_auth, read_hello, Ack, ControlChannelCmd, DataChannelCmd, Hello, UdpTraffic,
HASH_WIDTH_IN_BYTES,
};
use crate::transport::{TcpTransport, Transport};
use crate::transport::{control_channel_socket_opts, SocketOpts, TcpTransport, Transport};
use anyhow::{anyhow, bail, Context, Result};
use backoff::backoff::Backoff;
use backoff::ExponentialBackoff;
Expand Down Expand Up @@ -254,6 +254,8 @@ async fn do_control_channel_handshake<T: 'static + Transport>(
) -> Result<()> {
info!("Try to handshake a control channel");

T::hint(&conn, control_channel_socket_opts());

// Generate a nonce
let mut nonce = vec![0u8; HASH_WIDTH_IN_BYTES];
rand::thread_rng().fill_bytes(&mut nonce);
Expand Down Expand Up @@ -336,6 +338,8 @@ async fn do_data_channel_handshake<T: 'static + Transport>(
let control_channels_guard = control_channels.read().await;
match control_channels_guard.get2(&nonce) {
Some(handle) => {
T::hint(&conn, SocketOpts::from_server_cfg(&handle.service));

// Send the data channel to the corresponding control channel
handle
.data_ch_tx
Expand All @@ -354,6 +358,7 @@ pub struct ControlChannelHandle<T: Transport> {
// Shutdown the control channel by dropping it
_shutdown_tx: broadcast::Sender<bool>,
data_ch_tx: mpsc::Sender<T::Stream>,
service: ServerServiceConfig,
}

impl<T> ControlChannelHandle<T>
Expand Down Expand Up @@ -426,7 +431,7 @@ where
let ch = ControlChannel::<T> {
conn,
shutdown_rx,
service,
service: service.clone(),
data_ch_req_rx,
};

Expand All @@ -443,6 +448,7 @@ where
ControlChannelHandle {
_shutdown_tx: shutdown_tx,
data_ch_tx,
service,
}
}
}
Expand Down
35 changes: 29 additions & 6 deletions src/transport/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::config::TransportConfig;
use crate::config::{ClientServiceConfig, ServerServiceConfig, TransportConfig};
use crate::helper::try_set_tcp_keepalive;
use anyhow::{Context, Result};
use async_trait::async_trait;
Expand All @@ -21,8 +21,8 @@ pub trait Transport: Debug + Send + Sync {
fn new(config: &TransportConfig) -> Result<Self>
where
Self: Sized;
/// Apply `opt` to `conn` if the underlying protocol supports
fn hint(conn: &Self::RawStream, opt: SocketOpts);
/// Apply socket options to `conn` if the underlying protocol supports
fn hint(conn: &Self::Stream, opts: SocketOpts);
/// Create a socket for incoming connections
async fn bind<T: ToSocketAddrs + Send + Sync>(&self, addr: T) -> Result<Self::Acceptor>;
/// Accept a connection
Expand All @@ -44,8 +44,9 @@ mod noise;
#[cfg(feature = "noise")]
pub use noise::NoiseTransport;

#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub struct SocketOpts {
// None means do not change
pub nodelay: Option<bool>,
pub keepalive_secs: Option<u64>,
}
Expand All @@ -60,13 +61,27 @@ impl Default for SocketOpts {
}

impl SocketOpts {
pub fn from(config: &TransportConfig) -> SocketOpts {
pub fn from_transport_cfg(cfg: &TransportConfig) -> SocketOpts {
SocketOpts {
nodelay: Some(config.nodelay),
nodelay: Some(cfg.nodelay),
..Default::default()
}
}

pub fn from_client_cfg(cfg: &ClientServiceConfig) -> SocketOpts {
SocketOpts {
nodelay: Some(cfg.nodelay),
keepalive_secs: None,
}
}

pub fn from_server_cfg(cfg: &ServerServiceConfig) -> SocketOpts {
SocketOpts {
nodelay: Some(cfg.nodelay),
keepalive_secs: None,
}
}

pub fn apply(&self, conn: &TcpStream) {
if let Some(keepalive_secs) = self.keepalive_secs {
if let Err(e) = try_set_tcp_keepalive(conn, Duration::from_secs(keepalive_secs))
Expand All @@ -86,3 +101,11 @@ impl SocketOpts {
}
}
}

/// Socket options for the control channel
pub fn control_channel_socket_opts() -> SocketOpts {
SocketOpts {
nodelay: Some(true), // Always set nodelay for the control channel
keepalive_secs: None, // None means do not change. Keepalive is set by TcpTransport
}
}
4 changes: 2 additions & 2 deletions src/transport/noise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ impl Transport for NoiseTransport {
})
}

fn hint(conn: &Self::RawStream, opt: SocketOpts) {
opt.apply(conn);
fn hint(_conn: &Self::Stream, _opt: SocketOpts) {
//FIXME: wait for snowstorm
}

async fn bind<T: ToSocketAddrs + Send + Sync>(&self, addr: T) -> Result<Self::Acceptor> {
Expand Down
4 changes: 2 additions & 2 deletions src/transport/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ impl Transport for TcpTransport {

fn new(config: &TransportConfig) -> Result<Self> {
Ok(TcpTransport {
socket_opts: SocketOpts::from(config),
socket_opts: SocketOpts::from_transport_cfg(config),
})
}

fn hint(conn: &Self::RawStream, opt: SocketOpts) {
fn hint(conn: &Self::Stream, opt: SocketOpts) {
opt.apply(conn);
}

Expand Down
4 changes: 2 additions & 2 deletions src/transport/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ impl Transport for TlsTransport {
})
}

fn hint(conn: &Self::RawStream, opt: SocketOpts) {
opt.apply(conn);
fn hint(conn: &Self::Stream, opt: SocketOpts) {
opt.apply(conn.get_ref().get_ref().get_ref());
}

async fn bind<A: ToSocketAddrs + Send + Sync>(&self, addr: A) -> Result<Self::Acceptor> {
Expand Down

0 comments on commit dddf735

Please sign in to comment.