Skip to content

Commit

Permalink
refactor: pool fixes and breaking changes
Browse files Browse the repository at this point in the history
* Fixed leak of `Arc<SharedPool>` in `DecrementSizeGuard::cancel()`
* Renamed `PoolOptions::connect_timeout` to `acquire_timeout` for clarity.
* Fixed `/* SQLx ping */` showing up in Postgres query logs
* Made `.close()` a regular function that returns a `Future`
* Deleted deprecated method `PoolConnection::release()`
* Document why connection might be dropped if `Pool::acquire()` is cancelled
* Added connection metadata to pool lifecycle callbacks
* Improved guarantees for `min_connections`
* Fixed `num_idle()` to not spin forever at high load
* Improved documentation across the `pool` module
  • Loading branch information
abonander committed Jun 16, 2022
1 parent 664d576 commit a2ec7e1
Show file tree
Hide file tree
Showing 15 changed files with 835 additions and 291 deletions.
2 changes: 1 addition & 1 deletion sqlx-bench/benches/pg_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fn do_bench_acquire(b: &mut Bencher, concurrent: u32, fair: bool) {
let pool = sqlx_rt::block_on(
PgPoolOptions::new()
// we don't want timeouts because we want to see how the pool degrades
.connect_timeout(Duration::from_secs(3600))
.acquire_timeout(Duration::from_secs(3600))
// force the pool to start full
.min_connections(50)
.max_connections(50)
Expand Down
16 changes: 16 additions & 0 deletions sqlx-core/src/any/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,22 @@ impl Connection for AnyConnection {
}
}

fn close_hard(self) -> BoxFuture<'static, Result<(), Error>> {
match self.0 {
#[cfg(feature = "postgres")]
AnyConnectionKind::Postgres(conn) => conn.close_hard(),

#[cfg(feature = "mysql")]
AnyConnectionKind::MySql(conn) => conn.close_hard(),

#[cfg(feature = "sqlite")]
AnyConnectionKind::Sqlite(conn) => conn.close_hard(),

#[cfg(feature = "mssql")]
AnyConnectionKind::Mssql(conn) => conn.close_hard(),
}
}

fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
delegate_to_mut!(self.ping())
}
Expand Down
6 changes: 6 additions & 0 deletions sqlx-core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ pub trait Connection: Send {
/// will be faster at cleaning up resources.
fn close(self) -> BoxFuture<'static, Result<(), Error>>;

/// Immediately close the connection without sending a graceful shutdown.
///
/// This should still at least send a TCP `FIN` frame to let the server know we're dying.
#[doc(hidden)]
fn close_hard(self) -> BoxFuture<'static, Result<(), Error>>;

/// Checks if a connection to the database is still valid.
fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>>;

Expand Down
5 changes: 5 additions & 0 deletions sqlx-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ pub trait DatabaseError: 'static + Send + Sync + StdError {
#[doc(hidden)]
fn into_error(self: Box<Self>) -> Box<dyn StdError + Send + Sync + 'static>;

#[doc(hidden)]
fn is_transient_in_connect_phase(&self) -> bool {
false
}

/// Returns the name of the constraint that triggered the error, if applicable.
/// If the error was caused by a conflict of a unique index, this will be the index name.
///
Expand Down
4 changes: 4 additions & 0 deletions sqlx-core/src/mssql/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ impl Connection for MssqlConnection {
}
}

fn close_hard(self) -> BoxFuture<'static, Result<(), Error>> {
self.close()
}

fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
// NOTE: we do not use `SELECT 1` as that *could* interact with any ongoing transactions
self.execute("/* SQLx ping */").map_ok(|_| ()).boxed()
Expand Down
7 changes: 7 additions & 0 deletions sqlx-core/src/mysql/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ impl Connection for MySqlConnection {
})
}

fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> {
Box::pin(async move {
self.stream.shutdown().await?;
Ok(())
})
}

fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
self.stream.wait_until_ready().await?;
Expand Down
Loading

0 comments on commit a2ec7e1

Please sign in to comment.