Skip to content

Commit

Permalink
Replace sqlx::Pool with a custom implementation to work around a bug
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed May 17, 2022
1 parent e863e54 commit 6454707
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 58 deletions.
105 changes: 67 additions & 38 deletions lib/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,98 @@ use crate::{
error::{Error, Result},
};
use sqlx::{
sqlite::{Sqlite, SqliteConnectOptions, SqliteConnection, SqlitePoolOptions},
SqlitePool,
sqlite::{Sqlite, SqliteConnectOptions, SqliteConnection},
ConnectOptions, Connection as _,
};
use std::{
borrow::Cow,
convert::Infallible,
future::Future,
ops::{Deref, DerefMut},
path::{Path, PathBuf},
str::FromStr,
sync::Arc,
};
use tokio::{
fs,
sync::{Mutex, OwnedMutexGuard},
};
use tokio::fs;

/// Database connection pool.
///
/// NOTE: This is not a proper pool - it's just a mutex around a single connection. We use this
/// instead of `sqlx::Pool` to work around an issue where sometimes it drops connections if the
/// `acquire` future is cancelled before returning (see
/// https://github.com/launchbadge/sqlx/issues/1869 for more details).
/// Besides, we were already using only one connection* so using the full pool was overkill anyway.
///
/// *) The reason we were using only one connection was to work around concurrency issues
/// ("table locked" errors, etc...) which were too much hassle to fix.
///
/// TODO: consider switching back to `sqlx::Pool` when the connection drop issue is fixed.
#[derive(Clone)]
pub(crate) struct Pool {
inner: SqlitePool,
inner: Arc<Mutex<Option<Connection>>>,
deadlock_tracker: DeadlockTracker,
}

impl Pool {
fn new(inner: SqlitePool) -> Self {
fn new(conn: Connection) -> Self {
Self {
inner,
inner: Arc::new(Mutex::new(Some(conn))),
deadlock_tracker: DeadlockTracker::new(),
}
}

#[track_caller]
pub fn acquire(&self) -> impl Future<Output = Result<PoolConnection, sqlx::Error>> {
DeadlockGuard::try_wrap(self.inner.acquire(), self.deadlock_tracker.clone())
let lock = DeadlockGuard::wrap(
self.inner.clone().lock_owned(),
self.deadlock_tracker.clone(),
);

async move {
let guard = lock.await;

if guard.is_some() {
Ok(PoolConnection(guard))
} else {
Err(sqlx::Error::PoolClosed)
}
}
}

pub async fn close(&self) {
self.inner.close().await
if let Some(conn) = self.inner.lock().await.take() {
if let Err(error) = conn.close().await {
log::error!("Failed to close db connection: {:?}", error);
}
}
}
}

/// Database connection.
pub type Connection = SqliteConnection;

/// Pooled database connection
pub(crate) type PoolConnection = DeadlockGuard<sqlx::pool::PoolConnection<Sqlite>>;
pub(crate) struct PoolConnection(DeadlockGuard<OwnedMutexGuard<Option<Connection>>>);

impl Deref for PoolConnection {
type Target = Connection;

fn deref(&self) -> &Self::Target {
// `unwrap` is ok because we only ever create `PoolConnection` if the `Option` inside is
// `Some` and we never set it to `None` while the `PoolConnection` exists.
self.0.as_ref().unwrap()
}
}

impl DerefMut for PoolConnection {
fn deref_mut(&mut self) -> &mut Self::Target {
// `unwrap` is ok - see `Deref` impl for details.
self.0.as_mut().unwrap()
}
}

/// Database transaction
pub type Transaction<'a> = sqlx::Transaction<'a, Sqlite>;
Expand Down Expand Up @@ -116,41 +166,20 @@ async fn open_permanent(path: &Path, create_if_missing: bool) -> Result<Pool> {
}
}

SqlitePoolOptions::new()
// HACK: using only one connection to work around `SQLITE_BUSY` errors.
//
// TODO: After some experimentation, it seems that using `SqliteSynchornous::Normal` might
// fix those errors but it needs more testing. But even if it works, we should try to avoid
// making the test and the production code diverge too much. This means that in order to
// use multiple connections we would either have to stop using memory databases or we would
// have to enable shared cache also for file databases. Both approaches have their
// drawbacks.
.max_connections(1)
.connect_with(
SqliteConnectOptions::new()
.filename(path)
.create_if_missing(create_if_missing),
)
SqliteConnectOptions::new()
.filename(path)
.create_if_missing(create_if_missing)
.connect()
.await
.map(Pool::new)
.map_err(Error::ConnectToDb)
}

async fn open_temporary() -> Result<Pool> {
SqlitePoolOptions::new()
// HACK: using only one connection to avoid having to use shared cache (which is
// necessary when using multiple connections to a memory database, but it's extremely
// prone to deadlocks)
.max_connections(1)
// Never reap connections because without shared cache it would also destroy the whole
// database.
.max_lifetime(None)
.idle_timeout(None)
.connect_with(
SqliteConnectOptions::from_str(MEMORY)
.unwrap()
.shared_cache(false),
)
SqliteConnectOptions::from_str(MEMORY)
.unwrap()
.shared_cache(false)
.connect()
.await
.map(Pool::new)
.map_err(Error::ConnectToDb)
Expand Down
20 changes: 0 additions & 20 deletions lib/src/deadlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,6 @@ impl<T> DeadlockGuard<T> {
}
}
}

#[track_caller]
pub(crate) fn try_wrap<F, E>(
inner: F,
tracker: DeadlockTracker,
) -> impl Future<Output = Result<Self, E>>
where
F: Future<Output = Result<T, E>>,
{
let acquire = tracker.acquire();

async move {
let inner = detect_deadlock(inner, &tracker).await;

Ok(Self {
inner: inner?,
_acquire: acquire,
})
}
}
}

impl<T> Deref for DeadlockGuard<T>
Expand Down

0 comments on commit 6454707

Please sign in to comment.