Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(logging): add a tracing span per JSON-RPC call #722

Merged
merged 19 commits into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client/http-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ serde_json = "1.0"
thiserror = "1.0"
tokio = { version = "1.16", features = ["time"] }
tracing = "0.1"
tracing-futures = "0.2.5"

[dev-dependencies]
jsonrpsee-test-utils = { path = "../../test-utils" }
Expand Down
39 changes: 33 additions & 6 deletions client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ use crate::transport::HttpTransportClient;
use crate::types::{ErrorResponse, Id, NotificationSer, ParamsSer, RequestSer, Response};
use async_trait::async_trait;
use jsonrpsee_core::client::{CertificateStore, ClientT, IdKind, RequestIdManager, Subscription, SubscriptionClientT};
use jsonrpsee_core::tracing::RpcTracing;
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::error::CallError;
use rustc_hash::FxHashMap;
use serde::de::DeserializeOwned;
use tracing_futures::Instrument;

/// Http Client Builder.
#[derive(Debug)]
Expand All @@ -44,6 +46,7 @@ pub struct HttpClientBuilder {
max_concurrent_requests: usize,
certificate_store: CertificateStore,
id_kind: IdKind,
max_log_length: u32,
}

impl HttpClientBuilder {
Expand Down Expand Up @@ -77,10 +80,19 @@ impl HttpClientBuilder {
self
}

/// Max length for logging for requests and responses in number characters.
///
/// Logs bigger than this limit will be truncated.
pub fn set_max_logging_length(mut self, max: u32) -> Self {
self.max_log_length = max;
self
}

/// Build the HTTP client with target to connect to.
pub fn build(self, target: impl AsRef<str>) -> Result<HttpClient, Error> {
let transport = HttpTransportClient::new(target, self.max_request_body_size, self.certificate_store)
.map_err(|e| Error::Transport(e.into()))?;
let transport =
HttpTransportClient::new(target, self.max_request_body_size, self.certificate_store, self.max_log_length)
.map_err(|e| Error::Transport(e.into()))?;
Ok(HttpClient {
transport,
id_manager: Arc::new(RequestIdManager::new(self.max_concurrent_requests, self.id_kind)),
Expand All @@ -97,6 +109,7 @@ impl Default for HttpClientBuilder {
max_concurrent_requests: 256,
certificate_store: CertificateStore::Native,
id_kind: IdKind::Number,
max_log_length: 4096,
}
}
}
Expand All @@ -115,8 +128,13 @@ pub struct HttpClient {
#[async_trait]
impl ClientT for HttpClient {
async fn notification<'a>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<(), Error> {
let notif = NotificationSer::new(method, params);
let fut = self.transport.send(serde_json::to_string(&notif).map_err(Error::ParseError)?);
let trace = RpcTracing::notification(method);
let _enter = trace.span().enter();

let notif = serde_json::to_string(&NotificationSer::new(method, params)).map_err(Error::ParseError)?;

let fut = self.transport.send(notif).in_current_span();

match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(ok)) => Ok(ok),
Err(_) => Err(Error::RequestTimeout),
Expand All @@ -132,8 +150,12 @@ impl ClientT for HttpClient {
let guard = self.id_manager.next_request_id()?;
let id = guard.inner();
let request = RequestSer::new(&id, method, params);
let trace = RpcTracing::method_call(method);
let _enter = trace.span().enter();

let raw = serde_json::to_string(&request).map_err(Error::ParseError)?;

let fut = self.transport.send_and_read_body(serde_json::to_string(&request).map_err(Error::ParseError)?);
let fut = self.transport.send_and_read_body(raw).in_current_span();
let body = match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => {
Expand Down Expand Up @@ -165,6 +187,8 @@ impl ClientT for HttpClient {
{
let guard = self.id_manager.next_request_ids(batch.len())?;
let ids: Vec<Id> = guard.inner();
let trace = RpcTracing::batch();
let _enter = trace.span().enter();

let mut batch_request = Vec::with_capacity(batch.len());
// NOTE(niklasad1): `ID` is not necessarily monotonically increasing.
Expand All @@ -177,7 +201,10 @@ impl ClientT for HttpClient {
request_set.insert(&ids[pos], pos);
}

let fut = self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?);
let fut = self
.transport
.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?)
.in_current_span();

let body = match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Expand Down
32 changes: 22 additions & 10 deletions client/http-client/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use hyper::Uri;
use jsonrpsee_core::client::CertificateStore;
use jsonrpsee_core::error::GenericTransportError;
use jsonrpsee_core::http_helpers;
use jsonrpsee_core::tracing::{rx_log_from_bytes, tx_log_from_str};
use thiserror::Error;

const CONTENT_TYPE_JSON: &str = "application/json";
Expand Down Expand Up @@ -43,6 +44,10 @@ pub struct HttpTransportClient {
client: HyperClient,
/// Configurable max request body size
max_request_body_size: u32,
/// Max length for logging for requests and responses
///
/// Logs bigger than this limit will be truncated.
max_log_length: u32,
}

impl HttpTransportClient {
Expand All @@ -51,6 +56,7 @@ impl HttpTransportClient {
target: impl AsRef<str>,
max_request_body_size: u32,
cert_store: CertificateStore,
max_log_length: u32,
) -> Result<Self, Error> {
let target: Uri = target.as_ref().parse().map_err(|e| Error::Url(format!("Invalid URL: {}", e)))?;
if target.port_u16().is_none() {
Expand Down Expand Up @@ -84,11 +90,11 @@ impl HttpTransportClient {
return Err(Error::Url(err.into()));
}
};
Ok(Self { target, client, max_request_body_size })
Ok(Self { target, client, max_request_body_size, max_log_length })
}

async fn inner_send(&self, body: String) -> Result<hyper::Response<hyper::Body>, Error> {
tracing::debug!("send: {}", body);
tx_log_from_str(&body, self.max_log_length);

if body.len() > self.max_request_body_size as usize {
return Err(Error::RequestTooLarge);
Expand All @@ -113,12 +119,16 @@ impl HttpTransportClient {
let response = self.inner_send(body).await?;
let (parts, body) = response.into_parts();
let (body, _) = http_helpers::read_body(&parts.headers, body, self.max_request_body_size).await?;

rx_log_from_bytes(&body, self.max_log_length);

Ok(body)
}

/// Send serialized message without reading the HTTP message body.
pub(crate) async fn send(&self, body: String) -> Result<(), Error> {
let _ = self.inner_send(body).await?;

Ok(())
}
}
Expand Down Expand Up @@ -188,36 +198,37 @@ mod tests {

#[test]
fn invalid_http_url_rejected() {
let err = HttpTransportClient::new("ws://localhost:9933", 80, CertificateStore::Native).unwrap_err();
let err = HttpTransportClient::new("ws://localhost:9933", 80, CertificateStore::Native, 80).unwrap_err();
assert!(matches!(err, Error::Url(_)));
}

#[cfg(feature = "tls")]
#[test]
fn https_works() {
let client = HttpTransportClient::new("https://localhost:9933", 80, CertificateStore::Native).unwrap();
let client = HttpTransportClient::new("https://localhost:9933", 80, CertificateStore::Native, 80).unwrap();
assert_target(&client, "localhost", "https", "/", 9933, 80);
}

#[cfg(not(feature = "tls"))]
#[test]
fn https_fails_without_tls_feature() {
let err = HttpTransportClient::new("https://localhost:9933", 80, CertificateStore::Native).unwrap_err();
let err = HttpTransportClient::new("https://localhost:9933", 80, CertificateStore::Native, 80).unwrap_err();
assert!(matches!(err, Error::Url(_)));
}

#[test]
fn faulty_port() {
let err = HttpTransportClient::new("http://localhost:-43", 80, CertificateStore::Native).unwrap_err();
let err = HttpTransportClient::new("http://localhost:-43", 80, CertificateStore::Native, 80).unwrap_err();
assert!(matches!(err, Error::Url(_)));
let err = HttpTransportClient::new("http://localhost:-99999", 80, CertificateStore::Native).unwrap_err();
let err = HttpTransportClient::new("http://localhost:-99999", 80, CertificateStore::Native, 80).unwrap_err();
assert!(matches!(err, Error::Url(_)));
}

#[test]
fn url_with_path_works() {
let client =
HttpTransportClient::new("http://localhost:9944/my-special-path", 1337, CertificateStore::Native).unwrap();
HttpTransportClient::new("http://localhost:9944/my-special-path", 1337, CertificateStore::Native, 80)
.unwrap();
assert_target(&client, "localhost", "http", "/my-special-path", 9944, 1337);
}

Expand All @@ -227,6 +238,7 @@ mod tests {
"http://127.0.0.1:9999/my?name1=value1&name2=value2",
u32::MAX,
CertificateStore::WebPki,
80,
)
.unwrap();
assert_target(&client, "127.0.0.1", "http", "/my?name1=value1&name2=value2", 9999, u32::MAX);
Expand All @@ -235,14 +247,14 @@ mod tests {
#[test]
fn url_with_fragment_is_ignored() {
let client =
HttpTransportClient::new("http://127.0.0.1:9944/my.htm#ignore", 999, CertificateStore::Native).unwrap();
HttpTransportClient::new("http://127.0.0.1:9944/my.htm#ignore", 999, CertificateStore::Native, 80).unwrap();
assert_target(&client, "127.0.0.1", "http", "/my.htm", 9944, 999);
}

#[tokio::test]
async fn request_limit_works() {
let eighty_bytes_limit = 80;
let client = HttpTransportClient::new("http://localhost:9933", 80, CertificateStore::WebPki).unwrap();
let client = HttpTransportClient::new("http://localhost:9933", 80, CertificateStore::WebPki, 99).unwrap();
assert_eq!(client.max_request_body_size, eighty_bytes_limit);

let body = "a".repeat(81);
Expand Down
2 changes: 1 addition & 1 deletion client/transport/src/ws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl TransportSenderT for Sender {
/// Sends out a request. Returns a `Future` that finishes when the request has been
/// successfully sent.
async fn send(&mut self, body: String) -> Result<(), Self::Error> {
tracing::debug!("send: {}", body);
tracing::trace!("send: {}", body);
self.inner.send_text(body).await?;
self.inner.flush().await?;
Ok(())
Expand Down
8 changes: 4 additions & 4 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ jsonrpsee-types = { path = "../types", version = "0.14.0" }
thiserror = "1"
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = { version = "1", features = ["raw_value"] }
tracing = "0.1"

# optional deps
arrayvec = { version = "0.7.1", optional = true }
async-channel = { version = "1.6", optional = true }
async-lock = { version = "2.4", optional = true }
futures-util = { version = "0.3.14", default-features = false, optional = true }
hyper = { version = "0.14.10", default-features = false, features = ["stream"], optional = true }
tracing = { version = "0.1", optional = true }
tracing-futures = { version = "0.2", optional = true }
rustc-hash = { version = "1", optional = true }
rand = { version = "0.8", optional = true }
soketto = { version = "0.7.1", optional = true }
Expand All @@ -42,7 +43,6 @@ server = [
"futures-util/alloc",
"globset",
"rustc-hash/std",
"tracing",
"parking_lot",
"rand",
"tokio/rt",
Expand All @@ -58,16 +58,16 @@ async-client = [
"tokio/macros",
"tokio/rt",
"tokio/sync",
"tracing",
"tracing-futures",
"futures-timer",
]
async-wasm-client = [
"async-lock",
"client",
"wasm-bindgen-futures",
"rustc-hash/std",
"tracing-futures",
"futures-timer/wasm-bindgen",
"tracing",
]

[dev-dependencies]
Expand Down
Loading