diff --git a/sqlx-core/src/pool/mod.rs b/sqlx-core/src/pool/mod.rs index f25234cc61..5ff333880d 100644 --- a/sqlx-core/src/pool/mod.rs +++ b/sqlx-core/src/pool/mod.rs @@ -79,6 +79,11 @@ where self.0.close().await; } + /// Returns `true` if [.close()][Pool::close] has been called on the pool, `false` otherwise. + pub fn is_closed(&self) -> bool { + self.0.is_closed() + } + /// Returns the number of connections currently being managed by the pool. pub fn size(&self) -> u32 { self.0.size() diff --git a/tests/mysql.rs b/tests/mysql.rs index d2b38af9a7..981ef75ddc 100644 --- a/tests/mysql.rs +++ b/tests/mysql.rs @@ -1,5 +1,6 @@ use futures::TryStreamExt; use sqlx::{Connection as _, Executor as _, MySqlConnection, MySqlPool, Row as _}; +use std::time::Duration; #[cfg_attr(feature = "runtime-async-std", async_std::test)] #[cfg_attr(feature = "runtime-tokio", tokio::test)] @@ -121,6 +122,61 @@ async fn macro_select_from_cte_bind() -> anyhow::Result<()> { Ok(()) } +// run with `cargo test --features mysql -- --ignored --nocapture pool_smoke_test` +#[ignore] +#[cfg_attr(feature = "runtime-async-std", async_std::test)] +#[cfg_attr(feature = "runtime-tokio", tokio::test)] +async fn pool_smoke_test() -> anyhow::Result<()> { + use sqlx_core::runtime::{sleep, spawn, timeout}; + + eprintln!("starting pool"); + + let pool = MySqlPool::builder() + .connect_timeout(Duration::from_secs(5)) + .min_size(5) + .max_size(10) + .build(&dotenv::var("DATABASE_URL")?) + .await?; + + // spin up more tasks than connections available, and ensure we don't deadlock + for i in 0..20 { + let pool = pool.clone(); + spawn(async move { + loop { + if let Err(e) = sqlx::query("select 1 + 1").fetch_one(&mut &pool).await { + eprintln!("pool task {} dying due to {}", i, e); + break; + } + } + }); + } + + for _ in 0 .. 5 { + let pool = pool.clone(); + spawn(async move { + while !pool.is_closed() { + // drop acquire() futures in a hot loop + // https://github.com/launchbadge/sqlx/issues/83 + drop(pool.acquire()); + } + }); + } + + eprintln!("sleeping for 30 seconds"); + + sleep(Duration::from_secs(30)).await; + + assert_eq!(pool.size(), 10); + + eprintln!("closing pool"); + + timeout(Duration::from_secs(30), pool.close()).await?; + + eprintln!("pool closed successfully"); + + Ok(()) +} + fn url() -> anyhow::Result { Ok(dotenv::var("DATABASE_URL")?) } diff --git a/tests/postgres.rs b/tests/postgres.rs index ce105e44cc..61dc93f055 100644 --- a/tests/postgres.rs +++ b/tests/postgres.rs @@ -1,5 +1,7 @@ use futures::TryStreamExt; use sqlx::{postgres::PgConnection, Connection as _, Executor as _, Row as _}; +use sqlx_core::postgres::PgPool; +use std::time::Duration; #[cfg_attr(feature = "runtime-async-std", async_std::test)] #[cfg_attr(feature = "runtime-tokio", tokio::test)] @@ -68,6 +70,61 @@ async fn it_remains_stable_issue_30() -> anyhow::Result<()> { Ok(()) } +// run with `cargo test --features postgres -- --ignored --nocapture pool_smoke_test` +#[ignore] +#[cfg_attr(feature = "runtime-async-std", async_std::test)] +#[cfg_attr(feature = "runtime-tokio", tokio::test)] +async fn pool_smoke_test() -> anyhow::Result<()> { + use sqlx_core::runtime::{sleep, spawn, timeout}; + + eprintln!("starting pool"); + + let pool = PgPool::builder() + .connect_timeout(Duration::from_secs(5)) + .min_size(5) + .max_size(10) + .build(&dotenv::var("DATABASE_URL")?) + .await?; + + // spin up more tasks than connections available, and ensure we don't deadlock + for i in 0..20 { + let pool = pool.clone(); + spawn(async move { + loop { + if let Err(e) = sqlx::query("select 1 + 1").fetch_one(&mut &pool).await { + eprintln!("pool task {} dying due to {}", i, e); + break; + } + } + }); + } + + for _ in 0 .. 5 { + let pool = pool.clone(); + spawn(async move { + while !pool.is_closed() { + // drop acquire() futures in a hot loop + // https://github.com/launchbadge/sqlx/issues/83 + drop(pool.acquire()); + } + }); + } + + eprintln!("sleeping for 30 seconds"); + + sleep(Duration::from_secs(30)).await; + + assert_eq!(pool.size(), 10); + + eprintln!("closing pool"); + + timeout(Duration::from_secs(30), pool.close()).await?; + + eprintln!("pool closed successfully"); + + Ok(()) +} + async fn connect() -> anyhow::Result { let _ = dotenv::dotenv(); let _ = env_logger::try_init();