Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: nextest cleanup race condition #3334

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sqlx-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ mac_address = { workspace = true, optional = true }
uuid = { workspace = true, optional = true }

async-io = { version = "1.9.0", optional = true }
base64 = { version = "0.22.0", default-features = false, features = ["std"] }
bytes = "1.1.0"
chrono = { version = "0.4.34", default-features = false, features = ["clock"], optional = true }
crc = { version = "3", optional = true }
Expand Down
15 changes: 14 additions & 1 deletion sqlx-core/src/testing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::time::Duration;

use futures_core::future::BoxFuture;

use base64::{engine::general_purpose::URL_SAFE, Engine as _};
pub use fixtures::FixtureSnapshot;
use sha2::{Digest, Sha512};

use crate::connection::{ConnectOptions, Connection};
use crate::database::Database;
Expand Down Expand Up @@ -41,6 +43,17 @@ pub trait TestSupport: Database {
/// This snapshot can then be used to generate test fixtures.
fn snapshot(conn: &mut Self::Connection)
-> BoxFuture<'_, Result<FixtureSnapshot<Self>, Error>>;

/// Generate a unique database name for the given test path.
fn db_name(args: &TestArgs) -> String {
let mut hasher = Sha512::new();
hasher.update(args.test_path.as_bytes());
let hash = hasher.finalize();
let hash = URL_SAFE.encode(&hash[..39]);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's important to use - to _ here, since - is not a valid character for SQL identifiers.

let db_name = format!("_sqlx_test_{}", hash).replace("-", "_");

I've had errors related to this

let db_name = format!("_sqlx_test_{}", hash);
debug_assert!(db_name.len() == 63);
db_name
}
}

pub struct TestFixture {
Expand Down Expand Up @@ -217,7 +230,7 @@ where
let res = test_fn(test_context.pool_opts, test_context.connect_opts).await;

if res.is_success() {
if let Err(e) = DB::cleanup_test(&test_context.db_name).await {
if let Err(e) = DB::cleanup_test(&DB::db_name(&args)).await {
eprintln!(
"failed to delete database {:?}: {}",
test_context.db_name, e
Expand Down
183 changes: 70 additions & 113 deletions sqlx-mysql/src/testing/mod.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,25 @@
use std::fmt::Write;
use std::ops::Deref;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, SystemTime};
use std::time::Duration;

use futures_core::future::BoxFuture;

use once_cell::sync::OnceCell;

use crate::connection::Connection;
use sqlx_core::connection::Connection;
use sqlx_core::query_builder::QueryBuilder;
use sqlx_core::query_scalar::query_scalar;
use std::fmt::Write;

use crate::error::Error;
use crate::executor::Executor;
use crate::pool::{Pool, PoolOptions};
use crate::query::query;
use crate::query_builder::QueryBuilder;
use crate::query_scalar::query_scalar;
use crate::{MySql, MySqlConnectOptions, MySqlConnection};

pub(crate) use sqlx_core::testing::*;

// Using a blocking `OnceCell` here because the critical sections are short.
static MASTER_POOL: OnceCell<Pool<MySql>> = OnceCell::new();
// Automatically delete any databases created before the start of the test binary.
static DO_CLEANUP: AtomicBool = AtomicBool::new(true);

impl TestSupport for MySql {
fn test_context(args: &TestArgs) -> BoxFuture<'_, Result<TestContext<Self>, Error>> {
Expand All @@ -38,17 +34,7 @@ impl TestSupport for MySql {
.acquire()
.await?;

let db_id = db_id(db_name);

conn.execute(&format!("drop database if exists {db_name};")[..])
.await?;

query("delete from _sqlx_test_databases where db_id = ?")
.bind(db_id)
.execute(&mut *conn)
.await?;

Ok(())
do_cleanup(&mut conn, db_name).await
})
}

Expand All @@ -58,13 +44,55 @@ impl TestSupport for MySql {

let mut conn = MySqlConnection::connect(&url).await?;

let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
let delete_db_names: Vec<String> =
query_scalar("select db_name from _sqlx_test_databases")
.fetch_all(&mut conn)
.await?;

if delete_db_names.is_empty() {
return Ok(None);
}

let mut deleted_db_names = Vec::with_capacity(delete_db_names.len());

let mut command = String::new();

for db_name in &delete_db_names {
command.clear();

let db_name = format!("_sqlx_test_database_{db_name}");

writeln!(command, "drop database if exists {db_name:?};").ok();
match conn.execute(&*command).await {
Ok(_deleted) => {
deleted_db_names.push(db_name);
}
// Assume a database error just means the DB is still in use.
Err(Error::Database(dbe)) => {
eprintln!("could not clean test database {db_name:?}: {dbe}")
}
// Bubble up other errors
Err(e) => return Err(e),
}
}

if deleted_db_names.is_empty() {
return Ok(None);
}

let mut query =
QueryBuilder::new("delete from _sqlx_test_databases where db_name in (");

let mut separated = query.separated(",");

for db_name in &deleted_db_names {
separated.push_bind(db_name);
}

query.push(")").build().execute(&mut conn).await?;

let num_deleted = do_cleanup(&mut conn, now).await?;
let _ = conn.close().await;
Ok(Some(num_deleted))
Ok(Some(delete_db_names.len()))
})
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder to check this trait implementation for proper quoting of the database name. It doesn't look like it does it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bonega don't forget this. Even with URL_SAFE the database name still needs to be quoted because it may contain dashes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked and it seems to work.

A test in examples/postgres/axum-social-with-tests has the test_path: comment::test_create_comment
Which becomes the database name of _sqlx_test_hsAQWz-87IRR7sjnVDeMcHtoDLU3QyT6yWizSbWKFvNwoBt6Q60I

It is deleted correctly even though it contains dashes.

I see now that you're referencing the mysql implementation which I haven't started yet.
I will try and do it and the sqlite today.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to update that I am fighting with running the tests on (both my branch and main)

Expand Down Expand Up @@ -117,42 +145,27 @@ async fn test_context(args: &TestArgs) -> Result<TestContext<MySql>, Error> {
conn.execute(
r#"
create table if not exists _sqlx_test_databases (
db_id bigint unsigned primary key auto_increment,
db_name text primary key,
test_path text not null,
created_at timestamp not null default current_timestamp
);
"#,
)
.await?;

// Record the current time _before_ we acquire the `DO_CLEANUP` permit. This
// prevents the first test thread from accidentally deleting new test dbs
// created by other test threads if we're a bit slow.
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();

// Only run cleanup if the test binary just started.
if DO_CLEANUP.swap(false, Ordering::SeqCst) {
do_cleanup(&mut conn, now).await?;
}
let db_name = MySql::db_name(args);
do_cleanup(&mut conn, &db_name).await?;

query("insert into _sqlx_test_databases(test_path) values (?)")
query("insert into _sqlx_test_databases(db_name, test_path) values (?, ?)")
.bind(&db_name)
.bind(args.test_path)
.execute(&mut *conn)
.await?;

// MySQL doesn't have `INSERT ... RETURNING`
let new_db_id: u64 = query_scalar("select last_insert_id()")
.fetch_one(&mut *conn)
.await?;

let new_db_name = db_name(new_db_id);

conn.execute(&format!("create database {new_db_name}")[..])
conn.execute(&format!("create database {db_name:?}")[..])
.await?;

eprintln!("created database {new_db_name}");
eprintln!("created database {db_name}");

Ok(TestContext {
pool_opts: PoolOptions::new()
Expand All @@ -167,74 +180,18 @@ async fn test_context(args: &TestArgs) -> Result<TestContext<MySql>, Error> {
.connect_options()
.deref()
.clone()
.database(&new_db_name),
db_name: new_db_name,
.database(&db_name),
db_name,
})
}

async fn do_cleanup(conn: &mut MySqlConnection, created_before: Duration) -> Result<usize, Error> {
// since SystemTime is not monotonic we added a little margin here to avoid race conditions with other threads
let created_before_as_secs = created_before.as_secs() - 2;
let delete_db_ids: Vec<u64> = query_scalar(
"select db_id from _sqlx_test_databases \
where created_at < from_unixtime(?)",
)
.bind(created_before_as_secs)
.fetch_all(&mut *conn)
.await?;

if delete_db_ids.is_empty() {
return Ok(0);
}

let mut deleted_db_ids = Vec::with_capacity(delete_db_ids.len());

let mut command = String::new();

for db_id in delete_db_ids {
command.clear();

let db_name = db_name(db_id);

writeln!(command, "drop database if exists {db_name}").ok();
match conn.execute(&*command).await {
Ok(_deleted) => {
deleted_db_ids.push(db_id);
}
// Assume a database error just means the DB is still in use.
Err(Error::Database(dbe)) => {
eprintln!("could not clean test database {db_id:?}: {dbe}")
}
// Bubble up other errors
Err(e) => return Err(e),
}
}

let mut query = QueryBuilder::new("delete from _sqlx_test_databases where db_id in (");

let mut separated = query.separated(",");

for db_id in &deleted_db_ids {
separated.push_bind(db_id);
}

query.push(")").build().execute(&mut *conn).await?;

Ok(deleted_db_ids.len())
}

fn db_name(id: u64) -> String {
format!("_sqlx_test_database_{id}")
}

fn db_id(name: &str) -> u64 {
name.trim_start_matches("_sqlx_test_database_")
.parse()
.unwrap_or_else(|_1| panic!("failed to parse ID from database name {name:?}"))
}
async fn do_cleanup(conn: &mut MySqlConnection, db_name: &str) -> Result<(), Error> {
let delete_db_command = format!("drop database if exists {db_name:?};");
conn.execute(&*delete_db_command).await?;
query("delete from _sqlx_test.databases where db_name = $1::text")
.bind(db_name)
.execute(&mut *conn)
.await?;

#[test]
fn test_db_name_id() {
assert_eq!(db_name(12345), "_sqlx_test_database_12345");
assert_eq!(db_id("_sqlx_test_database_12345"), 12345);
Ok(())
}
Loading