Skip to content

Commit

Permalink
Improve max_lifetime handling (#3065)
Browse files Browse the repository at this point in the history
* Check max lifetime in return_to_pool, not on acquire

* Improve checks in backgrand maintenance task

* add tests

* adjust test to fix
  • Loading branch information
mirek26 authored Mar 5, 2024
1 parent 27a4991 commit b4f6596
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 49 deletions.
9 changes: 8 additions & 1 deletion sqlx-core/src/pool/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;

use super::inner::{DecrementSizeGuard, PoolInner};
use super::inner::{is_beyond_max_lifetime, DecrementSizeGuard, PoolInner};
use crate::pool::options::PoolConnectionMetadata;
use std::future::Future;

Expand Down Expand Up @@ -239,6 +239,13 @@ impl<DB: Database> Floating<DB, Live<DB>> {
return false;
}

// If the connection is beyond max lifetime, close the connection and
// immediately create a new connection
if is_beyond_max_lifetime(&self.inner, &self.guard.pool.options) {
self.close().await;
return false;
}

if let Some(test) = &self.guard.pool.options.after_release {
let meta = self.metadata();
match (test)(&mut self.inner.raw, meta).await {
Expand Down
64 changes: 22 additions & 42 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl<DB: Database> PoolInner<DB> {
}

pub(super) fn release(&self, floating: Floating<DB, Live<DB>>) {
// `options.after_release` is invoked by `PoolConnection::release_to_pool()`.
// `options.after_release` and other checks are in `PoolConnection::return_to_pool()`.

let Floating { inner: idle, guard } = floating.into_idle();

Expand Down Expand Up @@ -273,7 +273,7 @@ impl<DB: Database> PoolInner<DB> {
// `try_increment_size()`.
tracing::debug!("woke but was unable to acquire idle connection or open new one; retrying");
// If so, we're likely in the current-thread runtime if it's Tokio
// and so we should yield to let any spawned release_to_pool() tasks
// and so we should yield to let any spawned return_to_pool() tasks
// execute.
crate::rt::yield_now().await;
continue;
Expand Down Expand Up @@ -417,7 +417,10 @@ impl<DB: Database> Drop for PoolInner<DB> {
}

/// Returns `true` if the connection has exceeded `options.max_lifetime` if set, `false` otherwise.
fn is_beyond_max_lifetime<DB: Database>(live: &Live<DB>, options: &PoolOptions<DB>) -> bool {
pub(super) fn is_beyond_max_lifetime<DB: Database>(
live: &Live<DB>,
options: &PoolOptions<DB>,
) -> bool {
options
.max_lifetime
.map_or(false, |max| live.created_at.elapsed() > max)
Expand All @@ -434,12 +437,6 @@ async fn check_idle_conn<DB: Database>(
mut conn: Floating<DB, Idle<DB>>,
options: &PoolOptions<DB>,
) -> Result<Floating<DB, Live<DB>>, DecrementSizeGuard<DB>> {
// If the connection we pulled has expired, close the connection and
// immediately create a new connection
if is_beyond_max_lifetime(&conn, options) {
return Err(conn.close().await);
}

if options.test_before_acquire {
// Check that the connection is still live
if let Err(error) = conn.ping().await {
Expand Down Expand Up @@ -503,22 +500,30 @@ fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {
crate::rt::spawn(async move {
let _ = close_event
.do_until(async {
let mut slept = true;

// If the last handle to the pool was dropped while we were sleeping
while let Some(pool) = pool_weak.upgrade() {
if pool.is_closed() {
return;
}

// Don't run the reaper right away.
if slept && !pool.idle_conns.is_empty() {
do_reap(&pool).await;
}

let next_run = Instant::now() + period;

pool.min_connections_maintenance(Some(next_run)).await;
// Go over all idle connections, check for idleness and lifetime,
// and if we have fewer than min_connections after reaping a connection,
// open a new one immediately. Note that other connections may be popped from
// the queue in the meantime - that's fine, there is no harm in checking more
for _ in 0..pool.num_idle() {
if let Some(conn) = pool.try_acquire() {
if is_beyond_idle_timeout(&conn, &pool.options)
|| is_beyond_max_lifetime(&conn, &pool.options)
{
let _ = conn.close().await;
pool.min_connections_maintenance(Some(next_run)).await;
} else {
pool.release(conn.into_live());
}
}
}

// Don't hold a reference to the pool while sleeping.
drop(pool);
Expand All @@ -530,37 +535,12 @@ fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {
// `next_run` is in the past, just yield.
crate::rt::yield_now().await;
}

slept = true;
}
})
.await;
});
}

async fn do_reap<DB: Database>(pool: &Arc<PoolInner<DB>>) {
// reap at most the current size minus the minimum idle
let max_reaped = pool.size().saturating_sub(pool.options.min_connections);

// collect connections to reap
let (reap, keep) = (0..max_reaped)
// only connections waiting in the queue
.filter_map(|_| pool.try_acquire())
.partition::<Vec<_>, _>(|conn| {
is_beyond_idle_timeout(conn, &pool.options)
|| is_beyond_max_lifetime(conn, &pool.options)
});

for conn in keep {
// return valid connections to the pool first
pool.release(conn.into_live());
}

for conn in reap {
let _ = conn.close().await;
}
}

/// RAII guard returned by `Pool::try_increment_size()` and others.
///
/// Will decrement the pool size if dropped, to avoid semantically "leaking" connections
Expand Down
74 changes: 68 additions & 6 deletions tests/any/pool.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use sqlx::any::{AnyConnectOptions, AnyPoolOptions};
use sqlx::Executor;
use std::sync::atomic::AtomicI32;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
atomic::{AtomicI32, AtomicUsize, Ordering},
Arc, Mutex,
};
use std::time::Duration;

Expand Down Expand Up @@ -116,7 +115,7 @@ async fn test_pool_callbacks() -> anyhow::Result<()> {
CREATE TEMPORARY TABLE conn_stats(
id int primary key,
before_acquire_calls int default 0,
after_release_calls int default 0
after_release_calls int default 0
);
INSERT INTO conn_stats(id) VALUES ({});
"#,
Expand All @@ -137,7 +136,7 @@ async fn test_pool_callbacks() -> anyhow::Result<()> {
// MySQL and MariaDB don't support UPDATE ... RETURNING
sqlx::query(
r#"
UPDATE conn_stats
UPDATE conn_stats
SET before_acquire_calls = before_acquire_calls + 1
"#,
)
Expand All @@ -161,7 +160,7 @@ async fn test_pool_callbacks() -> anyhow::Result<()> {
Box::pin(async move {
sqlx::query(
r#"
UPDATE conn_stats
UPDATE conn_stats
SET after_release_calls = after_release_calls + 1
"#,
)
Expand Down Expand Up @@ -216,3 +215,66 @@ async fn test_pool_callbacks() -> anyhow::Result<()> {

Ok(())
}

#[sqlx_macros::test]
async fn test_connection_maintenance() -> anyhow::Result<()> {
sqlx::any::install_default_drivers();
sqlx_test::setup_if_needed();
let conn_options: AnyConnectOptions = std::env::var("DATABASE_URL")?.parse()?;

let last_meta = Arc::new(Mutex::new(None));
let last_meta_ = last_meta.clone();
let pool = AnyPoolOptions::new()
.max_lifetime(Duration::from_millis(400))
.min_connections(3)
.before_acquire(move |_conn, _meta| {
*last_meta_.lock().unwrap() = Some(_meta);
Box::pin(async { Ok(true) })
})
.connect_lazy_with(conn_options);

// Open and release 5 connections
let conns = vec![
pool.acquire().await?,
pool.acquire().await?,
pool.acquire().await?,
pool.acquire().await?,
pool.acquire().await?,
];
assert_eq!(pool.size(), 5);
assert_eq!(pool.num_idle(), 0);
for mut conn in conns {
conn.return_to_pool().await;
}

assert_eq!(pool.size(), 5);
assert_eq!(pool.num_idle(), 5);

// Wait for at least two iterations of maintenance task
sqlx_core::rt::sleep(Duration::from_secs(1)).await;

// Existing connections should have been closed due to max lifetime
// and the pool should have reopened min_connections new ones.
// One connection might be in the process of being replaced so we assert 2-3.
assert!(
pool.size() >= 2 && pool.size() <= 3,
"pool.size() = {}",
pool.size()
);
for _ in 0..2 {
// Check that the connections was both acquired from the pool AND it's new
let _ = pool.acquire().await.expect("failed to acquire connection");
let meta = last_meta
.lock()
.unwrap()
.take()
.expect("expected a connection from the pool");
assert!(
meta.age < Duration::from_secs(1),
"expected a fresh connection (age {:?})",
meta.age
);
}

Ok(())
}

1 comment on commit b4f6596

@maxcountryman
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There appears to be some kind of connection leak bug in 0.7.4 related to pools. I wonder if this change might be the culprit.

Please sign in to comment.