diff --git a/Cargo.lock b/Cargo.lock index 280a9fd5a..de9dcde25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6951,6 +6951,7 @@ dependencies = [ "cid 0.10.1", "codespan-reporting", "expect-test", + "futures", "hex", "lalrpop", "lalrpop-util", diff --git a/one/src/lib.rs b/one/src/lib.rs index 1196e2885..af333dab7 100644 --- a/one/src/lib.rs +++ b/one/src/lib.rs @@ -20,7 +20,10 @@ use clap::{Args, Parser, Subcommand, ValueEnum}; use futures::StreamExt; use multibase::Base; use multihash::{Code, Hasher, Multihash, MultihashDigest}; -use recon::{FullInterests, Recon, ReconInterestProvider, SQLiteStore, Server, Sha256a}; +use recon::{ + FullInterests, Recon, ReconInterestProvider, SQLiteStore, Server, Sha256a, + StoreMetricsMiddleware, +}; use signal_hook::consts::signal::*; use signal_hook_tokio::Signals; use swagger::{auth::MakeAllowAllAuthenticator, EmptyContext}; @@ -288,11 +291,12 @@ pub async fn run() -> Result<()> { type InterestStore = SQLiteStore; type InterestInterest = FullInterests; -type ReconInterest = Server; +type ReconInterest = + Server, InterestInterest>; type ModelStore = SQLiteStore; type ModelInterest = ReconInterestProvider; -type ReconModel = Server; +type ReconModel = Server, ModelInterest>; struct Daemon { opts: DaemonOpts, @@ -428,14 +432,22 @@ impl Daemon { let sql_db_path: PathBuf = dir.join("db.sqlite3"); let sql_pool = sql::connect(&sql_db_path).await?; + // Create recon metrics + let recon_metrics = ceramic_metrics::MetricsHandle::register(recon::Metrics::register); + // Create recon store for interests. - let interest_store = InterestStore::new(sql_pool.clone(), "interest".to_string()).await?; + let interest_store = StoreMetricsMiddleware::new( + InterestStore::new(sql_pool.clone(), "interest".to_string()).await?, + recon_metrics.clone(), + ); // Create second recon store for models. - let model_store = ModelStore::new(sql_pool.clone(), "model".to_string()).await?; + let model_store = StoreMetricsMiddleware::new( + ModelStore::new(sql_pool.clone(), "model".to_string()).await?, + recon_metrics.clone(), + ); // Construct a recon implementation for interests. - let recon_metrics = ceramic_metrics::MetricsHandle::register(recon::Metrics::register); let mut recon_interest = Server::new(Recon::new( interest_store, InterestInterest::default(), diff --git a/recon/Cargo.toml b/recon/Cargo.toml index a8295bd9f..a90b64db8 100644 --- a/recon/Cargo.toml +++ b/recon/Cargo.toml @@ -28,6 +28,7 @@ sqlx.workspace = true tokio.workspace = true tracing.workspace = true void.workspace = true +futures.workspace = true [dev-dependencies] codespan-reporting = "0.11.1" diff --git a/recon/src/lib.rs b/recon/src/lib.rs index ab1bd448a..59ca594df 100644 --- a/recon/src/lib.rs +++ b/recon/src/lib.rs @@ -1,14 +1,16 @@ //! Recon is a network protocol for set reconciliation #![warn(missing_docs, missing_debug_implementations, clippy::all)] -pub use crate::recon::{ - btreestore::BTreeStore, sqlitestore::SQLiteStore, AssociativeHash, FullInterests, - InterestProvider, Key, Message, Recon, ReconInterestProvider, Response, Store, +pub use crate::{ + client::{Client, Server}, + metrics::Metrics, + recon::{ + btreestore::BTreeStore, sqlitestore::SQLiteStore, store_metrics::StoreMetricsMiddleware, + AssociativeHash, FullInterests, InterestProvider, Key, Message, Recon, + ReconInterestProvider, Response, Store, + }, + sha256a::Sha256a, }; -pub use client::{Client, Server}; - -pub use crate::metrics::Metrics; -pub use sha256a::Sha256a; mod client; pub mod libp2p; diff --git a/recon/src/metrics.rs b/recon/src/metrics.rs index 89b00ed53..baa6c9ba9 100644 --- a/recon/src/metrics.rs +++ b/recon/src/metrics.rs @@ -1,10 +1,33 @@ +use std::time::Duration; + use ceramic_metrics::{register, Recorder}; -use prometheus_client::{metrics::counter::Counter, registry::Registry}; +use prometheus_client::{ + encoding::EncodeLabelSet, + metrics::{ + counter::Counter, + family::Family, + histogram::{exponential_buckets, Histogram}, + }, + registry::Registry, +}; /// Metrics for Recon P2P events #[derive(Debug, Clone)] pub struct Metrics { key_insert_count: Counter, + + store_query_durations: Family, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +struct QueryLabels { + name: &'static str, +} + +impl From<&StoreQuery> for QueryLabels { + fn from(value: &StoreQuery) -> Self { + Self { name: value.name } + } } impl Metrics { @@ -19,7 +42,19 @@ impl Metrics { sub_registry ); - Self { key_insert_count } + register!( + store_query_durations, + "Durations of store queries in seconds", + Family::::new_with_constructor(|| { + Histogram::new(exponential_buckets(0.005, 2.0, 20)) + }), + sub_registry + ); + + Self { + key_insert_count, + store_query_durations, + } } } @@ -31,3 +66,18 @@ impl Recorder for Metrics { self.key_insert_count.inc(); } } + +#[derive(Debug)] +pub struct StoreQuery { + pub(crate) name: &'static str, + pub(crate) duration: Duration, +} + +impl Recorder for Metrics { + fn record(&self, event: &StoreQuery) { + let labels: QueryLabels = event.into(); + self.store_query_durations + .get_or_create(&labels) + .observe(event.duration.as_secs_f64()); + } +} diff --git a/recon/src/recon.rs b/recon/src/recon.rs index a1443b4c9..94c4e677f 100644 --- a/recon/src/recon.rs +++ b/recon/src/recon.rs @@ -1,5 +1,6 @@ pub mod btreestore; pub mod sqlitestore; +pub mod store_metrics; #[cfg(test)] pub mod tests; diff --git a/recon/src/recon/store_metrics.rs b/recon/src/recon/store_metrics.rs new file mode 100644 index 000000000..a2f4638e5 --- /dev/null +++ b/recon/src/recon/store_metrics.rs @@ -0,0 +1,166 @@ +use anyhow::Result; +use async_trait::async_trait; +use ceramic_metrics::Recorder; +use futures::Future; +use tokio::time::Instant; + +use crate::{metrics::StoreQuery, recon::HashCount, AssociativeHash, Key, Metrics, Store}; + +/// Implement the Store and record metrics +#[derive(Debug)] +pub struct StoreMetricsMiddleware { + store: S, + metrics: Metrics, +} + +impl StoreMetricsMiddleware { + /// Construct a new StoreMetricsMiddleware. + /// The metrics should have already be registered. + pub fn new(store: S, metrics: Metrics) -> Self { + Self { store, metrics } + } + // Record metrics for a given API endpoint + async fn record(metrics: Metrics, name: &'static str, fut: impl Future) -> T { + let start = Instant::now(); + let ret = fut.await; + let duration = start.elapsed(); + let event = StoreQuery { name, duration }; + metrics.record(&event); + ret + } +} + +#[async_trait] +impl Store for StoreMetricsMiddleware +where + S: Store + Send, + K: Key, + H: AssociativeHash, +{ + type Key = K; + type Hash = H; + + async fn insert(&mut self, key: &Self::Key) -> Result { + StoreMetricsMiddleware::::record(self.metrics.clone(), "insert", self.store.insert(key)) + .await + } + async fn insert_many<'a, I>(&mut self, keys: I) -> Result + where + I: Iterator + Send, + { + StoreMetricsMiddleware::::record( + self.metrics.clone(), + "insert_many", + self.store.insert_many(keys), + ) + .await + } + + async fn hash_range( + &mut self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + ) -> Result> { + StoreMetricsMiddleware::::record( + self.metrics.clone(), + "hash_range", + self.store.hash_range(left_fencepost, right_fencepost), + ) + .await + } + + async fn range( + &mut self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + offset: usize, + limit: usize, + ) -> Result + Send + 'static>> { + StoreMetricsMiddleware::::record( + self.metrics.clone(), + "range", + self.store + .range(left_fencepost, right_fencepost, offset, limit), + ) + .await + } + + async fn full_range(&mut self) -> Result + Send + 'static>> { + StoreMetricsMiddleware::::record( + self.metrics.clone(), + "full_range", + self.store.full_range(), + ) + .await + } + + async fn middle( + &mut self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + ) -> Result> { + StoreMetricsMiddleware::::record( + self.metrics.clone(), + "middle", + self.store.middle(left_fencepost, right_fencepost), + ) + .await + } + async fn count( + &mut self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + ) -> Result { + StoreMetricsMiddleware::::record( + self.metrics.clone(), + "count", + self.store.count(left_fencepost, right_fencepost), + ) + .await + } + async fn first( + &mut self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + ) -> Result> { + StoreMetricsMiddleware::::record( + self.metrics.clone(), + "first", + self.store.first(left_fencepost, right_fencepost), + ) + .await + } + async fn last( + &mut self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + ) -> Result> { + StoreMetricsMiddleware::::record( + self.metrics.clone(), + "last", + self.store.last(left_fencepost, right_fencepost), + ) + .await + } + + async fn first_and_last( + &mut self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + ) -> Result> { + StoreMetricsMiddleware::::record( + self.metrics.clone(), + "first_and_last", + self.store.first_and_last(left_fencepost, right_fencepost), + ) + .await + } + + async fn len(&mut self) -> Result { + StoreMetricsMiddleware::::record(self.metrics.clone(), "len", self.store.len()).await + } + async fn is_empty(&mut self) -> Result { + StoreMetricsMiddleware::::record(self.metrics.clone(), "is_empty", self.store.is_empty()) + .await + } +} diff --git a/recon/src/recon/tests.rs b/recon/src/recon/tests.rs index dc3655b6c..f4faaf86d 100644 --- a/recon/src/recon/tests.rs +++ b/recon/src/recon/tests.rs @@ -638,6 +638,11 @@ fn hello() { value: 0, phantom: PhantomData, }, + store_query_durations: Family { + metrics: RwLock { + data: {}, + }, + }, }, } "#]] @@ -770,6 +775,11 @@ fn test_parse_recon() { value: 0, phantom: PhantomData, }, + store_query_durations: Family { + metrics: RwLock { + data: {}, + }, + }, }, }, dog: Recon { @@ -860,6 +870,11 @@ fn test_parse_recon() { value: 0, phantom: PhantomData, }, + store_query_durations: Family { + metrics: RwLock { + data: {}, + }, + }, }, }, iterations: [ @@ -1086,6 +1101,11 @@ dog: [] value: 0, phantom: PhantomData, }, + store_query_durations: Family { + metrics: RwLock { + data: {}, + }, + }, }, }, dog: Recon { @@ -1109,6 +1129,11 @@ dog: [] value: 0, phantom: PhantomData, }, + store_query_durations: Family { + metrics: RwLock { + data: {}, + }, + }, }, }, iterations: [