diff --git a/Cargo.lock b/Cargo.lock index 9c97ad8262..f3a7c1bf2d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -761,6 +761,27 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "enum-map" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ddfe61e8040145222887d0d32a939c70c8cae681490d72fb868305e9b40ced8" +dependencies = [ + "enum-map-derive", + "serde", +] + +[[package]] +name = "enum-map-derive" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00d1c54e25a57236a790ecf051c2befbb57740c9b86c4273eac378ba84d620d6" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "env_logger" version = "0.9.0" @@ -2367,6 +2388,7 @@ dependencies = [ "dirs", "either", "encoding_rs", + "enum-map", "event-listener", "flume", "futures-channel", diff --git a/sqlx-core/Cargo.toml b/sqlx-core/Cargo.toml index 96cca286fa..aeaad23b1c 100644 --- a/sqlx-core/Cargo.toml +++ b/sqlx-core/Cargo.toml @@ -62,7 +62,7 @@ all-types = [ ] bigdecimal = ["bigdecimal_", "num-bigint"] decimal = ["rust_decimal", "num-bigint"] -json = ["serde", "serde_json"] +json = ["serde", "serde_json", "enum-map/serde"] # runtimes runtime-actix-native-tls = [ @@ -185,6 +185,7 @@ hashlink = "0.8.0" indexmap = "1.6.0" hkdf = { version = "0.12.0", optional = true } event-listener = "2.5.2" +enum-map = "2.4.0" [dev-dependencies] sqlx = { version = "0.5.12", path = "..", features = ["postgres", "sqlite", "mysql"] } diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index 86c0ec5d8c..3a473b9abe 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -13,6 +13,7 @@ use std::future::Future; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}; use std::sync::Arc; +use crate::pool::metrics::{AcquirePhase, PoolMetricsCollector}; use crate::pool::options::PoolConnectionMetadata; use std::time::{Duration, Instant}; @@ -178,13 +179,22 @@ impl PoolInner { return Err(Error::PoolClosed); } - let deadline = Instant::now() + self.options.acquire_timeout; + let metrics = &self.options.metrics; - sqlx_rt::timeout( + let acquire_start = Instant::now(); + + let deadline = acquire_start + self.options.acquire_timeout; + + let mut phase = AcquirePhase::Waiting; + + let res = sqlx_rt::timeout( self.options.acquire_timeout, async { loop { + phase = AcquirePhase::Waiting; + let waiting_start = Instant::now(); let permit = self.semaphore.acquire(1).await; + metrics.permit_wait_time(waiting_start.elapsed()); if self.is_closed() { return Err(Error::PoolClosed); @@ -194,7 +204,7 @@ impl PoolInner { let guard = match self.pop_idle(permit) { // Then, check that we can use it... - Ok(conn) => match check_idle_conn(conn, &self.options).await { + Ok(conn) => match check_idle_conn(conn, &self.options, &mut phase).await { // All good! Ok(live) => return Ok(live), @@ -207,24 +217,40 @@ impl PoolInner { // we can open a new connection guard } else { + // I can't imagine this occurring unless there's a race condition where + // the number of available permits can exceed the max size + // without the pool being closed. + // + // If this does happen, the safest thing to do is return to the top + // and wait for another permit. log::debug!("woke but was unable to acquire idle connection or open new one; retrying"); continue; } }; // Attempt to connect... - return self.connect(deadline, guard).await; + return self.connect(deadline, guard, &mut phase).await; } } ) .await - .map_err(|_| Error::PoolTimedOut)? + .map_err(|_| { + metrics.acquire_timed_out(phase); + Error::PoolTimedOut + })?; + + if res.is_ok() { + metrics.connection_acquired(acquire_start.elapsed()); + } + + res } pub(super) async fn connect( self: &Arc, deadline: Instant, guard: DecrementSizeGuard, + phase: &mut AcquirePhase, ) -> Result>, Error> { if self.is_closed() { return Err(Error::PoolClosed); @@ -238,16 +264,20 @@ impl PoolInner { // result here is `Result, TimeoutError>` // if this block does not return, sleep for the backoff timeout and try again + + *phase = AcquirePhase::Connecting; match sqlx_rt::timeout(timeout, self.connect_options.connect()).await { // successfully established connection Ok(Ok(mut raw)) => { - // See comment on `PoolOptions::after_connect` - let meta = PoolConnectionMetadata { - age: Duration::ZERO, - idle_for: Duration::ZERO, - }; - let res = if let Some(callback) = &self.options.after_connect { + *phase = AcquirePhase::AfterConnectCallback; + + // See comment on `PoolOptions::after_connect` + let meta = PoolConnectionMetadata { + age: Duration::ZERO, + idle_for: Duration::ZERO, + }; + callback(&mut raw, meta).await } else { Ok(()) @@ -258,6 +288,7 @@ impl PoolInner { Err(e) => { log::error!("error returned from after_connect: {:?}", e); // The connection is broken, don't try to close nicely. + *phase = AcquirePhase::ClosingInvalidConnection; let _ = raw.close_hard().await; // Fall through to the backoff. @@ -279,6 +310,8 @@ impl PoolInner { Err(_) => return Err(Error::PoolTimedOut), } + *phase = AcquirePhase::Backoff; + // If the connection is refused, wait in exponentially // increasing steps for the server to come up, // capped by a factor of the remaining time until the deadline @@ -310,7 +343,10 @@ impl PoolInner { // We skip `after_release` since the connection was never provided to user code // besides `after_connect`, if they set it. - self.release(self.connect(deadline, guard).await?); + self.release( + self.connect(deadline, guard, &mut AcquirePhase::Connecting) + .await?, + ); } Ok(()) @@ -351,16 +387,22 @@ fn is_beyond_idle_timeout(idle: &Idle, options: &PoolOptions( mut conn: Floating>, options: &PoolOptions, + phase: &mut AcquirePhase, ) -> Result>, DecrementSizeGuard> { // If the connection we pulled has expired, close the connection and // immediately create a new connection if is_beyond_max_lifetime(&conn, options) { + *phase = AcquirePhase::ClosingInvalidConnection; return Err(conn.close().await); } if options.test_before_acquire { + *phase = AcquirePhase::TestBeforeAcquire; + // Check that the connection is still live if let Err(e) = conn.ping().await { + *phase = AcquirePhase::ClosingInvalidConnection; + // an error here means the other end has hung up or we lost connectivity // either way we're fine to just discard the connection // the error itself here isn't necessarily unexpected so WARN is too strong @@ -371,16 +413,20 @@ async fn check_idle_conn( } if let Some(test) = &options.before_acquire { + *phase = AcquirePhase::BeforeAcquireCallback; + let meta = conn.metadata(); match test(&mut conn.live.raw, meta).await { Ok(false) => { // connection was rejected by user-defined hook, close nicely + *phase = AcquirePhase::ClosingInvalidConnection; return Err(conn.close().await); } Err(error) => { log::warn!("error from `before_acquire`: {}", error); // connection is broken so don't try to close nicely + *phase = AcquirePhase::ClosingInvalidConnection; return Err(conn.close_hard().await); } diff --git a/sqlx-core/src/pool/metrics/mod.rs b/sqlx-core/src/pool/metrics/mod.rs new file mode 100644 index 0000000000..3fdbb48bf0 --- /dev/null +++ b/sqlx-core/src/pool/metrics/mod.rs @@ -0,0 +1,150 @@ +//! Metrics collection utilities for [`Pool`][crate::pool::Pool]. +//! +//! + +use std::sync::Arc; +use std::time::Duration; + +// Saves a bunch of redundant links in docs. +// Just `#[cfg(doc)]` doesn't work for some reason. +#[cfg_attr(not(doc), allow(unused_imports))] +use { + crate::connection::Connection, + crate::pool::{Pool, PoolOptions}, +}; + +mod simple; + +pub use simple::{ + AcquireTimeoutsPerPhase, SimplePoolMetrics, SimplePoolMetricsSnapshot, SimpleTimingStats, +}; + +/// Describes a type that can collect metrics from [`Pool`]. +/// +/// You can set the metrics collector for a `Pool` instance using [`PoolOptions::metrics_collector`]. +/// +/// For an easy-start implementation, see [`SimplePoolMetrics`]. +/// +/// All methods on this trait have provided impls so you can override just the ones you care about. +pub trait PoolMetricsCollector: Send + Sync + 'static { + /// Record when [`Pool::acquire()`] is called. + fn acquire_called(&self) {} + + /// Record how long a [`Pool::acquire()`] call waited for a semaphore permit. + /// + /// This is the first stage of `acquire()` and gives the call the right-of-way to either + /// pop a connection from the idle queue or open a new one. + /// + /// This time is likely to increase as the pool comes under higher and higher load, + /// and will asymptotically approach the [acquire timeout][PoolOptions::acquire_timeout]. + /// + /// If `acquire()` times out while waiting for a permit, this method will not be called. + /// You will get an acquire_timed_out([AcquirePhase::Waiting]) call instead. + /// + /// [acquire_timed_out]: Self::acquire_timed_out + fn permit_wait_time(&self, duration: Duration) { + drop(duration); + } + + /// Record when [`Pool::acquire()`] times out as governed by [`PoolOptions::acquire_timeout`]. + /// + /// `acquire()` has several internal asynchronous operations that it may time out on. + /// The given [`AcquirePhase`] tells you which one timed out. + fn acquire_timed_out(&self, phase: AcquirePhase) { + drop(phase); + } + + /// Record when a connection is successfully acquired. + fn connection_acquired(&self, total_wait: Duration) { + drop(total_wait); + } +} + +macro_rules! opt_delegate { + ($receiver:ident.$method:ident $( ( $($arg:expr),*) )?) => { + if let Some(this) = $receiver { + this.$method($( $($arg),* )?); + } + } +} + +#[doc(hidden)] +impl PoolMetricsCollector for Option> { + fn acquire_called(&self) { + opt_delegate!(self.acquire_called()); + } + + fn permit_wait_time(&self, duration: Duration) { + opt_delegate!(self.permit_wait_time(duration)); + } + + fn acquire_timed_out(&self, phase: AcquirePhase) { + opt_delegate!(self.acquire_timed_out(phase)); + } + + fn connection_acquired(&self, total_wait: Duration) { + opt_delegate!(self.connection_acquired(total_wait)); + } +} + +/// The phase that [`Pool::acquire()`] was in when it timed out. +/// +/// [`Pool::acquire()`] has several internal asynchronous operations, any of which may lead +/// to it timing out. Which phases are executed depends on multiple things: +/// +/// * The pool's configuration. +/// * If an idle connection was available or not. +/// * If there is room in the pool for a new connection. +/// +/// ### Note: Some Trait impls are Unstable +/// The `enum_map` trait impls are *not* considered part of the stable API. +/// They would not be listed in documentation if it was possible to tell the derive to hide them. +/// +/// We reserve the right to update `enum_map` to a non-compatible version if necessary. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, enum_map::Enum)] +#[cfg_attr(feature = "json", derive(serde::Serialize))] +#[non_exhaustive] +pub enum AcquirePhase { + /// Initial [`Pool::acquire()`] phase: waiting for a semaphore permit. + /// + /// A permit represents the privilege to acquire a connection, either by popping one + /// from the idle queue or opening a new one. + Waiting, + + /// `acquire()` found an idle connection. It then calls [`Connection::ping()`] on it. + /// + /// Only done if [`PoolOptions::test_before_acquire`] is `true` (enabled by default). + TestBeforeAcquire, + + /// `acquire()` found an idle connection and the `TestBeforeAcquire` phase succeeded + /// or was skipped. + /// + /// It then invokes the user-defined [`before_acquire`][PoolOptions::before_acquire] callback, if set. + BeforeAcquireCallback, + + /// `acquire()` found an idle connection but decided to close it. + /// + /// This may have happened for any of the following reasons: + /// * The connection's age exceeded [`PoolOptions::max_lifetime`]. + /// * The `TestBeforeAcquire` phase failed. + /// * The `BeforeAcquireCallback` errored or rejected the connection. + /// * A new connection was opened but the `AfterConnectCallback` phase errored. + ClosingInvalidConnection, + + /// `acquire()` either did not find an idle connection or the connection it got failed + /// the `TestBeforeAcquire` or `BeforeAcquireCallback` phase and was closed. + /// + /// It then attempted to open a new connection. + Connecting, + + /// `acquire()` successfully opened a new connection. + /// + /// It then invokes the user-defined [`after_connect`][PoolOptions::after_connect] callback, if set. + AfterConnectCallback, + + /// `acquire()` failed to open a new connection or the connection failed the + /// `AfterConnectCallback` phase. + /// + /// It then waits in a backoff loop before attempting to open another connection. + Backoff, +} diff --git a/sqlx-core/src/pool/metrics/simple.rs b/sqlx-core/src/pool/metrics/simple.rs new file mode 100644 index 0000000000..4fae1ffba8 --- /dev/null +++ b/sqlx-core/src/pool/metrics/simple.rs @@ -0,0 +1,369 @@ +use std::cmp; +use std::fmt::{self, Formatter}; +use std::ops::Index; +use std::sync::atomic::{self, AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use enum_map::EnumMap; + +// Saves a bunch of redundant links in docs. +// Just `#[cfg(doc)]` doesn't work for some reason. +use crate::pool::metrics::{AcquirePhase, PoolMetricsCollector}; +#[cfg_attr(not(doc), allow(unused_imports))] +use crate::pool::{Pool, PoolOptions}; + +/// A simple but hopefully useful metrics collector for [`Pool`]. +/// +/// See [`SimplePoolMetricsSnapshot`] for the metrics collected by this implementation. +/// +/// # Example +/// This example is written for PostgreSQL and Tokio but can trivially be adapted +/// to other databases and/or async-std. +/// +/// ```no_run +/// # #[cfg(feature = "postgres")] +/// # async fn f() -> Result<(), Box> { +/// use sqlx::Executor; +/// use sqlx::postgres::PgPoolOptions; +/// use sqlx::pool::metrics::SimplePoolMetrics; +/// +/// let metrics = SimplePoolMetrics::new(); +/// +/// let pool = PgPoolOptions::new() +/// .metrics_collector(metrics.collector()) +/// .connect("postgres:// …") +/// .await?; +/// +/// tokio::spawn(async move { +/// // Post metrics every minute. +/// tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; +/// +/// // Warning: very verbose! +/// println!("current pool metrics: {:#?}", metrics.snapshot()); +/// }); +/// +/// // use `pool`... +/// +/// # Ok(()) +/// # } +/// ``` +#[derive(Clone)] +pub struct SimplePoolMetrics { + inner: Arc, +} + +/// A snapshot of metrics returned by [`SimplePoolMetrics::snapshot()`]. +/// +/// If the `json` feature is enabled, this type implements `serde::Serialize`. +#[derive(Debug, Clone)] +#[cfg_attr(feature = "json", derive(serde::Serialize))] +#[non_exhaustive] +pub struct SimplePoolMetricsSnapshot { + /// Total number of calls to [`Pool::acquire()`] when the snapshot was taken. + pub acquire_calls: u64, + + /// Statistics for the time [`Pool::acquire()`] spent in [`AcquirePhase::Waiting`]. + pub permit_wait_time: SimpleTimingStats, + + /// Statistics for the time [`Pool::acquire()`] takes to acquire a connection. + pub acquire_time: SimpleTimingStats, + + /// Total number of times [`Pool::acquire()`] timed out. + pub acquire_timeouts: u64, + + /// Total number of times [`Pool::acquire()`] timed out aggregated per [`AcquirePhase`] in which + /// the timeout occurred. + /// + /// The value type can be indexed by `AcquirePhase`. + /// + /// ```rust + /// use sqlx::pool::metrics::{AcquirePhase, SimplePoolMetrics, SimplePoolMetricsSnapshot}; + /// + /// let metrics: SimplePoolMetrics = SimplePoolMetrics::new(); + /// + /// // pass `metrics.collector()` to `PoolOptions::metrics_collector()` + /// // then construct and start using the `Pool` + /// + /// // sometime later... + /// + /// let snapshot: SimplePoolMetricsSnapshot = metrics.snapshot(); + /// + /// println!( + /// "number of times the pool timed out waiting for a permit = {}", + /// snapshot.acquire_timeouts_per_phase[AcquirePhase::Waiting] + /// ); + /// ``` + pub acquire_timeouts_per_phase: AcquireTimeoutsPerPhase, +} + +/// The statistics for an individual [`Pool`] timing metric collected by [`SimplePoolMetrics`]. +#[derive(Debug, Clone)] +#[cfg_attr(feature = "json", derive(serde::Serialize))] +#[non_exhaustive] +pub struct SimpleTimingStats { + /// The total count of samples collected for this metric. + pub sample_count: u64, + + /// The minimum time for this metric. [`Duration::ZERO`] if no samples were collected. + pub min: Duration, + + /// The average time for this metric, calculated as an [Exponential Moving Average]. + /// + /// [`Duration::ZERO`] if no samples were collected. + /// + /// The EMA coefficient is set during construction of [`SimplePoolMetrics`]. + /// See [`SimplePoolMetrics::with_ema_coefficient()`] for details. + /// + /// [Exponential Moving Average]: https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average + pub average: Duration, + + /// The maximum time for this metric. [`Duration::ZERO`] if no samples were collected. + pub max: Duration, +} + +/// Counts of [`Pool::acquire()`] timeouts aggregated per [`AcquirePhase`] in which the timeout occurred. +#[derive(Debug, Clone)] +#[cfg_attr(feature = "json", derive(serde::Serialize))] +pub struct AcquireTimeoutsPerPhase(EnumMap); + +#[derive(Default)] +struct SimpleMetricsInner { + ema_coefficient: f64, + acquire_calls: AtomicU64, + permit_wait_time: AtomicTimingStats, + acquire_time: AtomicTimingStats, + acquire_timeouts: AtomicU64, + acquire_timeouts_per_phase: EnumMap, +} + +#[derive(Default)] +struct AtomicTimingStats { + sample_count: AtomicU64, + min_nanos: AtomicU64, + average_nanos: AtomicU64, + max_nanos: AtomicU64, +} + +impl SimplePoolMetrics { + /// Construct with default settings. + pub fn new() -> SimplePoolMetrics { + // Arbitrarily chosen, but should give decent metrics. + // See the table in the docs below for details. + Self::with_ema_coefficient(0.01) + } + + /// Construct with the given coefficient for calculating [Exponential Moving Averages]. + /// + /// `ema_coefficient` is the factor `α` in the following formula: + /// + /// + /// + /// Essentially, it determines how much new samples influence the average. A smaller coefficient + /// produces a more stable but more slowly moving average, a larger coefficient produces + /// a quickly moving but chaotic average. + /// + /// The following table shows how much each sample contributes to the average + /// for some arbitrary coefficients, where the Nth sample is the latest: + /// + // Got kinda nerd sniped calculating this table, tbh. + // I was trying to demonstrate how quickly each coefficient makes old samples irrelevant. + /// | α = | 0.01 | 0.05 | 0.1 | 0.2 | 0.25 | 0.5 | + /// |-------|-------|-------|----------|---------|--------|---------| + /// | N | 1% | 5% | 10% | 20% | 25% | 50% | + /// | N-1 | 0.99% | 4.75% | 9% | 16% | 18.75% | 25% | + /// | N-2 | 0.98% | 4.51% | 8.1% | 12.8% | 14.06% | 12.5% | + /// | N-3 | 0.97% | 4.29% | 7.29% | 10.24% | 10.54% | 6.25% | + /// | N-4 | 0.96% | 4.07% | 6.56% | 8.19% | 7.91% | 3.125% | + /// | ⋮ | | ⋮ | ⋮ | ⋮ | ⋮ | ⋮ | + /// | N-10 | 0.90% | 2.99% | 3.4% | 2.15% | 1.41% | 0.049% | + /// | ⋮ | | ⋮ | ⋮ | ⋮ | ⋮ | ⋮ | + /// | N-20 | 0.82% | 1.79% | 1.22% | 0.23% | 0.079% | 4.8 ppb | + /// | ⋮ | | ⋮ | ⋮ | ⋮ | ⋮ | ⋮ | + /// | N-100 | 0.36% | 0.03% | 26.5 ppb | 0.4 ppt | <1 ppt | <1 ppt | + /// + /// For coefficients greater than ~0.19, the N-100th sample contributes less than + /// one part per trillion to the average. + /// Greater than ~0.13, less than one part per billion. + /// Greater than ~0.6, less than one part per million. + /// + /// ### Panics + /// If `ema_coefficient` is outside the range `(0, 1)` or is non-normal. + /// + /// A coefficient of zero causes the average to never change. + /// A coefficient of one causes the average to always be equal to the last sample. + /// In either case, it's no longer an average. + /// + /// [Exponential Moving Averages]: https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average + pub fn with_ema_coefficient(ema_coefficient: f64) -> Self { + assert!(ema_coefficient.is_normal()); + assert!(ema_coefficient > 0.0); + assert!(ema_coefficient < 1.0); + + SimplePoolMetrics { + inner: Arc::new(SimpleMetricsInner { + ema_coefficient, + acquire_calls: AtomicU64::new(0), + permit_wait_time: AtomicTimingStats::default(), + acquire_time: AtomicTimingStats::default(), + acquire_timeouts: AtomicU64::new(0), + acquire_timeouts_per_phase: EnumMap::default(), + }), + } + } + + /// Get the collector instance to pass to [`PoolOptions::metrics_collector()`]. + pub fn collector(&self) -> Arc { + self.inner.clone() + } + + /// Get the current count of calls to [`Pool::acquire()`]. + /// + /// If you want to inspect multiple statistics at once, + /// [`.snapshot()`][Self::snapshot] is more efficient. + pub fn acquire_calls(&self) -> u64 { + self.inner.acquire_calls.load(Ordering::Acquire) + } + + /// Get the current statistics for the time [`Pool::acquire()`] spends in [`AcquirePhase::Waiting`]. + /// + /// If you want to inspect multiple statistics at once, + /// [`.snapshot()`][Self::snapshot] is more efficient. + pub fn permit_wait_time(&self) -> SimpleTimingStats { + atomic::fence(Ordering::Acquire); + self.inner.permit_wait_time.get() + } + + /// Get the current statistics for the total time [`Pool::acquire()`] takes to get a connection. + /// + /// If you want to inspect multiple statistics at once, + /// [`.snapshot()`][Self::snapshot] is more efficient. + pub fn acquire_time(&self) -> SimpleTimingStats { + atomic::fence(Ordering::Acquire); + self.inner.acquire_time.get() + } + + /// Load the current values for all metrics. + /// + /// More efficient than calling individual getters. + pub fn snapshot(&self) -> SimplePoolMetricsSnapshot { + use Ordering::*; + + atomic::fence(Acquire); + + SimplePoolMetricsSnapshot { + acquire_calls: self.inner.acquire_calls.load(Relaxed), + permit_wait_time: self.inner.permit_wait_time.get(), + acquire_time: self.inner.acquire_time.get(), + acquire_timeouts: self.inner.acquire_timeouts.load(Relaxed), + acquire_timeouts_per_phase: AcquireTimeoutsPerPhase( + self.inner + .acquire_timeouts_per_phase + .iter() + .map(|(phase, count)| (phase, count.load(Relaxed))) + .collect(), + ), + } + } +} + +/// Debug-prints the current metrics as determined by [`Self::snapshot()`]. +impl fmt::Debug for SimplePoolMetrics { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("SimplePoolMetrics") + .field("current", &self.snapshot()) + .finish() + } +} + +impl PoolMetricsCollector for SimpleMetricsInner { + fn acquire_called(&self) { + self.acquire_calls.fetch_add(1, Ordering::AcqRel); + } + + fn permit_wait_time(&self, duration: Duration) { + self.permit_wait_time.update(self.ema_coefficient, duration) + } + + fn acquire_timed_out(&self, phase: AcquirePhase) { + self.acquire_timeouts.fetch_add(1, Ordering::AcqRel); + self.acquire_timeouts_per_phase[phase].fetch_add(1, Ordering::AcqRel); + } + + fn connection_acquired(&self, total_wait: Duration) { + self.acquire_time.update(self.ema_coefficient, total_wait); + } +} + +impl AtomicTimingStats { + fn update(&self, ema_coefficient: f64, time_sample: Duration) { + use Ordering::*; + + // If your application triggers this assert then either an `.elapsed()` call overflowed or + // you somehow kept it running for ~585 years, so congratulate yourself on a job well done. + let nanos: u64 = time_sample + .as_nanos() + .try_into() + .expect("BUG: `duration` is too large!"); + + // Since this is just collecting some statistics, consistency isn't *too* important. + // We use relaxed orderings for all internal updates and just emit a single fence to + // get some semblance of synchronization. + atomic::fence(Acquire); + + self.sample_count.fetch_add(1, Relaxed); + + let _ = self.min_nanos.fetch_update(Relaxed, Relaxed, |prev| { + if prev == 0 { + // If our minimum is exactly zero, then we likely haven't collected any samples yet. + return Some(nanos); + } + + Some(cmp::min(prev, nanos)) + }); + + let _ = self + .average_nanos + .fetch_update(Relaxed, Relaxed, |average| { + if average == 0 { + // If we don't have an average, just use our first sample. + return Some(nanos); + } + + // Exponential Moving Average algorithm + Some( + ((nanos as f64 * ema_coefficient) + (average as f64 * (1.0 - ema_coefficient))) + as u64, + ) + }); + + let _ = self + .max_nanos + .fetch_update(Relaxed, Relaxed, |prev| Some(cmp::max(prev, nanos))); + + // Suggest that our update be published to main memory. + atomic::fence(Release); + } + + /// Assumes an atomic fence is issued first. + fn get(&self) -> SimpleTimingStats { + use Ordering::*; + + SimpleTimingStats { + sample_count: self.sample_count.load(Relaxed), + min: Duration::from_nanos(self.min_nanos.load(Relaxed)), + average: Duration::from_nanos(self.average_nanos.load(Relaxed)), + max: Duration::from_nanos(self.max_nanos.load(Relaxed)), + } + } +} + +impl Index for AcquireTimeoutsPerPhase { + type Output = u64; + + fn index(&self, index: AcquirePhase) -> &u64 { + &self.0[index] + } +} diff --git a/sqlx-core/src/pool/mod.rs b/sqlx-core/src/pool/mod.rs index 28a9e1217a..02217210dd 100644 --- a/sqlx-core/src/pool/mod.rs +++ b/sqlx-core/src/pool/mod.rs @@ -89,6 +89,8 @@ mod connection; mod inner; mod options; +pub mod metrics; + pub use self::connection::PoolConnection; pub(crate) use self::maybe::MaybePoolConnection; pub use self::options::{PoolConnectionMetadata, PoolOptions}; diff --git a/sqlx-core/src/pool/options.rs b/sqlx-core/src/pool/options.rs index 3e951cb292..66d0865475 100644 --- a/sqlx-core/src/pool/options.rs +++ b/sqlx-core/src/pool/options.rs @@ -2,9 +2,11 @@ use crate::connection::Connection; use crate::database::Database; use crate::error::Error; use crate::pool::inner::PoolInner; +use crate::pool::metrics::PoolMetricsCollector; use crate::pool::Pool; use futures_core::future::BoxFuture; use std::fmt::{self, Debug, Formatter}; +use std::sync::Arc; use std::time::{Duration, Instant}; /// Configuration options for [`Pool`][super::Pool]. @@ -72,6 +74,7 @@ pub struct PoolOptions { + Sync, >, >, + pub(crate) metrics: Option>, pub(crate) max_connections: u32, pub(crate) acquire_timeout: Duration, pub(crate) min_connections: u32, @@ -115,6 +118,7 @@ impl PoolOptions { after_connect: None, before_acquire: None, after_release: None, + metrics: None, test_before_acquire: true, // A production application will want to set a higher limit than this. max_connections: 10, @@ -256,6 +260,7 @@ impl PoolOptions { /// This example is written for PostgreSQL but can likely be adapted to other databases. /// /// ```no_run + /// # #[cfg(feature = "postgres")] /// # async fn f() -> Result<(), Box> { /// use sqlx::Executor; /// use sqlx::postgres::PgPoolOptions; @@ -310,6 +315,7 @@ impl PoolOptions { /// /// This example is written for Postgres but should be trivially adaptable to other databases. /// ```no_run + /// # #[cfg(feature = "postgres")] /// # async fn f() -> Result<(), Box> { /// use sqlx::{Connection, Executor}; /// use sqlx::postgres::PgPoolOptions; @@ -362,6 +368,7 @@ impl PoolOptions { /// which is only allowed for superusers. /// /// ```no_run + /// # #[cfg(feature = "postgres")] /// # async fn f() -> Result<(), Box> { /// use sqlx::{Connection, Executor}; /// use sqlx::postgres::PgPoolOptions; @@ -398,6 +405,18 @@ impl PoolOptions { self } + /// Hook in a custom metrics collector for the pool. + /// + /// See [`PoolMetricsCollector`] for details or [`SimplePoolMetrics`] for an easy start. + /// + /// [`SimplePoolMetrics`]: crate::pool::metrics::SimplePoolMetrics + pub fn metrics_collector(self, collector: Arc) -> Self { + Self { + metrics: Some(collector), + ..self + } + } + /// Create a new pool from this `PoolOptions` and immediately open at least one connection. /// /// This ensures the configuration is correct.