Skip to content

Commit

Permalink
actively return connections to the pool
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Whitehead <cywolf@gmail.com>
  • Loading branch information
andrewwhitehead committed Sep 20, 2023
1 parent 7537214 commit 92d76a7
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 31 deletions.
7 changes: 7 additions & 0 deletions askar-storage/src/backend/db_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ impl<DB: ExtDatabase> DbSession<DB> {
DB::TransactionManager::rollback(&mut conn).await
}
.map_err(err_map!(Backend, "Error closing transaction"))?;
conn.return_to_pool().await;
} else {
warn!("Could not close out transaction: session not active");
}
Expand Down Expand Up @@ -234,6 +235,12 @@ pub enum DbSessionRef<'q, DB: ExtDatabase> {
Borrowed(&'q mut DbSession<DB>),
}

impl<DB: ExtDatabase> DbSessionRef<'_, DB> {
pub fn is_owned(&self) -> bool {
matches!(self, Self::Owned(_))
}
}

impl<'q, DB: ExtDatabase> Deref for DbSessionRef<'q, DB> {
type Target = DbSession<DB>;

Expand Down
25 changes: 18 additions & 7 deletions askar-storage/src/backend/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use futures_lite::{
use sqlx::{
pool::PoolConnection,
postgres::{PgPool, Postgres},
Row,
Acquire, Row,
};

use super::{
Expand Down Expand Up @@ -127,15 +127,16 @@ impl Backend for PostgresBackend {
})
.await?;
let mut conn = self.conn_pool.acquire().await?;
if let Some(pid) = sqlx::query_scalar(
let res = sqlx::query_scalar(
"INSERT INTO profiles (name, profile_key) VALUES ($1, $2)
ON CONFLICT DO NOTHING RETURNING id",
)
.bind(&name)
.bind(enc_key)
.fetch_optional(conn.as_mut())
.await?
{
.await?;
conn.return_to_pool().await;
if let Some(pid) = res {
self.key_cache
.add_profile(name.clone(), pid, Arc::new(profile_key))
.await;
Expand All @@ -158,6 +159,7 @@ impl Backend for PostgresBackend {
.fetch_one(conn.as_mut())
.await
.map_err(err_map!(Backend, "Error fetching default profile name"))?;
conn.return_to_pool().await;
Ok(profile.unwrap_or_default())
})
}
Expand All @@ -171,6 +173,7 @@ impl Backend for PostgresBackend {
.execute(conn.as_mut())
.await
.map_err(err_map!(Backend, "Error setting default profile name"))?;
conn.return_to_pool().await;
Ok(())
})
}
Expand All @@ -183,20 +186,23 @@ impl Backend for PostgresBackend {
.await
.map_err(err_map!(Backend, "Error fetching profile list"))?;
let names = rows.into_iter().flat_map(|r| r.try_get(0)).collect();
conn.return_to_pool().await;
Ok(names)
})
}

fn remove_profile(&self, name: String) -> BoxFuture<'_, Result<bool, Error>> {
Box::pin(async move {
let mut conn = self.conn_pool.acquire().await?;
Ok(sqlx::query("DELETE FROM profiles WHERE name=$1")
let ret = sqlx::query("DELETE FROM profiles WHERE name=$1")
.bind(&name)
.execute(conn.as_mut())
.await
.map_err(err_map!(Backend, "Error removing profile"))?
.rows_affected()
!= 0)
!= 0;
conn.return_to_pool().await;
Ok(ret)
})
}

Expand All @@ -209,7 +215,8 @@ impl Backend for PostgresBackend {
Box::pin(async move {
let (store_key, store_key_ref) = unblock(move || method.resolve(pass_key)).await?;
let store_key = Arc::new(store_key);
let mut txn = self.conn_pool.begin().await?;
let mut conn = self.conn_pool.acquire().await?;
let mut txn = conn.begin().await?;
let mut rows = sqlx::query("SELECT id, profile_key FROM profiles").fetch(txn.as_mut());
let mut upd_keys = BTreeMap::<ProfileId, Vec<u8>>::new();
while let Some(row) = rows.next().await {
Expand Down Expand Up @@ -247,6 +254,7 @@ impl Backend for PostgresBackend {
return Err(err_msg!(Backend, "Error updating store key"));
}
txn.commit().await?;
conn.return_to_pool().await;
self.key_cache = Arc::new(KeyCache::new(store_key));
Ok(())
})
Expand Down Expand Up @@ -784,6 +792,9 @@ fn perform_scan(
}
}
drop(rows);
if active.is_owned() {
active.close(false).await?;
}
drop(active);

if !batch.is_empty() {
Expand Down
15 changes: 11 additions & 4 deletions askar-storage/src/backend/postgres/provision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ impl PostgresStoreOptions {
recreate: bool,
) -> Result<PostgresBackend, Error> {
let conn_pool = self.create_db_pool().await?;
let mut txn = conn_pool.begin().await?;
let mut conn = conn_pool.acquire().await?;
let mut txn = conn.begin().await?;

if recreate {
// remove expected tables
Expand Down Expand Up @@ -248,6 +249,8 @@ impl PostgresStoreOptions {
self.schema.as_ref().unwrap_or(&self.username),
)
.await?;
conn.return_to_pool().await;

let mut key_cache = KeyCache::new(store_key);
key_cache.add_profile_mut(default_profile.clone(), profile_id, profile_key);

Expand Down Expand Up @@ -290,14 +293,16 @@ impl PostgresStoreOptions {
// any character except NUL is allowed in an identifier.
// double quotes must be escaped, but we just disallow those
let drop_q = format!("DROP DATABASE \"{}\"", self.name);
match admin_conn.execute(drop_q.as_str()).await {
let res = match admin_conn.execute(drop_q.as_str()).await {
Ok(_) => Ok(true),
Err(SqlxError::Database(db_err)) if db_err.code() == Some(Cow::Borrowed("3D000")) => {
// invalid catalog name is raised if the database does not exist
Ok(false)
}
Err(err) => Err(err_msg!(Backend, "Error removing database").with_cause(err)),
}
}?;
admin_conn.close().await?;
Ok(res)
}
}

Expand Down Expand Up @@ -485,14 +490,16 @@ pub(crate) async fn open_db(
} else {
return Err(err_msg!(Unsupported, "Store key not found"));
};
let mut key_cache = KeyCache::new(store_key);

let mut key_cache = KeyCache::new(store_key);
let row = sqlx::query("SELECT id, profile_key FROM profiles WHERE name = $1")
.bind(&profile)
.fetch_one(conn.as_mut())
.await?;
let profile_id = row.try_get(0)?;
let profile_key = key_cache.load_key(row.try_get(1)?).await?;
conn.return_to_pool().await;

key_cache.add_profile_mut(profile.clone(), profile_id, profile_key);

Ok(PostgresBackend::new(
Expand Down
19 changes: 15 additions & 4 deletions askar-storage/src/backend/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures_lite::{
use sqlx::{
pool::PoolConnection,
sqlite::{Sqlite, SqlitePool},
Database, Error as SqlxError, Row, TransactionManager,
Acquire, Database, Error as SqlxError, Row, TransactionManager,
};

use super::{
Expand Down Expand Up @@ -126,6 +126,7 @@ impl Backend for SqliteBackend {
.bind(enc_key)
.execute(conn.as_mut())
.await?;
conn.return_to_pool().await;
if done.rows_affected() == 0 {
return Err(err_msg!(Duplicate, "Duplicate profile name"));
}
Expand All @@ -152,6 +153,7 @@ impl Backend for SqliteBackend {
.fetch_one(conn.as_mut())
.await
.map_err(err_map!(Backend, "Error fetching default profile name"))?;
conn.return_to_pool().await;
Ok(profile.unwrap_or_default())
})
}
Expand All @@ -165,6 +167,7 @@ impl Backend for SqliteBackend {
.execute(conn.as_mut())
.await
.map_err(err_map!(Backend, "Error setting default profile name"))?;
conn.return_to_pool().await;
Ok(())
})
}
Expand All @@ -176,6 +179,7 @@ impl Backend for SqliteBackend {
.fetch_all(conn.as_mut())
.await
.map_err(err_map!(Backend, "Error fetching profile list"))?;
conn.return_to_pool().await;
let names = rows.into_iter().flat_map(|r| r.try_get(0)).collect();
Ok(names)
})
Expand All @@ -184,13 +188,15 @@ impl Backend for SqliteBackend {
fn remove_profile(&self, name: String) -> BoxFuture<'_, Result<bool, Error>> {
Box::pin(async move {
let mut conn = self.conn_pool.acquire().await?;
Ok(sqlx::query("DELETE FROM profiles WHERE name=?")
let ret = sqlx::query("DELETE FROM profiles WHERE name=?")
.bind(&name)
.execute(conn.as_mut())
.await
.map_err(err_map!(Backend, "Error removing profile"))?
.rows_affected()
!= 0)
!= 0;
conn.return_to_pool().await;
Ok(ret)
})
}

Expand All @@ -203,7 +209,8 @@ impl Backend for SqliteBackend {
Box::pin(async move {
let (store_key, store_key_ref) = unblock(move || method.resolve(pass_key)).await?;
let store_key = Arc::new(store_key);
let mut txn = self.conn_pool.begin().await?;
let mut conn = self.conn_pool.acquire().await?;
let mut txn = conn.begin().await?;
let mut rows = sqlx::query("SELECT id, profile_key FROM profiles").fetch(txn.as_mut());
let mut upd_keys = BTreeMap::<ProfileId, Vec<u8>>::new();
while let Some(row) = rows.next().await {
Expand Down Expand Up @@ -241,6 +248,7 @@ impl Backend for SqliteBackend {
return Err(err_msg!(Backend, "Error updating store key"));
}
txn.commit().await?;
conn.return_to_pool().await;
self.key_cache = Arc::new(KeyCache::new(store_key));
Ok(())
})
Expand Down Expand Up @@ -729,6 +737,9 @@ fn perform_scan(
}
}
drop(rows);
if active.is_owned() {
active.close(false).await?;
}
drop(active);

if !batch.is_empty() {
Expand Down
37 changes: 21 additions & 16 deletions askar-storage/src/backend/sqlite/provision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,23 +169,26 @@ impl SqliteStoreOptions {
.await
.map_err(err_map!(Backend, "Error creating database pool"))?;

if !recreate
&& sqlx::query_scalar::<_, i64>(
if !recreate {
let mut conn = conn_pool.acquire().await?;
let found = sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='config'",
)
.fetch_one(&conn_pool)
.fetch_one(conn.as_mut())
.await
.map_err(err_map!(Backend, "Error checking for existing store"))?
== 1
{
return open_db(
conn_pool,
Some(method),
pass_key,
profile,
self.path.to_string(),
)
.await;
== 1;
conn.return_to_pool().await;
if found {
return open_db(
conn_pool,
Some(method),
pass_key,
profile,
self.path.to_string(),
)
.await;
}
}
// else: no 'config' table, assume empty database

Expand Down Expand Up @@ -354,14 +357,15 @@ async fn init_db(
.execute(conn.as_mut())
.await.map_err(err_map!(Backend, "Error creating database tables"))?;

let mut key_cache = KeyCache::new(store_key);

let row = sqlx::query("SELECT id FROM profiles WHERE name = ?1")
.persistent(false)
.bind(profile_name)
.fetch_one(conn.as_mut())
.await
.map_err(err_map!(Backend, "Error checking for existing profile"))?;
conn.return_to_pool().await;

let mut key_cache = KeyCache::new(store_key);
key_cache.add_profile_mut(profile_name.to_string(), row.try_get(0)?, profile_key);

Ok(key_cache)
Expand Down Expand Up @@ -424,14 +428,15 @@ async fn open_db(
} else {
return Err(err_msg!(Unsupported, "Store key not found"));
};
let mut key_cache = KeyCache::new(store_key);

let mut key_cache = KeyCache::new(store_key);
let row = sqlx::query("SELECT id, profile_key FROM profiles WHERE name = ?1")
.bind(&profile)
.fetch_one(conn.as_mut())
.await?;
let profile_id = row.try_get(0)?;
let profile_key = key_cache.load_key(row.try_get(1)?).await?;
conn.return_to_pool().await;
key_cache.add_profile_mut(profile.clone(), profile_id, profile_key);

Ok(SqliteBackend::new(conn_pool, profile, key_cache, path))
Expand Down

0 comments on commit 92d76a7

Please sign in to comment.