Skip to content

Commit

Permalink
fix(pool): close when last handle is dropped, extra check in `try_acq…
Browse files Browse the repository at this point in the history
…uire`

closes #1928
closes #2375
  • Loading branch information
abonander committed Mar 2, 2023
1 parent c17c59f commit 1fb1945
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 16 deletions.
17 changes: 14 additions & 3 deletions sqlx-core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,20 @@ pub trait Connection: Send {

/// Explicitly close this database connection.
///
/// This method is **not required** for safe and consistent operation. However, it is
/// recommended to call it instead of letting a connection `drop` as the database backend
/// will be faster at cleaning up resources.
/// This notifies the database server that the connection is closing so that it can
/// free up any server-side resources in use.
///
/// While connections can simply be dropped to clean up local resources,
/// the `Drop` handler itself cannot notify the server that the connection is being closed
/// because that may require I/O to send a termination message. That can result in a delay
/// before the server learns that the connection is gone, usually from a TCP keepalive timeout.
///
/// Creating and dropping many connections in short order without calling `.close()` may
/// lead to errors from the database server because those senescent connections will still
/// count against any connection limit or quota that is configured.
///
/// Therefore it is recommended to call `.close()` on a connection when you are done using it
/// and to `.await` the result to ensure the termination message is sent.
fn close(self) -> BoxFuture<'static, Result<(), Error>>;

/// Immediately close the connection without sending a graceful shutdown.
Expand Down
58 changes: 45 additions & 13 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,13 @@ impl<DB: Database> PoolInner<DB> {
self.is_closed.load(Ordering::Acquire)
}

pub(super) fn close<'a>(self: &'a Arc<Self>) -> impl Future<Output = ()> + 'a {
fn mark_closed(&self) {
self.is_closed.store(true, Ordering::Release);
self.on_closed.notify(usize::MAX);
}

pub(super) fn close<'a>(self: &'a Arc<Self>) -> impl Future<Output = ()> + 'a {
self.mark_closed();

async move {
for permits in 1..=self.options.max_connections {
Expand Down Expand Up @@ -209,19 +213,25 @@ impl<DB: Database> PoolInner<DB> {
}

/// Try to atomically increment the pool size for a new connection.
///
/// Returns `Err` if the pool is at max capacity already or is closed.
pub(super) fn try_increment_size<'a>(
self: &'a Arc<Self>,
permit: AsyncSemaphoreReleaser<'a>,
) -> Result<DecrementSizeGuard<DB>, AsyncSemaphoreReleaser<'a>> {
match self
.size
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |size| {
if self.is_closed() {
return None;
}

size.checked_add(1)
.filter(|size| size <= &self.options.max_connections)
}) {
// we successfully incremented the size
Ok(_) => Ok(DecrementSizeGuard::from_permit((*self).clone(), permit)),
// the pool is at max capacity
// the pool is at max capacity or is closed
Err(_) => Err(permit),
}
}
Expand Down Expand Up @@ -258,7 +268,9 @@ impl<DB: Database> PoolInner<DB> {
// we can open a new connection
guard
} else {
// This can happen for a child pool that's at its connection limit.
// This can happen for a child pool that's at its connection limit,
// or if the pool was closed between `acquire_permit()` and
// `try_increment_size()`.
tracing::debug!("woke but was unable to acquire idle connection or open new one; retrying");
// If so, we're likely in the current-thread runtime if it's Tokio
// and so we should yield to let any spawned release_to_pool() tasks
Expand Down Expand Up @@ -395,6 +407,8 @@ impl<DB: Database> PoolInner<DB> {

impl<DB: Database> Drop for PoolInner<DB> {
fn drop(&mut self) {
self.mark_closed();

if let Some(parent) = &self.options.parent_pool {
// Release the stolen permits.
parent.0.semaphore.release(self.semaphore.permits());
Expand Down Expand Up @@ -461,7 +475,9 @@ async fn check_idle_conn<DB: Database>(
}

fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {
let pool = Arc::clone(&pool);
// NOTE: use `pool_weak` for the maintenance tasks so
// they don't keep `PoolInner` from being dropped.
let pool_weak = Arc::downgrade(&pool);

let period = match (pool.options.max_lifetime, pool.options.idle_timeout) {
(Some(it), None) | (None, Some(it)) => it,
Expand All @@ -471,35 +487,51 @@ fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {
(None, None) => {
if pool.options.min_connections > 0 {
crate::rt::spawn(async move {
pool.min_connections_maintenance(None).await;
if let Some(pool) = pool_weak.upgrade() {
pool.min_connections_maintenance(None).await;
}
});
}

return;
}
};

// Immediately cancel this task if the pool is closed.
let mut close_event = pool.close_event();

crate::rt::spawn(async move {
// Immediately cancel this task if the pool is closed.
let _ = pool
.close_event()
let _ = close_event
.do_until(async {
while !pool.is_closed() {
let mut slept = true;

// If the last handle to the pool was dropped while we were sleeping
while let Some(pool) = pool_weak.upgrade() {
if pool.is_closed() {
return;
}

// Don't run the reaper right away.
if slept && !pool.idle_conns.is_empty() {
do_reap(&pool).await;
}

let next_run = Instant::now() + period;

pool.min_connections_maintenance(Some(next_run)).await;

// Don't hold a reference to the pool while sleeping.
drop(pool);

if let Some(duration) = next_run.checked_duration_since(Instant::now()) {
// `async-std` doesn't have a `sleep_until()`
crate::rt::sleep(duration).await;
} else {
// `next_run` is in the past, just yield.
crate::rt::yield_now().await;
}

// Don't run the reaper right away.
if !pool.idle_conns.is_empty() {
do_reap(&pool).await;
}
slept = true;
}
})
.await;
Expand Down
21 changes: 21 additions & 0 deletions sqlx-core/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,27 @@ pub use self::maybe::MaybePoolConnection;
///
/// [web::Data]: https://docs.rs/actix-web/3/actix_web/web/struct.Data.html
///
/// ### Note: Drop Behavior
/// Due to a lack of async `Drop`, dropping the last `Pool` handle may not immediately clean
/// up connections by itself. The connections will be dropped locally, which is sufficient for
/// SQLite, but for client/server databases like MySQL and Postgres, that only closes the
/// client side of the connection. The server will not know the connection is closed until
/// potentially much later: this is usually dictated by the TCP keepalive timeout in the server
/// settings.
///
/// Because the connection may not be cleaned up immediately on the server side, you may run
/// into errors regarding connection limits if you are creating and dropping many pools in short
/// order.
///
/// We recommend calling [`.close().await`] to gracefully close the pool and its connections
/// when you are done using it. This will also wake any tasks that are waiting on an `.acquire()`
/// call, so for long-lived applications it's a good idea to call `.close()` during shutdown.
///
/// If you're writing tests, consider using `#[sqlx::test]` which handles the lifetime of
/// the pool for you.
///
/// [`.close().await`]: Pool::close
///
/// ### Why Use a Pool?
///
/// A single database connection (in general) cannot be used by multiple threads simultaneously
Expand Down

0 comments on commit 1fb1945

Please sign in to comment.