From 87fe29c4abfcb28dd4cd989fb21835c956164fb3 Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Wed, 1 Mar 2023 17:49:18 -0800 Subject: [PATCH] fix(pool): close when last handle is dropped, extra check in `try_acquire` closes #1928 closes #2375 --- sqlx-core/src/connection.rs | 17 +++++++++-- sqlx-core/src/pool/inner.rs | 58 ++++++++++++++++++++++++++++--------- sqlx-core/src/pool/mod.rs | 21 ++++++++++++++ 3 files changed, 80 insertions(+), 16 deletions(-) diff --git a/sqlx-core/src/connection.rs b/sqlx-core/src/connection.rs index c73b23f47e..d437269157 100644 --- a/sqlx-core/src/connection.rs +++ b/sqlx-core/src/connection.rs @@ -17,9 +17,20 @@ pub trait Connection: Send { /// Explicitly close this database connection. /// - /// This method is **not required** for safe and consistent operation. However, it is - /// recommended to call it instead of letting a connection `drop` as the database backend - /// will be faster at cleaning up resources. + /// This notifies the database server that the connection is closing so that it can + /// free up any server-side resources in use. + /// + /// While connections can simply be dropped to clean up local resources, + /// the `Drop` handler itself cannot notify the server that the connection is being closed + /// because that may require I/O to send a termination message. That can result in a delay + /// before the server learns that the connection is gone, usually from a TCP keepalive timeout. + /// + /// Creating and dropping many connections in short order without calling `.close()` may + /// lead to errors from the database server because those senescent connections will still + /// count against any connection limit or quota that is configured. + /// + /// Therefore it is recommended to call `.close()` on a connection when you are done using it + /// and to `.await` the result to ensure the termination message is sent. fn close(self) -> BoxFuture<'static, Result<(), Error>>; /// Immediately close the connection without sending a graceful shutdown. diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index ceb4977998..2f92a0f623 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -81,9 +81,13 @@ impl PoolInner { self.is_closed.load(Ordering::Acquire) } - pub(super) fn close<'a>(self: &'a Arc) -> impl Future + 'a { + fn mark_closed(&self) { self.is_closed.store(true, Ordering::Release); self.on_closed.notify(usize::MAX); + } + + pub(super) fn close<'a>(self: &'a Arc) -> impl Future + 'a { + self.mark_closed(); async move { for permits in 1..=self.options.max_connections { @@ -209,6 +213,8 @@ impl PoolInner { } /// Try to atomically increment the pool size for a new connection. + /// + /// Returns `Err` if the pool is at max capacity already or is closed. pub(super) fn try_increment_size<'a>( self: &'a Arc, permit: AsyncSemaphoreReleaser<'a>, @@ -216,12 +222,16 @@ impl PoolInner { match self .size .fetch_update(Ordering::AcqRel, Ordering::Acquire, |size| { + if self.is_closed() { + return None; + } + size.checked_add(1) .filter(|size| size <= &self.options.max_connections) }) { // we successfully incremented the size Ok(_) => Ok(DecrementSizeGuard::from_permit((*self).clone(), permit)), - // the pool is at max capacity + // the pool is at max capacity or is closed Err(_) => Err(permit), } } @@ -258,7 +268,9 @@ impl PoolInner { // we can open a new connection guard } else { - // This can happen for a child pool that's at its connection limit. + // This can happen for a child pool that's at its connection limit, + // or if the pool was closed between `acquire_permit()` and + // `try_increment_size()`. tracing::debug!("woke but was unable to acquire idle connection or open new one; retrying"); // If so, we're likely in the current-thread runtime if it's Tokio // and so we should yield to let any spawned release_to_pool() tasks @@ -395,6 +407,8 @@ impl PoolInner { impl Drop for PoolInner { fn drop(&mut self) { + self.mark_closed(); + if let Some(parent) = &self.options.parent_pool { // Release the stolen permits. parent.0.semaphore.release(self.semaphore.permits()); @@ -461,7 +475,9 @@ async fn check_idle_conn( } fn spawn_maintenance_tasks(pool: &Arc>) { - let pool = Arc::clone(&pool); + // NOTE: use `pool_weak` for the maintenance tasks so + // they don't keep `PoolInner` from being dropped. + let pool_weak = Arc::downgrade(&pool); let period = match (pool.options.max_lifetime, pool.options.idle_timeout) { (Some(it), None) | (None, Some(it)) => it, @@ -471,7 +487,9 @@ fn spawn_maintenance_tasks(pool: &Arc>) { (None, None) => { if pool.options.min_connections > 0 { crate::rt::spawn(async move { - pool.min_connections_maintenance(None).await; + if let Some(pool) = pool_weak.upgrade() { + pool.min_connections_maintenance(None).await; + } }); } @@ -479,27 +497,41 @@ fn spawn_maintenance_tasks(pool: &Arc>) { } }; + // Immediately cancel this task if the pool is closed. + let mut close_event = pool.close_event(); + crate::rt::spawn(async move { - // Immediately cancel this task if the pool is closed. - let _ = pool - .close_event() + let _ = close_event .do_until(async { - while !pool.is_closed() { + let mut slept = true; + + // If the last handle to the pool was dropped while we were sleeping + while let Some(pool) = pool_weak.upgrade() { + if pool.is_closed() { + return; + } + + // Don't run the reaper right away. + if slept && !pool.idle_conns.is_empty() { + do_reap(&pool).await; + } + let next_run = Instant::now() + period; pool.min_connections_maintenance(Some(next_run)).await; + // Don't hold a reference to the pool while sleeping. + drop(pool); + if let Some(duration) = next_run.checked_duration_since(Instant::now()) { // `async-std` doesn't have a `sleep_until()` crate::rt::sleep(duration).await; } else { + // `next_run` is in the past, just yield. crate::rt::yield_now().await; } - // Don't run the reaper right away. - if !pool.idle_conns.is_empty() { - do_reap(&pool).await; - } + slept = true; } }) .await; diff --git a/sqlx-core/src/pool/mod.rs b/sqlx-core/src/pool/mod.rs index c5edb49158..bb89fc4ebf 100644 --- a/sqlx-core/src/pool/mod.rs +++ b/sqlx-core/src/pool/mod.rs @@ -142,6 +142,27 @@ pub use self::maybe::MaybePoolConnection; /// /// [web::Data]: https://docs.rs/actix-web/3/actix_web/web/struct.Data.html /// +/// ### Note: Drop Behavior +/// Due to a lack of async `Drop`, dropping the last `Pool` handle may not immediately clean +/// up connections by itself. The connections will be dropped locally, which is sufficient for +/// SQLite, but for client/server databases like MySQL and Postgres, that only closes the +/// client side of the connection. The server will not know the connection is closed until +/// potentially much later: this is usually dictated by the TCP keepalive timeout in the server +/// settings. +/// +/// Because the connection may not be cleaned up immediately on the server side, you may run +/// into errors regarding connection limits if you are creating and dropping many pools in short +/// order. +/// +/// We recommend calling [`.close().await`] to gracefully close the pool and its connections +/// when you are done using it. This will also wake any tasks that are waiting on an `.acquire()` +/// call, so for long-lived applications it's a good idea to call `.close()` during shutdown. +/// +/// If you're writing tests, consider using `#[sqlx::test]` which handles the lifetime of +/// the pool for you. +/// +/// [`.close().await`]: Pool::close +/// /// ### Why Use a Pool? /// /// A single database connection (in general) cannot be used by multiple threads simultaneously