Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc server: add prometheus label is_rate_limited #3504

Merged
merged 8 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions prdoc/pr_3504.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: add prometheus label "is_rate_limited" to rpc calls

doc:
- audience: Node Operator
description: |
This PR adds a label "is_rate_limited" to the prometheus metrics "substrate_rpc_calls_time" and "substrate_rpc_calls_finished"
than can be used to distinguish rate-limited RPC calls from other RPC calls. Because rate-limited RPC calls may take
tens of seconds.

crates: [ ]
1 change: 0 additions & 1 deletion substrate/client/rpc-servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,4 @@ tower = { version = "0.4.13", features = ["util"] }
http = "0.2.8"
hyper = "0.14.27"
futures = "0.3.29"
pin-project = "1.1.3"
governor = "0.6.0"
25 changes: 17 additions & 8 deletions substrate/client/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub use jsonrpsee::{
},
server::{middleware::rpc::RpcServiceBuilder, BatchRequestConfig},
};
pub use middleware::{MetricsLayer, RateLimitLayer, RpcMetrics};
pub use middleware::{Metrics, MiddlewareLayer, RpcMetrics};

const MEGABYTE: u32 = 1024 * 1024;

Expand Down Expand Up @@ -173,13 +173,22 @@ where
let is_websocket = ws::is_upgrade_request(&req);
let transport_label = if is_websocket { "ws" } else { "http" };

let metrics = metrics.map(|m| MetricsLayer::new(m, transport_label));
let rate_limit = rate_limit.map(|r| RateLimitLayer::per_minute(r));
let middleware_layer = match (metrics, rate_limit) {
(None, None) => None,
(Some(metrics), None) => Some(
MiddlewareLayer::new().with_metrics(Metrics::new(metrics, transport_label)),
),
(None, Some(rate_limit)) =>
Some(MiddlewareLayer::new().with_rate_limit_per_minute(rate_limit)),
(Some(metrics), Some(rate_limit)) => Some(
MiddlewareLayer::new()
.with_metrics(Metrics::new(metrics, transport_label))
.with_rate_limit_per_minute(rate_limit),
),
};
Comment on lines +176 to +188
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be possible to do something like this instead to simplify a bit?:

let mut middleware_layer = MiddlewareLayer::new();

if let Some(metrics) = metrics {
    middleware_layer = middleware_layer.with_metrics(Metrics::new(metrics, transport_label));
}
if let Some(rate_limit) = rate_limit {
    middleware_layer = middleware_layer.with_rate_limit_per_minute(rate_limit);
}

Copy link
Member Author

@niklasad1 niklasad1 Feb 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible but it doesn't make sense to enable middleware if it's empty that's why I did it like that i.e, to box the future when it's a no-op middleware.


// NOTE: The metrics needs to run first to include rate-limited calls in the
// metrics.
let rpc_middleware =
RpcServiceBuilder::new().option_layer(metrics.clone()).option_layer(rate_limit);
RpcServiceBuilder::new().option_layer(middleware_layer.clone());

let mut svc =
service_builder.set_rpc_middleware(rpc_middleware).build(methods, stop_handle);
Expand All @@ -191,9 +200,9 @@ where
// Spawn a task to handle when the connection is closed.
tokio_handle.spawn(async move {
let now = std::time::Instant::now();
metrics.as_ref().map(|m| m.ws_connect());
middleware_layer.as_ref().map(|m| m.ws_connect());
on_disconnect.await;
metrics.as_ref().map(|m| m.ws_disconnect(now));
middleware_layer.as_ref().map(|m| m.ws_disconnect(now));
});
}

Expand Down
196 changes: 71 additions & 125 deletions substrate/client/rpc-servers/src/middleware/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,9 @@

//! RPC middleware to collect prometheus metrics on RPC calls.

use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Instant,
};
use std::time::Instant;

use jsonrpsee::{server::middleware::rpc::RpcServiceT, types::Request, MethodResponse};
use pin_project::pin_project;
use jsonrpsee::{types::Request, MethodResponse};
use prometheus_endpoint::{
register, Counter, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry,
U64,
Expand Down Expand Up @@ -77,7 +71,7 @@ impl RpcMetrics {
"Total time [μs] of processed RPC calls",
)
.buckets(HISTOGRAM_BUCKETS.to_vec()),
&["protocol", "method"],
&["protocol", "method", "is_rate_limited"],
)?,
metrics_registry,
)?,
Expand All @@ -97,7 +91,7 @@ impl RpcMetrics {
"substrate_rpc_calls_finished",
"Number of processed RPC calls (unique un-batched requests)",
),
&["protocol", "method", "is_error"],
&["protocol", "method", "is_error", "is_rate_limited"],
)?,
metrics_registry,
)?,
Expand Down Expand Up @@ -144,138 +138,90 @@ impl RpcMetrics {
self.ws_sessions_closed.as_ref().map(|counter| counter.inc());
self.ws_sessions_time.with_label_values(&["ws"]).observe(micros as _);
}
}

/// Metrics layer.
#[derive(Clone)]
pub struct MetricsLayer {
inner: RpcMetrics,
transport_label: &'static str,
}

impl MetricsLayer {
/// Create a new [`MetricsLayer`].
pub fn new(metrics: RpcMetrics, transport_label: &'static str) -> Self {
Self { inner: metrics, transport_label }
}

pub(crate) fn ws_connect(&self) {
self.inner.ws_connect();
}

pub(crate) fn ws_disconnect(&self, now: Instant) {
self.inner.ws_disconnect(now)
}
}

impl<S> tower::Layer<S> for MetricsLayer {
type Service = Metrics<S>;

fn layer(&self, inner: S) -> Self::Service {
Metrics::new(inner, self.inner.clone(), self.transport_label)
}
}

/// Metrics middleware.
#[derive(Clone)]
pub struct Metrics<S> {
service: S,
metrics: RpcMetrics,
transport_label: &'static str,
}

impl<S> Metrics<S> {
/// Create a new metrics middleware.
pub fn new(service: S, metrics: RpcMetrics, transport_label: &'static str) -> Metrics<S> {
Metrics { service, metrics, transport_label }
}
}

impl<'a, S> RpcServiceT<'a> for Metrics<S>
where
S: Send + Sync + RpcServiceT<'a>,
{
type Future = ResponseFuture<'a, S::Future>;

fn call(&self, req: Request<'a>) -> Self::Future {
let now = Instant::now();

pub(crate) fn on_call(&self, req: &Request, transport_label: &'static str) {
log::trace!(
target: "rpc_metrics",
"[{}] on_call name={} params={:?}",
self.transport_label,
"[{transport_label}] on_call name={} params={:?}",
req.method_name(),
req.params(),
);
self.metrics
.calls_started
.with_label_values(&[self.transport_label, req.method_name()])

self.calls_started
.with_label_values(&[transport_label, req.method_name()])
.inc();
}

ResponseFuture {
fut: self.service.call(req.clone()),
metrics: self.metrics.clone(),
req,
now,
transport_label: self.transport_label,
}
pub(crate) fn on_response(
&self,
req: &Request,
rp: &MethodResponse,
is_rate_limited: bool,
transport_label: &'static str,
now: Instant,
) {
log::trace!(target: "rpc_metrics", "[{transport_label}] on_response started_at={:?}", now);
log::trace!(target: "rpc_metrics::extra", "[{transport_label}] result={}", rp.as_result());

let micros = now.elapsed().as_micros();
log::debug!(
target: "rpc_metrics",
"[{transport_label}] {} call took {} μs",
req.method_name(),
micros,
);
self.calls_time
.with_label_values(&[
transport_label,
req.method_name(),
if is_rate_limited { "true" } else { "false" },
])
.observe(micros as _);
self.calls_finished
.with_label_values(&[
transport_label,
req.method_name(),
// the label "is_error", so `success` should be regarded as false
// and vice-versa to be registrered correctly.
if rp.is_success() { "false" } else { "true" },
if is_rate_limited { "true" } else { "false" },
])
.inc();
}
}

/// Response future for metrics.
#[pin_project]
pub struct ResponseFuture<'a, F> {
#[pin]
fut: F,
metrics: RpcMetrics,
req: Request<'a>,
now: Instant,
transport_label: &'static str,
/// Metrics with transport label.
#[derive(Clone, Debug)]
pub struct Metrics {
pub(crate) inner: RpcMetrics,
pub(crate) transport_label: &'static str,
}

impl<'a, F> std::fmt::Debug for ResponseFuture<'a, F> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("ResponseFuture")
impl Metrics {
/// Create a new [`Metrics`].
pub fn new(metrics: RpcMetrics, transport_label: &'static str) -> Self {
Self { inner: metrics, transport_label }
}
}

impl<'a, F: Future<Output = MethodResponse>> Future for ResponseFuture<'a, F> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
pub(crate) fn ws_connect(&self) {
self.inner.ws_connect();
}

let res = this.fut.poll(cx);
if let Poll::Ready(rp) = &res {
let method_name = this.req.method_name();
let transport_label = &this.transport_label;
let now = this.now;
let metrics = &this.metrics;
pub(crate) fn ws_disconnect(&self, now: Instant) {
self.inner.ws_disconnect(now)
}

log::trace!(target: "rpc_metrics", "[{transport_label}] on_response started_at={:?}", now);
log::trace!(target: "rpc_metrics::extra", "[{transport_label}] result={:?}", rp);
pub(crate) fn on_call(&self, req: &Request) {
self.inner.on_call(req, self.transport_label)
}

let micros = now.elapsed().as_micros();
log::debug!(
target: "rpc_metrics",
"[{transport_label}] {method_name} call took {} μs",
micros,
);
metrics
.calls_time
.with_label_values(&[transport_label, method_name])
.observe(micros as _);
metrics
.calls_finished
.with_label_values(&[
transport_label,
method_name,
// the label "is_error", so `success` should be regarded as false
// and vice-versa to be registrered correctly.
if rp.is_success() { "false" } else { "true" },
])
.inc();
}
res
pub(crate) fn on_response(
&self,
req: &Request,
rp: &MethodResponse,
is_rate_limited: bool,
now: Instant,
) {
self.inner.on_response(req, rp, is_rate_limited, self.transport_label, now)
}
}
Loading
Loading