Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pool): close when last handle is dropped, extra check in try_acquire #2376

Merged
merged 1 commit into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions sqlx-core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,20 @@ pub trait Connection: Send {

/// Explicitly close this database connection.
///
/// This method is **not required** for safe and consistent operation. However, it is
/// recommended to call it instead of letting a connection `drop` as the database backend
/// will be faster at cleaning up resources.
/// This notifies the database server that the connection is closing so that it can
/// free up any server-side resources in use.
///
/// While connections can simply be dropped to clean up local resources,
/// the `Drop` handler itself cannot notify the server that the connection is being closed
/// because that may require I/O to send a termination message. That can result in a delay
/// before the server learns that the connection is gone, usually from a TCP keepalive timeout.
///
/// Creating and dropping many connections in short order without calling `.close()` may
/// lead to errors from the database server because those senescent connections will still
/// count against any connection limit or quota that is configured.
///
/// Therefore it is recommended to call `.close()` on a connection when you are done using it
/// and to `.await` the result to ensure the termination message is sent.
fn close(self) -> BoxFuture<'static, Result<(), Error>>;

/// Immediately close the connection without sending a graceful shutdown.
Expand Down
58 changes: 45 additions & 13 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,13 @@ impl<DB: Database> PoolInner<DB> {
self.is_closed.load(Ordering::Acquire)
}

pub(super) fn close<'a>(self: &'a Arc<Self>) -> impl Future<Output = ()> + 'a {
fn mark_closed(&self) {
self.is_closed.store(true, Ordering::Release);
self.on_closed.notify(usize::MAX);
}

pub(super) fn close<'a>(self: &'a Arc<Self>) -> impl Future<Output = ()> + 'a {
self.mark_closed();

async move {
for permits in 1..=self.options.max_connections {
Expand Down Expand Up @@ -209,19 +213,25 @@ impl<DB: Database> PoolInner<DB> {
}

/// Try to atomically increment the pool size for a new connection.
///
/// Returns `Err` if the pool is at max capacity already or is closed.
pub(super) fn try_increment_size<'a>(
self: &'a Arc<Self>,
permit: AsyncSemaphoreReleaser<'a>,
) -> Result<DecrementSizeGuard<DB>, AsyncSemaphoreReleaser<'a>> {
match self
.size
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |size| {
if self.is_closed() {
return None;
}

size.checked_add(1)
.filter(|size| size <= &self.options.max_connections)
}) {
// we successfully incremented the size
Ok(_) => Ok(DecrementSizeGuard::from_permit((*self).clone(), permit)),
// the pool is at max capacity
// the pool is at max capacity or is closed
Err(_) => Err(permit),
}
}
Expand Down Expand Up @@ -258,7 +268,9 @@ impl<DB: Database> PoolInner<DB> {
// we can open a new connection
guard
} else {
// This can happen for a child pool that's at its connection limit.
// This can happen for a child pool that's at its connection limit,
// or if the pool was closed between `acquire_permit()` and
// `try_increment_size()`.
tracing::debug!("woke but was unable to acquire idle connection or open new one; retrying");
// If so, we're likely in the current-thread runtime if it's Tokio
// and so we should yield to let any spawned release_to_pool() tasks
Expand Down Expand Up @@ -395,6 +407,8 @@ impl<DB: Database> PoolInner<DB> {

impl<DB: Database> Drop for PoolInner<DB> {
fn drop(&mut self) {
self.mark_closed();

if let Some(parent) = &self.options.parent_pool {
// Release the stolen permits.
parent.0.semaphore.release(self.semaphore.permits());
Expand Down Expand Up @@ -461,7 +475,9 @@ async fn check_idle_conn<DB: Database>(
}

fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {
let pool = Arc::clone(&pool);
// NOTE: use `pool_weak` for the maintenance tasks so
// they don't keep `PoolInner` from being dropped.
let pool_weak = Arc::downgrade(&pool);

let period = match (pool.options.max_lifetime, pool.options.idle_timeout) {
(Some(it), None) | (None, Some(it)) => it,
Expand All @@ -471,35 +487,51 @@ fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {
(None, None) => {
if pool.options.min_connections > 0 {
crate::rt::spawn(async move {
pool.min_connections_maintenance(None).await;
if let Some(pool) = pool_weak.upgrade() {
pool.min_connections_maintenance(None).await;
}
});
}

return;
}
};

// Immediately cancel this task if the pool is closed.
let mut close_event = pool.close_event();

crate::rt::spawn(async move {
// Immediately cancel this task if the pool is closed.
let _ = pool
.close_event()
let _ = close_event
.do_until(async {
while !pool.is_closed() {
let mut slept = true;

// If the last handle to the pool was dropped while we were sleeping
while let Some(pool) = pool_weak.upgrade() {
if pool.is_closed() {
return;
}

// Don't run the reaper right away.
if slept && !pool.idle_conns.is_empty() {
do_reap(&pool).await;
}

let next_run = Instant::now() + period;

pool.min_connections_maintenance(Some(next_run)).await;

// Don't hold a reference to the pool while sleeping.
drop(pool);

if let Some(duration) = next_run.checked_duration_since(Instant::now()) {
// `async-std` doesn't have a `sleep_until()`
crate::rt::sleep(duration).await;
} else {
// `next_run` is in the past, just yield.
crate::rt::yield_now().await;
}

// Don't run the reaper right away.
if !pool.idle_conns.is_empty() {
do_reap(&pool).await;
}
slept = true;
}
})
.await;
Expand Down
21 changes: 21 additions & 0 deletions sqlx-core/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,27 @@ pub use self::maybe::MaybePoolConnection;
///
/// [web::Data]: https://docs.rs/actix-web/3/actix_web/web/struct.Data.html
///
/// ### Note: Drop Behavior
/// Due to a lack of async `Drop`, dropping the last `Pool` handle may not immediately clean
/// up connections by itself. The connections will be dropped locally, which is sufficient for
/// SQLite, but for client/server databases like MySQL and Postgres, that only closes the
/// client side of the connection. The server will not know the connection is closed until
/// potentially much later: this is usually dictated by the TCP keepalive timeout in the server
/// settings.
///
/// Because the connection may not be cleaned up immediately on the server side, you may run
/// into errors regarding connection limits if you are creating and dropping many pools in short
/// order.
///
/// We recommend calling [`.close().await`] to gracefully close the pool and its connections
/// when you are done using it. This will also wake any tasks that are waiting on an `.acquire()`
/// call, so for long-lived applications it's a good idea to call `.close()` during shutdown.
///
/// If you're writing tests, consider using `#[sqlx::test]` which handles the lifetime of
/// the pool for you.
///
/// [`.close().await`]: Pool::close
///
/// ### Why Use a Pool?
///
/// A single database connection (in general) cannot be used by multiple threads simultaneously
Expand Down