Skip to content

Commit

Permalink
WIP feat: implement metrics-gathering support for Pool
Browse files Browse the repository at this point in the history
TODO: tests
  • Loading branch information
abonander committed Jun 16, 2022
1 parent cd78d7d commit 0f6b547
Show file tree
Hide file tree
Showing 7 changed files with 622 additions and 13 deletions.
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion sqlx-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ all-types = [
]
bigdecimal = ["bigdecimal_", "num-bigint"]
decimal = ["rust_decimal", "num-bigint"]
json = ["serde", "serde_json"]
json = ["serde", "serde_json", "enum-map/serde"]

# runtimes
runtime-actix-native-tls = [
Expand Down Expand Up @@ -185,6 +185,7 @@ hashlink = "0.8.0"
indexmap = "1.6.0"
hkdf = { version = "0.12.0", optional = true }
event-listener = "2.5.2"
enum-map = "2.4.0"

[dev-dependencies]
sqlx = { version = "0.5.12", path = "..", features = ["postgres", "sqlite", "mysql"] }
Expand Down
70 changes: 58 additions & 12 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::future::Future;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use std::sync::Arc;

use crate::pool::metrics::{AcquirePhase, PoolMetricsCollector};
use crate::pool::options::PoolConnectionMetadata;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -178,13 +179,22 @@ impl<DB: Database> PoolInner<DB> {
return Err(Error::PoolClosed);
}

let deadline = Instant::now() + self.options.acquire_timeout;
let metrics = &self.options.metrics;

sqlx_rt::timeout(
let acquire_start = Instant::now();

let deadline = acquire_start + self.options.acquire_timeout;

let mut phase = AcquirePhase::Waiting;

let res = sqlx_rt::timeout(
self.options.acquire_timeout,
async {
loop {
phase = AcquirePhase::Waiting;
let waiting_start = Instant::now();
let permit = self.semaphore.acquire(1).await;
metrics.permit_wait_time(waiting_start.elapsed());

if self.is_closed() {
return Err(Error::PoolClosed);
Expand All @@ -194,7 +204,7 @@ impl<DB: Database> PoolInner<DB> {
let guard = match self.pop_idle(permit) {

// Then, check that we can use it...
Ok(conn) => match check_idle_conn(conn, &self.options).await {
Ok(conn) => match check_idle_conn(conn, &self.options, &mut phase).await {

// All good!
Ok(live) => return Ok(live),
Expand All @@ -207,24 +217,40 @@ impl<DB: Database> PoolInner<DB> {
// we can open a new connection
guard
} else {
// I can't imagine this occurring unless there's a race condition where
// the number of available permits can exceed the max size
// without the pool being closed.
//
// If this does happen, the safest thing to do is return to the top
// and wait for another permit.
log::debug!("woke but was unable to acquire idle connection or open new one; retrying");
continue;
}
};

// Attempt to connect...
return self.connect(deadline, guard).await;
return self.connect(deadline, guard, &mut phase).await;
}
}
)
.await
.map_err(|_| Error::PoolTimedOut)?
.map_err(|_| {
metrics.acquire_timed_out(phase);
Error::PoolTimedOut
})?;

if res.is_ok() {
metrics.connection_acquired(acquire_start.elapsed());
}

res
}

pub(super) async fn connect(
self: &Arc<Self>,
deadline: Instant,
guard: DecrementSizeGuard<DB>,
phase: &mut AcquirePhase,
) -> Result<Floating<DB, Live<DB>>, Error> {
if self.is_closed() {
return Err(Error::PoolClosed);
Expand All @@ -238,16 +264,20 @@ impl<DB: Database> PoolInner<DB> {

// result here is `Result<Result<C, Error>, TimeoutError>`
// if this block does not return, sleep for the backoff timeout and try again

*phase = AcquirePhase::Connecting;
match sqlx_rt::timeout(timeout, self.connect_options.connect()).await {
// successfully established connection
Ok(Ok(mut raw)) => {
// See comment on `PoolOptions::after_connect`
let meta = PoolConnectionMetadata {
age: Duration::ZERO,
idle_for: Duration::ZERO,
};

let res = if let Some(callback) = &self.options.after_connect {
*phase = AcquirePhase::AfterConnectCallback;

// See comment on `PoolOptions::after_connect`
let meta = PoolConnectionMetadata {
age: Duration::ZERO,
idle_for: Duration::ZERO,
};

callback(&mut raw, meta).await
} else {
Ok(())
Expand All @@ -258,6 +288,7 @@ impl<DB: Database> PoolInner<DB> {
Err(e) => {
log::error!("error returned from after_connect: {:?}", e);
// The connection is broken, don't try to close nicely.
*phase = AcquirePhase::ClosingInvalidConnection;
let _ = raw.close_hard().await;

// Fall through to the backoff.
Expand All @@ -279,6 +310,8 @@ impl<DB: Database> PoolInner<DB> {
Err(_) => return Err(Error::PoolTimedOut),
}

*phase = AcquirePhase::Backoff;

// If the connection is refused, wait in exponentially
// increasing steps for the server to come up,
// capped by a factor of the remaining time until the deadline
Expand Down Expand Up @@ -310,7 +343,10 @@ impl<DB: Database> PoolInner<DB> {

// We skip `after_release` since the connection was never provided to user code
// besides `after_connect`, if they set it.
self.release(self.connect(deadline, guard).await?);
self.release(
self.connect(deadline, guard, &mut AcquirePhase::Connecting)
.await?,
);
}

Ok(())
Expand Down Expand Up @@ -351,16 +387,22 @@ fn is_beyond_idle_timeout<DB: Database>(idle: &Idle<DB>, options: &PoolOptions<D
async fn check_idle_conn<DB: Database>(
mut conn: Floating<DB, Idle<DB>>,
options: &PoolOptions<DB>,
phase: &mut AcquirePhase,
) -> Result<Floating<DB, Live<DB>>, DecrementSizeGuard<DB>> {
// If the connection we pulled has expired, close the connection and
// immediately create a new connection
if is_beyond_max_lifetime(&conn, options) {
*phase = AcquirePhase::ClosingInvalidConnection;
return Err(conn.close().await);
}

if options.test_before_acquire {
*phase = AcquirePhase::TestBeforeAcquire;

// Check that the connection is still live
if let Err(e) = conn.ping().await {
*phase = AcquirePhase::ClosingInvalidConnection;

// an error here means the other end has hung up or we lost connectivity
// either way we're fine to just discard the connection
// the error itself here isn't necessarily unexpected so WARN is too strong
Expand All @@ -371,16 +413,20 @@ async fn check_idle_conn<DB: Database>(
}

if let Some(test) = &options.before_acquire {
*phase = AcquirePhase::BeforeAcquireCallback;

let meta = conn.metadata();
match test(&mut conn.live.raw, meta).await {
Ok(false) => {
// connection was rejected by user-defined hook, close nicely
*phase = AcquirePhase::ClosingInvalidConnection;
return Err(conn.close().await);
}

Err(error) => {
log::warn!("error from `before_acquire`: {}", error);
// connection is broken so don't try to close nicely
*phase = AcquirePhase::ClosingInvalidConnection;
return Err(conn.close_hard().await);
}

Expand Down
150 changes: 150 additions & 0 deletions sqlx-core/src/pool/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
//! Metrics collection utilities for [`Pool`][crate::pool::Pool].
//!
//!
use std::sync::Arc;
use std::time::Duration;

// Saves a bunch of redundant links in docs.
// Just `#[cfg(doc)]` doesn't work for some reason.
#[cfg_attr(not(doc), allow(unused_imports))]
use {
crate::connection::Connection,
crate::pool::{Pool, PoolOptions},
};

mod simple;

pub use simple::{
AcquireTimeoutsPerPhase, SimplePoolMetrics, SimplePoolMetricsSnapshot, SimpleTimingStats,
};

/// Describes a type that can collect metrics from [`Pool`].
///
/// You can set the metrics collector for a `Pool` instance using [`PoolOptions::metrics_collector`].
///
/// For an easy-start implementation, see [`SimplePoolMetrics`].
///
/// All methods on this trait have provided impls so you can override just the ones you care about.
pub trait PoolMetricsCollector: Send + Sync + 'static {
/// Record when [`Pool::acquire()`] is called.
fn acquire_called(&self) {}

/// Record how long a [`Pool::acquire()`] call waited for a semaphore permit.
///
/// This is the first stage of `acquire()` and gives the call the right-of-way to either
/// pop a connection from the idle queue or open a new one.
///
/// This time is likely to increase as the pool comes under higher and higher load,
/// and will asymptotically approach the [acquire timeout][PoolOptions::acquire_timeout].
///
/// If `acquire()` times out while waiting for a permit, this method will not be called.
/// You will get an <code>acquire_timed_out([AcquirePhase::Waiting])</code> call instead.
///
/// [acquire_timed_out]: Self::acquire_timed_out
fn permit_wait_time(&self, duration: Duration) {
drop(duration);
}

/// Record when [`Pool::acquire()`] times out as governed by [`PoolOptions::acquire_timeout`].
///
/// `acquire()` has several internal asynchronous operations that it may time out on.
/// The given [`AcquirePhase`] tells you which one timed out.
fn acquire_timed_out(&self, phase: AcquirePhase) {
drop(phase);
}

/// Record when a connection is successfully acquired.
fn connection_acquired(&self, total_wait: Duration) {
drop(total_wait);
}
}

macro_rules! opt_delegate {
($receiver:ident.$method:ident $( ( $($arg:expr),*) )?) => {
if let Some(this) = $receiver {
this.$method($( $($arg),* )?);
}
}
}

#[doc(hidden)]
impl PoolMetricsCollector for Option<Arc<dyn PoolMetricsCollector>> {
fn acquire_called(&self) {
opt_delegate!(self.acquire_called());
}

fn permit_wait_time(&self, duration: Duration) {
opt_delegate!(self.permit_wait_time(duration));
}

fn acquire_timed_out(&self, phase: AcquirePhase) {
opt_delegate!(self.acquire_timed_out(phase));
}

fn connection_acquired(&self, total_wait: Duration) {
opt_delegate!(self.connection_acquired(total_wait));
}
}

/// The phase that [`Pool::acquire()`] was in when it timed out.
///
/// [`Pool::acquire()`] has several internal asynchronous operations, any of which may lead
/// to it timing out. Which phases are executed depends on multiple things:
///
/// * The pool's configuration.
/// * If an idle connection was available or not.
/// * If there is room in the pool for a new connection.
///
/// ### Note: Some Trait impls are Unstable
/// The `enum_map` trait impls are *not* considered part of the stable API.
/// They would not be listed in documentation if it was possible to tell the derive to hide them.
///
/// We reserve the right to update `enum_map` to a non-compatible version if necessary.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, enum_map::Enum)]
#[cfg_attr(feature = "json", derive(serde::Serialize))]
#[non_exhaustive]
pub enum AcquirePhase {
/// Initial [`Pool::acquire()`] phase: waiting for a semaphore permit.
///
/// A permit represents the privilege to acquire a connection, either by popping one
/// from the idle queue or opening a new one.
Waiting,

/// `acquire()` found an idle connection. It then calls [`Connection::ping()`] on it.
///
/// Only done if [`PoolOptions::test_before_acquire`] is `true` (enabled by default).
TestBeforeAcquire,

/// `acquire()` found an idle connection and the `TestBeforeAcquire` phase succeeded
/// or was skipped.
///
/// It then invokes the user-defined [`before_acquire`][PoolOptions::before_acquire] callback, if set.
BeforeAcquireCallback,

/// `acquire()` found an idle connection but decided to close it.
///
/// This may have happened for any of the following reasons:
/// * The connection's age exceeded [`PoolOptions::max_lifetime`].
/// * The `TestBeforeAcquire` phase failed.
/// * The `BeforeAcquireCallback` errored or rejected the connection.
/// * A new connection was opened but the `AfterConnectCallback` phase errored.
ClosingInvalidConnection,

/// `acquire()` either did not find an idle connection or the connection it got failed
/// the `TestBeforeAcquire` or `BeforeAcquireCallback` phase and was closed.
///
/// It then attempted to open a new connection.
Connecting,

/// `acquire()` successfully opened a new connection.
///
/// It then invokes the user-defined [`after_connect`][PoolOptions::after_connect] callback, if set.
AfterConnectCallback,

/// `acquire()` failed to open a new connection or the connection failed the
/// `AfterConnectCallback` phase.
///
/// It then waits in a backoff loop before attempting to open another connection.
Backoff,
}
Loading

0 comments on commit 0f6b547

Please sign in to comment.