Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add metrics to api and recon #208

Merged
merged 3 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 14 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
# Therefore may be useful in ensuring a change
# is ready to pass CI checks.

RUSTFLAGS = --cfg tokio_unstable
CARGO = RUSTFLAGS='${RUSTFLAGS}' cargo

RELEASE_LEVEL ?= minor

# ECS environment to deploy image to
Expand All @@ -24,9 +27,9 @@ all: build check-fmt check-clippy test
.PHONY: build
build:
# Build with default features
cargo build --locked --release
$(CARGO) build --locked --release
# Build with all features
cargo build --locked --release --all-features
$(CARGO) build --locked --release --all-features

# Generates api-server crate from ceramic.yaml OpenAPI spec
.PHONY: gen-api-server
Expand All @@ -49,8 +52,9 @@ check-kubo-rpc-server:
./ci-scripts/check_kubo_rpc_server.sh

.PHONY: release
release: RUSTFLAGS += -D warnings
release:
RUSTFLAGS="-D warnings" cargo build -p ceramic-one --locked --release
$(CARGO) build -p ceramic-one --locked --release

# Prepare a release PR.
.PHONY: release-pr
Expand All @@ -59,29 +63,29 @@ release-pr:

.PHONY: debug
debug:
cargo build -p ceramic-one --locked
$(CARGO) build -p ceramic-one --locked

.PHONY: test
test:
# Test with default features
cargo test --locked --release
$(CARGO) test --locked --release
# Test with all features
cargo test --locked --release --all-features
$(CARGO) test --locked --release --all-features

.PHONY: check-fmt
check-fmt:
cargo fmt --all -- --check
$(CARGO) fmt --all -- --check

.PHONY: check-clippy
check-clippy:
# Check with default features
cargo clippy --workspace --locked --release -- -D warnings --no-deps
$(CARGO) clippy --workspace --locked --release -- -D warnings --no-deps
# Check with all features
cargo clippy --workspace --locked --release --all-features -- -D warnings --no-deps
$(CARGO) clippy --workspace --locked --release --all-features -- -D warnings --no-deps

.PHONY: run
run:
RUST_LOG=ERROR,ceramic_kubo_rpc=DEBUG,ceramic_one=DEBUG cargo run --all-features --locked --release --bin ceramic-one -- daemon -b 127.0.0.1:5001
RUST_LOG=ERROR,ceramic_kubo_rpc=DEBUG,ceramic_one=DEBUG $(CARGO) run --all-features --locked --release --bin ceramic-one -- daemon -b 127.0.0.1:5001

.PHONY: publish-docker
publish-docker:
Expand Down
3 changes: 3 additions & 0 deletions api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ async-trait.workspace = true
ceramic-api-server.workspace = true
ceramic-core.workspace = true
ceramic-metadata.workspace = true
ceramic-metrics.workspace = true
futures.workspace = true
hyper.workspace = true
multibase.workspace = true
recon.workspace = true
swagger.workspace = true
tokio.workspace = true
tracing.workspace = true
prometheus-client.workspace = true
iroh-rpc-client.workspace = true

[dev-dependencies]
expect-test.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
mod metrics;
mod server;

pub use metrics::api::MetricsMiddleware;
pub use metrics::Metrics;

pub use server::Server;
85 changes: 85 additions & 0 deletions api/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
pub mod api;

use std::time::Duration;

use ceramic_api_server::{API_VERSION, BASE_PATH};
use ceramic_metrics::Recorder;
use prometheus_client::{
encoding::EncodeLabelSet,
metrics::{
counter::Counter,
family::Family,
histogram::{exponential_buckets, Histogram},
info::Info,
},
registry::Registry,
};

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct RequestLabels {
path: &'static str,
}

impl From<&Event> for RequestLabels {
fn from(value: &Event) -> Self {
Self { path: value.path }
}
}

/// Metrics for Kubo RPC API
#[derive(Clone)]
pub struct Metrics {
requests: Family<RequestLabels, Counter>,
request_durations: Family<RequestLabels, Histogram>,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct InfoLabels {
base_path: &'static str,
version: &'static str,
}

impl Metrics {
/// Register and construct Metrics
pub fn register(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("ceramic_api");

let requests = Family::<RequestLabels, Counter>::default();
sub_registry.register("requests", "Number of HTTP requests", requests.clone());

let request_durations = Family::<RequestLabels, Histogram>::new_with_constructor(|| {
Histogram::new(exponential_buckets(0.005, 2.0, 20))
});
sub_registry.register(
"request_durations",
"Duration of HTTP requests",
request_durations.clone(),
);

let info: Info<InfoLabels> = Info::new(InfoLabels {
base_path: BASE_PATH,
version: API_VERSION,
});
sub_registry.register("api", "Information about the Ceramic API", info);

Self {
requests,
request_durations,
}
}
}

pub struct Event {
pub(crate) path: &'static str,
pub(crate) duration: Duration,
}

impl Recorder<Event> for Metrics {
fn record(&self, event: &Event) {
let labels: RequestLabels = event.into();
self.requests.get_or_create(&labels).inc();
self.request_durations
.get_or_create(&labels)
.observe(event.duration.as_secs_f64());
}
}
85 changes: 85 additions & 0 deletions api/src/metrics/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use async_trait::async_trait;
use ceramic_api_server::{
models, Api, EventsPostResponse, LivenessGetResponse, SubscribeSortKeySortValueGetResponse,
VersionPostResponse,
};
use ceramic_metrics::Recorder;
use futures::Future;
use swagger::ApiError;
use tokio::time::Instant;

use crate::{metrics::Event, Metrics};

/// Implement the API and record metrics
#[derive(Clone)]
pub struct MetricsMiddleware<A: Clone> {
api: A,
metrics: Metrics,
}

impl<A: Clone> MetricsMiddleware<A> {
/// Construct a new MetricsMiddleware.
/// The metrics should have already be registered.
pub fn new(api: A, metrics: Metrics) -> Self {
Self { api, metrics }
}
// Record metrics for a given API endpoint
async fn record<T>(&self, path: &'static str, fut: impl Future<Output = T>) -> T {
let start = Instant::now();
let ret = fut.await;
let duration = start.elapsed();
let event = Event { path, duration };
self.metrics.record(&event);
ret
}
}

#[async_trait]
impl<A, C> Api<C> for MetricsMiddleware<A>
where
A: Api<C>,
A: Clone + Send + Sync,
C: Send + Sync,
{
/// Creates a new event
async fn events_post(
&self,
event: models::Event,
context: &C,
) -> Result<EventsPostResponse, ApiError> {
self.record("/events", self.api.events_post(event, context))
.await
}

/// Test the liveness of the Ceramic node
async fn liveness_get(&self, context: &C) -> Result<LivenessGetResponse, ApiError> {
self.record("/liveness", self.api.liveness_get(context))
.await
}

/// Get events for a stream
async fn subscribe_sort_key_sort_value_get(
&self,
sort_key: String,
sort_value: String,
controller: Option<String>,
stream_id: Option<String>,
offset: Option<f64>,
limit: Option<f64>,
context: &C,
) -> Result<SubscribeSortKeySortValueGetResponse, ApiError> {
self.record(
"/subscribe",
self.api.subscribe_sort_key_sort_value_get(
sort_key, sort_value, controller, stream_id, offset, limit, context,
),
)
.await
}

/// Get the version of the Ceramic node
async fn version_post(&self, context: &C) -> Result<VersionPostResponse, ApiError> {
self.record("/version", self.api.version_post(context))
.await
}
}
1 change: 1 addition & 0 deletions beetle/iroh-rpc-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ anyhow.workspace = true
async-stream.workspace = true
bytes.workspace = true
cid.workspace = true
ceramic-core.workspace = true
futures.workspace = true
iroh-rpc-types.workspace = true
iroh-util.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion kubo-rpc/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ where
.into_iter()
.map(|(k, v)| SwarmPeersPost200ResponsePeersInner {
addr: v
.get(0)
.first()
.map(|a| a.to_string())
.unwrap_or_else(|| "".to_string()),
peer: k.to_string(),
Expand Down
3 changes: 2 additions & 1 deletion one/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@ signal-hook = "0.3.17"
signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] }
sqlx.workspace = true
swagger.workspace = true
tokio-metrics = "0.3.1"
tokio-metrics = { version = "0.3.1", features = ["rt"] }
tokio-prometheus-client = "0.1"
tokio-util.workspace = true
tokio.workspace = true
tracing-appender = "0.2.2"
tracing-subscriber.workspace = true
tracing.workspace = true


[features]
default = []
tokio-console = ["ceramic-metrics/tokio-console"]
Expand Down
13 changes: 11 additions & 2 deletions one/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,14 +448,19 @@ impl Daemon {
let model_store = ModelStore::new(sql_pool.clone(), "model".to_string()).await?;

// Construct a recon implementation for interests.
let mut recon_interest =
Server::new(Recon::new(interest_store, InterestInterest::default()));
let recon_metrics = ceramic_metrics::MetricsHandle::register(recon::Metrics::register);
let mut recon_interest = Server::new(Recon::new(
interest_store,
InterestInterest::default(),
recon_metrics.clone(),
));

// Construct a recon implementation for models.
let mut recon_model = Server::new(Recon::new(
model_store,
// Use recon interests as the InterestProvider for recon_model
ModelInterest::new(peer_id, recon_interest.client()),
recon_metrics.clone(),
));

let recons = if opts.recon {
Expand Down Expand Up @@ -502,6 +507,10 @@ impl Daemon {
self.recon_interest.client(),
self.recon_model.client(),
);
let ceramic_metrics =
ceramic_metrics::MetricsHandle::register(ceramic_api::Metrics::register);
// Wrap server in metrics middleware
let ceramic_server = ceramic_api::MetricsMiddleware::new(ceramic_server, ceramic_metrics);
let ceramic_service = ceramic_api_server::server::MakeService::new(ceramic_server);
let ceramic_service = MakeAllowAllAuthenticator::new(ceramic_service, "");
let ceramic_service =
Expand Down
2 changes: 2 additions & 0 deletions recon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ anyhow.workspace = true
async-trait.workspace = true
asynchronous-codec = { version = "0.6.1", features = ["cbor", "json"] }
ceramic-core.workspace = true
ceramic-metrics.workspace = true
cid.workspace = true
hex = "0.4.3"
libp2p-identity.workspace = true
libp2p.workspace = true
multihash.workspace = true
prometheus-client.workspace = true
rand = "0.8.5"
serde.workspace = true
serde_ipld_dagcbor.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions recon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ pub use crate::recon::{
};
pub use client::{Client, Server};

pub use crate::metrics::Metrics;
pub use sha256a::Sha256a;

mod client;
pub mod libp2p;
mod metrics;
mod recon;
mod sha256a;
Loading