From 2ff6de65a0bf44202c012d3cdd955e8496240621 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Thu, 21 Dec 2023 14:03:25 -0700 Subject: [PATCH 1/3] feat: add metrics to api and recon Add metrics to ceramic-api and recon crates. Additionally some small build fixes for tokio --- Cargo.lock | 6 +++ Makefile | 22 ++++---- api/Cargo.toml | 3 ++ api/src/lib.rs | 4 ++ api/src/metrics.rs | 85 +++++++++++++++++++++++++++++++ api/src/metrics/api.rs | 85 +++++++++++++++++++++++++++++++ beetle/iroh-rpc-client/Cargo.toml | 1 + one/Cargo.toml | 3 +- one/src/lib.rs | 13 ++++- recon/Cargo.toml | 2 + recon/src/lib.rs | 2 + recon/src/libp2p.rs | 5 -- recon/src/libp2p/protocol.rs | 10 ++-- recon/src/libp2p/tests.rs | 5 +- recon/src/metrics.rs | 33 ++++++++++++ recon/src/recon.rs | 19 ++++--- recon/src/recon/parser.lalrpop | 12 ++++- recon/src/recon/tests.rs | 48 +++++++++++++++-- 18 files changed, 322 insertions(+), 36 deletions(-) create mode 100644 api/src/metrics.rs create mode 100644 api/src/metrics/api.rs create mode 100644 recon/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 71659df31..ae3a06444 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1069,11 +1069,14 @@ dependencies = [ "ceramic-api-server", "ceramic-core", "ceramic-metadata", + "ceramic-metrics", "expect-test", "futures", "hyper", + "iroh-rpc-client", "mockall", "multibase 0.9.1", + "prometheus-client", "recon", "swagger", "tokio", @@ -3830,6 +3833,7 @@ dependencies = [ "anyhow", "async-stream", "bytes 1.5.0", + "ceramic-core", "cid 0.10.1", "futures", "iroh-rpc-types", @@ -6984,6 +6988,7 @@ dependencies = [ "async-trait", "asynchronous-codec 0.6.2", "ceramic-core", + "ceramic-metrics", "cid 0.10.1", "codespan-reporting", "expect-test", @@ -6995,6 +7000,7 @@ dependencies = [ "libp2p-swarm-test", "multihash 0.18.1", "pretty", + "prometheus-client", "quickcheck", "rand 0.8.5", "regex", diff --git a/Makefile b/Makefile index a1fb4fe45..df7679bbb 100644 --- a/Makefile +++ b/Makefile @@ -4,6 +4,8 @@ # Therefore may be useful in ensuring a change # is ready to pass CI checks. +CARGO = RUSTFLAGS='--cfg tokio_unstable' cargo + RELEASE_LEVEL ?= minor # ECS environment to deploy image to @@ -24,9 +26,9 @@ all: build check-fmt check-clippy test .PHONY: build build: # Build with default features - cargo build --locked --release + $(CARGO) build --locked --release # Build with all features - cargo build --locked --release --all-features + $(CARGO) build --locked --release --all-features # Generates api-server crate from ceramic.yaml OpenAPI spec .PHONY: gen-api-server @@ -50,7 +52,7 @@ check-kubo-rpc-server: .PHONY: release release: - RUSTFLAGS="-D warnings" cargo build -p ceramic-one --locked --release + RUSTFLAGS="-D warnings" $(CARGO) build -p ceramic-one --locked --release # Prepare a release PR. .PHONY: release-pr @@ -59,29 +61,29 @@ release-pr: .PHONY: debug debug: - cargo build -p ceramic-one --locked + $(CARGO) build -p ceramic-one --locked .PHONY: test test: # Test with default features - cargo test --locked --release + $(CARGO) test --locked --release # Test with all features - cargo test --locked --release --all-features + $(CARGO) test --locked --release --all-features .PHONY: check-fmt check-fmt: - cargo fmt --all -- --check + $(CARGO) fmt --all -- --check .PHONY: check-clippy check-clippy: # Check with default features - cargo clippy --workspace --locked --release -- -D warnings --no-deps + $(CARGO) clippy --workspace --locked --release -- -D warnings --no-deps # Check with all features - cargo clippy --workspace --locked --release --all-features -- -D warnings --no-deps + $(CARGO) clippy --workspace --locked --release --all-features -- -D warnings --no-deps .PHONY: run run: - RUST_LOG=ERROR,ceramic_kubo_rpc=DEBUG,ceramic_one=DEBUG cargo run --all-features --locked --release --bin ceramic-one -- daemon -b 127.0.0.1:5001 + RUST_LOG=ERROR,ceramic_kubo_rpc=DEBUG,ceramic_one=DEBUG $(CARGO) run --all-features --locked --release --bin ceramic-one -- daemon -b 127.0.0.1:5001 .PHONY: publish-docker publish-docker: diff --git a/api/Cargo.toml b/api/Cargo.toml index 5411f646e..83d8c64d1 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -14,6 +14,7 @@ async-trait.workspace = true ceramic-api-server.workspace = true ceramic-core.workspace = true ceramic-metadata.workspace = true +ceramic-metrics.workspace = true futures.workspace = true hyper.workspace = true multibase.workspace = true @@ -21,6 +22,8 @@ recon.workspace = true swagger.workspace = true tokio.workspace = true tracing.workspace = true +prometheus-client.workspace = true +iroh-rpc-client.workspace = true [dev-dependencies] expect-test.workspace = true diff --git a/api/src/lib.rs b/api/src/lib.rs index d42811d96..db271b9da 100644 --- a/api/src/lib.rs +++ b/api/src/lib.rs @@ -1,3 +1,7 @@ +mod metrics; mod server; +pub use metrics::api::MetricsMiddleware; +pub use metrics::Metrics; + pub use server::Server; diff --git a/api/src/metrics.rs b/api/src/metrics.rs new file mode 100644 index 000000000..67d1b35dd --- /dev/null +++ b/api/src/metrics.rs @@ -0,0 +1,85 @@ +pub mod api; + +use std::time::Duration; + +use ceramic_api_server::{API_VERSION, BASE_PATH}; +use ceramic_metrics::Recorder; +use prometheus_client::{ + encoding::EncodeLabelSet, + metrics::{ + counter::Counter, + family::Family, + histogram::{exponential_buckets, Histogram}, + info::Info, + }, + registry::Registry, +}; + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +struct RequestLabels { + path: &'static str, +} + +impl From<&Event> for RequestLabels { + fn from(value: &Event) -> Self { + Self { path: value.path } + } +} + +/// Metrics for Kubo RPC API +#[derive(Clone)] +pub struct Metrics { + requests: Family, + request_durations: Family, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +struct InfoLabels { + base_path: &'static str, + version: &'static str, +} + +impl Metrics { + /// Register and construct Metrics + pub fn register(registry: &mut Registry) -> Self { + let sub_registry = registry.sub_registry_with_prefix("ceramic_api"); + + let requests = Family::::default(); + sub_registry.register("requests", "Number of HTTP requests", requests.clone()); + + let request_durations = Family::::new_with_constructor(|| { + Histogram::new(exponential_buckets(0.005, 2.0, 20)) + }); + sub_registry.register( + "request_durations", + "Duration of HTTP requests", + request_durations.clone(), + ); + + let info: Info = Info::new(InfoLabels { + base_path: BASE_PATH, + version: API_VERSION, + }); + sub_registry.register("api", "Information about the Ceramic API", info); + + Self { + requests, + request_durations, + } + } +} + +pub struct Event { + pub(crate) path: &'static str, + pub(crate) duration: Duration, +} + +impl Recorder for Metrics { + fn record(&self, event: &Event) { + let labels: RequestLabels = event.into(); + self.requests.get_or_create(&labels).inc(); + self.request_durations + .get_or_create(&labels) + .observe(event.duration.as_secs_f64()); + } +} diff --git a/api/src/metrics/api.rs b/api/src/metrics/api.rs new file mode 100644 index 000000000..0b05616f0 --- /dev/null +++ b/api/src/metrics/api.rs @@ -0,0 +1,85 @@ +use async_trait::async_trait; +use ceramic_api_server::{ + models, Api, EventsPostResponse, LivenessGetResponse, SubscribeSortKeySortValueGetResponse, + VersionPostResponse, +}; +use ceramic_metrics::Recorder; +use futures::Future; +use swagger::ApiError; +use tokio::time::Instant; + +use crate::{metrics::Event, Metrics}; + +/// Implement the API and record metrics +#[derive(Clone)] +pub struct MetricsMiddleware { + api: A, + metrics: Metrics, +} + +impl MetricsMiddleware { + /// Construct a new MetricsMiddleware. + /// The metrics should have already be registered. + pub fn new(api: A, metrics: Metrics) -> Self { + Self { api, metrics } + } + // Record metrics for a given API endpoint + async fn record(&self, path: &'static str, fut: impl Future) -> T { + let start = Instant::now(); + let ret = fut.await; + let duration = start.elapsed(); + let event = Event { path, duration }; + self.metrics.record(&event); + ret + } +} + +#[async_trait] +impl Api for MetricsMiddleware +where + A: Api, + A: Clone + Send + Sync, + C: Send + Sync, +{ + /// Creates a new event + async fn events_post( + &self, + event: models::Event, + context: &C, + ) -> Result { + self.record("/events", self.api.events_post(event, context)) + .await + } + + /// Test the liveness of the Ceramic node + async fn liveness_get(&self, context: &C) -> Result { + self.record("/liveness", self.api.liveness_get(context)) + .await + } + + /// Get events for a stream + async fn subscribe_sort_key_sort_value_get( + &self, + sort_key: String, + sort_value: String, + controller: Option, + stream_id: Option, + offset: Option, + limit: Option, + context: &C, + ) -> Result { + self.record( + "/subscribe", + self.api.subscribe_sort_key_sort_value_get( + sort_key, sort_value, controller, stream_id, offset, limit, context, + ), + ) + .await + } + + /// Get the version of the Ceramic node + async fn version_post(&self, context: &C) -> Result { + self.record("/version", self.api.version_post(context)) + .await + } +} diff --git a/beetle/iroh-rpc-client/Cargo.toml b/beetle/iroh-rpc-client/Cargo.toml index d38b0d2c9..dc42df144 100644 --- a/beetle/iroh-rpc-client/Cargo.toml +++ b/beetle/iroh-rpc-client/Cargo.toml @@ -13,6 +13,7 @@ anyhow.workspace = true async-stream.workspace = true bytes.workspace = true cid.workspace = true +ceramic-core.workspace = true futures.workspace = true iroh-rpc-types.workspace = true iroh-util.workspace = true diff --git a/one/Cargo.toml b/one/Cargo.toml index 26812e1c9..51504a456 100644 --- a/one/Cargo.toml +++ b/one/Cargo.toml @@ -45,7 +45,7 @@ signal-hook = "0.3.17" signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] } sqlx.workspace = true swagger.workspace = true -tokio-metrics = "0.3.1" +tokio-metrics = { version = "0.3.1", features = ["rt"] } tokio-prometheus-client = "0.1" tokio-util.workspace = true tokio.workspace = true @@ -53,6 +53,7 @@ tracing-appender = "0.2.2" tracing-subscriber.workspace = true tracing.workspace = true + [features] default = [] tokio-console = ["ceramic-metrics/tokio-console"] diff --git a/one/src/lib.rs b/one/src/lib.rs index a996ff44c..301ea1025 100644 --- a/one/src/lib.rs +++ b/one/src/lib.rs @@ -448,14 +448,19 @@ impl Daemon { let model_store = ModelStore::new(sql_pool.clone(), "model".to_string()).await?; // Construct a recon implementation for interests. - let mut recon_interest = - Server::new(Recon::new(interest_store, InterestInterest::default())); + let recon_metrics = ceramic_metrics::MetricsHandle::register(recon::Metrics::register); + let mut recon_interest = Server::new(Recon::new( + interest_store, + InterestInterest::default(), + recon_metrics.clone(), + )); // Construct a recon implementation for models. let mut recon_model = Server::new(Recon::new( model_store, // Use recon interests as the InterestProvider for recon_model ModelInterest::new(peer_id, recon_interest.client()), + recon_metrics.clone(), )); let recons = if opts.recon { @@ -502,6 +507,10 @@ impl Daemon { self.recon_interest.client(), self.recon_model.client(), ); + let ceramic_metrics = + ceramic_metrics::MetricsHandle::register(ceramic_api::Metrics::register); + // Wrap server in metrics middleware + let ceramic_server = ceramic_api::MetricsMiddleware::new(ceramic_server, ceramic_metrics); let ceramic_service = ceramic_api_server::server::MakeService::new(ceramic_server); let ceramic_service = MakeAllowAllAuthenticator::new(ceramic_service, ""); let ceramic_service = diff --git a/recon/Cargo.toml b/recon/Cargo.toml index 1309d11c9..0025be278 100644 --- a/recon/Cargo.toml +++ b/recon/Cargo.toml @@ -13,11 +13,13 @@ anyhow.workspace = true async-trait.workspace = true asynchronous-codec = { version = "0.6.1", features = ["cbor", "json"] } ceramic-core.workspace = true +ceramic-metrics.workspace = true cid.workspace = true hex = "0.4.3" libp2p-identity.workspace = true libp2p.workspace = true multihash.workspace = true +prometheus-client.workspace = true rand = "0.8.5" serde.workspace = true serde_ipld_dagcbor.workspace = true diff --git a/recon/src/lib.rs b/recon/src/lib.rs index 027a320b4..ab1bd448a 100644 --- a/recon/src/lib.rs +++ b/recon/src/lib.rs @@ -7,9 +7,11 @@ pub use crate::recon::{ }; pub use client::{Client, Server}; +pub use crate::metrics::Metrics; pub use sha256a::Sha256a; mod client; pub mod libp2p; +mod metrics; mod recon; mod sha256a; diff --git a/recon/src/libp2p.rs b/recon/src/libp2p.rs index c84255b42..998842df6 100644 --- a/recon/src/libp2p.rs +++ b/recon/src/libp2p.rs @@ -76,11 +76,6 @@ pub trait Recon: Clone + Send + Sync + 'static { } } -// Implement the Recon trait using crate::recon::Recon -// -// NOTE: We use a std::sync::Mutex because we are not doing any async -// logic within Recon itself, all async logic exists outside its scope. -// We should use a tokio::sync::Mutex if we introduce any async logic into Recon. #[async_trait] impl Recon for Client where diff --git a/recon/src/libp2p/protocol.rs b/recon/src/libp2p/protocol.rs index c26046982..159383544 100644 --- a/recon/src/libp2p/protocol.rs +++ b/recon/src/libp2p/protocol.rs @@ -6,7 +6,7 @@ use libp2p::{ }; use libp2p_identity::PeerId; use serde::{Deserialize, Serialize}; -use tracing::{debug, trace}; +use tracing::{debug, info, trace}; use crate::{ libp2p::{stream_set::StreamSet, Recon}, @@ -27,7 +27,7 @@ pub async fn initiate_synchronize( recon: R, stream: S, ) -> Result { - debug!("start synchronize"); + info!("initiate_synchronize"); let codec = CborCodec::, Envelope>::new(); let mut framed = Framed::new(stream, codec); @@ -52,7 +52,7 @@ pub async fn initiate_synchronize( } framed.close().await?; debug!( - "finished synchronize, number of keys {}", + "finished initiate_synchronize, number of keys {}", recon.len().await? ); Ok(stream_set) @@ -68,7 +68,7 @@ pub async fn accept_synchronize( recon: R, stream: S, ) -> Result { - debug!("accept_synchronize_interests"); + info!("accept_synchronize"); let codec = CborCodec::, Envelope>::new(); let mut framed = Framed::new(stream, codec); @@ -89,7 +89,7 @@ pub async fn accept_synchronize( } framed.close().await?; debug!( - "finished synchronize, number of keys {}", + "finished accept_synchronize, number of keys {}", recon.len().await? ); Ok(stream_set) diff --git a/recon/src/libp2p/tests.rs b/recon/src/libp2p/tests.rs index 98f63c123..d9bebc276 100644 --- a/recon/src/libp2p/tests.rs +++ b/recon/src/libp2p/tests.rs @@ -27,6 +27,7 @@ use libp2p::{ swarm::{Swarm, SwarmEvent}, }; use libp2p_swarm_test::SwarmExt; +use prometheus_client::registry::Registry; use quickcheck::QuickCheck; use rand::{thread_rng, Rng}; use std::{num::NonZeroU8, time::Duration}; @@ -37,7 +38,7 @@ use tracing_test::traced_test; use crate::{ libp2p::{stream_set::StreamSet, Behaviour, Config, Event, PeerEvent, PeerStatus}, recon::{FullInterests, ReconInterestProvider}, - BTreeStore, Client, Recon, Server, Sha256a, + BTreeStore, Client, Metrics, Recon, Server, Sha256a, }; type InterestStore = BTreeStore; @@ -77,6 +78,7 @@ fn build_swarm(runtime: &Runtime, name: &str, config: Config) -> SwarmTest { .into(), ), FullInterests::default(), + Metrics::register(&mut Registry::default()), )); let mut model = Server::new(ReconModel::new( @@ -114,6 +116,7 @@ fn build_swarm(runtime: &Runtime, name: &str, config: Config) -> SwarmTest { .into(), ), ModelInterest::new(peer_id, interest.client()), + Metrics::register(&mut Registry::default()), )); let b = Behaviour::new(interest.client(), model.client(), config.clone()); runtime.spawn(interest.run()); diff --git a/recon/src/metrics.rs b/recon/src/metrics.rs new file mode 100644 index 000000000..89b00ed53 --- /dev/null +++ b/recon/src/metrics.rs @@ -0,0 +1,33 @@ +use ceramic_metrics::{register, Recorder}; +use prometheus_client::{metrics::counter::Counter, registry::Registry}; + +/// Metrics for Recon P2P events +#[derive(Debug, Clone)] +pub struct Metrics { + key_insert_count: Counter, +} + +impl Metrics { + /// Register and construct Metrics + pub fn register(registry: &mut Registry) -> Self { + let sub_registry = registry.sub_registry_with_prefix("recon"); + + register!( + key_insert_count, + "Number times a new key is inserted into the datastore", + Counter::default(), + sub_registry + ); + + Self { key_insert_count } + } +} + +#[derive(Debug)] +pub struct KeyInsertEvent; + +impl Recorder for Metrics { + fn record(&self, _event: &KeyInsertEvent) { + self.key_insert_count.inc(); + } +} diff --git a/recon/src/recon.rs b/recon/src/recon.rs index 2a42f865a..3594a913a 100644 --- a/recon/src/recon.rs +++ b/recon/src/recon.rs @@ -8,10 +8,11 @@ use std::{fmt::Display, marker::PhantomData}; use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; use ceramic_core::{EventId, Interest, PeerId, RangeOpen}; +use ceramic_metrics::Recorder; use serde::{Deserialize, Serialize}; use tracing::{debug, instrument, trace}; -use crate::{Client, Sha256a}; +use crate::{metrics::KeyInsertEvent, Client, Metrics, Sha256a}; /// Recon is a protocol for set reconciliation via a message passing paradigm. /// An initial message can be created and then messages are exchanged between two Recon instances @@ -34,6 +35,7 @@ where { interests: I, store: S, + metrics: Metrics, } impl Recon @@ -44,8 +46,12 @@ where I: InterestProvider, { /// Construct a new Recon instance. - pub fn new(store: S, interests: I) -> Self { - Self { store, interests } + pub fn new(store: S, interests: I, metrics: Metrics) -> Self { + Self { + store, + interests, + metrics, + } } /// Construct a message to send as the first message. @@ -192,6 +198,9 @@ where /// Returns true if the key did not previously exist. pub async fn insert(&mut self, key: &K) -> Result { let new_key = self.store.insert(key).await?; + if new_key { + self.metrics.record(&KeyInsertEvent); + } Ok(new_key) } @@ -278,7 +287,6 @@ pub trait Store: std::fmt::Debug { /// An exact middle is not necessary but performance will be better with a better approximation. /// /// The default implementation will count all elements and then find the middle. - #[instrument(skip(self), ret)] async fn middle( &mut self, left_fencepost: &Self::Key, @@ -295,7 +303,6 @@ pub trait Store: std::fmt::Debug { } } /// Return the number of keys within the range. - #[instrument(skip(self), ret)] async fn count( &mut self, left_fencepost: &Self::Key, @@ -307,7 +314,6 @@ pub trait Store: std::fmt::Debug { .count()) } /// Return the first key within the range. - #[instrument(skip(self), ret)] async fn first( &mut self, left_fencepost: &Self::Key, @@ -319,7 +325,6 @@ pub trait Store: std::fmt::Debug { .next()) } /// Return the last key within the range. - #[instrument(skip(self), ret)] async fn last( &mut self, left_fencepost: &Self::Key, diff --git a/recon/src/recon/parser.lalrpop b/recon/src/recon/parser.lalrpop index ed5c72207..d4fadf896 100644 --- a/recon/src/recon/parser.lalrpop +++ b/recon/src/recon/parser.lalrpop @@ -12,10 +12,18 @@ pub Record : Record = { }, }; Cat: ReconMemoryBytes = { - "cat:" => ReconMemoryBytes::new(BTreeStore::from_set(set), interests.unwrap_or_else(|| FixedInterests::full())), + "cat:" => ReconMemoryBytes::new( + BTreeStore::from_set(set), + interests.unwrap_or_else(|| FixedInterests::full()), + Metrics::register(&mut Registry::default()), + ), }; Dog: ReconMemoryBytes = { - "dog:" => ReconMemoryBytes::new(BTreeStore::from_set(set), interests.unwrap_or_else(|| FixedInterests::full())), + "dog:" => ReconMemoryBytes::new( + BTreeStore::from_set(set), + interests.unwrap_or_else(|| FixedInterests::full()), + Metrics::register(&mut Registry::default()), + ), }; Interests: FixedInterests = { diff --git a/recon/src/recon/tests.rs b/recon/src/recon/tests.rs index e74203568..ffaad1deb 100644 --- a/recon/src/recon/tests.rs +++ b/recon/src/recon/tests.rs @@ -6,6 +6,7 @@ lalrpop_util::lalrpop_mod!( use anyhow::Result; use async_trait::async_trait; use ceramic_core::{Bytes, RangeOpen}; +use prometheus_client::registry::Registry; use std::collections::BTreeSet; use std::fmt::Display; use tracing_test::traced_test; @@ -21,7 +22,7 @@ use pretty::{Arena, DocAllocator, DocBuilder, Pretty}; use crate::{ recon::{FullInterests, InterestProvider}, - AssociativeHash, BTreeStore, Key, Message, Recon, Sha256a, Store, + AssociativeHash, BTreeStore, Key, Message, Metrics, Recon, Sha256a, Store, }; type Set = BTreeSet; @@ -379,7 +380,11 @@ impl TryFrom<(Option, Vec)> for MessageData { #[tokio::test] async fn word_lists() { async fn recon_from_string(s: &str) -> ReconBytes { - let mut r = ReconBytes::new(BTreeStore::default(), FullInterests::default()); + let mut r = ReconBytes::new( + BTreeStore::default(), + FullInterests::default(), + Metrics::register(&mut Registry::default()), + ); for key in s.split([' ', '\n']).map(|s| s.to_string()) { if !s.is_empty() { r.insert(&key.as_bytes().into()).await.unwrap(); @@ -422,7 +427,11 @@ async fn word_lists() { } // We are using a FullInterest so we can assume there is only ever one message per exchange. - let mut local = ReconBytes::new(BTreeStore::default(), FullInterests::default()); + let mut local = ReconBytes::new( + BTreeStore::default(), + FullInterests::default(), + Metrics::register(&mut Registry::default()), + ); async fn sync(local: &mut ReconBytes, peers: &mut [ReconBytes]) { for j in 0..3 { for (i, peer) in peers.iter_mut().enumerate() { @@ -526,6 +535,7 @@ async fn response_is_synchronized() { Bytes::from("n"), ])), FullInterests::default(), + Metrics::register(&mut Registry::default()), ); let mut x = ReconMemoryBytes::new( BTreeStore::from_set(BTreeSet::from_iter([ @@ -535,6 +545,7 @@ async fn response_is_synchronized() { Bytes::from("n"), ])), FullInterests::default(), + Metrics::register(&mut Registry::default()), ); let response = x .process_messages(&a.initial_messages().await.unwrap()) @@ -561,6 +572,7 @@ fn hello() { Bytes::from("world"), ])), FullInterests::default(), + Metrics::register(&mut Registry::default()), ); expect![[r#" Recon { @@ -601,6 +613,12 @@ fn hello() { }, }, }, + metrics: Metrics { + key_insert_count: Counter { + value: 0, + phantom: PhantomData, + }, + }, } "#]] .assert_debug_eq(&other_hash) @@ -727,6 +745,12 @@ fn test_parse_recon() { }, }, }, + metrics: Metrics { + key_insert_count: Counter { + value: 0, + phantom: PhantomData, + }, + }, }, dog: Recon { interests: FixedInterests( @@ -811,6 +835,12 @@ fn test_parse_recon() { }, }, }, + metrics: Metrics { + key_insert_count: Counter { + value: 0, + phantom: PhantomData, + }, + }, }, iterations: [ Iteration { @@ -1031,6 +1061,12 @@ dog: [] }, }, }, + metrics: Metrics { + key_insert_count: Counter { + value: 0, + phantom: PhantomData, + }, + }, }, dog: Recon { interests: FixedInterests( @@ -1048,6 +1084,12 @@ dog: [] store: BTreeStore { keys: {}, }, + metrics: Metrics { + key_insert_count: Counter { + value: 0, + phantom: PhantomData, + }, + }, }, iterations: [ Iteration { From c70009187cd1e6fbad382e4789ef97437443d34b Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Mon, 8 Jan 2024 12:41:20 -0700 Subject: [PATCH 2/3] fix: update rustflags handling in makefile --- Makefile | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index df7679bbb..48acda448 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,8 @@ # Therefore may be useful in ensuring a change # is ready to pass CI checks. -CARGO = RUSTFLAGS='--cfg tokio_unstable' cargo +RUSTFLAGS = --cfg tokio_unstable +CARGO = RUSTFLAGS='${RUSTFLAGS}' cargo RELEASE_LEVEL ?= minor @@ -51,8 +52,9 @@ check-kubo-rpc-server: ./ci-scripts/check_kubo_rpc_server.sh .PHONY: release +release: RUSTFLAGS += -D warnings release: - RUSTFLAGS="-D warnings" $(CARGO) build -p ceramic-one --locked --release + $(CARGO) build -p ceramic-one --locked --release # Prepare a release PR. .PHONY: release-pr From 5870ee28d85e68c5d5e66631fdde189f93102431 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Tue, 9 Jan 2024 08:26:42 -0700 Subject: [PATCH 3/3] fix: update new clippy warnings --- kubo-rpc/src/http.rs | 2 +- recon/src/recon/sqlitestore.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/kubo-rpc/src/http.rs b/kubo-rpc/src/http.rs index b48b63566..e4fd35608 100644 --- a/kubo-rpc/src/http.rs +++ b/kubo-rpc/src/http.rs @@ -547,7 +547,7 @@ where .into_iter() .map(|(k, v)| SwarmPeersPost200ResponsePeersInner { addr: v - .get(0) + .first() .map(|a| a.to_string()) .unwrap_or_else(|| "".to_string()), peer: k.to_string(), diff --git a/recon/src/recon/sqlitestore.rs b/recon/src/recon/sqlitestore.rs index 20b04fbd2..56e57da20 100644 --- a/recon/src/recon/sqlitestore.rs +++ b/recon/src/recon/sqlitestore.rs @@ -253,7 +253,7 @@ where .bind(right_fencepost.as_bytes()) .fetch_all(&self.pool) .await?; - Ok(rows.get(0).map(|row| { + Ok(rows.first().map(|row| { let bytes: Vec = row.get(0); K::from(bytes) })) @@ -286,7 +286,7 @@ where .bind(right_fencepost.as_bytes()) .fetch_all(&self.pool) .await?; - Ok(rows.get(0).map(|row| { + Ok(rows.first().map(|row| { let bytes: Vec = row.get(0); K::from(bytes) })) @@ -332,7 +332,7 @@ where .bind(right_fencepost.as_bytes()) .fetch_all(&self.pool) .await?; - if let Some(row) = rows.get(0) { + if let Some(row) = rows.first() { let first = K::from(row.get(0)); let last = K::from(row.get(1)); Ok(Some((first, last)))