Skip to content

Commit

Permalink
feat: add recon store metrics (#221)
Browse files Browse the repository at this point in the history
* feat: add recon store metrics

* fix: fix clippy issues
  • Loading branch information
nathanielc authored Jan 22, 2024
1 parent 9098c3f commit 4a55ed9
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 15 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 18 additions & 6 deletions one/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use clap::{Args, Parser, Subcommand, ValueEnum};
use futures::StreamExt;
use multibase::Base;
use multihash::{Code, Hasher, Multihash, MultihashDigest};
use recon::{FullInterests, Recon, ReconInterestProvider, SQLiteStore, Server, Sha256a};
use recon::{
FullInterests, Recon, ReconInterestProvider, SQLiteStore, Server, Sha256a,
StoreMetricsMiddleware,
};
use signal_hook::consts::signal::*;
use signal_hook_tokio::Signals;
use swagger::{auth::MakeAllowAllAuthenticator, EmptyContext};
Expand Down Expand Up @@ -287,11 +290,12 @@ pub async fn run() -> Result<()> {

type InterestStore = SQLiteStore<Interest, Sha256a>;
type InterestInterest = FullInterests<Interest>;
type ReconInterest = Server<Interest, Sha256a, InterestStore, InterestInterest>;
type ReconInterest =
Server<Interest, Sha256a, StoreMetricsMiddleware<InterestStore>, InterestInterest>;

type ModelStore = SQLiteStore<EventId, Sha256a>;
type ModelInterest = ReconInterestProvider<Sha256a>;
type ReconModel = Server<EventId, Sha256a, ModelStore, ModelInterest>;
type ReconModel = Server<EventId, Sha256a, StoreMetricsMiddleware<ModelStore>, ModelInterest>;

struct Daemon {
opts: DaemonOpts,
Expand Down Expand Up @@ -427,14 +431,22 @@ impl Daemon {
let sql_db_path: PathBuf = dir.join("db.sqlite3");
let sql_pool = SqlitePool::connect(&sql_db_path).await?;

// Create recon metrics
let recon_metrics = ceramic_metrics::MetricsHandle::register(recon::Metrics::register);

// Create recon store for interests.
let interest_store = InterestStore::new(sql_pool.clone(), "interest".to_string()).await?;
let interest_store = StoreMetricsMiddleware::new(
InterestStore::new(sql_pool.clone(), "interest".to_string()).await?,
recon_metrics.clone(),
);

// Create second recon store for models.
let model_store = ModelStore::new(sql_pool.clone(), "model".to_string()).await?;
let model_store = StoreMetricsMiddleware::new(
ModelStore::new(sql_pool.clone(), "model".to_string()).await?,
recon_metrics.clone(),
);

// Construct a recon implementation for interests.
let recon_metrics = ceramic_metrics::MetricsHandle::register(recon::Metrics::register);
let mut recon_interest = Server::new(Recon::new(
interest_store,
InterestInterest::default(),
Expand Down
1 change: 1 addition & 0 deletions recon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ sqlx.workspace = true
tokio.workspace = true
tracing.workspace = true
void.workspace = true
futures.workspace = true

[dev-dependencies]
codespan-reporting = "0.11.1"
Expand Down
16 changes: 9 additions & 7 deletions recon/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
//! Recon is a network protocol for set reconciliation
#![warn(missing_docs, missing_debug_implementations, clippy::all)]

pub use crate::recon::{
btreestore::BTreeStore, sqlitestore::SQLiteStore, AssociativeHash, FullInterests,
InterestProvider, Key, Message, Recon, ReconInterestProvider, Response, Store,
pub use crate::{
client::{Client, Server},
metrics::Metrics,
recon::{
btreestore::BTreeStore, sqlitestore::SQLiteStore, store_metrics::StoreMetricsMiddleware,
AssociativeHash, FullInterests, InterestProvider, Key, Message, Recon,
ReconInterestProvider, Response, Store,
},
sha256a::Sha256a,
};
pub use client::{Client, Server};

pub use crate::metrics::Metrics;
pub use sha256a::Sha256a;

mod client;
pub mod libp2p;
Expand Down
54 changes: 52 additions & 2 deletions recon/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,33 @@
use std::time::Duration;

use ceramic_metrics::{register, Recorder};
use prometheus_client::{metrics::counter::Counter, registry::Registry};
use prometheus_client::{
encoding::EncodeLabelSet,
metrics::{
counter::Counter,
family::Family,
histogram::{exponential_buckets, Histogram},
},
registry::Registry,
};

/// Metrics for Recon P2P events
#[derive(Debug, Clone)]
pub struct Metrics {
key_insert_count: Counter,

store_query_durations: Family<QueryLabels, Histogram>,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct QueryLabels {
name: &'static str,
}

impl From<&StoreQuery> for QueryLabels {
fn from(value: &StoreQuery) -> Self {
Self { name: value.name }
}
}

impl Metrics {
Expand All @@ -19,7 +42,19 @@ impl Metrics {
sub_registry
);

Self { key_insert_count }
register!(
store_query_durations,
"Durations of store queries in seconds",
Family::<QueryLabels, Histogram>::new_with_constructor(|| {
Histogram::new(exponential_buckets(0.005, 2.0, 20))
}),
sub_registry
);

Self {
key_insert_count,
store_query_durations,
}
}
}

Expand All @@ -31,3 +66,18 @@ impl Recorder<KeyInsertEvent> for Metrics {
self.key_insert_count.inc();
}
}

#[derive(Debug)]
pub struct StoreQuery {
pub(crate) name: &'static str,
pub(crate) duration: Duration,
}

impl Recorder<StoreQuery> for Metrics {
fn record(&self, event: &StoreQuery) {
let labels: QueryLabels = event.into();
self.store_query_durations
.get_or_create(&labels)
.observe(event.duration.as_secs_f64());
}
}
1 change: 1 addition & 0 deletions recon/src/recon.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod btreestore;
pub mod sqlitestore;
pub mod store_metrics;
#[cfg(test)]
pub mod tests;

Expand Down
166 changes: 166 additions & 0 deletions recon/src/recon/store_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
use anyhow::Result;
use async_trait::async_trait;
use ceramic_metrics::Recorder;
use futures::Future;
use tokio::time::Instant;

use crate::{metrics::StoreQuery, recon::HashCount, AssociativeHash, Key, Metrics, Store};

/// Implement the Store and record metrics
#[derive(Debug)]
pub struct StoreMetricsMiddleware<S> {
store: S,
metrics: Metrics,
}

impl<S> StoreMetricsMiddleware<S> {
/// Construct a new StoreMetricsMiddleware.
/// The metrics should have already be registered.
pub fn new(store: S, metrics: Metrics) -> Self {
Self { store, metrics }
}
// Record metrics for a given API endpoint
async fn record<T>(metrics: Metrics, name: &'static str, fut: impl Future<Output = T>) -> T {
let start = Instant::now();
let ret = fut.await;
let duration = start.elapsed();
let event = StoreQuery { name, duration };
metrics.record(&event);
ret
}
}

#[async_trait]
impl<S, K, H> Store for StoreMetricsMiddleware<S>
where
S: Store<Key = K, Hash = H> + Send,
K: Key,
H: AssociativeHash,
{
type Key = K;
type Hash = H;

async fn insert(&mut self, key: &Self::Key) -> Result<bool> {
StoreMetricsMiddleware::<S>::record(self.metrics.clone(), "insert", self.store.insert(key))
.await
}
async fn insert_many<'a, I>(&mut self, keys: I) -> Result<bool>
where
I: Iterator<Item = &'a Self::Key> + Send,
{
StoreMetricsMiddleware::<S>::record(
self.metrics.clone(),
"insert_many",
self.store.insert_many(keys),
)
.await
}

async fn hash_range(
&mut self,
left_fencepost: &Self::Key,
right_fencepost: &Self::Key,
) -> Result<HashCount<Self::Hash>> {
StoreMetricsMiddleware::<S>::record(
self.metrics.clone(),
"hash_range",
self.store.hash_range(left_fencepost, right_fencepost),
)
.await
}

async fn range(
&mut self,
left_fencepost: &Self::Key,
right_fencepost: &Self::Key,
offset: usize,
limit: usize,
) -> Result<Box<dyn Iterator<Item = Self::Key> + Send + 'static>> {
StoreMetricsMiddleware::<S>::record(
self.metrics.clone(),
"range",
self.store
.range(left_fencepost, right_fencepost, offset, limit),
)
.await
}

async fn full_range(&mut self) -> Result<Box<dyn Iterator<Item = Self::Key> + Send + 'static>> {
StoreMetricsMiddleware::<S>::record(
self.metrics.clone(),
"full_range",
self.store.full_range(),
)
.await
}

async fn middle(
&mut self,
left_fencepost: &Self::Key,
right_fencepost: &Self::Key,
) -> Result<Option<Self::Key>> {
StoreMetricsMiddleware::<S>::record(
self.metrics.clone(),
"middle",
self.store.middle(left_fencepost, right_fencepost),
)
.await
}
async fn count(
&mut self,
left_fencepost: &Self::Key,
right_fencepost: &Self::Key,
) -> Result<usize> {
StoreMetricsMiddleware::<S>::record(
self.metrics.clone(),
"count",
self.store.count(left_fencepost, right_fencepost),
)
.await
}
async fn first(
&mut self,
left_fencepost: &Self::Key,
right_fencepost: &Self::Key,
) -> Result<Option<Self::Key>> {
StoreMetricsMiddleware::<S>::record(
self.metrics.clone(),
"first",
self.store.first(left_fencepost, right_fencepost),
)
.await
}
async fn last(
&mut self,
left_fencepost: &Self::Key,
right_fencepost: &Self::Key,
) -> Result<Option<Self::Key>> {
StoreMetricsMiddleware::<S>::record(
self.metrics.clone(),
"last",
self.store.last(left_fencepost, right_fencepost),
)
.await
}

async fn first_and_last(
&mut self,
left_fencepost: &Self::Key,
right_fencepost: &Self::Key,
) -> Result<Option<(Self::Key, Self::Key)>> {
StoreMetricsMiddleware::<S>::record(
self.metrics.clone(),
"first_and_last",
self.store.first_and_last(left_fencepost, right_fencepost),
)
.await
}

async fn len(&mut self) -> Result<usize> {
StoreMetricsMiddleware::<S>::record(self.metrics.clone(), "len", self.store.len()).await
}
async fn is_empty(&mut self) -> Result<bool> {
StoreMetricsMiddleware::<S>::record(self.metrics.clone(), "is_empty", self.store.is_empty())
.await
}
}
Loading

0 comments on commit 4a55ed9

Please sign in to comment.