From 63b745e8c3cd7f00b91397abca6121da34b0b388 Mon Sep 17 00:00:00 2001 From: Jonathan Cammisuli Date: Wed, 30 Oct 2024 11:45:34 -0400 Subject: [PATCH] fix(core): retry more db operations (#28667) --- Cargo.lock | 33 +++-- packages/nx/Cargo.toml | 1 + packages/nx/src/native/db/connection.rs | 118 ++++++++++----- packages/nx/src/native/db/initialize.rs | 143 +++++++++++++------ packages/nx/src/native/db/mod.rs | 28 ++-- packages/nx/src/native/tasks/details.rs | 22 +-- packages/nx/src/native/tasks/task_history.rs | 39 ++--- 7 files changed, 265 insertions(+), 119 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a4da77495d984..b5eb986c90b0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -518,9 +518,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.0.1" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" +checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" [[package]] name = "filedescriptor" @@ -1168,9 +1168,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.155" +version = "0.2.161" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1" [[package]] name = "libloading" @@ -1195,9 +1195,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.4.13" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" +checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" [[package]] name = "lock_api" @@ -1546,6 +1546,7 @@ dependencies = [ "swc_ecma_dep_graph", "swc_ecma_parser", "swc_ecma_visit", + "tempfile", "thiserror", "tracing", "tracing-subscriber", @@ -1969,9 +1970,9 @@ checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" [[package]] name = "rustix" -version = "0.38.32" +version = "0.38.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65e04861e65f21776e67888bfbea442b3642beaa0138fdb1dd7a84a52dffdb89" +checksum = "aa260229e6538e52293eeb577aabd09945a09d6d9cc0fc550ed7529056c2e32a" dependencies = [ "bitflags 2.6.0", "errno", @@ -2420,14 +2421,15 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "tempfile" -version = "3.10.1" +version = "3.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" +checksum = "f0f2c9fc62d0beef6951ccffd757e241266a2c833136efbe35af6cd2567dca5b" dependencies = [ "cfg-if", "fastrand", + "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2876,6 +2878,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-targets" version = "0.48.5" diff --git a/packages/nx/Cargo.toml b/packages/nx/Cargo.toml index 9780242dbb297..3ac80adb1e445 100644 --- a/packages/nx/Cargo.toml +++ b/packages/nx/Cargo.toml @@ -74,3 +74,4 @@ napi-build = '2.1.3' assert_fs = "1.0.10" # This is only used for unit tests swc_ecma_dep_graph = "0.109.1" +tempfile = "3.13.0" diff --git a/packages/nx/src/native/db/connection.rs b/packages/nx/src/native/db/connection.rs index 3d7de42530874..3c96657f279fd 100644 --- a/packages/nx/src/native/db/connection.rs +++ b/packages/nx/src/native/db/connection.rs @@ -1,40 +1,103 @@ use anyhow::Result; -use rusqlite::{Connection, Error, OptionalExtension, Params, Row, Statement}; + +use rusqlite::{Connection, DatabaseName, Error, OptionalExtension, Params, Row, Statement, ToSql}; use std::thread; -use std::time::{Duration, Instant}; +use std::time::Duration; use tracing::trace; pub struct NxDbConnection { pub conn: Connection, } +const MAX_RETRIES: u32 = 20; +const RETRY_DELAY: u64 = 25; + +/// macro for handling the db when its busy +/// This is a macro instead of a function because some database operations need to take a &mut Connection, while returning a reference +/// This causes some quite complex lifetime issues that are quite hard to solve +/// +/// Using a macro inlines the retry operation where it was called, and the lifetime issues are avoided +macro_rules! retry_db_operation_when_busy { + ($operation:expr) => {{ + let connection = 'retry: { + for i in 1..MAX_RETRIES { + match $operation { + r @ Ok(_) => break 'retry r, + Err(Error::SqliteFailure(err, _)) + if err.code == rusqlite::ErrorCode::DatabaseBusy => + { + trace!("Database busy. Retrying {} of {}", i, MAX_RETRIES); + let sleep = Duration::from_millis(RETRY_DELAY * 2_u64.pow(i)); + let max_sleep = Duration::from_secs(12); + if (sleep >= max_sleep) { + thread::sleep(max_sleep); + } else { + thread::sleep(sleep); + } + } + err => break 'retry err, + }; + } + break 'retry Err(Error::SqliteFailure( + rusqlite::ffi::Error { + code: rusqlite::ErrorCode::DatabaseBusy, + extended_code: 0, + }, + Some("Database busy. Retried maximum number of times.".to_string()), + )); + }; + + connection + }}; +} + impl NxDbConnection { pub fn new(connection: Connection) -> Self { Self { conn: connection } } pub fn execute(&self, sql: &str, params: P) -> Result { - self.retry_on_busy(|conn| conn.execute(sql, params.clone())) + retry_db_operation_when_busy!(self.conn.execute(sql, params.clone())) .map_err(|e| anyhow::anyhow!("DB execute error: \"{}\", {:?}", sql, e)) } pub fn execute_batch(&self, sql: &str) -> Result<()> { - self.retry_on_busy(|conn| conn.execute_batch(sql)) + retry_db_operation_when_busy!(self.conn.execute_batch(sql)) .map_err(|e| anyhow::anyhow!("DB execute batch error: \"{}\", {:?}", sql, e)) } pub fn prepare(&self, sql: &str) -> Result { - self.retry_on_busy(|conn| conn.prepare(sql)) + retry_db_operation_when_busy!(self.conn.prepare(sql)) .map_err(|e| anyhow::anyhow!("DB prepare error: \"{}\", {:?}", sql, e)) } + pub fn transaction( + &mut self, + transaction_operation: impl Fn(&Connection) -> rusqlite::Result, + ) -> Result { + let transaction = retry_db_operation_when_busy!(self.conn.transaction()) + .map_err(|e| anyhow::anyhow!("DB transaction error: {:?}", e))?; + + let result = transaction_operation(&transaction) + .map_err(|e| anyhow::anyhow!("DB transaction operation error: {:?}", e))?; + + transaction + .commit() + .map_err(|e| anyhow::anyhow!("DB transaction commit error: {:?}", e))?; + + Ok(result) + } + pub fn query_row(&self, sql: &str, params: P, f: F) -> Result> where P: Params + Clone, F: FnOnce(&Row<'_>) -> rusqlite::Result + Clone, { - self.retry_on_busy(|conn| conn.query_row(sql, params.clone(), f.clone()).optional()) - .map_err(|e| anyhow::anyhow!("DB query error: \"{}\", {:?}", sql, e)) + retry_db_operation_when_busy!(self + .conn + .query_row(sql, params.clone(), f.clone()) + .optional()) + .map_err(|e| anyhow::anyhow!("DB query error: \"{}\", {:?}", sql, e)) } pub fn close(self) -> rusqlite::Result<(), (Connection, Error)> { @@ -43,33 +106,24 @@ impl NxDbConnection { .inspect_err(|e| trace!("Error in close: {:?}", e)) } - #[allow(clippy::needless_lifetimes)] - fn retry_on_busy<'a, F, T>(&'a self, operation: F) -> rusqlite::Result + pub fn pragma_update( + &self, + schema_name: Option>, + pragma_name: &str, + pragma_value: V, + ) -> rusqlite::Result<()> where - F: Fn(&'a Connection) -> rusqlite::Result, + V: ToSql + Clone, { - let start = Instant::now(); - let max_retries: u64 = 5; - let retry_delay = Duration::from_millis(25); - - for i in 0..max_retries { - match operation(&self.conn) { - Ok(result) => return Ok(result), - Err(Error::SqliteFailure(err, _)) - if err.code == rusqlite::ErrorCode::DatabaseBusy => - { - trace!("Database busy. Retrying{}", ".".repeat(i as usize)); - if start.elapsed() - >= Duration::from_millis(max_retries * retry_delay.as_millis() as u64) - { - break; - } - thread::sleep(retry_delay); - } - err @ Err(_) => return err, - } - } + retry_db_operation_when_busy!(self.conn.pragma_update( + schema_name, + pragma_name, + pragma_value.clone() + )) + } - operation(&self.conn) + pub fn busy_handler(&self, callback: Option bool>) -> Result<()> { + retry_db_operation_when_busy!(self.conn.busy_handler(callback)) + .map_err(|e| anyhow::anyhow!("DB busy handler error: {:?}", e)) } } diff --git a/packages/nx/src/native/db/initialize.rs b/packages/nx/src/native/db/initialize.rs index bacc2b5c0e67c..5396e962fe83b 100644 --- a/packages/nx/src/native/db/initialize.rs +++ b/packages/nx/src/native/db/initialize.rs @@ -1,9 +1,9 @@ +use crate::native::db::connection::NxDbConnection; +use fs4::fs_std::FileExt; +use rusqlite::{Connection, OpenFlags}; use std::fs::{remove_file, File}; use std::path::{Path, PathBuf}; use tracing::{debug, trace}; -use rusqlite::{Connection, OpenFlags}; -use fs4::fs_std::FileExt; -use crate::native::db::connection::NxDbConnection; pub(super) struct LockFile { file: File, @@ -36,8 +36,8 @@ pub(super) fn create_lock_file(db_path: &Path) -> anyhow::Result { }) } -pub(super) fn initialize_db(nx_version: String, db_path: &PathBuf) -> anyhow::Result { - let c = create_connection(db_path)?; +pub(super) fn initialize_db(nx_version: String, db_path: &Path) -> anyhow::Result { + let mut c = open_database_connection(db_path)?; trace!( "Checking if current existing database is compatible with Nx {}", @@ -56,53 +56,49 @@ pub(super) fn initialize_db(nx_version: String, db_path: &PathBuf) -> anyhow::Re trace!("Database is compatible with Nx {}", nx_version); c } - // If there is no version, it means that this database is new - Ok(None) => { - trace!("Recording Nx Version: {}", nx_version); - c.execute( - "INSERT OR REPLACE INTO metadata (key, value) VALUES ('NX_VERSION', ?)", - [nx_version], - )?; + // If there is no metadata, it means that this database is new + Err(s) if s.to_string().contains("metadata") => { + configure_database(&c)?; + create_metadata_table(&mut c, &nx_version)?; c } - _ => { + check @ _ => { + trace!("Incompatible database because: {:?}", check); trace!("Disconnecting from existing incompatible database"); c.close().map_err(|(_, error)| anyhow::Error::from(error))?; trace!("Removing existing incompatible database"); remove_file(db_path)?; - trace!("Creating a new connection to a new database"); - create_connection(db_path)? + trace!("Initializing a new database"); + initialize_db(nx_version, db_path)? } }; Ok(c) } -fn create_connection(db_path: &PathBuf) -> anyhow::Result { - match open_database_connection(db_path) { - Ok(connection) => { - configure_database(&connection)?; - create_metadata_table(&connection)?; - Ok(connection) - } - err @ Err(_) => err, - } -} +fn create_metadata_table(c: &mut NxDbConnection, nx_version: &str) -> anyhow::Result<()> { + debug!("Creating table for metadata"); + c.transaction(|conn| { + conn.execute( + "CREATE TABLE metadata ( + key TEXT NOT NULL PRIMARY KEY, + value TEXT NOT NULL + )", + [], + )?; + trace!("Recording Nx Version: {}", nx_version); + conn.execute( + "INSERT INTO metadata (key, value) VALUES ('NX_VERSION', ?)", + [nx_version], + )?; + Ok(()) + })?; -fn create_metadata_table(c: &NxDbConnection) -> anyhow::Result<()> { - debug!("Creating table for metadata if it does not exist"); - c.execute( - "CREATE TABLE IF NOT EXISTS metadata ( - key TEXT NOT NULL PRIMARY KEY, - value TEXT NOT NULL - )", - [], - )?; Ok(()) } -fn open_database_connection(db_path: &PathBuf) -> anyhow::Result { +fn open_database_connection(db_path: &Path) -> anyhow::Result { let conn = if cfg!(target_family = "unix") && ci_info::is_ci() { trace!("Opening connection with unix-dotfile"); Connection::open_with_flags_and_vfs( @@ -110,7 +106,7 @@ fn open_database_connection(db_path: &PathBuf) -> anyhow::Result OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_URI - | OpenFlags::SQLITE_OPEN_NO_MUTEX, + | OpenFlags::SQLITE_OPEN_FULL_MUTEX, "unix-dotfile", ) } else { @@ -129,16 +125,83 @@ fn open_database_connection(db_path: &PathBuf) -> anyhow::Result fn configure_database(connection: &NxDbConnection) -> anyhow::Result<()> { connection - .conn .pragma_update(None, "journal_mode", "WAL") .map_err(|e| anyhow::anyhow!("Unable to set journal_mode: {:?}", e))?; connection - .conn .pragma_update(None, "synchronous", "NORMAL") .map_err(|e| anyhow::anyhow!("Unable to set synchronous: {:?}", e))?; connection - .conn - .busy_handler(Some(|tries| tries < 6)) + .busy_handler(Some(|tries| tries <= 12)) .map_err(|e| anyhow::anyhow!("Unable to set busy handler: {:?}", e))?; Ok(()) } + +#[cfg(test)] +mod tests { + + use crate::native::logger::enable_logger; + + use super::*; + + #[test] + fn initialize_db_creates_new_db() -> anyhow::Result<()> { + let temp_dir = tempfile::tempdir()?; + let db_path = temp_dir.path().join("test.db"); + + let _ = initialize_db("1.0.0".to_string(), &db_path)?; + + let conn = Connection::open(&db_path)?; + let version: String = conn.query_row( + "SELECT value FROM metadata WHERE key='NX_VERSION'", + [], + |row| row.get(0), + )?; + + assert_eq!(version, "1.0.0"); + Ok(()) + } + + #[test] + fn initialize_db_reuses_compatible_db() -> anyhow::Result<()> { + let temp_dir = tempfile::tempdir()?; + let db_path = temp_dir.path().join("test.db"); + + // Create initial db + let _ = initialize_db("1.0.0".to_string(), &db_path)?; + + // Try to initialize again with same version + let _ = initialize_db("1.0.0".to_string(), &db_path)?; + + let conn = Connection::open(&db_path)?; + let version: String = conn.query_row( + "SELECT value FROM metadata WHERE key='NX_VERSION'", + [], + |row| row.get(0), + )?; + + assert_eq!(version, "1.0.0"); + Ok(()) + } + + #[test] + fn initialize_db_recreates_incompatible_db() -> anyhow::Result<()> { + enable_logger(); + let temp_dir = tempfile::tempdir()?; + let db_path = temp_dir.path().join("test.db"); + // + // Create initial db + let _ = initialize_db("1.0.0".to_string(), &db_path)?; + + // Try to initialize with different version + let conn = initialize_db("2.0.0".to_string(), &db_path)?; + + let version: Option = conn.query_row( + "SELECT value FROM metadata WHERE key='NX_VERSION'", + [], + |row| row.get(0), + )?; + + assert_eq!(version.unwrap(), "2.0.0"); + Ok(()) + } +} diff --git a/packages/nx/src/native/db/mod.rs b/packages/nx/src/native/db/mod.rs index 5438a42cfe414..ea4be4e03a136 100644 --- a/packages/nx/src/native/db/mod.rs +++ b/packages/nx/src/native/db/mod.rs @@ -1,8 +1,9 @@ pub mod connection; mod initialize; -use crate::native::db::connection::NxDbConnection; +use crate::native::logger::enable_logger; use crate::native::machine_id::get_machine_id; +use crate::native::{db::connection::NxDbConnection, hasher::hash}; use napi::bindgen_prelude::External; use std::fs::create_dir_all; use std::path::PathBuf; @@ -15,18 +16,27 @@ pub fn connect_to_nx_db( nx_version: String, db_name: Option, ) -> anyhow::Result> { + enable_logger(); let cache_dir_buf = PathBuf::from(cache_dir); - let db_path = cache_dir_buf.join(format!("{}.db", db_name.unwrap_or_else(get_machine_id))); + let mut db_file_name = db_name.unwrap_or_else(get_machine_id); + + if db_file_name.is_empty() { + trace!("Invalid db file name, using fallback name"); + db_file_name = hash(b"machine"); + } + + let db_path = cache_dir_buf.join(format!("{}.db", db_file_name)); create_dir_all(cache_dir_buf)?; - let _ = trace_span!("process", id = process::id()).entered(); - trace!("Creating connection to {:?}", db_path); - let lock_file = initialize::create_lock_file(&db_path)?; + trace_span!("process", id = process::id()).in_scope(|| { + trace!("Creating connection to {:?}", db_path); + let lock_file = initialize::create_lock_file(&db_path)?; - let c = initialize::initialize_db(nx_version, &db_path) - .inspect_err(|_| initialize::unlock_file(&lock_file))?; + let c = initialize::initialize_db(nx_version, &db_path) + .inspect_err(|_| initialize::unlock_file(&lock_file))?; - initialize::unlock_file(&lock_file); + initialize::unlock_file(&lock_file); - Ok(External::new(c)) + Ok(External::new(c)) + }) } diff --git a/packages/nx/src/native/tasks/details.rs b/packages/nx/src/native/tasks/details.rs index 70d0c05919867..ac972f90d6be6 100644 --- a/packages/nx/src/native/tasks/details.rs +++ b/packages/nx/src/native/tasks/details.rs @@ -1,9 +1,10 @@ use crate::native::db::connection::NxDbConnection; use napi::bindgen_prelude::*; use rusqlite::params; +use tracing::trace; #[napi(object)] -#[derive(Default, Clone)] +#[derive(Default, Clone, Debug)] pub struct HashedTask { pub hash: String, pub project: String, @@ -42,14 +43,17 @@ impl TaskDetails { } #[napi] - pub fn record_task_details(&self, tasks: Vec) -> anyhow::Result<()> { - for task in tasks.iter() { - self.db.execute( - "INSERT OR REPLACE INTO task_details (hash, project, target, configuration) - VALUES (?1, ?2, ?3, ?4)", - params![task.hash, task.project, task.target, task.configuration], - )?; - } + pub fn record_task_details(&mut self, tasks: Vec) -> anyhow::Result<()> { + trace!("Recording task details"); + self.db.transaction(|conn| { + let mut stmt = conn.prepare("INSERT OR REPLACE INTO task_details (hash, project, target, configuration) VALUES (?1, ?2, ?3, ?4)")?; + for task in tasks.iter() { + stmt.execute( + params![task.hash, task.project, task.target, task.configuration], + )?; + } + Ok(()) + })?; Ok(()) } diff --git a/packages/nx/src/native/tasks/task_history.rs b/packages/nx/src/native/tasks/task_history.rs index 5ee0c53904e16..9dba3b31378b4 100644 --- a/packages/nx/src/native/tasks/task_history.rs +++ b/packages/nx/src/native/tasks/task_history.rs @@ -5,6 +5,7 @@ use rusqlite::vtab::array; use rusqlite::{params, types::Value}; use std::collections::HashMap; use std::rc::Rc; +use tracing::trace; #[napi(object)] pub struct TaskRun { @@ -54,24 +55,26 @@ impl NxTaskHistory { } #[napi] - pub fn record_task_runs(&self, task_runs: Vec) -> anyhow::Result<()> { - for task_run in task_runs.iter() { - self.db - .execute( - " - INSERT INTO task_history - (hash, status, code, start, end) - VALUES (?1, ?2, ?3, ?4, ?5)", - params![ - task_run.hash, - task_run.status, - task_run.code, - task_run.start, - task_run.end - ], - ) - .map_err(anyhow::Error::from)?; - } + pub fn record_task_runs(&mut self, task_runs: Vec) -> anyhow::Result<()> { + trace!("Recording task runs"); + self.db.transaction(|conn| { + let mut stmt = conn.prepare( + "INSERT OR REPLACE INTO task_history + (hash, status, code, start, end) + VALUES (?1, ?2, ?3, ?4, ?5)", + )?; + for task_run in task_runs.iter() { + stmt.execute(params![ + task_run.hash, + task_run.status, + task_run.code, + task_run.start, + task_run.end + ]) + .inspect_err(|e| trace!("Error trying to insert {:?}: {:?}", &task_run.hash, e))?; + } + Ok(()) + })?; Ok(()) }