Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): retry more db operations #28667

Merged
merged 7 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/nx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,4 @@ napi-build = '2.1.3'
assert_fs = "1.0.10"
# This is only used for unit tests
swc_ecma_dep_graph = "0.109.1"
tempfile = "3.13.0"
118 changes: 86 additions & 32 deletions packages/nx/src/native/db/connection.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,103 @@
use anyhow::Result;
use rusqlite::{Connection, Error, OptionalExtension, Params, Row, Statement};

use rusqlite::{Connection, DatabaseName, Error, OptionalExtension, Params, Row, Statement, ToSql};
use std::thread;
use std::time::{Duration, Instant};
use std::time::Duration;
use tracing::trace;

pub struct NxDbConnection {
pub conn: Connection,
}

const MAX_RETRIES: u32 = 20;
const RETRY_DELAY: u64 = 25;

/// macro for handling the db when its busy
/// This is a macro instead of a function because some database operations need to take a &mut Connection, while returning a reference
/// This causes some quite complex lifetime issues that are quite hard to solve
///
/// Using a macro inlines the retry operation where it was called, and the lifetime issues are avoided
macro_rules! retry_db_operation_when_busy {
($operation:expr) => {{
let connection = 'retry: {
for i in 1..MAX_RETRIES {
match $operation {
r @ Ok(_) => break 'retry r,
Err(Error::SqliteFailure(err, _))
if err.code == rusqlite::ErrorCode::DatabaseBusy =>
{
trace!("Database busy. Retrying {} of {}", i, MAX_RETRIES);
let sleep = Duration::from_millis(RETRY_DELAY * 2_u64.pow(i));
let max_sleep = Duration::from_secs(12);
if (sleep >= max_sleep) {
thread::sleep(max_sleep);
} else {
thread::sleep(sleep);
}
}
err => break 'retry err,
};
}
break 'retry Err(Error::SqliteFailure(
rusqlite::ffi::Error {
code: rusqlite::ErrorCode::DatabaseBusy,
extended_code: 0,
},
Some("Database busy. Retried maximum number of times.".to_string()),
));
};

connection
}};
}

impl NxDbConnection {
pub fn new(connection: Connection) -> Self {
Self { conn: connection }
}

pub fn execute<P: Params + Clone>(&self, sql: &str, params: P) -> Result<usize> {
self.retry_on_busy(|conn| conn.execute(sql, params.clone()))
retry_db_operation_when_busy!(self.conn.execute(sql, params.clone()))
.map_err(|e| anyhow::anyhow!("DB execute error: \"{}\", {:?}", sql, e))
}

pub fn execute_batch(&self, sql: &str) -> Result<()> {
self.retry_on_busy(|conn| conn.execute_batch(sql))
retry_db_operation_when_busy!(self.conn.execute_batch(sql))
.map_err(|e| anyhow::anyhow!("DB execute batch error: \"{}\", {:?}", sql, e))
}

pub fn prepare(&self, sql: &str) -> Result<Statement> {
self.retry_on_busy(|conn| conn.prepare(sql))
retry_db_operation_when_busy!(self.conn.prepare(sql))
.map_err(|e| anyhow::anyhow!("DB prepare error: \"{}\", {:?}", sql, e))
}

pub fn transaction<T>(
&mut self,
transaction_operation: impl Fn(&Connection) -> rusqlite::Result<T>,
) -> Result<T> {
let transaction = retry_db_operation_when_busy!(self.conn.transaction())
.map_err(|e| anyhow::anyhow!("DB transaction error: {:?}", e))?;

let result = transaction_operation(&transaction)
.map_err(|e| anyhow::anyhow!("DB transaction operation error: {:?}", e))?;

transaction
.commit()
.map_err(|e| anyhow::anyhow!("DB transaction commit error: {:?}", e))?;

Ok(result)
}

pub fn query_row<T, P, F>(&self, sql: &str, params: P, f: F) -> Result<Option<T>>
where
P: Params + Clone,
F: FnOnce(&Row<'_>) -> rusqlite::Result<T> + Clone,
{
self.retry_on_busy(|conn| conn.query_row(sql, params.clone(), f.clone()).optional())
.map_err(|e| anyhow::anyhow!("DB query error: \"{}\", {:?}", sql, e))
retry_db_operation_when_busy!(self
.conn
.query_row(sql, params.clone(), f.clone())
.optional())
.map_err(|e| anyhow::anyhow!("DB query error: \"{}\", {:?}", sql, e))
}

pub fn close(self) -> rusqlite::Result<(), (Connection, Error)> {
Expand All @@ -43,33 +106,24 @@ impl NxDbConnection {
.inspect_err(|e| trace!("Error in close: {:?}", e))
}

#[allow(clippy::needless_lifetimes)]
fn retry_on_busy<'a, F, T>(&'a self, operation: F) -> rusqlite::Result<T>
pub fn pragma_update<V>(
&self,
schema_name: Option<DatabaseName<'_>>,
pragma_name: &str,
pragma_value: V,
) -> rusqlite::Result<()>
where
F: Fn(&'a Connection) -> rusqlite::Result<T>,
V: ToSql + Clone,
{
let start = Instant::now();
let max_retries: u64 = 5;
let retry_delay = Duration::from_millis(25);

for i in 0..max_retries {
match operation(&self.conn) {
Ok(result) => return Ok(result),
Err(Error::SqliteFailure(err, _))
if err.code == rusqlite::ErrorCode::DatabaseBusy =>
{
trace!("Database busy. Retrying{}", ".".repeat(i as usize));
if start.elapsed()
>= Duration::from_millis(max_retries * retry_delay.as_millis() as u64)
{
break;
}
thread::sleep(retry_delay);
}
err @ Err(_) => return err,
}
}
retry_db_operation_when_busy!(self.conn.pragma_update(
schema_name,
pragma_name,
pragma_value.clone()
))
}

operation(&self.conn)
pub fn busy_handler(&self, callback: Option<fn(i32) -> bool>) -> Result<()> {
retry_db_operation_when_busy!(self.conn.busy_handler(callback))
.map_err(|e| anyhow::anyhow!("DB busy handler error: {:?}", e))
}
}
Loading