Skip to content

Commit

Permalink
feat(logging): add tracing span per JSON-RPC call (#722)
Browse files Browse the repository at this point in the history
* less verbose logging + tracing based on method name

* add tracing per rpc call

* fix nits

* remove unsed feature

* fix build

* http make logging more human friendly

* unify logging format

* deps: make tracing hard dependency

* fix tests

* fix nit

* fix build

* fix nits

* Update core/src/client/async_client/mod.rs

* Update core/Cargo.toml

* Update core/src/lib.rs
  • Loading branch information
niklasad1 authored Jun 21, 2022
1 parent 01577da commit 00c2ce6
Show file tree
Hide file tree
Showing 17 changed files with 346 additions and 107 deletions.
1 change: 1 addition & 0 deletions client/http-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ serde_json = "1.0"
thiserror = "1.0"
tokio = { version = "1.16", features = ["time"] }
tracing = "0.1"
tracing-futures = "0.2.5"

[dev-dependencies]
jsonrpsee-test-utils = { path = "../../test-utils" }
Expand Down
39 changes: 33 additions & 6 deletions client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ use crate::transport::HttpTransportClient;
use crate::types::{ErrorResponse, Id, NotificationSer, ParamsSer, RequestSer, Response};
use async_trait::async_trait;
use jsonrpsee_core::client::{CertificateStore, ClientT, IdKind, RequestIdManager, Subscription, SubscriptionClientT};
use jsonrpsee_core::tracing::RpcTracing;
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::error::CallError;
use rustc_hash::FxHashMap;
use serde::de::DeserializeOwned;
use tracing_futures::Instrument;

/// Http Client Builder.
#[derive(Debug)]
Expand All @@ -44,6 +46,7 @@ pub struct HttpClientBuilder {
max_concurrent_requests: usize,
certificate_store: CertificateStore,
id_kind: IdKind,
max_log_length: u32,
}

impl HttpClientBuilder {
Expand Down Expand Up @@ -77,10 +80,19 @@ impl HttpClientBuilder {
self
}

/// Max length for logging for requests and responses in number characters.
///
/// Logs bigger than this limit will be truncated.
pub fn set_max_logging_length(mut self, max: u32) -> Self {
self.max_log_length = max;
self
}

/// Build the HTTP client with target to connect to.
pub fn build(self, target: impl AsRef<str>) -> Result<HttpClient, Error> {
let transport = HttpTransportClient::new(target, self.max_request_body_size, self.certificate_store)
.map_err(|e| Error::Transport(e.into()))?;
let transport =
HttpTransportClient::new(target, self.max_request_body_size, self.certificate_store, self.max_log_length)
.map_err(|e| Error::Transport(e.into()))?;
Ok(HttpClient {
transport,
id_manager: Arc::new(RequestIdManager::new(self.max_concurrent_requests, self.id_kind)),
Expand All @@ -97,6 +109,7 @@ impl Default for HttpClientBuilder {
max_concurrent_requests: 256,
certificate_store: CertificateStore::Native,
id_kind: IdKind::Number,
max_log_length: 4096,
}
}
}
Expand All @@ -115,8 +128,13 @@ pub struct HttpClient {
#[async_trait]
impl ClientT for HttpClient {
async fn notification<'a>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<(), Error> {
let notif = NotificationSer::new(method, params);
let fut = self.transport.send(serde_json::to_string(&notif).map_err(Error::ParseError)?);
let trace = RpcTracing::notification(method);
let _enter = trace.span().enter();

let notif = serde_json::to_string(&NotificationSer::new(method, params)).map_err(Error::ParseError)?;

let fut = self.transport.send(notif).in_current_span();

match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(ok)) => Ok(ok),
Err(_) => Err(Error::RequestTimeout),
Expand All @@ -132,8 +150,12 @@ impl ClientT for HttpClient {
let guard = self.id_manager.next_request_id()?;
let id = guard.inner();
let request = RequestSer::new(&id, method, params);
let trace = RpcTracing::method_call(method);
let _enter = trace.span().enter();

let raw = serde_json::to_string(&request).map_err(Error::ParseError)?;

let fut = self.transport.send_and_read_body(serde_json::to_string(&request).map_err(Error::ParseError)?);
let fut = self.transport.send_and_read_body(raw).in_current_span();
let body = match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => {
Expand Down Expand Up @@ -165,6 +187,8 @@ impl ClientT for HttpClient {
{
let guard = self.id_manager.next_request_ids(batch.len())?;
let ids: Vec<Id> = guard.inner();
let trace = RpcTracing::batch();
let _enter = trace.span().enter();

let mut batch_request = Vec::with_capacity(batch.len());
// NOTE(niklasad1): `ID` is not necessarily monotonically increasing.
Expand All @@ -177,7 +201,10 @@ impl ClientT for HttpClient {
request_set.insert(&ids[pos], pos);
}

let fut = self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?);
let fut = self
.transport
.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?)
.in_current_span();

let body = match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Expand Down
32 changes: 22 additions & 10 deletions client/http-client/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use hyper::Uri;
use jsonrpsee_core::client::CertificateStore;
use jsonrpsee_core::error::GenericTransportError;
use jsonrpsee_core::http_helpers;
use jsonrpsee_core::tracing::{rx_log_from_bytes, tx_log_from_str};
use thiserror::Error;

const CONTENT_TYPE_JSON: &str = "application/json";
Expand Down Expand Up @@ -43,6 +44,10 @@ pub struct HttpTransportClient {
client: HyperClient,
/// Configurable max request body size
max_request_body_size: u32,
/// Max length for logging for requests and responses
///
/// Logs bigger than this limit will be truncated.
max_log_length: u32,
}

impl HttpTransportClient {
Expand All @@ -51,6 +56,7 @@ impl HttpTransportClient {
target: impl AsRef<str>,
max_request_body_size: u32,
cert_store: CertificateStore,
max_log_length: u32,
) -> Result<Self, Error> {
let target: Uri = target.as_ref().parse().map_err(|e| Error::Url(format!("Invalid URL: {}", e)))?;
if target.port_u16().is_none() {
Expand Down Expand Up @@ -84,11 +90,11 @@ impl HttpTransportClient {
return Err(Error::Url(err.into()));
}
};
Ok(Self { target, client, max_request_body_size })
Ok(Self { target, client, max_request_body_size, max_log_length })
}

async fn inner_send(&self, body: String) -> Result<hyper::Response<hyper::Body>, Error> {
tracing::debug!("send: {}", body);
tx_log_from_str(&body, self.max_log_length);

if body.len() > self.max_request_body_size as usize {
return Err(Error::RequestTooLarge);
Expand All @@ -113,12 +119,16 @@ impl HttpTransportClient {
let response = self.inner_send(body).await?;
let (parts, body) = response.into_parts();
let (body, _) = http_helpers::read_body(&parts.headers, body, self.max_request_body_size).await?;

rx_log_from_bytes(&body, self.max_log_length);

Ok(body)
}

/// Send serialized message without reading the HTTP message body.
pub(crate) async fn send(&self, body: String) -> Result<(), Error> {
let _ = self.inner_send(body).await?;

Ok(())
}
}
Expand Down Expand Up @@ -188,36 +198,37 @@ mod tests {

#[test]
fn invalid_http_url_rejected() {
let err = HttpTransportClient::new("ws://localhost:9933", 80, CertificateStore::Native).unwrap_err();
let err = HttpTransportClient::new("ws://localhost:9933", 80, CertificateStore::Native, 80).unwrap_err();
assert!(matches!(err, Error::Url(_)));
}

#[cfg(feature = "tls")]
#[test]
fn https_works() {
let client = HttpTransportClient::new("https://localhost:9933", 80, CertificateStore::Native).unwrap();
let client = HttpTransportClient::new("https://localhost:9933", 80, CertificateStore::Native, 80).unwrap();
assert_target(&client, "localhost", "https", "/", 9933, 80);
}

#[cfg(not(feature = "tls"))]
#[test]
fn https_fails_without_tls_feature() {
let err = HttpTransportClient::new("https://localhost:9933", 80, CertificateStore::Native).unwrap_err();
let err = HttpTransportClient::new("https://localhost:9933", 80, CertificateStore::Native, 80).unwrap_err();
assert!(matches!(err, Error::Url(_)));
}

#[test]
fn faulty_port() {
let err = HttpTransportClient::new("http://localhost:-43", 80, CertificateStore::Native).unwrap_err();
let err = HttpTransportClient::new("http://localhost:-43", 80, CertificateStore::Native, 80).unwrap_err();
assert!(matches!(err, Error::Url(_)));
let err = HttpTransportClient::new("http://localhost:-99999", 80, CertificateStore::Native).unwrap_err();
let err = HttpTransportClient::new("http://localhost:-99999", 80, CertificateStore::Native, 80).unwrap_err();
assert!(matches!(err, Error::Url(_)));
}

#[test]
fn url_with_path_works() {
let client =
HttpTransportClient::new("http://localhost:9944/my-special-path", 1337, CertificateStore::Native).unwrap();
HttpTransportClient::new("http://localhost:9944/my-special-path", 1337, CertificateStore::Native, 80)
.unwrap();
assert_target(&client, "localhost", "http", "/my-special-path", 9944, 1337);
}

Expand All @@ -227,6 +238,7 @@ mod tests {
"http://127.0.0.1:9999/my?name1=value1&name2=value2",
u32::MAX,
CertificateStore::WebPki,
80,
)
.unwrap();
assert_target(&client, "127.0.0.1", "http", "/my?name1=value1&name2=value2", 9999, u32::MAX);
Expand All @@ -235,14 +247,14 @@ mod tests {
#[test]
fn url_with_fragment_is_ignored() {
let client =
HttpTransportClient::new("http://127.0.0.1:9944/my.htm#ignore", 999, CertificateStore::Native).unwrap();
HttpTransportClient::new("http://127.0.0.1:9944/my.htm#ignore", 999, CertificateStore::Native, 80).unwrap();
assert_target(&client, "127.0.0.1", "http", "/my.htm", 9944, 999);
}

#[tokio::test]
async fn request_limit_works() {
let eighty_bytes_limit = 80;
let client = HttpTransportClient::new("http://localhost:9933", 80, CertificateStore::WebPki).unwrap();
let client = HttpTransportClient::new("http://localhost:9933", 80, CertificateStore::WebPki, 99).unwrap();
assert_eq!(client.max_request_body_size, eighty_bytes_limit);

let body = "a".repeat(81);
Expand Down
2 changes: 1 addition & 1 deletion client/transport/src/ws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl TransportSenderT for Sender {
/// Sends out a request. Returns a `Future` that finishes when the request has been
/// successfully sent.
async fn send(&mut self, body: String) -> Result<(), Self::Error> {
tracing::debug!("send: {}", body);
tracing::trace!("send: {}", body);
self.inner.send_text(body).await?;
self.inner.flush().await?;
Ok(())
Expand Down
8 changes: 4 additions & 4 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ jsonrpsee-types = { path = "../types", version = "0.14.0" }
thiserror = "1"
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = { version = "1", features = ["raw_value"] }
tracing = "0.1"

# optional deps
arrayvec = { version = "0.7.1", optional = true }
async-channel = { version = "1.6", optional = true }
async-lock = { version = "2.4", optional = true }
futures-util = { version = "0.3.14", default-features = false, optional = true }
hyper = { version = "0.14.10", default-features = false, features = ["stream"], optional = true }
tracing = { version = "0.1", optional = true }
tracing-futures = { version = "0.2", optional = true }
rustc-hash = { version = "1", optional = true }
rand = { version = "0.8", optional = true }
soketto = { version = "0.7.1", optional = true }
Expand All @@ -42,7 +43,6 @@ server = [
"futures-util/alloc",
"globset",
"rustc-hash/std",
"tracing",
"parking_lot",
"rand",
"tokio/rt",
Expand All @@ -58,16 +58,16 @@ async-client = [
"tokio/macros",
"tokio/rt",
"tokio/sync",
"tracing",
"tracing-futures",
"futures-timer",
]
async-wasm-client = [
"async-lock",
"client",
"wasm-bindgen-futures",
"rustc-hash/std",
"tracing-futures",
"futures-timer/wasm-bindgen",
"tracing",
]

[dev-dependencies]
Expand Down
Loading

0 comments on commit 00c2ce6

Please sign in to comment.