Skip to content

Commit

Permalink
add smoke test for Pool to both Postgres and MySQL tests
Browse files Browse the repository at this point in the history
add `Pool::is_closed()`
  • Loading branch information
abonander committed Jan 23, 2020
1 parent a7f0d7d commit 252da8c
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 0 deletions.
5 changes: 5 additions & 0 deletions sqlx-core/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ where
self.0.close().await;
}

/// Returns `true` if [.close()][Pool::close] has been called on the pool, `false` otherwise.
pub fn is_closed(&self) -> bool {
self.0.is_closed()
}

/// Returns the number of connections currently being managed by the pool.
pub fn size(&self) -> u32 {
self.0.size()
Expand Down
56 changes: 56 additions & 0 deletions tests/mysql.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use futures::TryStreamExt;
use sqlx::{Connection as _, Executor as _, MySqlConnection, MySqlPool, Row as _};
use std::time::Duration;

#[cfg_attr(feature = "runtime-async-std", async_std::test)]
#[cfg_attr(feature = "runtime-tokio", tokio::test)]
Expand Down Expand Up @@ -121,6 +122,61 @@ async fn macro_select_from_cte_bind() -> anyhow::Result<()> {
Ok(())
}

// run with `cargo test --features mysql -- --ignored --nocapture pool_smoke_test`
#[ignore]
#[cfg_attr(feature = "runtime-async-std", async_std::test)]
#[cfg_attr(feature = "runtime-tokio", tokio::test)]
async fn pool_smoke_test() -> anyhow::Result<()> {
use sqlx_core::runtime::{sleep, spawn, timeout};

eprintln!("starting pool");

let pool = MySqlPool::builder()
.connect_timeout(Duration::from_secs(5))
.min_size(5)
.max_size(10)
.build(&dotenv::var("DATABASE_URL")?)
.await?;

// spin up more tasks than connections available, and ensure we don't deadlock
for i in 0..20 {
let pool = pool.clone();
spawn(async move {
loop {
if let Err(e) = sqlx::query("select 1 + 1").fetch_one(&mut &pool).await {
eprintln!("pool task {} dying due to {}", i, e);
break;
}
}
});
}

for _ in 0 .. 5 {
let pool = pool.clone();
spawn(async move {
while !pool.is_closed() {
// drop acquire() futures in a hot loop
// https://github.com/launchbadge/sqlx/issues/83
drop(pool.acquire());
}
});
}

eprintln!("sleeping for 30 seconds");

sleep(Duration::from_secs(30)).await;

assert_eq!(pool.size(), 10);

eprintln!("closing pool");

timeout(Duration::from_secs(30), pool.close()).await?;

eprintln!("pool closed successfully");

Ok(())
}

fn url() -> anyhow::Result<String> {
Ok(dotenv::var("DATABASE_URL")?)
}
Expand Down
57 changes: 57 additions & 0 deletions tests/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use futures::TryStreamExt;
use sqlx::{postgres::PgConnection, Connection as _, Executor as _, Row as _};
use sqlx_core::postgres::PgPool;
use std::time::Duration;

#[cfg_attr(feature = "runtime-async-std", async_std::test)]
#[cfg_attr(feature = "runtime-tokio", tokio::test)]
Expand Down Expand Up @@ -68,6 +70,61 @@ async fn it_remains_stable_issue_30() -> anyhow::Result<()> {
Ok(())
}

// run with `cargo test --features postgres -- --ignored --nocapture pool_smoke_test`
#[ignore]
#[cfg_attr(feature = "runtime-async-std", async_std::test)]
#[cfg_attr(feature = "runtime-tokio", tokio::test)]
async fn pool_smoke_test() -> anyhow::Result<()> {
use sqlx_core::runtime::{sleep, spawn, timeout};

eprintln!("starting pool");

let pool = PgPool::builder()
.connect_timeout(Duration::from_secs(5))
.min_size(5)
.max_size(10)
.build(&dotenv::var("DATABASE_URL")?)
.await?;

// spin up more tasks than connections available, and ensure we don't deadlock
for i in 0..20 {
let pool = pool.clone();
spawn(async move {
loop {
if let Err(e) = sqlx::query("select 1 + 1").fetch_one(&mut &pool).await {
eprintln!("pool task {} dying due to {}", i, e);
break;
}
}
});
}

for _ in 0 .. 5 {
let pool = pool.clone();
spawn(async move {
while !pool.is_closed() {
// drop acquire() futures in a hot loop
// https://github.com/launchbadge/sqlx/issues/83
drop(pool.acquire());
}
});
}

eprintln!("sleeping for 30 seconds");

sleep(Duration::from_secs(30)).await;

assert_eq!(pool.size(), 10);

eprintln!("closing pool");

timeout(Duration::from_secs(30), pool.close()).await?;

eprintln!("pool closed successfully");

Ok(())
}

async fn connect() -> anyhow::Result<PgConnection> {
let _ = dotenv::dotenv();
let _ = env_logger::try_init();
Expand Down

0 comments on commit 252da8c

Please sign in to comment.