From ffb529533790f01b348d9a0e6a073de8222a039d Mon Sep 17 00:00:00 2001 From: Georg Semmler Date: Fri, 18 Aug 2023 14:06:47 +0200 Subject: [PATCH 1/3] Introduce support for connection instrumentation This commit adds functionality that allows to add a relatively fine instrumentation to our connection types by providing an essentially call back based pattern for instrumentation. The implemented setup calls the provided instrumentation type with different events. This allows the instrumentation to decide on it's own which events are important and which are unimportant. It also enables to skip most of the work (like constructing the sql of an inspected query) if the event is not handled as we just pass down an opaque wrapper that can be evaluated by the instrumentation implementation. This commit includes: * A default instrumentation implementation that does nothing * A global way to set the instrumentation implementation used by new connections * A connection specific setter to change the instrumentation implementation for a specific connection * A wild card instrumentation implementation for closures that accept the event type This commit does not include any "advanced" instrumentation implementations (based on `log` or `tracing`, etc). The idea is that these live in their own crates as it is might depend on the actual use case how the different events should be handled. The implementation of `InstrumentationEvent` is decoupled form specific backend types to allow reusing the same instrumentation for different connection types. The definition of `Instrumentation` does not depend on any connection specific stuff so that it is possible to use the same implementation for `diesel-async` as well. --- diesel/Cargo.toml | 2 +- diesel/src/connection/instrumentation.rs | 304 +++++++++++++++++++ diesel/src/connection/mod.rs | 17 ++ diesel/src/connection/statement_cache.rs | 8 + diesel/src/connection/transaction_manager.rs | 35 ++- diesel/src/mysql/connection/mod.rs | 128 ++++++-- diesel/src/pg/connection/cursor.rs | 8 + diesel/src/pg/connection/mod.rs | 137 +++++++-- diesel/src/r2d2.rs | 8 + diesel/src/sqlite/connection/mod.rs | 103 +++++-- diesel/src/sqlite/connection/stmt.rs | 42 ++- diesel_derives/src/multiconnection.rs | 30 ++ diesel_derives/tests/multiconnection.rs | 5 + diesel_derives/tests/selectable.rs | 2 +- diesel_tests/tests/instrumentation.rs | 231 ++++++++++++++ diesel_tests/tests/lib.rs | 1 + 16 files changed, 976 insertions(+), 85 deletions(-) create mode 100644 diesel/src/connection/instrumentation.rs create mode 100644 diesel_tests/tests/instrumentation.rs diff --git a/diesel/Cargo.toml b/diesel/Cargo.toml index e754f195a8ae..06e99b2bbda6 100644 --- a/diesel/Cargo.toml +++ b/diesel/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "diesel" -version = "2.1.1" +version = "2.1.4" license = "MIT OR Apache-2.0" description = "A safe, extensible ORM and Query Builder for PostgreSQL, SQLite, and MySQL" readme = "README.md" diff --git a/diesel/src/connection/instrumentation.rs b/diesel/src/connection/instrumentation.rs new file mode 100644 index 000000000000..a7444425022b --- /dev/null +++ b/diesel/src/connection/instrumentation.rs @@ -0,0 +1,304 @@ +use std::fmt::Debug; +use std::fmt::Display; +use std::num::NonZeroU32; +use std::ops::DerefMut; + +static GLOBAL_INSTRUMENTATION: std::sync::RwLock Option>> = + std::sync::RwLock::new(default_instrumentation); + +pub trait DebugQuery: Debug + Display {} + +impl DebugQuery for crate::query_builder::DebugQuery<'_, T, DB> where Self: Debug + Display {} + +/// A helper type that allows printing out str slices +/// +/// This type is necessary because it's not possible +/// to cast from a reference of a unsized type like `&str` +/// to a reference of a trait object even if that +/// type implements all necessary traits +#[diesel_derives::__diesel_public_if( + feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes" +)] +pub(crate) struct StrQueryHelper<'a> { + s: &'a str, +} + +impl<'a> StrQueryHelper<'a> { + /// Construct a new `StrQueryHelper` + #[diesel_derives::__diesel_public_if( + feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes" + )] + pub(crate) fn new(s: &'a str) -> Self { + Self { s } + } +} + +impl Debug for StrQueryHelper<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Debug::fmt(self.s, f) + } +} + +impl Display for StrQueryHelper<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Display::fmt(&self.s, f) + } +} + +impl DebugQuery for StrQueryHelper<'_> {} + +/// This enum describes possible connection events +/// that can be handled by an [`Instrumentation`] implementation +/// +/// Some fields might contain sensitive information, like login +/// details for the database. +/// +/// Diesel does not guarantee that future versions will +/// emit the same events in the same order or timing. +/// In addition the output of the [`Debug`] and [`Display`] +/// implementation of the enum itself and any of its fields +/// is not guarantee to be stable. +// +// This types is carefully designed +// to avoid any potential overhead by +// taking references for all things +// and by not performing any additional +// work until required. +#[derive(Debug)] +#[non_exhaustive] +pub enum InstrumentationEvent<'a> { + /// An event emitted by before starting + /// establishing a new connection + #[non_exhaustive] + StartEstablishConnection { + /// The database url the connection + /// tries to connect to + /// + /// This might contain sensitive information + /// like the database password + url: &'a str, + }, + /// An event emitted after establishing a + /// new connection + #[non_exhaustive] + FinishEstablishConnection { + /// The database url the connection + /// tries is connected to + /// + /// This might contain sensitive information + /// like the database password + url: &'a str, + /// An optional error if the connection failed + error: Option<&'a crate::result::ConnectionError>, + }, + /// An event that is emitted before executing + /// a query + #[non_exhaustive] + StartQuery { + /// A opaque representation of the query + /// + /// This type implements [`Debug`] and [`Display`], + /// but should be considered otherwise as opaque. + /// + /// The exact output of the [`Debug`] and [`Display`] + /// implementation is not considered as part of the + /// stable API. + query: &'a dyn DebugQuery, + }, + /// An event that is emitted when a query + /// is cached in the connection internal + /// prepared statement cache + #[non_exhaustive] + CacheQuery { + /// SQL string of the cached query + sql: &'a str, + }, + /// An event that is emitted after executing + /// a query + #[non_exhaustive] + FinishQuery { + /// A opaque representation of the query + /// + /// This type implements [`Debug`] and [`Display`], + /// but should be considered otherwise as opaque. + /// + /// The exact output of the [`Debug`] and [`Display`] + /// implementation is not considered as part of the + /// stable API. + query: &'a dyn DebugQuery, + /// An optional error if the connection failed + error: Option<&'a crate::result::Error>, + }, + /// An event that is emitted while + /// starting a new transaction + #[non_exhaustive] + BeginTransaction { + /// Transaction level of the newly started + /// transaction + depth: NonZeroU32, + }, + /// An event that is emitted while + /// committing a transaction + #[non_exhaustive] + CommitTransaction { + /// Transaction level of the to be committed + /// transaction + depth: NonZeroU32, + }, + /// An event that is emitted while + /// rolling back a transaction + #[non_exhaustive] + RollbackTransaction { + /// Transaction level of the to be rolled + /// back transaction + depth: NonZeroU32, + }, +} + +// these constructors exist to +// keep `#[non_exhaustive]` on all the variants +// and to gate the constructors on the unstable feature +impl<'a> InstrumentationEvent<'a> { + /// Create a new `InstrumentationEvent::StartEstablishConnection` event + #[cfg(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes")] + pub fn start_establish_connection(url: &'a str) -> Self { + Self::StartEstablishConnection { url } + } + + /// Create a new `InstrumentationEvent::FinishEstablishConnection` event + #[cfg(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes")] + pub fn finish_establish_connection( + url: &'a str, + error: Option<&'a crate::result::ConnectionError>, + ) -> Self { + Self::FinishEstablishConnection { url, error } + } + + /// Create a new `InstrumentationEvent::StartQuery` event + #[cfg(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes")] + pub fn start_query(query: &'a dyn DebugQuery) -> Self { + Self::StartQuery { query } + } + + /// Create a new `InstrumentationEvent::CacheQuery` event + #[cfg(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes")] + pub fn cache_query(sql: &'a str) -> Self { + Self::CacheQuery { sql } + } + + /// Create a new `InstrumentationEvent::FinishQuery` event + #[cfg(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes")] + pub fn finish_query( + query: &'a dyn DebugQuery, + error: Option<&'a crate::result::Error>, + ) -> Self { + Self::FinishQuery { query, error } + } + + /// Create a new `InstrumentationEvent::BeginTransaction` event + #[cfg(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes")] + pub fn begin_transaction(depth: NonZeroU32) -> Self { + Self::BeginTransaction { depth } + } + + /// Create a new `InstrumentationEvent::RollbackTransaction` event + #[cfg(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes")] + pub fn rollback_transaction(depth: NonZeroU32) -> Self { + Self::RollbackTransaction { depth } + } + + /// Create a new `InstrumentationEvent::CommitTransaction` event + #[cfg(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes")] + pub fn commit_transaction(depth: NonZeroU32) -> Self { + Self::CommitTransaction { depth } + } +} + +/// A type that provides an connection `Instrumentation` +/// +/// This trait is the basic building block for logging or +/// otherwise instrumenting diesel connection types. It +/// acts as callback that receives information about certain +/// important connection states +/// +/// For simple usages this trait is implemented for closures +/// accepting a [`InstrumentationEvent`] as argument. +/// +/// More complex usages and integrations with frameworks like +/// `tracing` and `log` are supposed to be part of their own +/// crates. +pub trait Instrumentation: Send + 'static { + /// The function that is invoced for each event + fn on_connection_event(&mut self, event: InstrumentationEvent<'_>); +} + +fn default_instrumentation() -> Option> { + None +} + +/// Get an instance of the default [`Instrumentation`] +/// +/// This function is mostly useful for crates implementing +/// their own connection types +pub fn get_default_instrumentation() -> Option> { + match GLOBAL_INSTRUMENTATION.read() { + Ok(f) => (*f)(), + Err(_) => None, + } +} + +/// Set a custom constructor for the default [`Instrumentation`] +/// used by new connections +/// +/// ```rust +/// use diesel::connection::{set_default_instrumentation, Instrumentation, InstrumentationEvent}; +/// +/// // a simple logger that prints all events to stdout +/// fn simple_logger() -> Option> { +/// // we need the explicit argument type there due +/// // to bugs in rustc +/// Some(Box::new(|event: InstrumentationEvent<'_>| println!("{event:?}"))) +/// } +/// +/// set_default_instrumentation(simple_logger); +/// ``` +pub fn set_default_instrumentation( + default: fn() -> Option>, +) -> crate::QueryResult<()> { + match GLOBAL_INSTRUMENTATION.write() { + Ok(mut l) => { + *l = default; + Ok(()) + } + Err(e) => Err(crate::result::Error::DatabaseError( + crate::result::DatabaseErrorKind::Unknown, + Box::new(e.to_string()), + )), + } +} + +impl Instrumentation for F +where + F: FnMut(InstrumentationEvent<'_>) + Send + 'static, +{ + fn on_connection_event(&mut self, event: InstrumentationEvent<'_>) { + (self)(event) + } +} + +impl Instrumentation for Box { + fn on_connection_event(&mut self, event: InstrumentationEvent<'_>) { + self.deref_mut().on_connection_event(event) + } +} + +impl Instrumentation for Option +where + T: Instrumentation, +{ + fn on_connection_event(&mut self, event: InstrumentationEvent<'_>) { + if let Some(i) = self { + i.on_connection_event(event) + } + } +} diff --git a/diesel/src/connection/mod.rs b/diesel/src/connection/mod.rs index 788014c0f927..af819cad8c9a 100644 --- a/diesel/src/connection/mod.rs +++ b/diesel/src/connection/mod.rs @@ -1,5 +1,6 @@ //! Types related to database connections +pub(crate) mod instrumentation; #[cfg(all( not(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"), any(feature = "sqlite", feature = "postgres", feature = "mysql") @@ -15,6 +16,11 @@ use crate::query_builder::{Query, QueryFragment, QueryId}; use crate::result::*; use std::fmt::Debug; +#[doc(inline)] +pub use self::instrumentation::{ + get_default_instrumentation, set_default_instrumentation, Instrumentation, InstrumentationEvent, +}; +#[doc(inline)] pub use self::transaction_manager::{ AnsiTransactionManager, InTransactionStatus, TransactionDepthChange, TransactionManager, TransactionManagerStatus, ValidTransactionManagerStatus, @@ -28,6 +34,9 @@ pub(crate) use self::private::ConnectionSealed; #[cfg(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes")] pub use self::private::MultiConnectionHelper; +#[cfg(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes")] +pub use self::instrumentation::StrQueryHelper; + #[cfg(all( not(feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"), any(feature = "sqlite", feature = "postgres", feature = "mysql") @@ -381,6 +390,14 @@ where fn transaction_state( &mut self, ) -> &mut >::TransactionStateData; + + #[diesel_derives::__diesel_public_if( + feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes" + )] + fn instrumentation(&mut self) -> &mut dyn Instrumentation; + + /// Set a specific [`Instrumentation`] implementation for this connection + fn set_instrumentation(&mut self, instrumentation: impl Instrumentation); } /// The specific part of a [`Connection`] which actually loads data from the database diff --git a/diesel/src/connection/statement_cache.rs b/diesel/src/connection/statement_cache.rs index caf38bb9936b..2d4cd7ef4a18 100644 --- a/diesel/src/connection/statement_cache.rs +++ b/diesel/src/connection/statement_cache.rs @@ -99,9 +99,12 @@ use std::hash::Hash; use std::ops::{Deref, DerefMut}; use crate::backend::Backend; +use crate::connection::InstrumentationEvent; use crate::query_builder::*; use crate::result::QueryResult; +use super::Instrumentation; + /// A prepared statement cache #[allow(missing_debug_implementations, unreachable_pub)] #[cfg_attr( @@ -184,6 +187,7 @@ where backend: &DB, bind_types: &[DB::TypeMetadata], mut prepare_fn: F, + instrumentation: &mut dyn Instrumentation, ) -> QueryResult> where T: QueryFragment + QueryId, @@ -195,6 +199,7 @@ where backend, bind_types, &mut prepare_fn, + instrumentation, ) } @@ -206,6 +211,7 @@ where backend: &DB, bind_types: &[DB::TypeMetadata], prepare_fn: &mut dyn FnMut(&str, PrepareForCache) -> QueryResult, + instrumentation: &mut dyn Instrumentation, ) -> QueryResult> { use std::collections::hash_map::Entry::{Occupied, Vacant}; @@ -221,6 +227,8 @@ where Vacant(entry) => { let statement = { let sql = entry.key().sql(source, backend)?; + instrumentation + .on_connection_event(InstrumentationEvent::CacheQuery { sql: &sql }); prepare_fn(&sql, PrepareForCache::Yes) }; diff --git a/diesel/src/connection/transaction_manager.rs b/diesel/src/connection/transaction_manager.rs index de9ba5e7cca4..703d51098991 100644 --- a/diesel/src/connection/transaction_manager.rs +++ b/diesel/src/connection/transaction_manager.rs @@ -336,12 +336,19 @@ where fn begin_transaction(conn: &mut Conn) -> QueryResult<()> { let transaction_state = Self::get_transaction_state(conn)?; - let start_transaction_sql = match transaction_state.transaction_depth() { + let transaction_depth = transaction_state.transaction_depth(); + let start_transaction_sql = match transaction_depth { None => Cow::from("BEGIN"), Some(transaction_depth) => { Cow::from(format!("SAVEPOINT diesel_savepoint_{transaction_depth}")) } }; + let depth = transaction_depth + .and_then(|d| d.checked_add(1)) + .unwrap_or(NonZeroU32::new(1).expect("It's not 0")); + conn.instrumentation().on_connection_event( + super::instrumentation::InstrumentationEvent::BeginTransaction { depth }, + ); conn.batch_execute(&start_transaction_sql)?; Self::get_transaction_state(conn)? .change_transaction_depth(TransactionDepthChange::IncreaseDepth)?; @@ -371,6 +378,12 @@ where ), None => return Err(Error::NotInTransaction), }; + let depth = transaction_state + .transaction_depth() + .expect("We know that we are in a transaction here"); + conn.instrumentation().on_connection_event( + super::instrumentation::InstrumentationEvent::RollbackTransaction { depth }, + ); match conn.batch_execute(&rollback_sql) { Ok(()) => { @@ -449,6 +462,12 @@ where false, ), }; + let depth = transaction_state + .transaction_depth() + .expect("We know that we are in a transaction here"); + conn.instrumentation().on_connection_event( + super::instrumentation::InstrumentationEvent::CommitTransaction { depth }, + ); match conn.batch_execute(&commit_sql) { Ok(()) => { match Self::get_transaction_state(conn)? @@ -500,6 +519,7 @@ mod test { // Mock connection. mod mock { use crate::connection::transaction_manager::AnsiTransactionManager; + use crate::connection::Instrumentation; use crate::connection::{ Connection, ConnectionSealed, SimpleConnection, TransactionManager, }; @@ -512,6 +532,7 @@ mod test { pub(crate) next_batch_execute_results: VecDeque>, pub(crate) top_level_requires_rollback_after_next_batch_execute: bool, transaction_state: AnsiTransactionManager, + instrumentation: Option>, } impl SimpleConnection for MockConnection { @@ -542,6 +563,7 @@ mod test { next_batch_execute_results: VecDeque::new(), top_level_requires_rollback_after_next_batch_execute: false, transaction_state: AnsiTransactionManager::default(), + instrumentation: None, }) } @@ -559,6 +581,17 @@ mod test { { &mut self.transaction_state } + + fn instrumentation(&mut self) -> &mut dyn crate::connection::Instrumentation { + &mut self.instrumentation + } + + fn set_instrumentation( + &mut self, + instrumentation: impl crate::connection::Instrumentation, + ) { + self.instrumentation = Some(Box::new(instrumentation)); + } } } diff --git a/diesel/src/mysql/connection/mod.rs b/diesel/src/mysql/connection/mod.rs index f61770e005af..011558bdaed9 100644 --- a/diesel/src/mysql/connection/mod.rs +++ b/diesel/src/mysql/connection/mod.rs @@ -8,6 +8,9 @@ use self::stmt::iterator::StatementIterator; use self::stmt::Statement; use self::url::ConnectionOptions; use super::backend::Mysql; +use crate::connection::instrumentation::DebugQuery; +use crate::connection::instrumentation::InstrumentationEvent; +use crate::connection::instrumentation::StrQueryHelper; use crate::connection::statement_cache::{MaybeCached, StatementCache}; use crate::connection::*; use crate::expression::QueryMetadata; @@ -109,6 +112,7 @@ pub struct MysqlConnection { raw_connection: RawConnection, transaction_state: AnsiTransactionManager, statement_cache: StatementCache, + instrumentation: Option>, } // mysql connection can be shared between threads according to libmysqlclients documentation @@ -117,8 +121,19 @@ unsafe impl Send for MysqlConnection {} impl SimpleConnection for MysqlConnection { fn batch_execute(&mut self, query: &str) -> QueryResult<()> { - self.raw_connection - .enable_multi_statements(|| self.raw_connection.execute(query)) + self.instrumentation + .on_connection_event(InstrumentationEvent::StartQuery { + query: &StrQueryHelper::new(query), + }); + let r = self + .raw_connection + .enable_multi_statements(|| self.raw_connection.execute(query)); + self.instrumentation + .on_connection_event(InstrumentationEvent::FinishQuery { + query: &StrQueryHelper::new(query), + error: r.as_ref().err(), + }); + r } } @@ -142,18 +157,18 @@ impl Connection for MysqlConnection { /// * `ssl_mode` expects a value defined for MySQL client command option `--ssl-mode` /// See fn establish(database_url: &str) -> ConnectionResult { - use crate::result::ConnectionError::CouldntSetupConfiguration; + let mut instrumentation = crate::connection::instrumentation::get_default_instrumentation(); + instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection { + url: database_url, + }); - let raw_connection = RawConnection::new(); - let connection_options = ConnectionOptions::parse(database_url)?; - raw_connection.connect(&connection_options)?; - let mut conn = MysqlConnection { - raw_connection, - transaction_state: AnsiTransactionManager::default(), - statement_cache: StatementCache::new(), - }; - conn.set_config_options() - .map_err(CouldntSetupConfiguration)?; + let establish_result = Self::establish_inner(database_url); + instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection { + url: database_url, + error: establish_result.as_ref().err(), + }); + let mut conn = establish_result?; + conn.instrumentation = instrumentation; Ok(conn) } @@ -163,33 +178,53 @@ impl Connection for MysqlConnection { { #[allow(unsafe_code)] // call to unsafe function update_transaction_manager_status( - prepared_query(&source, &mut self.statement_cache, &mut self.raw_connection).and_then( - |stmt| { - // we have not called result yet, so calling `execute` is - // fine - let stmt_use = unsafe { stmt.execute() }?; - Ok(stmt_use.affected_rows()) - }, - ), + prepared_query( + &source, + &mut self.statement_cache, + &mut self.raw_connection, + &mut self.instrumentation, + ) + .and_then(|stmt| { + // we have not called result yet, so calling `execute` is + // fine + let stmt_use = unsafe { stmt.execute() }?; + Ok(stmt_use.affected_rows()) + }), &mut self.transaction_state, + &mut self.instrumentation, + &crate::debug_query(source), ) } fn transaction_state(&mut self) -> &mut AnsiTransactionManager { &mut self.transaction_state } + + fn instrumentation(&mut self) -> &mut dyn Instrumentation { + &mut self.instrumentation + } + + fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) { + self.instrumentation = Some(Box::new(instrumentation)); + } } #[inline(always)] fn update_transaction_manager_status( query_result: QueryResult, transaction_manager: &mut AnsiTransactionManager, + instrumentation: &mut Option>, + query: &dyn DebugQuery, ) -> QueryResult { if let Err(Error::DatabaseError(DatabaseErrorKind::SerializationFailure, _)) = query_result { transaction_manager .status .set_requires_rollback_maybe_up_to_top_level(true) } + instrumentation.on_connection_event(InstrumentationEvent::FinishQuery { + query, + error: query_result.as_ref().err(), + }); query_result } @@ -206,14 +241,20 @@ impl LoadConnection for MysqlConnection { Self::Backend: QueryMetadata, { update_transaction_manager_status( - prepared_query(&source, &mut self.statement_cache, &mut self.raw_connection).and_then( - |stmt| { - let mut metadata = Vec::new(); - Mysql::row_metadata(&mut (), &mut metadata); - StatementIterator::from_stmt(stmt, &metadata) - }, - ), + prepared_query( + &source, + &mut self.statement_cache, + &mut self.raw_connection, + &mut self.instrumentation, + ) + .and_then(|stmt| { + let mut metadata = Vec::new(); + Mysql::row_metadata(&mut (), &mut metadata); + StatementIterator::from_stmt(stmt, &metadata) + }), &mut self.transaction_state, + &mut self.instrumentation, + &crate::debug_query(&source), ) } } @@ -247,9 +288,19 @@ fn prepared_query<'a, T: QueryFragment + QueryId>( source: &'_ T, statement_cache: &'a mut StatementCache, raw_connection: &'a mut RawConnection, + instrumentation: &mut dyn Instrumentation, ) -> QueryResult> { - let mut stmt = statement_cache - .cached_statement(source, &Mysql, &[], |sql, _| raw_connection.prepare(sql))?; + instrumentation.on_connection_event(InstrumentationEvent::StartQuery { + query: &crate::debug_query(source), + }); + let mut stmt = statement_cache.cached_statement( + source, + &Mysql, + &[], + |sql, _| raw_connection.prepare(sql), + instrumentation, + )?; + let mut bind_collector = RawBytesBindCollector::new(); source.collect_binds(&mut bind_collector, &mut (), &Mysql)?; let binds = bind_collector @@ -268,6 +319,23 @@ impl MysqlConnection { crate::sql_query("SET character_set_results = 'utf8mb4'").execute(self)?; Ok(()) } + + fn establish_inner(database_url: &str) -> Result { + use crate::ConnectionError::CouldntSetupConfiguration; + + let raw_connection = RawConnection::new(); + let connection_options = ConnectionOptions::parse(database_url)?; + raw_connection.connect(&connection_options)?; + let mut conn = MysqlConnection { + raw_connection, + transaction_state: AnsiTransactionManager::default(), + statement_cache: StatementCache::new(), + instrumentation: None, + }; + conn.set_config_options() + .map_err(CouldntSetupConfiguration)?; + Ok(conn) + } } #[cfg(test)] diff --git a/diesel/src/pg/connection/cursor.rs b/diesel/src/pg/connection/cursor.rs index 4e77dffdec84..678c8812b26c 100644 --- a/diesel/src/pg/connection/cursor.rs +++ b/diesel/src/pg/connection/cursor.rs @@ -1,3 +1,5 @@ +use crate::connection::instrumentation::StrQueryHelper; + use super::raw::RawConnection; use super::result::PgResult; use super::row::PgRow; @@ -87,6 +89,9 @@ impl Iterator for RowByRowCursor<'_> { let get_next_result = super::update_transaction_manager_status( self.conn.raw_connection.get_next_result(), self.conn, + // todo + &StrQueryHelper::new(""), + false, ); match get_next_result { Ok(Some(res)) => { @@ -120,6 +125,9 @@ impl Drop for RowByRowCursor<'_> { let res = super::update_transaction_manager_status( self.conn.raw_connection.get_next_result(), self.conn, + // todo + &StrQueryHelper::new(""), + false, ); if matches!(res, Err(_) | Ok(None)) { break; diff --git a/diesel/src/pg/connection/mod.rs b/diesel/src/pg/connection/mod.rs index bcf9b18f1cfe..40773d08b8c9 100644 --- a/diesel/src/pg/connection/mod.rs +++ b/diesel/src/pg/connection/mod.rs @@ -9,6 +9,9 @@ use self::private::ConnectionAndTransactionManager; use self::raw::{PgTransactionStatus, RawConnection}; use self::result::PgResult; use self::stmt::Statement; +use crate::connection::instrumentation::DebugQuery; +use crate::connection::instrumentation::StrQueryHelper; +use crate::connection::instrumentation::{Instrumentation, InstrumentationEvent}; use crate::connection::statement_cache::{MaybeCached, StatementCache}; use crate::connection::*; use crate::expression::QueryMetadata; @@ -20,6 +23,7 @@ use crate::result::ConnectionError::CouldntSetupConfiguration; use crate::result::*; use crate::RunQueryDsl; use std::ffi::CString; +use std::fmt::Debug; use std::os::raw as libc; /// The connection string expected by `PgConnection::establish` @@ -126,11 +130,16 @@ unsafe impl Send for PgConnection {} impl SimpleConnection for PgConnection { #[allow(unsafe_code)] // use of unsafe function fn batch_execute(&mut self, query: &str) -> QueryResult<()> { - let query = CString::new(query)?; + self.connection_and_transaction_manager + .instrumentation + .on_connection_event(InstrumentationEvent::StartQuery { + query: &StrQueryHelper::new(query), + }); + let c_query = CString::new(query)?; let inner_result = unsafe { self.connection_and_transaction_manager .raw_connection - .exec(query.as_ptr()) + .exec(c_query.as_ptr()) }; update_transaction_manager_status( inner_result.and_then(|raw_result| { @@ -140,6 +149,8 @@ impl SimpleConnection for PgConnection { ) }), &mut self.connection_and_transaction_manager, + &StrQueryHelper::new(query), + true, )?; Ok(()) } @@ -158,11 +169,16 @@ impl Connection for PgConnection { type TransactionManager = AnsiTransactionManager; fn establish(database_url: &str) -> ConnectionResult { - RawConnection::establish(database_url).and_then(|raw_conn| { + let mut instrumentation = crate::connection::instrumentation::get_default_instrumentation(); + instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection { + url: database_url, + }); + let r = RawConnection::establish(database_url).and_then(|raw_conn| { let mut conn = PgConnection { connection_and_transaction_manager: ConnectionAndTransactionManager { raw_connection: raw_conn, transaction_state: AnsiTransactionManager::default(), + instrumentation: None, }, statement_cache: StatementCache::new(), metadata_cache: PgMetadataCache::new(), @@ -170,14 +186,22 @@ impl Connection for PgConnection { conn.set_config_options() .map_err(CouldntSetupConfiguration)?; Ok(conn) - }) + }); + instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection { + url: database_url, + error: r.as_ref().err(), + }); + let mut conn = r?; + conn.connection_and_transaction_manager.instrumentation = instrumentation; + Ok(conn) } + fn execute_returning_count(&mut self, source: &T) -> QueryResult where T: QueryFragment + QueryId, { update_transaction_manager_status( - self.with_prepared_query(source, |query, params, conn| { + self.with_prepared_query(source, true, |query, params, conn| { let res = query .execute(&mut conn.raw_connection, ¶ms, false) .map(|r| r.rows_affected()); @@ -187,6 +211,8 @@ impl Connection for PgConnection { res }), &mut self.connection_and_transaction_manager, + &crate::debug_query(source), + true, ) } @@ -196,6 +222,14 @@ impl Connection for PgConnection { { &mut self.connection_and_transaction_manager.transaction_state } + + fn instrumentation(&mut self) -> &mut dyn Instrumentation { + &mut self.connection_and_transaction_manager.instrumentation + } + + fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) { + self.connection_and_transaction_manager.instrumentation = Some(Box::new(instrumentation)); + } } impl LoadConnection for PgConnection @@ -213,11 +247,16 @@ where T: Query + QueryFragment + QueryId + 'query, Self::Backend: QueryMetadata, { - self.with_prepared_query(&source, |stmt, params, conn| { + self.with_prepared_query(&source, false, |stmt, params, conn| { use self::private::PgLoadingMode; let result = stmt.execute(&mut conn.raw_connection, ¶ms, Self::USE_ROW_BY_ROW_MODE); - let result = update_transaction_manager_status(result, conn)?; - Self::get_cursor(conn, result) + let result = update_transaction_manager_status( + result, + conn, + &crate::debug_query(&source), + false, + )?; + Self::get_cursor(conn, result, &source) }) } } @@ -232,6 +271,8 @@ impl GetPgMetadataCache for PgConnection { fn update_transaction_manager_status( query_result: QueryResult, conn: &mut ConnectionAndTransactionManager, + source: &dyn DebugQuery, + final_call: bool, ) -> QueryResult { /// avoid monomorphizing for every result type - this part will not be inlined fn non_generic_inner(conn: &mut ConnectionAndTransactionManager, is_err: bool) { @@ -281,6 +322,19 @@ fn update_transaction_manager_status( } } non_generic_inner(conn, query_result.is_err()); + if let Err(ref e) = query_result { + conn.instrumentation + .on_connection_event(InstrumentationEvent::FinishQuery { + query: source, + error: Some(e), + }); + } else if final_call { + conn.instrumentation + .on_connection_event(InstrumentationEvent::FinishQuery { + query: source, + error: None, + }); + } query_result } @@ -342,12 +396,18 @@ impl PgConnection { fn with_prepared_query<'conn, T: QueryFragment + QueryId, R>( &'conn mut self, source: &'_ T, + execute_returning_count: bool, f: impl FnOnce( MaybeCached<'_, Statement>, Vec>>, &'conn mut ConnectionAndTransactionManager, ) -> QueryResult, ) -> QueryResult { + self.connection_and_transaction_manager + .instrumentation + .on_connection_event(InstrumentationEvent::StartQuery { + query: &crate::debug_query(source), + }); let mut bind_collector = RawBytesBindCollector::::new(); source.collect_binds(&mut bind_collector, self, &Pg)?; let binds = bind_collector.binds; @@ -356,14 +416,30 @@ impl PgConnection { let cache_len = self.statement_cache.len(); let cache = &mut self.statement_cache; let conn = &mut self.connection_and_transaction_manager.raw_connection; - let query = cache.cached_statement(source, &Pg, &metadata, |sql, _| { - let query_name = if source.is_safe_to_cache_prepared(&Pg)? { - Some(format!("__diesel_stmt_{cache_len}")) - } else { - None - }; - Statement::prepare(conn, sql, query_name.as_deref(), &metadata) - }); + let query = cache.cached_statement( + source, + &Pg, + &metadata, + |sql, _| { + let query_name = if source.is_safe_to_cache_prepared(&Pg)? { + Some(format!("__diesel_stmt_{cache_len}")) + } else { + None + }; + Statement::prepare(conn, sql, query_name.as_deref(), &metadata) + }, + &mut self.connection_and_transaction_manager.instrumentation, + ); + if !execute_returning_count { + if let Err(ref e) = query { + self.connection_and_transaction_manager + .instrumentation + .on_connection_event(InstrumentationEvent::FinishQuery { + query: &crate::debug_query(&source), + error: Some(e), + }); + } + } f(query?, binds, &mut self.connection_and_transaction_manager) } @@ -387,6 +463,7 @@ mod private { pub struct ConnectionAndTransactionManager { pub(super) raw_connection: RawConnection, pub(super) transaction_state: AnsiTransactionManager, + pub(super) instrumentation: Option>, } pub trait PgLoadingMode { @@ -394,10 +471,11 @@ mod private { type Cursor<'conn, 'query>: Iterator>>; type Row<'conn, 'query>: crate::row::Row<'conn, Pg>; - fn get_cursor<'query>( - raw_connection: &mut ConnectionAndTransactionManager, + fn get_cursor<'conn, 'query>( + raw_connection: &'conn mut ConnectionAndTransactionManager, result: PgResult, - ) -> QueryResult>; + source: &dyn QueryFragment, + ) -> QueryResult>; } impl PgLoadingMode for PgConnection { @@ -405,11 +483,17 @@ mod private { type Cursor<'conn, 'query> = Cursor; type Row<'conn, 'query> = self::row::PgRow; - fn get_cursor<'query>( - conn: &mut ConnectionAndTransactionManager, + fn get_cursor<'conn, 'query>( + conn: &'conn mut ConnectionAndTransactionManager, result: PgResult, - ) -> QueryResult> { - update_transaction_manager_status(Cursor::new(result, &mut conn.raw_connection), conn) + source: &dyn QueryFragment, + ) -> QueryResult> { + update_transaction_manager_status( + Cursor::new(result, &mut conn.raw_connection), + conn, + &crate::debug_query(&source), + true, + ) } } @@ -418,10 +502,11 @@ mod private { type Cursor<'conn, 'query> = RowByRowCursor<'conn>; type Row<'conn, 'query> = self::row::PgRow; - fn get_cursor<'query>( - raw_connection: &mut ConnectionAndTransactionManager, + fn get_cursor<'conn, 'query>( + raw_connection: &'conn mut ConnectionAndTransactionManager, result: PgResult, - ) -> QueryResult> { + _source: &dyn QueryFragment, + ) -> QueryResult> { Ok(RowByRowCursor::new(result, raw_connection)) } } diff --git a/diesel/src/r2d2.rs b/diesel/src/r2d2.rs index 523baf80bbe1..79b8d0431e58 100644 --- a/diesel/src/r2d2.rs +++ b/diesel/src/r2d2.rs @@ -248,6 +248,14 @@ where ) -> &mut >::TransactionStateData { (**self).transaction_state() } + + fn instrumentation(&mut self) -> &mut dyn crate::connection::Instrumentation { + (**self).instrumentation() + } + + fn set_instrumentation(&mut self, instrumentation: impl crate::connection::Instrumentation) { + (**self).set_instrumentation(instrumentation) + } } impl LoadConnection for PooledConnection diff --git a/diesel/src/sqlite/connection/mod.rs b/diesel/src/sqlite/connection/mod.rs index e21234086b86..2549897d35ec 100644 --- a/diesel/src/sqlite/connection/mod.rs +++ b/diesel/src/sqlite/connection/mod.rs @@ -20,6 +20,8 @@ use self::raw::RawConnection; use self::statement_iterator::*; use self::stmt::{Statement, StatementUse}; use super::SqliteAggregateFunction; +use crate::connection::instrumentation::InstrumentationEvent; +use crate::connection::instrumentation::StrQueryHelper; use crate::connection::statement_cache::StatementCache; use crate::connection::*; use crate::deserialize::{FromSqlRow, StaticallySizedRow}; @@ -122,6 +124,7 @@ pub struct SqliteConnection { statement_cache: StatementCache, raw_connection: RawConnection, transaction_state: AnsiTransactionManager, + instrumentation: Option>, } // This relies on the invariant that RawConnection or Statement are never @@ -132,7 +135,17 @@ unsafe impl Send for SqliteConnection {} impl SimpleConnection for SqliteConnection { fn batch_execute(&mut self, query: &str) -> QueryResult<()> { - self.raw_connection.exec(query) + self.instrumentation + .on_connection_event(InstrumentationEvent::StartQuery { + query: &StrQueryHelper::new(query), + }); + let resp = self.raw_connection.exec(query); + self.instrumentation + .on_connection_event(InstrumentationEvent::FinishQuery { + query: &StrQueryHelper::new(query), + error: resp.as_ref().err(), + }); + resp } } @@ -149,16 +162,18 @@ impl Connection for SqliteConnection { /// If the database does not exist, this method will try to /// create a new database and then establish a connection to it. fn establish(database_url: &str) -> ConnectionResult { - use crate::result::ConnectionError::CouldntSetupConfiguration; - - let raw_connection = RawConnection::establish(database_url)?; - let conn = Self { - statement_cache: StatementCache::new(), - raw_connection, - transaction_state: AnsiTransactionManager::default(), - }; - conn.register_diesel_sql_functions() - .map_err(CouldntSetupConfiguration)?; + let mut instrumentation = crate::connection::instrumentation::get_default_instrumentation(); + instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection { + url: database_url, + }); + + let establish_result = Self::establish_inner(database_url); + instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection { + url: database_url, + error: establish_result.as_ref().err(), + }); + let mut conn = establish_result?; + conn.instrumentation = instrumentation; Ok(conn) } @@ -167,9 +182,9 @@ impl Connection for SqliteConnection { T: QueryFragment + QueryId, { let statement_use = self.prepared_query(source)?; - statement_use.run()?; - - Ok(self.raw_connection.rows_affected_by_last_query()) + statement_use + .run() + .map(|_| self.raw_connection.rows_affected_by_last_query()) } fn transaction_state(&mut self) -> &mut AnsiTransactionManager @@ -178,6 +193,14 @@ impl Connection for SqliteConnection { { &mut self.transaction_state } + + fn instrumentation(&mut self) -> &mut dyn Instrumentation { + &mut self.instrumentation + } + + fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) { + self.instrumentation = Some(Box::new(instrumentation)); + } } impl LoadConnection for SqliteConnection { @@ -192,9 +215,9 @@ impl LoadConnection for SqliteConnection { T: Query + QueryFragment + QueryId + 'query, Self::Backend: QueryMetadata, { - let statement_use = self.prepared_query(source)?; + let statement = self.prepared_query(source)?; - Ok(StatementIterator::new(statement_use)) + Ok(StatementIterator::new(statement)) } } @@ -302,17 +325,39 @@ impl SqliteConnection { } } - fn prepared_query<'a, 'b, T>(&'a mut self, source: T) -> QueryResult> + fn prepared_query<'conn, 'query, T>( + &'conn mut self, + source: T, + ) -> QueryResult> where - T: QueryFragment + QueryId + 'b, + T: QueryFragment + QueryId + 'query, { + self.instrumentation + .on_connection_event(InstrumentationEvent::StartQuery { + query: &crate::debug_query(&source), + }); let raw_connection = &self.raw_connection; let cache = &mut self.statement_cache; - let statement = cache.cached_statement(&source, &Sqlite, &[], |sql, is_cached| { - Statement::prepare(raw_connection, sql, is_cached) - })?; + let statement = match cache.cached_statement( + &source, + &Sqlite, + &[], + |sql, is_cached| Statement::prepare(raw_connection, sql, is_cached), + &mut self.instrumentation, + ) { + Ok(statement) => statement, + Err(e) => { + self.instrumentation + .on_connection_event(InstrumentationEvent::FinishQuery { + query: &crate::debug_query(&source), + error: Some(&e), + }); + + return Err(e); + } + }; - StatementUse::bind(statement, source) + StatementUse::bind(statement, source, &mut self.instrumentation) } #[doc(hidden)] @@ -477,6 +522,20 @@ impl SqliteConnection { }, ) } + + fn establish_inner(database_url: &str) -> Result { + use crate::result::ConnectionError::CouldntSetupConfiguration; + let raw_connection = RawConnection::establish(database_url)?; + let conn = Self { + statement_cache: StatementCache::new(), + raw_connection, + transaction_state: AnsiTransactionManager::default(), + instrumentation: None, + }; + conn.register_diesel_sql_functions() + .map_err(CouldntSetupConfiguration)?; + Ok(conn) + } } fn error_message(err_code: libc::c_int) -> &'static str { diff --git a/diesel/src/sqlite/connection/stmt.rs b/diesel/src/sqlite/connection/stmt.rs index a5ba556690c1..07aee05bce60 100644 --- a/diesel/src/sqlite/connection/stmt.rs +++ b/diesel/src/sqlite/connection/stmt.rs @@ -3,6 +3,7 @@ use super::bind_collector::{InternalSqliteBindValue, SqliteBindCollector}; use super::raw::RawConnection; use super::sqlite_value::OwnedSqliteValue; use crate::connection::statement_cache::{MaybeCached, PrepareForCache}; +use crate::connection::Instrumentation; use crate::query_builder::{QueryFragment, QueryId}; use crate::result::Error::DatabaseError; use crate::result::*; @@ -235,12 +236,15 @@ struct BoundStatement<'stmt, 'query> { // contained in the query itself. We use NonNull to // communicate that this is a shared buffer binds_to_free: Vec<(i32, Option>)>, + instrumentation: &'stmt mut dyn Instrumentation, + has_error: bool, } impl<'stmt, 'query> BoundStatement<'stmt, 'query> { fn bind( statement: MaybeCached<'stmt, Statement>, query: T, + instrumentation: &'stmt mut dyn Instrumentation, ) -> QueryResult> where T: QueryFragment + QueryId + 'query, @@ -259,6 +263,8 @@ impl<'stmt, 'query> BoundStatement<'stmt, 'query> { statement, query: None, binds_to_free: Vec::new(), + instrumentation, + has_error: false, }; ret.bind_buffers(binds)?; @@ -322,6 +328,20 @@ impl<'stmt, 'query> BoundStatement<'stmt, 'query> { } Ok(()) } + + fn finish_query_with_error(mut self, e: &Error) { + self.has_error = true; + if let Some(q) = self.query { + // it's safe to get a reference from this ptr as it's guaranteed to not be null + let q = unsafe { q.as_ref() }; + self.instrumentation.on_connection_event( + crate::connection::InstrumentationEvent::FinishQuery { + query: &crate::debug_query(&q), + error: Some(e), + }, + ); + } + } } impl<'stmt, 'query> Drop for BoundStatement<'stmt, 'query> { @@ -353,11 +373,20 @@ impl<'stmt, 'query> Drop for BoundStatement<'stmt, 'query> { } if let Some(query) = self.query { - unsafe { + let query = unsafe { // Constructing the `Box` here is safe as we // got the pointer from a box + it is guaranteed to be not null. - std::mem::drop(Box::from_raw(query.as_ptr())); + Box::from_raw(query.as_ptr()) + }; + if !self.has_error { + self.instrumentation.on_connection_event( + crate::connection::InstrumentationEvent::FinishQuery { + query: &crate::debug_query(&query), + error: None, + }, + ); } + std::mem::drop(query); self.query = None; } } @@ -373,23 +402,28 @@ impl<'stmt, 'query> StatementUse<'stmt, 'query> { pub(super) fn bind( statement: MaybeCached<'stmt, Statement>, query: T, + instrumentation: &'stmt mut dyn Instrumentation, ) -> QueryResult> where T: QueryFragment + QueryId + 'query, { Ok(Self { - statement: BoundStatement::bind(statement, query)?, + statement: BoundStatement::bind(statement, query, instrumentation)?, column_names: OnceCell::new(), }) } pub(super) fn run(mut self) -> QueryResult<()> { - unsafe { + let r = unsafe { // This is safe as we pass `first_step = true` // and we consume the statement so nobody could // access the columns later on anyway. self.step(true).map(|_| ()) + }; + if let Err(ref e) = r { + self.statement.finish_query_with_error(e); } + r } // This function is marked as unsafe incorrectly passing `false` to `first_step` diff --git a/diesel_derives/src/multiconnection.rs b/diesel_derives/src/multiconnection.rs index d32c142c53c2..b4ba74214bda 100644 --- a/diesel_derives/src/multiconnection.rs +++ b/diesel_derives/src/multiconnection.rs @@ -109,6 +109,24 @@ fn generate_connection_impl( } }); + let instrumentation_impl = connection_types.iter().map(|c| { + let variant_ident = c.name; + quote::quote! { + #ident::#variant_ident(conn) => { + diesel::connection::Connection::set_instrumentation(conn, instrumentation); + } + } + }); + + let get_instrumentation_impl = connection_types.iter().map(|c| { + let variant_ident = c.name; + quote::quote! { + #ident::#variant_ident(conn) => { + diesel::connection::Connection::instrumentation(conn) + } + } + }); + let establish_impls = connection_types.iter().map(|c| { let ident = c.name; let ty = c.ty; @@ -326,6 +344,18 @@ fn generate_connection_impl( ) -> &mut >::TransactionStateData { self } + + fn instrumentation(&mut self) -> &mut dyn diesel::connection::Instrumentation { + match self { + #(#get_instrumentation_impl,)* + } + } + + fn set_instrumentation(&mut self, instrumentation: impl diesel::connection::Instrumentation) { + match self { + #(#instrumentation_impl,)* + } + } } impl LoadConnection for MultiConnection diff --git a/diesel_derives/tests/multiconnection.rs b/diesel_derives/tests/multiconnection.rs index 51c93836d116..96474a6d9606 100644 --- a/diesel_derives/tests/multiconnection.rs +++ b/diesel_derives/tests/multiconnection.rs @@ -1,4 +1,5 @@ use crate::schema::users; +use diesel::connection::Instrumentation; use diesel::prelude::*; #[derive(diesel::MultiConnection)] @@ -21,6 +22,10 @@ pub struct User { fn check_queries_work() { let mut conn = establish_connection(); + // checks that this trait is implemented + conn.set_instrumentation(None::>); + let _ = conn.instrumentation(); + diesel::sql_query( "CREATE TEMPORARY TABLE users(\ id INTEGER NOT NULL PRIMARY KEY, \ diff --git a/diesel_derives/tests/selectable.rs b/diesel_derives/tests/selectable.rs index 4b151f7fc2af..240fdccaa482 100644 --- a/diesel_derives/tests/selectable.rs +++ b/diesel_derives/tests/selectable.rs @@ -1,6 +1,6 @@ use std::marker::PhantomData; -use diesel::deserialize::FromSql; +use diesel::deserialize::{FromSql, FromSqlRow}; use diesel::sql_types::Text; use diesel::*; diff --git a/diesel_tests/tests/instrumentation.rs b/diesel_tests/tests/instrumentation.rs new file mode 100644 index 000000000000..73077dd9fcda --- /dev/null +++ b/diesel_tests/tests/instrumentation.rs @@ -0,0 +1,231 @@ +use crate::schema::users; +use crate::schema::TestConnection; +use diesel::connection::DefaultLoadingMode; +use diesel::connection::InstrumentationEvent; +use diesel::connection::LoadConnection; +use diesel::connection::SimpleConnection; +use diesel::query_builder::AsQuery; +use diesel::Connection; +use diesel::QueryResult; +use std::num::NonZeroU32; +use std::sync::Arc; +use std::sync::Mutex; + +use crate::schema::connection_with_sean_and_tess_in_users_table; + +#[derive(Debug, PartialEq)] +enum Event { + StartQuery { query: String }, + CacheQuery { sql: String }, + FinishQuery { query: String, error: Option<()> }, + BeginTransaction { depth: NonZeroU32 }, + CommitTransaction { depth: NonZeroU32 }, + RollbackTransaction { depth: NonZeroU32 }, +} + +impl From> for Event { + fn from(value: InstrumentationEvent<'_>) -> Self { + match value { + InstrumentationEvent::StartEstablishConnection { .. } => unreachable!(), + InstrumentationEvent::FinishEstablishConnection { .. } => unreachable!(), + InstrumentationEvent::StartQuery { query, .. } => Event::StartQuery { + query: query.to_string(), + }, + InstrumentationEvent::CacheQuery { sql, .. } => Event::CacheQuery { + sql: sql.to_owned(), + }, + InstrumentationEvent::FinishQuery { query, error, .. } => Event::FinishQuery { + query: query.to_string(), + error: error.map(|_| ()), + }, + InstrumentationEvent::BeginTransaction { depth, .. } => { + Event::BeginTransaction { depth } + } + InstrumentationEvent::CommitTransaction { depth, .. } => { + Event::CommitTransaction { depth } + } + InstrumentationEvent::RollbackTransaction { depth, .. } => { + Event::RollbackTransaction { depth } + } + _ => unreachable!(), + } + } +} + +fn setup_test_case() -> (Arc>>, TestConnection) { + let events = Arc::new(Mutex::new(Vec::::new())); + let events_to_check = events.clone(); + let mut conn = connection_with_sean_and_tess_in_users_table(); + conn.set_instrumentation(move |event: InstrumentationEvent<'_>| { + events.lock().unwrap().push(event.into()); + }); + assert_eq!(events_to_check.lock().unwrap().len(), 0); + (events_to_check, conn) +} + +#[test] +fn check_events_are_emitted_for_batch_execute() { + let (events_to_check, mut conn) = setup_test_case(); + conn.batch_execute("select 1").unwrap(); + + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 2); + assert_eq!( + events[0], + Event::StartQuery { + query: String::from("select 1") + } + ); + assert_eq!( + events[1], + Event::FinishQuery { + query: String::from("select 1"), + error: None, + } + ); +} + +#[test] +fn check_events_are_emitted_for_execute_returning_count() { + let (events_to_check, mut conn) = setup_test_case(); + conn.execute_returning_count(&users::table.as_query()) + .unwrap(); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 3, "{:?}", events); + assert_matches!(events[0], Event::StartQuery { .. }); + assert_matches!(events[1], Event::CacheQuery { .. }); + assert_matches!(events[2], Event::FinishQuery { .. }); +} + +#[test] +fn check_events_are_emitted_for_load() { + let (events_to_check, mut conn) = setup_test_case(); + LoadConnection::::load(&mut conn, users::table.as_query()).unwrap(); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 3, "{:?}", events); + assert_matches!(events[0], Event::StartQuery { .. }); + assert_matches!(events[1], Event::CacheQuery { .. }); + assert_matches!(events[2], Event::FinishQuery { .. }); +} + +#[test] +fn check_events_are_emitted_for_execute_returning_count_does_not_contain_cache_for_uncached_queries( +) { + let (events_to_check, mut conn) = setup_test_case(); + conn.execute_returning_count(&diesel::sql_query("select 1")) + .unwrap(); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 2, "{:?}", events); + assert_matches!(events[0], Event::StartQuery { .. }); + assert_matches!(events[1], Event::FinishQuery { .. }); +} + +#[test] +fn check_events_are_emitted_for_load_does_not_contain_cache_for_uncached_queries() { + let (events_to_check, mut conn) = setup_test_case(); + LoadConnection::::load(&mut conn, diesel::sql_query("select 1")).unwrap(); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 2, "{:?}", events); + assert_matches!(events[0], Event::StartQuery { .. }); + assert_matches!(events[1], Event::FinishQuery { .. }); +} + +#[test] +fn check_events_are_emitted_for_execute_returning_count_does_contain_error_for_failures() { + let (events_to_check, mut conn) = setup_test_case(); + let _ = conn.execute_returning_count(&diesel::sql_query("invalid")); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 2, "{:?}", events); + assert_matches!(events[0], Event::StartQuery { .. }); + assert_matches!(events[1], Event::FinishQuery { error: Some(_), .. }); +} + +#[test] +fn check_events_are_emitted_for_load_does_contain_error_for_failures() { + let (events_to_check, mut conn) = setup_test_case(); + let _ = LoadConnection::::load(&mut conn, diesel::sql_query("invalid")); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 2, "{:?}", events); + assert_matches!(events[0], Event::StartQuery { .. }); + assert_matches!(events[1], Event::FinishQuery { error: Some(_), .. }); +} + +#[test] +fn check_events_are_emitted_for_execute_returning_count_repeat_does_not_repeat_cache() { + let (events_to_check, mut conn) = setup_test_case(); + conn.execute_returning_count(&users::table.as_query()) + .unwrap(); + conn.execute_returning_count(&users::table.as_query()) + .unwrap(); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 5, "{:?}", events); + assert_matches!(events[0], Event::StartQuery { .. }); + assert_matches!(events[1], Event::CacheQuery { .. }); + assert_matches!(events[2], Event::FinishQuery { .. }); + assert_matches!(events[3], Event::StartQuery { .. }); + assert_matches!(events[4], Event::FinishQuery { .. }); +} + +#[test] +fn check_events_are_emitted_for_load_repeat_does_not_repeat_cache() { + let (events_to_check, mut conn) = setup_test_case(); + LoadConnection::::load(&mut conn, users::table.as_query()).unwrap(); + LoadConnection::::load(&mut conn, users::table.as_query()).unwrap(); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 5, "{:?}", events); + assert_matches!(events[0], Event::StartQuery { .. }); + assert_matches!(events[1], Event::CacheQuery { .. }); + assert_matches!(events[2], Event::FinishQuery { .. }); + assert_matches!(events[3], Event::StartQuery { .. }); + assert_matches!(events[4], Event::FinishQuery { .. }); +} + +#[test] +fn check_events_transaction() { + let (events_to_check, mut conn) = setup_test_case(); + conn.transaction(|_conn| QueryResult::Ok(())).unwrap(); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 6, "{:?}", events); + assert_matches!(events[0], Event::BeginTransaction { .. }); + assert_matches!(events[1], Event::StartQuery { .. }); + assert_matches!(events[2], Event::FinishQuery { .. }); + assert_matches!(events[3], Event::CommitTransaction { .. }); + assert_matches!(events[4], Event::StartQuery { .. }); + assert_matches!(events[5], Event::FinishQuery { .. }); +} + +#[test] +fn check_events_transaction_error() { + let (events_to_check, mut conn) = setup_test_case(); + let _ = conn + .transaction(|_conn| QueryResult::<()>::Err(diesel::result::Error::RollbackTransaction)); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 6, "{:?}", events); + assert_matches!(events[0], Event::BeginTransaction { .. }); + assert_matches!(events[1], Event::StartQuery { .. }); + assert_matches!(events[2], Event::FinishQuery { .. }); + assert_matches!(events[3], Event::RollbackTransaction { .. }); + assert_matches!(events[4], Event::StartQuery { .. }); + assert_matches!(events[5], Event::FinishQuery { .. }); +} + +#[test] +fn check_events_transaction_nested() { + let (events_to_check, mut conn) = setup_test_case(); + conn.transaction(|conn| conn.transaction(|_conn| QueryResult::Ok(()))) + .unwrap(); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 12, "{:?}", events); + assert_matches!(events[0], Event::BeginTransaction { .. }); + assert_matches!(events[1], Event::StartQuery { .. }); + assert_matches!(events[2], Event::FinishQuery { .. }); + assert_matches!(events[3], Event::BeginTransaction { .. }); + assert_matches!(events[4], Event::StartQuery { .. }); + assert_matches!(events[5], Event::FinishQuery { .. }); + assert_matches!(events[6], Event::CommitTransaction { .. }); + assert_matches!(events[7], Event::StartQuery { .. }); + assert_matches!(events[8], Event::FinishQuery { .. }); + assert_matches!(events[9], Event::CommitTransaction { .. }); + assert_matches!(events[10], Event::StartQuery { .. }); + assert_matches!(events[11], Event::FinishQuery { .. }); +} diff --git a/diesel_tests/tests/lib.rs b/diesel_tests/tests/lib.rs index e4963a855de3..81c78f73186c 100644 --- a/diesel_tests/tests/lib.rs +++ b/diesel_tests/tests/lib.rs @@ -28,6 +28,7 @@ mod group_by; mod having; mod insert; mod insert_from_select; +mod instrumentation; mod internal_details; mod joins; mod limit_offset; From 789af8db7c79c1ab7ea78099a82023162db45de5 Mon Sep 17 00:00:00 2001 From: Georg Semmler Date: Fri, 15 Dec 2023 13:30:59 +0100 Subject: [PATCH 2/3] Test and fix the instrumentation implementation for the `PgRowByRowMode` --- diesel/src/connection/instrumentation.rs | 29 +++++++++----- diesel/src/connection/mod.rs | 3 +- diesel/src/connection/transaction_manager.rs | 10 +++-- diesel/src/pg/connection/cursor.rs | 35 +++++++++++------ diesel/src/pg/connection/mod.rs | 34 ++++++++++------ diesel_tests/tests/instrumentation.rs | 41 ++++++++++++++++++++ 6 files changed, 114 insertions(+), 38 deletions(-) diff --git a/diesel/src/connection/instrumentation.rs b/diesel/src/connection/instrumentation.rs index a7444425022b..206456132b5e 100644 --- a/diesel/src/connection/instrumentation.rs +++ b/diesel/src/connection/instrumentation.rs @@ -4,8 +4,12 @@ use std::num::NonZeroU32; use std::ops::DerefMut; static GLOBAL_INSTRUMENTATION: std::sync::RwLock Option>> = - std::sync::RwLock::new(default_instrumentation); + std::sync::RwLock::new(|| None); +/// A helper trait for opaque query representations +/// which allows to get a `Display` and `Debug` +/// representation of the underlying type without +/// exposing type specific details pub trait DebugQuery: Debug + Display {} impl DebugQuery for crate::query_builder::DebugQuery<'_, T, DB> where Self: Debug + Display {} @@ -19,16 +23,22 @@ impl DebugQuery for crate::query_builder::DebugQuery<'_, T, DB> where Sel #[diesel_derives::__diesel_public_if( feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes" )] -pub(crate) struct StrQueryHelper<'a> { - s: &'a str, +pub(crate) struct StrQueryHelper<'query> { + s: &'query str, } -impl<'a> StrQueryHelper<'a> { +impl<'query> StrQueryHelper<'query> { /// Construct a new `StrQueryHelper` #[diesel_derives::__diesel_public_if( feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes" )] - pub(crate) fn new(s: &'a str) -> Self { + #[cfg(any( + feature = "postgres", + feature = "sqlite", + feature = "mysql", + feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes" + ))] + pub(crate) fn new(s: &'query str) -> Self { Self { s } } } @@ -64,6 +74,11 @@ impl DebugQuery for StrQueryHelper<'_> {} // taking references for all things // and by not performing any additional // work until required. +// In addition it's carefully designed +// not to be dependent on the actual backend +// type, as that makes it easier to to reuse +// `Instrumentation` implementations in +// different a different context #[derive(Debug)] #[non_exhaustive] pub enum InstrumentationEvent<'a> { @@ -232,10 +247,6 @@ pub trait Instrumentation: Send + 'static { fn on_connection_event(&mut self, event: InstrumentationEvent<'_>); } -fn default_instrumentation() -> Option> { - None -} - /// Get an instance of the default [`Instrumentation`] /// /// This function is mostly useful for crates implementing diff --git a/diesel/src/connection/mod.rs b/diesel/src/connection/mod.rs index af819cad8c9a..a0fe95c6963b 100644 --- a/diesel/src/connection/mod.rs +++ b/diesel/src/connection/mod.rs @@ -18,7 +18,8 @@ use std::fmt::Debug; #[doc(inline)] pub use self::instrumentation::{ - get_default_instrumentation, set_default_instrumentation, Instrumentation, InstrumentationEvent, + get_default_instrumentation, set_default_instrumentation, DebugQuery, Instrumentation, + InstrumentationEvent, }; #[doc(inline)] pub use self::transaction_manager::{ diff --git a/diesel/src/connection/transaction_manager.rs b/diesel/src/connection/transaction_manager.rs index 703d51098991..bad7846e83a8 100644 --- a/diesel/src/connection/transaction_manager.rs +++ b/diesel/src/connection/transaction_manager.rs @@ -343,11 +343,13 @@ where Cow::from(format!("SAVEPOINT diesel_savepoint_{transaction_depth}")) } }; - let depth = transaction_depth - .and_then(|d| d.checked_add(1)) - .unwrap_or(NonZeroU32::new(1).expect("It's not 0")); conn.instrumentation().on_connection_event( - super::instrumentation::InstrumentationEvent::BeginTransaction { depth }, + super::instrumentation::InstrumentationEvent::BeginTransaction { + depth: NonZeroU32::new( + transaction_depth.map_or(0, NonZeroU32::get).wrapping_add(1), + ) + .expect("Transaction depth is too large"), + }, ); conn.batch_execute(&start_transaction_sql)?; Self::get_transaction_state(conn)? diff --git a/diesel/src/pg/connection/cursor.rs b/diesel/src/pg/connection/cursor.rs index 678c8812b26c..2a72138fd294 100644 --- a/diesel/src/pg/connection/cursor.rs +++ b/diesel/src/pg/connection/cursor.rs @@ -1,8 +1,9 @@ -use crate::connection::instrumentation::StrQueryHelper; - use super::raw::RawConnection; use super::result::PgResult; use super::row::PgRow; +use crate::connection::Instrumentation; +use crate::pg::Pg; +use crate::query_builder::QueryFragment; use std::rc::Rc; #[allow(missing_debug_implementations)] @@ -62,26 +63,29 @@ impl Iterator for Cursor { /// The type returned by various [`Connection`] methods. /// Acts as an iterator over `T`. #[allow(missing_debug_implementations)] -pub struct RowByRowCursor<'a> { +pub struct RowByRowCursor<'conn, 'query> { first_row: bool, db_result: Rc, - conn: &'a mut super::ConnectionAndTransactionManager, + conn: &'conn mut super::ConnectionAndTransactionManager, + query: Box + 'query>, } -impl<'a> RowByRowCursor<'a> { +impl<'conn, 'query> RowByRowCursor<'conn, 'query> { pub(super) fn new( db_result: PgResult, - conn: &'a mut super::ConnectionAndTransactionManager, + conn: &'conn mut super::ConnectionAndTransactionManager, + query: Box + 'query>, ) -> Self { RowByRowCursor { first_row: true, db_result: Rc::new(db_result), conn, + query, } } } -impl Iterator for RowByRowCursor<'_> { +impl<'conn, 'query> Iterator for RowByRowCursor<'conn, 'query> { type Item = crate::QueryResult; fn next(&mut self) -> Option { @@ -89,8 +93,7 @@ impl Iterator for RowByRowCursor<'_> { let get_next_result = super::update_transaction_manager_status( self.conn.raw_connection.get_next_result(), self.conn, - // todo - &StrQueryHelper::new(""), + &crate::debug_query(&self.query), false, ); match get_next_result { @@ -119,17 +122,25 @@ impl Iterator for RowByRowCursor<'_> { } } -impl Drop for RowByRowCursor<'_> { +impl<'conn, 'query> Drop for RowByRowCursor<'conn, 'query> { fn drop(&mut self) { loop { let res = super::update_transaction_manager_status( self.conn.raw_connection.get_next_result(), self.conn, - // todo - &StrQueryHelper::new(""), + &crate::debug_query(&self.query), false, ); if matches!(res, Err(_) | Ok(None)) { + // the error case is handled in update_transaction_manager_status + if res.is_ok() { + self.conn.instrumentation.on_connection_event( + crate::connection::InstrumentationEvent::FinishQuery { + query: &crate::debug_query(&self.query), + error: None, + }, + ); + } break; } } diff --git a/diesel/src/pg/connection/mod.rs b/diesel/src/pg/connection/mod.rs index 40773d08b8c9..aa1bec8a55e7 100644 --- a/diesel/src/pg/connection/mod.rs +++ b/diesel/src/pg/connection/mod.rs @@ -201,7 +201,7 @@ impl Connection for PgConnection { T: QueryFragment + QueryId, { update_transaction_manager_status( - self.with_prepared_query(source, true, |query, params, conn| { + self.with_prepared_query(source, true, |query, params, conn, _source| { let res = query .execute(&mut conn.raw_connection, ¶ms, false) .map(|r| r.rows_affected()); @@ -247,7 +247,7 @@ where T: Query + QueryFragment + QueryId + 'query, Self::Backend: QueryMetadata, { - self.with_prepared_query(&source, false, |stmt, params, conn| { + self.with_prepared_query(source, false, |stmt, params, conn, source| { use self::private::PgLoadingMode; let result = stmt.execute(&mut conn.raw_connection, ¶ms, Self::USE_ROW_BY_ROW_MODE); let result = update_transaction_manager_status( @@ -256,7 +256,7 @@ where &crate::debug_query(&source), false, )?; - Self::get_cursor(conn, result, &source) + Self::get_cursor(conn, result, source) }) } } @@ -395,18 +395,19 @@ impl PgConnection { fn with_prepared_query<'conn, T: QueryFragment + QueryId, R>( &'conn mut self, - source: &'_ T, + source: T, execute_returning_count: bool, f: impl FnOnce( MaybeCached<'_, Statement>, Vec>>, &'conn mut ConnectionAndTransactionManager, + T, ) -> QueryResult, ) -> QueryResult { self.connection_and_transaction_manager .instrumentation .on_connection_event(InstrumentationEvent::StartQuery { - query: &crate::debug_query(source), + query: &crate::debug_query(&source), }); let mut bind_collector = RawBytesBindCollector::::new(); source.collect_binds(&mut bind_collector, self, &Pg)?; @@ -417,7 +418,7 @@ impl PgConnection { let cache = &mut self.statement_cache; let conn = &mut self.connection_and_transaction_manager.raw_connection; let query = cache.cached_statement( - source, + &source, &Pg, &metadata, |sql, _| { @@ -441,7 +442,12 @@ impl PgConnection { } } - f(query?, binds, &mut self.connection_and_transaction_manager) + f( + query?, + binds, + &mut self.connection_and_transaction_manager, + source, + ) } fn set_config_options(&mut self) -> QueryResult<()> { @@ -474,7 +480,7 @@ mod private { fn get_cursor<'conn, 'query>( raw_connection: &'conn mut ConnectionAndTransactionManager, result: PgResult, - source: &dyn QueryFragment, + source: impl QueryFragment + 'query, ) -> QueryResult>; } @@ -486,7 +492,7 @@ mod private { fn get_cursor<'conn, 'query>( conn: &'conn mut ConnectionAndTransactionManager, result: PgResult, - source: &dyn QueryFragment, + source: impl QueryFragment + 'query, ) -> QueryResult> { update_transaction_manager_status( Cursor::new(result, &mut conn.raw_connection), @@ -499,15 +505,19 @@ mod private { impl PgLoadingMode for PgConnection { const USE_ROW_BY_ROW_MODE: bool = true; - type Cursor<'conn, 'query> = RowByRowCursor<'conn>; + type Cursor<'conn, 'query> = RowByRowCursor<'conn, 'query>; type Row<'conn, 'query> = self::row::PgRow; fn get_cursor<'conn, 'query>( raw_connection: &'conn mut ConnectionAndTransactionManager, result: PgResult, - _source: &dyn QueryFragment, + source: impl QueryFragment + 'query, ) -> QueryResult> { - Ok(RowByRowCursor::new(result, raw_connection)) + Ok(RowByRowCursor::new( + result, + raw_connection, + Box::new(source), + )) } } } diff --git a/diesel_tests/tests/instrumentation.rs b/diesel_tests/tests/instrumentation.rs index 73077dd9fcda..350128293c2e 100644 --- a/diesel_tests/tests/instrumentation.rs +++ b/diesel_tests/tests/instrumentation.rs @@ -229,3 +229,44 @@ fn check_events_transaction_nested() { assert_matches!(events[10], Event::StartQuery { .. }); assert_matches!(events[11], Event::FinishQuery { .. }); } + +#[cfg(feature = "postgres")] +#[test] +fn check_events_are_emitted_for_load_pg_row_by_row() { + use diesel::pg::PgRowByRowLoadingMode; + + let (events_to_check, mut conn) = setup_test_case(); + LoadConnection::::load(&mut conn, users::table.as_query()).unwrap(); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 3, "{:?}", events); + assert_matches!(events[0], Event::StartQuery { .. }); + assert_matches!(events[1], Event::CacheQuery { .. }); + assert_matches!(events[2], Event::FinishQuery { .. }); +} + +#[cfg(feature = "postgres")] +#[test] +fn check_events_are_emitted_for_load_does_not_contain_cache_for_uncached_queries_pg_row_by_row() { + use diesel::pg::PgRowByRowLoadingMode; + + let (events_to_check, mut conn) = setup_test_case(); + LoadConnection::::load(&mut conn, diesel::sql_query("select 1")) + .unwrap(); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 2, "{:?}", events); + assert_matches!(events[0], Event::StartQuery { .. }); + assert_matches!(events[1], Event::FinishQuery { .. }); +} + +#[cfg(feature = "postgres")] +#[test] +fn check_events_are_emitted_for_load_does_contain_error_for_failures_pg_row_by_row() { + use diesel::pg::PgRowByRowLoadingMode; + + let (events_to_check, mut conn) = setup_test_case(); + let _ = LoadConnection::::load(&mut conn, diesel::sql_query("invalid")); + let events = events_to_check.lock().unwrap(); + assert_eq!(events.len(), 2, "{:?}", events); + assert_matches!(events[0], Event::StartQuery { .. }); + assert_matches!(events[1], Event::FinishQuery { error: Some(_), .. }); +} From 7df81d9317840f6cc359421b31e8125ddee73f59 Mon Sep 17 00:00:00 2001 From: Georg Semmler Date: Fri, 15 Dec 2023 15:05:04 +0100 Subject: [PATCH 3/3] Fix one test to compile --- diesel/src/mysql/connection/bind.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/diesel/src/mysql/connection/bind.rs b/diesel/src/mysql/connection/bind.rs index 3b002d7e2a27..7632e648d991 100644 --- a/diesel/src/mysql/connection/bind.rs +++ b/diesel/src/mysql/connection/bind.rs @@ -862,6 +862,7 @@ mod tests { ), &mut conn.statement_cache, &mut conn.raw_connection, + &mut conn.instrumentation, ).unwrap(); let metadata = stmt.metadata().unwrap();