Skip to content

Commit

Permalink
Merge pull request blackbeam#258 from cloneable/absolute-connection-ttl
Browse files Browse the repository at this point in the history
Option to set an absolute TTL for connections
  • Loading branch information
blackbeam authored Sep 18, 2023
2 parents e9e037b + ec1a698 commit 02e47d9
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 19 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ once_cell = "1.7.2"
pem = "3.0"
percent-encoding = "2.1.0"
pin-project = "1.0.2"
rand = "0.8.5"
serde = "1"
serde_json = "1"
socket2 = "0.5.2"
Expand Down Expand Up @@ -74,7 +75,6 @@ optional = true
tempfile = "3.1.0"
socket2 = { version = "0.5.2", features = ["all"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
rand = "0.8.0"

[features]
default = [
Expand Down
8 changes: 8 additions & 0 deletions src/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ struct ConnInner {
tx_status: TxStatus,
reset_upon_returning_to_a_pool: bool,
opts: Opts,
ttl_deadline: Option<Instant>,
last_io: Instant,
wait_timeout: Duration,
stmt_cache: StmtCache,
Expand Down Expand Up @@ -140,6 +141,7 @@ impl fmt::Debug for ConnInner {
impl ConnInner {
/// Constructs an empty connection.
fn empty(opts: Opts) -> ConnInner {
let ttl_deadline = opts.pool_opts().new_connection_ttl_deadline();
ConnInner {
capabilities: opts.get_capabilities(),
status: StatusFlags::empty(),
Expand All @@ -157,6 +159,7 @@ impl ConnInner {
stmt_cache: StmtCache::new(opts.stmt_cache_size()),
socket: opts.socket().map(Into::into),
opts,
ttl_deadline,
nonce: Vec::default(),
auth_plugin: AuthPlugin::MysqlNativePassword,
auth_switched: false,
Expand Down Expand Up @@ -1088,6 +1091,11 @@ impl Conn {
/// Returns true if time since last IO exceeds `wait_timeout`
/// (or `conn_ttl` if specified in opts).
fn expired(&self) -> bool {
if let Some(deadline) = self.inner.ttl_deadline {
if Instant::now() > deadline {
return true;
}
}
let ttl = self
.inner
.opts
Expand Down
51 changes: 49 additions & 2 deletions src/conn/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ struct IdlingConn {
}

impl IdlingConn {
/// Returns true when this connection has a TTL and it elapsed.
fn expired(&self) -> bool {
self.conn
.inner
.ttl_deadline
.map(|t| Instant::now() > t)
.unwrap_or_default()
}

/// Returns duration elapsed since this connection is idling.
fn elapsed(&self) -> Duration {
self.since.elapsed()
Expand Down Expand Up @@ -82,8 +91,11 @@ impl Exchange {
// Spawn the Recycler.
tokio::spawn(Recycler::new(pool_opts.clone(), inner.clone(), dropped));

// Spawn the ttl check interval if `inactive_connection_ttl` isn't `0`
if pool_opts.inactive_connection_ttl() > Duration::from_secs(0) {
// Spawn the ttl check interval if `inactive_connection_ttl` isn't `0` or
// connections have an absolute TTL.
if pool_opts.inactive_connection_ttl() > Duration::ZERO
|| pool_opts.abs_conn_ttl().is_some()
{
tokio::spawn(TtlCheckInterval::new(pool_opts, inner.clone()));
}
}
Expand Down Expand Up @@ -1012,6 +1024,41 @@ mod test {
assert_eq!(0, waitlist.queue.len());
}

#[tokio::test]
async fn check_absolute_connection_ttl() -> super::Result<()> {
let constraints = PoolConstraints::new(1, 3).unwrap();
let pool_opts = PoolOpts::default()
.with_constraints(constraints)
.with_inactive_connection_ttl(Duration::from_secs(99))
.with_ttl_check_interval(Duration::from_secs(1))
.with_abs_conn_ttl(Some(Duration::from_secs(2)));

let pool = Pool::new(get_opts().pool_opts(pool_opts));

let conn_ttl0 = pool.get_conn().await?;
sleep(Duration::from_millis(1000)).await;
let conn_ttl1 = pool.get_conn().await?;
sleep(Duration::from_millis(1000)).await;
let conn_ttl2 = pool.get_conn().await?;

drop(conn_ttl0);
drop(conn_ttl1);
drop(conn_ttl2);
assert_eq!(ex_field!(pool, exist), 3);

sleep(Duration::from_millis(1500)).await;
assert_eq!(ex_field!(pool, exist), 2);

sleep(Duration::from_millis(1000)).await;
assert_eq!(ex_field!(pool, exist), 1);

// Go even below min pool size.
sleep(Duration::from_millis(1000)).await;
assert_eq!(ex_field!(pool, exist), 0);

Ok(())
}

#[cfg(feature = "nightly")]
mod bench {
use futures_util::future::{FutureExt, TryFutureExt};
Expand Down
48 changes: 33 additions & 15 deletions src/conn/pool/ttl_check_inerval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use pin_project::pin_project;
use tokio::time::{self, Interval};

use std::{
collections::VecDeque,
future::Future,
sync::{atomic::Ordering, Arc},
};
Expand Down Expand Up @@ -46,24 +47,41 @@ impl TtlCheckInterval {

/// Perform the check.
pub fn check_ttl(&self) {
let mut exchange = self.inner.exchange.lock().unwrap();
let to_be_dropped = {
let mut exchange = self.inner.exchange.lock().unwrap();

let num_idling = exchange.available.len();
let num_to_drop = num_idling.saturating_sub(self.pool_opts.constraints().min());
let num_to_drop = exchange
.available
.len()
.saturating_sub(self.pool_opts.constraints().min());

for _ in 0..num_to_drop {
let idling_conn = exchange.available.pop_front().unwrap();
if idling_conn.elapsed() > self.pool_opts.inactive_connection_ttl() {
assert!(idling_conn.conn.inner.pool.is_none());
let inner = self.inner.clone();
tokio::spawn(idling_conn.conn.disconnect().then(move |_| {
let mut exchange = inner.exchange.lock().unwrap();
exchange.exist -= 1;
ok::<_, ()>(())
}));
} else {
exchange.available.push_back(idling_conn);
let mut to_be_dropped = Vec::<_>::with_capacity(exchange.available.len());
let mut kept_available =
VecDeque::<_>::with_capacity(self.pool_opts.constraints().max());

while let Some(conn) = exchange.available.pop_front() {
if conn.expired() {
to_be_dropped.push(conn);
} else if to_be_dropped.len() < num_to_drop
&& conn.elapsed() > self.pool_opts.inactive_connection_ttl()
{
to_be_dropped.push(conn);
} else {
kept_available.push_back(conn);
}
}
exchange.available = kept_available;
to_be_dropped
};

for idling_conn in to_be_dropped {
assert!(idling_conn.conn.inner.pool.is_none());
let inner = self.inner.clone();
tokio::spawn(idling_conn.conn.disconnect().then(move |_| {
let mut exchange = inner.exchange.lock().unwrap();
exchange.exist -= 1;
ok::<_, ()>(())
}));
}
}
}
Expand Down
126 changes: 125 additions & 1 deletion src/opts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub use native_tls_opts::ClientIdentity;
pub use rustls_opts::ClientIdentity;

use percent_encoding::percent_decode;
use rand::Rng;
use url::{Host, Url};

use std::{
Expand All @@ -26,7 +27,7 @@ use std::{
path::Path,
str::FromStr,
sync::Arc,
time::Duration,
time::{Duration, Instant},
vec,
};

Expand Down Expand Up @@ -209,6 +210,8 @@ pub struct PoolOpts {
constraints: PoolConstraints,
inactive_connection_ttl: Duration,
ttl_check_interval: Duration,
abs_conn_ttl: Option<Duration>,
abs_conn_ttl_jitter: Option<Duration>,
reset_connection: bool,
}

Expand Down Expand Up @@ -273,6 +276,49 @@ impl PoolOpts {
self.reset_connection
}

/// Sets an absolute TTL after which a connection is removed from the pool.
/// This may push the pool below the requested minimum pool size and is indepedent of the
/// idle TTL.
/// The absolute TTL is disabled by default.
/// Fractions of seconds are ignored.
pub fn with_abs_conn_ttl(mut self, ttl: Option<Duration>) -> Self {
self.abs_conn_ttl = ttl;
self
}

/// Optionally, the absolute TTL can be extended by a per-connection random amount
/// bounded by `jitter`.
/// Setting `abs_conn_ttl_jitter` without `abs_conn_ttl` has no effect.
/// Fractions of seconds are ignored.
pub fn with_abs_conn_ttl_jitter(mut self, jitter: Option<Duration>) -> Self {
self.abs_conn_ttl_jitter = jitter;
self
}

/// Returns the absolute TTL, if set.
pub fn abs_conn_ttl(&self) -> Option<Duration> {
self.abs_conn_ttl
}

/// Returns the absolute TTL's jitter bound, if set.
pub fn abs_conn_ttl_jitter(&self) -> Option<Duration> {
self.abs_conn_ttl_jitter
}

/// Returns a new deadline that's TTL (+ random jitter) in the future.
pub(crate) fn new_connection_ttl_deadline(&self) -> Option<Instant> {
if let Some(ttl) = self.abs_conn_ttl {
let jitter = if let Some(jitter) = self.abs_conn_ttl_jitter {
Duration::from_secs(rand::thread_rng().gen_range(0..=jitter.as_secs()))
} else {
Duration::ZERO
};
Some(Instant::now() + ttl + jitter)
} else {
None
}
}

/// Pool will recycle inactive connection if it is outside of the lower bound of the pool
/// and if it is idling longer than this value (defaults to
/// [`DEFAULT_INACTIVE_CONNECTION_TTL`]).
Expand Down Expand Up @@ -359,6 +405,8 @@ impl Default for PoolOpts {
constraints: DEFAULT_POOL_CONSTRAINTS,
inactive_connection_ttl: DEFAULT_INACTIVE_CONNECTION_TTL,
ttl_check_interval: DEFAULT_TTL_CHECK_INTERVAL,
abs_conn_ttl: None,
abs_conn_ttl_jitter: None,
reset_connection: true,
}
}
Expand Down Expand Up @@ -662,6 +710,49 @@ impl Opts {
self.inner.mysql_opts.conn_ttl
}

/// The pool will close a connection when this absolute TTL has elapsed.
/// Disabled by default.
///
/// Enables forced recycling and migration of connections in a guaranteed timeframe.
/// This TTL bypasses pool constraints and an idle pool can go below the min size.
///
/// # Connection URL
///
/// You can use `abs_conn_ttl` URL parameter to set this value (in seconds). E.g.
///
/// ```
/// # use mysql_async::*;
/// # use std::time::Duration;
/// # fn main() -> Result<()> {
/// let opts = Opts::from_url("mysql://localhost/db?abs_conn_ttl=86400")?;
/// assert_eq!(opts.abs_conn_ttl(), Some(Duration::from_secs(24 * 60 * 60)));
/// # Ok(()) }
/// ```
pub fn abs_conn_ttl(&self) -> Option<Duration> {
self.inner.mysql_opts.pool_opts.abs_conn_ttl
}

/// Upper bound of a random value added to the absolute TTL, if enabled.
/// Disabled by default.
///
/// Should be used to prevent connections from closing at the same time.
///
/// # Connection URL
///
/// You can use `abs_conn_ttl_jitter` URL parameter to set this value (in seconds). E.g.
///
/// ```
/// # use mysql_async::*;
/// # use std::time::Duration;
/// # fn main() -> Result<()> {
/// let opts = Opts::from_url("mysql://localhost/db?abs_conn_ttl=7200&abs_conn_ttl_jitter=3600")?;
/// assert_eq!(opts.abs_conn_ttl_jitter(), Some(Duration::from_secs(60 * 60)));
/// # Ok(()) }
/// ```
pub fn abs_conn_ttl_jitter(&self) -> Option<Duration> {
self.inner.mysql_opts.pool_opts.abs_conn_ttl_jitter
}

/// Number of prepared statements cached on the client side (per connection). Defaults to
/// [`DEFAULT_STMT_CACHE_SIZE`].
///
Expand Down Expand Up @@ -1444,6 +1535,34 @@ fn mysqlopts_from_url(url: &Url) -> std::result::Result<MysqlOpts, UrlError> {
});
}
}
} else if key == "abs_conn_ttl" {
match u64::from_str(&*value) {
Ok(value) => {
opts.pool_opts = opts
.pool_opts
.with_abs_conn_ttl(Some(Duration::from_secs(value)))
}
_ => {
return Err(UrlError::InvalidParamValue {
param: "abs_conn_ttl".into(),
value,
});
}
}
} else if key == "abs_conn_ttl_jitter" {
match u64::from_str(&*value) {
Ok(value) => {
opts.pool_opts = opts
.pool_opts
.with_abs_conn_ttl_jitter(Some(Duration::from_secs(value)))
}
_ => {
return Err(UrlError::InvalidParamValue {
param: "abs_conn_ttl_jitter".into(),
value,
});
}
}
} else if key == "tcp_keepalive" {
match u32::from_str(&*value) {
Ok(value) => opts.tcp_keepalive = Some(value),
Expand Down Expand Up @@ -1679,6 +1798,11 @@ mod test {
assert_eq!(url_opts.tcp_nodelay(), builder_opts.tcp_nodelay());
assert_eq!(url_opts.pool_opts(), builder_opts.pool_opts());
assert_eq!(url_opts.conn_ttl(), builder_opts.conn_ttl());
assert_eq!(url_opts.abs_conn_ttl(), builder_opts.abs_conn_ttl());
assert_eq!(
url_opts.abs_conn_ttl_jitter(),
builder_opts.abs_conn_ttl_jitter()
);
assert_eq!(url_opts.stmt_cache_size(), builder_opts.stmt_cache_size());
assert_eq!(url_opts.ssl_opts(), builder_opts.ssl_opts());
assert_eq!(url_opts.prefer_socket(), builder_opts.prefer_socket());
Expand Down

0 comments on commit 02e47d9

Please sign in to comment.