Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add set_connect_options method to Pool #2088

Merged
merged 4 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ rand_xoshiro = "0.6.0"
hex = "0.4.3"
tempdir = "0.3.7"
# Needed to test SQLCipher
libsqlite3-sys = { version = "0.24", features = ["bundled-sqlcipher"] }
libsqlite3-sys = { version = "0.25", features = ["bundled-sqlcipher"] }

#
# Any
Expand Down
7 changes: 6 additions & 1 deletion sqlx-core/src/mysql/testing/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::fmt::Write;
use std::ops::Deref;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
Expand Down Expand Up @@ -152,7 +153,11 @@ async fn test_context(args: &TestArgs) -> Result<TestContext<MySql>, Error> {
// Close connections ASAP if left in the idle queue.
.idle_timeout(Some(Duration::from_secs(1)))
.parent(master_pool.clone()),
connect_opts: master_pool.connect_options().clone().database(&new_db_name),
connect_opts: master_pool
.connect_options()
.deref()
.clone()
.database(&new_db_name),
db_name: new_db_name,
})
}
Expand Down
16 changes: 12 additions & 4 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures_intrusive::sync::{Semaphore, SemaphoreReleaser};
use std::cmp;
use std::future::Future;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::task::Poll;

use crate::pool::options::PoolConnectionMetadata;
Expand All @@ -20,7 +20,7 @@ use futures_util::FutureExt;
use std::time::{Duration, Instant};

pub(crate) struct PoolInner<DB: Database> {
pub(super) connect_options: <DB::Connection as Connection>::Options,
pub(super) connect_options: RwLock<Arc<<DB::Connection as Connection>::Options>>,
pub(super) idle_conns: ArrayQueue<Idle<DB>>,
pub(super) semaphore: Semaphore,
pub(super) size: AtomicU32,
Expand All @@ -47,7 +47,7 @@ impl<DB: Database> PoolInner<DB> {
};

let pool = Self {
connect_options,
connect_options: RwLock::new(Arc::new(connect_options)),
idle_conns: ArrayQueue::new(capacity),
semaphore: Semaphore::new(options.fair, semaphore_capacity),
size: AtomicU32::new(0),
Expand Down Expand Up @@ -292,9 +292,17 @@ impl<DB: Database> PoolInner<DB> {
loop {
let timeout = deadline_as_timeout::<DB>(deadline)?;

// clone the connect options arc so it can be used without holding the RwLockReadGuard
// across an async await point
let connect_options = self
.connect_options
.read()
.expect("write-lock holder panicked")
.clone();

// result here is `Result<Result<C, Error>, TimeoutError>`
// if this block does not return, sleep for the backoff timeout and try again
match sqlx_rt::timeout(timeout, self.connect_options.connect()).await {
match sqlx_rt::timeout(timeout, connect_options.connect()).await {
// successfully established connection
Ok(Ok(mut raw)) => {
// See comment on `PoolOptions::after_connect`
Expand Down
30 changes: 26 additions & 4 deletions sqlx-core/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ use futures_core::FusedFuture;
use futures_util::FutureExt;
use std::fmt;
use std::future::Future;
use std::ops::DerefMut;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -489,9 +490,26 @@ impl<DB: Database> Pool<DB> {
self.0.num_idle()
}

/// Get the connection options for this pool
pub fn connect_options(&self) -> &<DB::Connection as Connection>::Options {
&self.0.connect_options
/// Gets a clone of the connection options for this pool
pub fn connect_options(&self) -> Arc<<DB::Connection as Connection>::Options> {
self.0
.connect_options
.read()
.expect("write-lock holder panicked")
.clone()
}

/// Updates the connection options this pool will use when opening any future connections. Any
/// existing open connection in the pool will be left as-is.
pub fn set_connect_options(&self, connect_options: <DB::Connection as Connection>::Options) {
// technically write() could also panic if the current thread already holds the lock,
// but because this method can't be re-entered by the same thread that shouldn't be a problem
let mut guard = self
.0
.connect_options
.write()
.expect("write-lock holder panicked");
std::mem::swap(guard.deref_mut(), &mut Arc::new(connect_options));
moatra marked this conversation as resolved.
Show resolved Hide resolved
}

/// Get the options for this pool
Expand All @@ -514,7 +532,11 @@ impl Pool<Any> {
///
/// Determined by the connection URL.
pub fn any_kind(&self) -> AnyKind {
self.0.connect_options.kind()
self.0
.connect_options
.read()
.expect("write-lock holder panicked")
.kind()
}
}

Expand Down
7 changes: 6 additions & 1 deletion sqlx-core/src/postgres/testing/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::fmt::Write;
use std::ops::Deref;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
Expand Down Expand Up @@ -159,7 +160,11 @@ async fn test_context(args: &TestArgs) -> Result<TestContext<Postgres>, Error> {
// Close connections ASAP if left in the idle queue.
.idle_timeout(Some(Duration::from_secs(1)))
.parent(master_pool.clone()),
connect_opts: master_pool.connect_options().clone().database(&new_db_name),
connect_opts: master_pool
.connect_options()
.deref()
.clone()
.database(&new_db_name),
db_name: new_db_name,
})
}
Expand Down
4 changes: 2 additions & 2 deletions sqlx-core/src/sqlite/connection/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@ impl BranchStateHash {
let mut cur = vec![];
for (k, v) in &st.p {
match v {
CursorDataType::Normal(hm) => {
for (i, col) in hm {
CursorDataType::Normal { cols, .. } => {
for (i, col) in cols {
cur.push((*k, *i, Some(col.clone())));
}
}
Expand Down