diff --git a/sqlx-core/src/pool/connection.rs b/sqlx-core/src/pool/connection.rs index e73f4f2735..c1e163c704 100644 --- a/sqlx-core/src/pool/connection.rs +++ b/sqlx-core/src/pool/connection.rs @@ -9,7 +9,7 @@ use crate::connection::Connection; use crate::database::Database; use crate::error::Error; -use super::inner::{DecrementSizeGuard, PoolInner}; +use super::inner::{is_beyond_max_lifetime, DecrementSizeGuard, PoolInner}; use crate::pool::options::PoolConnectionMetadata; use std::future::Future; @@ -239,6 +239,13 @@ impl Floating> { return false; } + // If the connection is beyond max lifetime, close the connection and + // immediately create a new connection + if is_beyond_max_lifetime(&self.inner, &self.guard.pool.options) { + self.close().await; + return false; + } + if let Some(test) = &self.guard.pool.options.after_release { let meta = self.metadata(); match (test)(&mut self.inner.raw, meta).await { diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index c14e6a434b..3c265476ff 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -197,7 +197,7 @@ impl PoolInner { } pub(super) fn release(&self, floating: Floating>) { - // `options.after_release` is invoked by `PoolConnection::release_to_pool()`. + // `options.after_release` and other checks are in `PoolConnection::return_to_pool()`. let Floating { inner: idle, guard } = floating.into_idle(); @@ -273,7 +273,7 @@ impl PoolInner { // `try_increment_size()`. tracing::debug!("woke but was unable to acquire idle connection or open new one; retrying"); // If so, we're likely in the current-thread runtime if it's Tokio - // and so we should yield to let any spawned release_to_pool() tasks + // and so we should yield to let any spawned return_to_pool() tasks // execute. crate::rt::yield_now().await; continue; @@ -417,7 +417,10 @@ impl Drop for PoolInner { } /// Returns `true` if the connection has exceeded `options.max_lifetime` if set, `false` otherwise. -fn is_beyond_max_lifetime(live: &Live, options: &PoolOptions) -> bool { +pub(super) fn is_beyond_max_lifetime( + live: &Live, + options: &PoolOptions, +) -> bool { options .max_lifetime .map_or(false, |max| live.created_at.elapsed() > max) @@ -434,12 +437,6 @@ async fn check_idle_conn( mut conn: Floating>, options: &PoolOptions, ) -> Result>, DecrementSizeGuard> { - // If the connection we pulled has expired, close the connection and - // immediately create a new connection - if is_beyond_max_lifetime(&conn, options) { - return Err(conn.close().await); - } - if options.test_before_acquire { // Check that the connection is still live if let Err(error) = conn.ping().await { @@ -503,22 +500,30 @@ fn spawn_maintenance_tasks(pool: &Arc>) { crate::rt::spawn(async move { let _ = close_event .do_until(async { - let mut slept = true; - // If the last handle to the pool was dropped while we were sleeping while let Some(pool) = pool_weak.upgrade() { if pool.is_closed() { return; } - // Don't run the reaper right away. - if slept && !pool.idle_conns.is_empty() { - do_reap(&pool).await; - } - let next_run = Instant::now() + period; - pool.min_connections_maintenance(Some(next_run)).await; + // Go over all idle connections, check for idleness and lifetime, + // and if we have fewer than min_connections after reaping a connection, + // open a new one immediately. Note that other connections may be popped from + // the queue in the meantime - that's fine, there is no harm in checking more + for _ in 0..pool.num_idle() { + if let Some(conn) = pool.try_acquire() { + if is_beyond_idle_timeout(&conn, &pool.options) + || is_beyond_max_lifetime(&conn, &pool.options) + { + let _ = conn.close().await; + pool.min_connections_maintenance(Some(next_run)).await; + } else { + pool.release(conn.into_live()); + } + } + } // Don't hold a reference to the pool while sleeping. drop(pool); @@ -530,37 +535,12 @@ fn spawn_maintenance_tasks(pool: &Arc>) { // `next_run` is in the past, just yield. crate::rt::yield_now().await; } - - slept = true; } }) .await; }); } -async fn do_reap(pool: &Arc>) { - // reap at most the current size minus the minimum idle - let max_reaped = pool.size().saturating_sub(pool.options.min_connections); - - // collect connections to reap - let (reap, keep) = (0..max_reaped) - // only connections waiting in the queue - .filter_map(|_| pool.try_acquire()) - .partition::, _>(|conn| { - is_beyond_idle_timeout(conn, &pool.options) - || is_beyond_max_lifetime(conn, &pool.options) - }); - - for conn in keep { - // return valid connections to the pool first - pool.release(conn.into_live()); - } - - for conn in reap { - let _ = conn.close().await; - } -} - /// RAII guard returned by `Pool::try_increment_size()` and others. /// /// Will decrement the pool size if dropped, to avoid semantically "leaking" connections diff --git a/tests/any/pool.rs b/tests/any/pool.rs index ed36dd74b6..901258a5c9 100644 --- a/tests/any/pool.rs +++ b/tests/any/pool.rs @@ -1,9 +1,8 @@ use sqlx::any::{AnyConnectOptions, AnyPoolOptions}; use sqlx::Executor; -use std::sync::atomic::AtomicI32; use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, + atomic::{AtomicI32, AtomicUsize, Ordering}, + Arc, Mutex, }; use std::time::Duration; @@ -116,7 +115,7 @@ async fn test_pool_callbacks() -> anyhow::Result<()> { CREATE TEMPORARY TABLE conn_stats( id int primary key, before_acquire_calls int default 0, - after_release_calls int default 0 + after_release_calls int default 0 ); INSERT INTO conn_stats(id) VALUES ({}); "#, @@ -137,7 +136,7 @@ async fn test_pool_callbacks() -> anyhow::Result<()> { // MySQL and MariaDB don't support UPDATE ... RETURNING sqlx::query( r#" - UPDATE conn_stats + UPDATE conn_stats SET before_acquire_calls = before_acquire_calls + 1 "#, ) @@ -161,7 +160,7 @@ async fn test_pool_callbacks() -> anyhow::Result<()> { Box::pin(async move { sqlx::query( r#" - UPDATE conn_stats + UPDATE conn_stats SET after_release_calls = after_release_calls + 1 "#, ) @@ -216,3 +215,66 @@ async fn test_pool_callbacks() -> anyhow::Result<()> { Ok(()) } + +#[sqlx_macros::test] +async fn test_connection_maintenance() -> anyhow::Result<()> { + sqlx::any::install_default_drivers(); + sqlx_test::setup_if_needed(); + let conn_options: AnyConnectOptions = std::env::var("DATABASE_URL")?.parse()?; + + let last_meta = Arc::new(Mutex::new(None)); + let last_meta_ = last_meta.clone(); + let pool = AnyPoolOptions::new() + .max_lifetime(Duration::from_millis(400)) + .min_connections(3) + .before_acquire(move |_conn, _meta| { + *last_meta_.lock().unwrap() = Some(_meta); + Box::pin(async { Ok(true) }) + }) + .connect_lazy_with(conn_options); + + // Open and release 5 connections + let conns = vec![ + pool.acquire().await?, + pool.acquire().await?, + pool.acquire().await?, + pool.acquire().await?, + pool.acquire().await?, + ]; + assert_eq!(pool.size(), 5); + assert_eq!(pool.num_idle(), 0); + for mut conn in conns { + conn.return_to_pool().await; + } + + assert_eq!(pool.size(), 5); + assert_eq!(pool.num_idle(), 5); + + // Wait for at least two iterations of maintenance task + sqlx_core::rt::sleep(Duration::from_secs(1)).await; + + // Existing connections should have been closed due to max lifetime + // and the pool should have reopened min_connections new ones. + // One connection might be in the process of being replaced so we assert 2-3. + assert!( + pool.size() >= 2 && pool.size() <= 3, + "pool.size() = {}", + pool.size() + ); + for _ in 0..2 { + // Check that the connections was both acquired from the pool AND it's new + let _ = pool.acquire().await.expect("failed to acquire connection"); + let meta = last_meta + .lock() + .unwrap() + .take() + .expect("expected a connection from the pool"); + assert!( + meta.age < Duration::from_secs(1), + "expected a fresh connection (age {:?})", + meta.age + ); + } + + Ok(()) +}