From 00c2ce65e210c6a2e231d38b5f866a0a87db4cd5 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 21 Jun 2022 08:10:53 +0200 Subject: [PATCH] feat(logging): add `tracing span` per JSON-RPC call (#722) * less verbose logging + tracing based on method name * add tracing per rpc call * fix nits * remove unsed feature * fix build * http make logging more human friendly * unify logging format * deps: make tracing hard dependency * fix tests * fix nit * fix build * fix nits * Update core/src/client/async_client/mod.rs * Update core/Cargo.toml * Update core/src/lib.rs --- client/http-client/Cargo.toml | 1 + client/http-client/src/client.rs | 39 +++++++-- client/http-client/src/transport.rs | 32 ++++--- client/transport/src/ws/mod.rs | 2 +- core/Cargo.toml | 8 +- core/src/client/async_client/mod.rs | 128 +++++++++++++++++----------- core/src/lib.rs | 2 + core/src/server/helpers.rs | 15 +++- core/src/tracing.rs | 101 ++++++++++++++++++++++ examples/examples/http.rs | 8 +- examples/examples/ws.rs | 9 +- http-server/Cargo.toml | 1 + http-server/src/server.rs | 59 ++++++++++--- types/src/request.rs | 2 +- types/src/response.rs | 8 ++ ws-server/Cargo.toml | 1 + ws-server/src/server.rs | 37 +++++--- 17 files changed, 346 insertions(+), 107 deletions(-) create mode 100644 core/src/tracing.rs diff --git a/client/http-client/Cargo.toml b/client/http-client/Cargo.toml index fcd75aeda7..695da2cfa5 100644 --- a/client/http-client/Cargo.toml +++ b/client/http-client/Cargo.toml @@ -21,6 +21,7 @@ serde_json = "1.0" thiserror = "1.0" tokio = { version = "1.16", features = ["time"] } tracing = "0.1" +tracing-futures = "0.2.5" [dev-dependencies] jsonrpsee-test-utils = { path = "../../test-utils" } diff --git a/client/http-client/src/client.rs b/client/http-client/src/client.rs index 2d93f915da..deb843960f 100644 --- a/client/http-client/src/client.rs +++ b/client/http-client/src/client.rs @@ -31,10 +31,12 @@ use crate::transport::HttpTransportClient; use crate::types::{ErrorResponse, Id, NotificationSer, ParamsSer, RequestSer, Response}; use async_trait::async_trait; use jsonrpsee_core::client::{CertificateStore, ClientT, IdKind, RequestIdManager, Subscription, SubscriptionClientT}; +use jsonrpsee_core::tracing::RpcTracing; use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES}; use jsonrpsee_types::error::CallError; use rustc_hash::FxHashMap; use serde::de::DeserializeOwned; +use tracing_futures::Instrument; /// Http Client Builder. #[derive(Debug)] @@ -44,6 +46,7 @@ pub struct HttpClientBuilder { max_concurrent_requests: usize, certificate_store: CertificateStore, id_kind: IdKind, + max_log_length: u32, } impl HttpClientBuilder { @@ -77,10 +80,19 @@ impl HttpClientBuilder { self } + /// Max length for logging for requests and responses in number characters. + /// + /// Logs bigger than this limit will be truncated. + pub fn set_max_logging_length(mut self, max: u32) -> Self { + self.max_log_length = max; + self + } + /// Build the HTTP client with target to connect to. pub fn build(self, target: impl AsRef) -> Result { - let transport = HttpTransportClient::new(target, self.max_request_body_size, self.certificate_store) - .map_err(|e| Error::Transport(e.into()))?; + let transport = + HttpTransportClient::new(target, self.max_request_body_size, self.certificate_store, self.max_log_length) + .map_err(|e| Error::Transport(e.into()))?; Ok(HttpClient { transport, id_manager: Arc::new(RequestIdManager::new(self.max_concurrent_requests, self.id_kind)), @@ -97,6 +109,7 @@ impl Default for HttpClientBuilder { max_concurrent_requests: 256, certificate_store: CertificateStore::Native, id_kind: IdKind::Number, + max_log_length: 4096, } } } @@ -115,8 +128,13 @@ pub struct HttpClient { #[async_trait] impl ClientT for HttpClient { async fn notification<'a>(&self, method: &'a str, params: Option>) -> Result<(), Error> { - let notif = NotificationSer::new(method, params); - let fut = self.transport.send(serde_json::to_string(¬if).map_err(Error::ParseError)?); + let trace = RpcTracing::notification(method); + let _enter = trace.span().enter(); + + let notif = serde_json::to_string(&NotificationSer::new(method, params)).map_err(Error::ParseError)?; + + let fut = self.transport.send(notif).in_current_span(); + match tokio::time::timeout(self.request_timeout, fut).await { Ok(Ok(ok)) => Ok(ok), Err(_) => Err(Error::RequestTimeout), @@ -132,8 +150,12 @@ impl ClientT for HttpClient { let guard = self.id_manager.next_request_id()?; let id = guard.inner(); let request = RequestSer::new(&id, method, params); + let trace = RpcTracing::method_call(method); + let _enter = trace.span().enter(); + + let raw = serde_json::to_string(&request).map_err(Error::ParseError)?; - let fut = self.transport.send_and_read_body(serde_json::to_string(&request).map_err(Error::ParseError)?); + let fut = self.transport.send_and_read_body(raw).in_current_span(); let body = match tokio::time::timeout(self.request_timeout, fut).await { Ok(Ok(body)) => body, Err(_e) => { @@ -165,6 +187,8 @@ impl ClientT for HttpClient { { let guard = self.id_manager.next_request_ids(batch.len())?; let ids: Vec = guard.inner(); + let trace = RpcTracing::batch(); + let _enter = trace.span().enter(); let mut batch_request = Vec::with_capacity(batch.len()); // NOTE(niklasad1): `ID` is not necessarily monotonically increasing. @@ -177,7 +201,10 @@ impl ClientT for HttpClient { request_set.insert(&ids[pos], pos); } - let fut = self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?); + let fut = self + .transport + .send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?) + .in_current_span(); let body = match tokio::time::timeout(self.request_timeout, fut).await { Ok(Ok(body)) => body, diff --git a/client/http-client/src/transport.rs b/client/http-client/src/transport.rs index 81868e25a6..f2698d0b9a 100644 --- a/client/http-client/src/transport.rs +++ b/client/http-client/src/transport.rs @@ -11,6 +11,7 @@ use hyper::Uri; use jsonrpsee_core::client::CertificateStore; use jsonrpsee_core::error::GenericTransportError; use jsonrpsee_core::http_helpers; +use jsonrpsee_core::tracing::{rx_log_from_bytes, tx_log_from_str}; use thiserror::Error; const CONTENT_TYPE_JSON: &str = "application/json"; @@ -43,6 +44,10 @@ pub struct HttpTransportClient { client: HyperClient, /// Configurable max request body size max_request_body_size: u32, + /// Max length for logging for requests and responses + /// + /// Logs bigger than this limit will be truncated. + max_log_length: u32, } impl HttpTransportClient { @@ -51,6 +56,7 @@ impl HttpTransportClient { target: impl AsRef, max_request_body_size: u32, cert_store: CertificateStore, + max_log_length: u32, ) -> Result { let target: Uri = target.as_ref().parse().map_err(|e| Error::Url(format!("Invalid URL: {}", e)))?; if target.port_u16().is_none() { @@ -84,11 +90,11 @@ impl HttpTransportClient { return Err(Error::Url(err.into())); } }; - Ok(Self { target, client, max_request_body_size }) + Ok(Self { target, client, max_request_body_size, max_log_length }) } async fn inner_send(&self, body: String) -> Result, Error> { - tracing::debug!("send: {}", body); + tx_log_from_str(&body, self.max_log_length); if body.len() > self.max_request_body_size as usize { return Err(Error::RequestTooLarge); @@ -113,12 +119,16 @@ impl HttpTransportClient { let response = self.inner_send(body).await?; let (parts, body) = response.into_parts(); let (body, _) = http_helpers::read_body(&parts.headers, body, self.max_request_body_size).await?; + + rx_log_from_bytes(&body, self.max_log_length); + Ok(body) } /// Send serialized message without reading the HTTP message body. pub(crate) async fn send(&self, body: String) -> Result<(), Error> { let _ = self.inner_send(body).await?; + Ok(()) } } @@ -188,36 +198,37 @@ mod tests { #[test] fn invalid_http_url_rejected() { - let err = HttpTransportClient::new("ws://localhost:9933", 80, CertificateStore::Native).unwrap_err(); + let err = HttpTransportClient::new("ws://localhost:9933", 80, CertificateStore::Native, 80).unwrap_err(); assert!(matches!(err, Error::Url(_))); } #[cfg(feature = "tls")] #[test] fn https_works() { - let client = HttpTransportClient::new("https://localhost:9933", 80, CertificateStore::Native).unwrap(); + let client = HttpTransportClient::new("https://localhost:9933", 80, CertificateStore::Native, 80).unwrap(); assert_target(&client, "localhost", "https", "/", 9933, 80); } #[cfg(not(feature = "tls"))] #[test] fn https_fails_without_tls_feature() { - let err = HttpTransportClient::new("https://localhost:9933", 80, CertificateStore::Native).unwrap_err(); + let err = HttpTransportClient::new("https://localhost:9933", 80, CertificateStore::Native, 80).unwrap_err(); assert!(matches!(err, Error::Url(_))); } #[test] fn faulty_port() { - let err = HttpTransportClient::new("http://localhost:-43", 80, CertificateStore::Native).unwrap_err(); + let err = HttpTransportClient::new("http://localhost:-43", 80, CertificateStore::Native, 80).unwrap_err(); assert!(matches!(err, Error::Url(_))); - let err = HttpTransportClient::new("http://localhost:-99999", 80, CertificateStore::Native).unwrap_err(); + let err = HttpTransportClient::new("http://localhost:-99999", 80, CertificateStore::Native, 80).unwrap_err(); assert!(matches!(err, Error::Url(_))); } #[test] fn url_with_path_works() { let client = - HttpTransportClient::new("http://localhost:9944/my-special-path", 1337, CertificateStore::Native).unwrap(); + HttpTransportClient::new("http://localhost:9944/my-special-path", 1337, CertificateStore::Native, 80) + .unwrap(); assert_target(&client, "localhost", "http", "/my-special-path", 9944, 1337); } @@ -227,6 +238,7 @@ mod tests { "http://127.0.0.1:9999/my?name1=value1&name2=value2", u32::MAX, CertificateStore::WebPki, + 80, ) .unwrap(); assert_target(&client, "127.0.0.1", "http", "/my?name1=value1&name2=value2", 9999, u32::MAX); @@ -235,14 +247,14 @@ mod tests { #[test] fn url_with_fragment_is_ignored() { let client = - HttpTransportClient::new("http://127.0.0.1:9944/my.htm#ignore", 999, CertificateStore::Native).unwrap(); + HttpTransportClient::new("http://127.0.0.1:9944/my.htm#ignore", 999, CertificateStore::Native, 80).unwrap(); assert_target(&client, "127.0.0.1", "http", "/my.htm", 9944, 999); } #[tokio::test] async fn request_limit_works() { let eighty_bytes_limit = 80; - let client = HttpTransportClient::new("http://localhost:9933", 80, CertificateStore::WebPki).unwrap(); + let client = HttpTransportClient::new("http://localhost:9933", 80, CertificateStore::WebPki, 99).unwrap(); assert_eq!(client.max_request_body_size, eighty_bytes_limit); let body = "a".repeat(81); diff --git a/client/transport/src/ws/mod.rs b/client/transport/src/ws/mod.rs index 195f5aa200..51c680605d 100644 --- a/client/transport/src/ws/mod.rs +++ b/client/transport/src/ws/mod.rs @@ -191,7 +191,7 @@ impl TransportSenderT for Sender { /// Sends out a request. Returns a `Future` that finishes when the request has been /// successfully sent. async fn send(&mut self, body: String) -> Result<(), Self::Error> { - tracing::debug!("send: {}", body); + tracing::trace!("send: {}", body); self.inner.send_text(body).await?; self.inner.flush().await?; Ok(()) diff --git a/core/Cargo.toml b/core/Cargo.toml index a06a067492..cc345c96a8 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -15,6 +15,7 @@ jsonrpsee-types = { path = "../types", version = "0.14.0" } thiserror = "1" serde = { version = "1.0", default-features = false, features = ["derive"] } serde_json = { version = "1", features = ["raw_value"] } +tracing = "0.1" # optional deps arrayvec = { version = "0.7.1", optional = true } @@ -22,7 +23,7 @@ async-channel = { version = "1.6", optional = true } async-lock = { version = "2.4", optional = true } futures-util = { version = "0.3.14", default-features = false, optional = true } hyper = { version = "0.14.10", default-features = false, features = ["stream"], optional = true } -tracing = { version = "0.1", optional = true } +tracing-futures = { version = "0.2", optional = true } rustc-hash = { version = "1", optional = true } rand = { version = "0.8", optional = true } soketto = { version = "0.7.1", optional = true } @@ -42,7 +43,6 @@ server = [ "futures-util/alloc", "globset", "rustc-hash/std", - "tracing", "parking_lot", "rand", "tokio/rt", @@ -58,7 +58,7 @@ async-client = [ "tokio/macros", "tokio/rt", "tokio/sync", - "tracing", + "tracing-futures", "futures-timer", ] async-wasm-client = [ @@ -66,8 +66,8 @@ async-wasm-client = [ "client", "wasm-bindgen-futures", "rustc-hash/std", + "tracing-futures", "futures-timer/wasm-bindgen", - "tracing", ] [dev-dependencies] diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 5376364b7e..8edb7a4476 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -8,6 +8,8 @@ use crate::client::{ RegisterNotificationMessage, RequestMessage, Subscription, SubscriptionClientT, SubscriptionKind, SubscriptionMessage, TransportReceiverT, TransportSenderT, }; +use crate::tracing::{rx_log_from_json, tx_log_from_str, RpcTracing}; + use core::time::Duration; use helpers::{ build_unsubscribe_message, call_with_timeout, process_batch_response, process_error_response, process_notification, @@ -29,6 +31,7 @@ use jsonrpsee_types::{ SubscriptionResponse, }; use serde::de::DeserializeOwned; +use tracing_futures::Instrument; use super::{FrontToBack, IdKind, RequestIdManager}; @@ -69,6 +72,7 @@ pub struct ClientBuilder { max_concurrent_requests: usize, max_notifs_per_subscription: usize, id_kind: IdKind, + max_log_length: u32, ping_interval: Option, } @@ -79,6 +83,7 @@ impl Default for ClientBuilder { max_concurrent_requests: 256, max_notifs_per_subscription: 1024, id_kind: IdKind::Number, + max_log_length: 4096, ping_interval: None, } } @@ -117,6 +122,13 @@ impl ClientBuilder { self } + /// Set maximum length for logging calls and responses. + /// + /// Logs bigger than this limit will be truncated. + pub fn set_max_logging_length(mut self, max: u32) -> Self { + self.max_log_length = max; + self + } /// Set the interval at which pings frames are submitted (disabled by default). /// /// Periodically submitting pings at a defined interval has mainly two benefits: @@ -159,6 +171,7 @@ impl ClientBuilder { request_timeout: self.request_timeout, error: Mutex::new(ErrorFromBack::Unread(err_rx)), id_manager: RequestIdManager::new(self.max_concurrent_requests, self.id_kind), + max_log_length: self.max_log_length, } } @@ -182,6 +195,7 @@ impl ClientBuilder { request_timeout: self.request_timeout, error: Mutex::new(ErrorFromBack::Unread(err_rx)), id_manager: RequestIdManager::new(self.max_concurrent_requests, self.id_kind), + max_log_length: self.max_log_length, } } } @@ -198,6 +212,10 @@ pub struct Client { request_timeout: Duration, /// Request ID manager. id_manager: RequestIdManager, + /// Max length for logging for requests and responses. + /// + /// Entries bigger than this limit will be truncated. + max_log_length: u32, } impl Client { @@ -228,11 +246,14 @@ impl ClientT for Client { // NOTE: we use this to guard against max number of concurrent requests. let _req_id = self.id_manager.next_request_id()?; let notif = NotificationSer::new(method, params); + let trace = RpcTracing::batch(); + let _enter = trace.span().enter(); + let raw = serde_json::to_string(¬if).map_err(Error::ParseError)?; - tracing::trace!("[frontend]: send notification: {:?}", raw); + tx_log_from_str(&raw, self.max_log_length); let mut sender = self.to_back.clone(); - let fut = sender.send(FrontToBack::Notification(raw)); + let fut = sender.send(FrontToBack::Notification(raw)).in_current_span(); match future::select(fut, Delay::new(self.request_timeout)).await { Either::Left((Ok(()), _)) => Ok(()), @@ -248,26 +269,31 @@ impl ClientT for Client { let (send_back_tx, send_back_rx) = oneshot::channel(); let guard = self.id_manager.next_request_id()?; let id = guard.inner(); + let trace = RpcTracing::method_call(method); + let _enter = trace.span().enter(); let raw = serde_json::to_string(&RequestSer::new(&id, method, params)).map_err(Error::ParseError)?; - tracing::trace!("[frontend]: send request: {:?}", raw); + tx_log_from_str(&raw, self.max_log_length); if self .to_back .clone() - .send(FrontToBack::Request(RequestMessage { raw, id, send_back: Some(send_back_tx) })) + .send(FrontToBack::Request(RequestMessage { raw, id: id.clone(), send_back: Some(send_back_tx) })) .await .is_err() { return Err(self.read_error_from_backend().await); } - let res = call_with_timeout(self.request_timeout, send_back_rx).await; + let res = call_with_timeout(self.request_timeout, send_back_rx).in_current_span().await; let json_value = match res { Ok(Ok(v)) => v, Ok(Err(err)) => return Err(err), Err(_) => return Err(self.read_error_from_backend().await), }; + + rx_log_from_json(&Response::new(&json_value, id), self.max_log_length); + serde_json::from_value(json_value).map_err(Error::ParseError) } @@ -278,6 +304,8 @@ impl ClientT for Client { let guard = self.id_manager.next_request_ids(batch.len())?; let batch_ids: Vec = guard.inner(); let mut batches = Vec::with_capacity(batch.len()); + let log = RpcTracing::batch(); + let _enter = log.span().enter(); for (idx, (method, params)) in batch.into_iter().enumerate() { batches.push(RequestSer::new(&batch_ids[idx], method, params)); @@ -286,7 +314,9 @@ impl ClientT for Client { let (send_back_tx, send_back_rx) = oneshot::channel(); let raw = serde_json::to_string(&batches).map_err(Error::ParseError)?; - tracing::trace!("[frontend]: send batch request: {:?}", raw); + + tx_log_from_str(&raw, self.max_log_length); + if self .to_back .clone() @@ -297,13 +327,15 @@ impl ClientT for Client { return Err(self.read_error_from_backend().await); } - let res = call_with_timeout(self.request_timeout, send_back_rx).await; + let res = call_with_timeout(self.request_timeout, send_back_rx).in_current_span().await; let json_values = match res { Ok(Ok(v)) => v, Ok(Err(err)) => return Err(err), Err(_) => return Err(self.read_error_from_backend().await), }; + rx_log_from_json(&json_values, self.max_log_length); + let values: Result<_, _> = json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect(); Ok(values?) @@ -325,18 +357,20 @@ impl SubscriptionClientT for Client { where N: DeserializeOwned, { - tracing::trace!("[frontend]: subscribe: {:?}, unsubscribe: {:?}", subscribe_method, unsubscribe_method); - if subscribe_method == unsubscribe_method { return Err(Error::SubscriptionNameConflict(unsubscribe_method.to_owned())); } let guard = self.id_manager.next_request_ids(2)?; - let mut ids: Vec = guard.inner(); + let trace = RpcTracing::method_call(subscribe_method); + let _enter = trace.span().enter(); - let raw = - serde_json::to_string(&RequestSer::new(&ids[0], subscribe_method, params)).map_err(Error::ParseError)?; + let id = ids[0].clone(); + + let raw = serde_json::to_string(&RequestSer::new(&id, subscribe_method, params)).map_err(Error::ParseError)?; + + tx_log_from_str(&raw, self.max_log_length); let (send_back_tx, send_back_rx) = oneshot::channel(); if self @@ -355,14 +389,17 @@ impl SubscriptionClientT for Client { return Err(self.read_error_from_backend().await); } - let res = call_with_timeout(self.request_timeout, send_back_rx).await; + let res = call_with_timeout(self.request_timeout, send_back_rx).in_current_span().await; - let (notifs_rx, id) = match res { + let (notifs_rx, sub_id) = match res { Ok(Ok(val)) => val, Ok(Err(err)) => return Err(err), Err(_) => return Err(self.read_error_from_backend().await), }; - Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(id))) + + rx_log_from_json(&Response::new(&sub_id, id), self.max_log_length); + + Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(sub_id))) } /// Subscribe to a specific method. @@ -370,8 +407,6 @@ impl SubscriptionClientT for Client { where N: DeserializeOwned, { - tracing::trace!("[frontend]: register_notification: {:?}", method); - let (send_back_tx, send_back_rx) = oneshot::channel(); if self .to_back @@ -407,17 +442,15 @@ async fn handle_backend_messages( sender: &mut S, max_notifs_per_subscription: usize, ) -> Result<(), Error> { - // Handle raw messages of form `ReceivedMessage::Bytes` (Vec) or ReceivedMessage::Data` (String). async fn handle_recv_message( raw: &[u8], manager: &mut RequestManager, sender: &mut S, - max_notifs_per_subscription: usize + max_notifs_per_subscription: usize, ) -> Result<(), Error> { // Single response to a request. if let Ok(single) = serde_json::from_slice::>(&raw) { - tracing::debug!("[backend]: recv method_call {:?}", single); match process_single_response(manager, single, max_notifs_per_subscription) { Ok(Some(unsub)) => { stop_subscription(sender, manager, unsub).await; @@ -428,42 +461,40 @@ async fn handle_backend_messages( } // Subscription response. else if let Ok(response) = serde_json::from_slice::>(&raw) { - tracing::debug!("[backend]: recv subscription {:?}", response); if let Err(Some(unsub)) = process_subscription_response(manager, response) { let _ = stop_subscription(sender, manager, unsub).await; } } // Subscription error response. else if let Ok(response) = serde_json::from_slice::>(&raw) { - tracing::debug!("[backend]: recv subscription closed {:?}", response); let _ = process_subscription_close_response(manager, response); } // Incoming Notification else if let Ok(notif) = serde_json::from_slice::>(&raw) { - tracing::debug!("[backend]: recv notification {:?}", notif); let _ = process_notification(manager, notif); } // Batch response. else if let Ok(batch) = serde_json::from_slice::>>(&raw) { - tracing::debug!("[backend]: recv batch {:?}", batch); if let Err(e) = process_batch_response(manager, batch) { return Err(e); } } // Error response else if let Ok(err) = serde_json::from_slice::(&raw) { - tracing::debug!("[backend]: recv error response {:?}", err); if let Err(e) = process_error_response(manager, err) { return Err(e); } } // Unparsable response else { - tracing::debug!( - "[backend]: recv unparseable message: {:?}", - serde_json::from_slice::(&raw) - ); - return Err(Error::Custom("Unparsable response".into())); + let json = serde_json::from_slice::(&raw); + + let json_str = match json { + Ok(json) => serde_json::to_string(&json).expect("valid JSON; qed"), + Err(e) => e.to_string(), + }; + + return Err(Error::Custom(format!("Unparseable message: {}", json_str))); } Ok(()) } @@ -508,7 +539,6 @@ async fn handle_frontend_messages( } Some(FrontToBack::Batch(batch)) => { - tracing::trace!("[backend]: client prepares to send batch request: {:?}", batch.raw); if let Err(send_back) = manager.insert_pending_batch(batch.ids.clone(), batch.send_back) { tracing::warn!("[backend]: batch request: {:?} already pending", batch.ids); let _ = send_back.send(Err(Error::InvalidRequestId)); @@ -522,24 +552,18 @@ async fn handle_frontend_messages( } // User called `notification` on the front-end Some(FrontToBack::Notification(notif)) => { - tracing::trace!("[backend]: client prepares to send notification: {:?}", notif); if let Err(e) = sender.send(notif).await { tracing::warn!("[backend]: client notif failed: {:?}", e); } } // User called `request` on the front-end - Some(FrontToBack::Request(request)) => { - tracing::trace!("[backend]: client prepares to send request={:?}", request); - match sender.send(request.raw).await { - Ok(_) => { - manager.insert_pending_call(request.id, request.send_back).expect("ID unused checked above; qed") - } - Err(e) => { - tracing::warn!("[backend]: client request failed: {:?}", e); - let _ = request.send_back.map(|s| s.send(Err(Error::Transport(e.into())))); - } + Some(FrontToBack::Request(request)) => match sender.send(request.raw).await { + Ok(_) => manager.insert_pending_call(request.id, request.send_back).expect("ID unused checked above; qed"), + Err(e) => { + tracing::warn!("[backend]: client request failed: {:?}", e); + let _ = request.send_back.map(|s| s.send(Err(Error::Transport(e.into())))); } - } + }, // User called `subscribe` on the front-end. Some(FrontToBack::Subscribe(sub)) => match sender.send(sub.raw).await { Ok(_) => manager @@ -569,7 +593,6 @@ async fn handle_frontend_messages( } // User called `register_notification` on the front-end. Some(FrontToBack::RegisterNotification(reg)) => { - tracing::trace!("[backend] registering notification handler: {:?}", reg.method); let (subscribe_tx, subscribe_rx) = mpsc::channel(max_notifs_per_subscription); if manager.insert_notification_handler(®.method, subscribe_tx).is_ok() { @@ -580,7 +603,6 @@ async fn handle_frontend_messages( } // User dropped the notificationHandler for this method Some(FrontToBack::UnregisterNotification(method)) => { - tracing::trace!("[backend] unregistering notification handler: {:?}", method); let _ = manager.remove_notification_handler(method); } } @@ -628,7 +650,10 @@ async fn background_task( match future::select(message_fut, submit_ping).await { // Message received from the frontend. Either::Left((Either::Left((frontend_value, backend)), _)) => { - if let Err(err) = handle_frontend_messages(frontend_value, &mut manager, &mut sender, max_notifs_per_subscription).await { + if let Err(err) = + handle_frontend_messages(frontend_value, &mut manager, &mut sender, max_notifs_per_subscription) + .await + { tracing::warn!("{:?}", err); let _ = front_error.send(err); break; @@ -637,10 +662,15 @@ async fn background_task( message_fut = future::select(frontend.next(), backend); } // Message received from the backend. - Either::Left((Either::Right((backend_value, frontend)), _))=> { + Either::Left((Either::Right((backend_value, frontend)), _)) => { if let Err(err) = handle_backend_messages::( - backend_value, &mut manager, &mut sender, max_notifs_per_subscription - ).await { + backend_value, + &mut manager, + &mut sender, + max_notifs_per_subscription, + ) + .await + { tracing::warn!("{:?}", err); let _ = front_error.send(err); break; diff --git a/core/src/lib.rs b/core/src/lib.rs index c1d77de7b3..91ca4ceda0 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -55,6 +55,8 @@ cfg_client! { pub mod client; } +/// Shared tracing helpers to trace RPC calls. +pub mod tracing; pub use async_trait::async_trait; pub use error::Error; diff --git a/core/src/server/helpers.rs b/core/src/server/helpers.rs index d9587ed077..09ded6799f 100644 --- a/core/src/server/helpers.rs +++ b/core/src/server/helpers.rs @@ -27,7 +27,8 @@ use std::io; use std::sync::Arc; -use crate::Error; +use crate::tracing::tx_log_from_str; +use crate::{Error}; use futures_channel::mpsc; use futures_util::StreamExt; use jsonrpsee_types::error::{ErrorCode, ErrorObject, ErrorResponse, OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG}; @@ -87,17 +88,19 @@ pub struct MethodSink { tx: mpsc::UnboundedSender, /// Max response size in bytes for a executed call. max_response_size: u32, + /// Max log length. + max_log_length: u32, } impl MethodSink { /// Create a new `MethodSink` with unlimited response size pub fn new(tx: mpsc::UnboundedSender) -> Self { - MethodSink { tx, max_response_size: u32::MAX } + MethodSink { tx, max_response_size: u32::MAX, max_log_length: u32::MAX } } /// Create a new `MethodSink` with a limited response size - pub fn new_with_limit(tx: mpsc::UnboundedSender, max_response_size: u32) -> Self { - MethodSink { tx, max_response_size } + pub fn new_with_limit(tx: mpsc::UnboundedSender, max_response_size: u32, max_log_length: u32) -> Self { + MethodSink { tx, max_response_size, max_log_length } } /// Returns whether this channel is closed without needing a context. @@ -128,6 +131,8 @@ impl MethodSink { } }; + tx_log_from_str(&json, self.max_log_length); + if let Err(err) = self.send_raw(json) { tracing::warn!("Error sending response {:?}", err); false @@ -147,6 +152,8 @@ impl MethodSink { } }; + tx_log_from_str(&json, self.max_log_length); + if let Err(err) = self.send_raw(json) { tracing::warn!("Error sending response {:?}", err); } diff --git a/core/src/tracing.rs b/core/src/tracing.rs new file mode 100644 index 0000000000..eb491418a8 --- /dev/null +++ b/core/src/tracing.rs @@ -0,0 +1,101 @@ +use serde::Serialize; +use tracing::Level; + +#[derive(Debug)] +/// Wrapper over [`tracing::Span`] to trace individual method calls, notifications and similar. +pub struct RpcTracing(tracing::Span); + +impl RpcTracing { + /// Create a `method_call` tracing target. + /// + /// To enable this you need to call `RpcTracing::method_call("some_method").span().enable()`. + pub fn method_call(method: &str) -> Self { + Self(tracing::span!(tracing::Level::DEBUG, "method_call", %method)) + } + + /// Create a `notification` tracing target. + /// + /// To enable this you need to call `RpcTracing::notification("some_method").span().enable()`. + pub fn notification(method: &str) -> Self { + Self(tracing::span!(tracing::Level::DEBUG, "notification", %method)) + } + + /// Create a `batch` tracing target. + /// + /// To enable this you need to call `RpcTracing::batch().span().enable()`. + pub fn batch() -> Self { + Self(tracing::span!(tracing::Level::DEBUG, "batch")) + } + + /// Get the inner span. + pub fn span(&self) -> &tracing::Span { + &self.0 + } +} + +/// Helper for writing trace logs from str. +pub fn tx_log_from_str(s: impl AsRef, max: u32) { + if tracing::enabled!(Level::TRACE) { + let msg = truncate_at_char_boundary(s.as_ref(), max as usize); + tracing::trace!(send = msg); + } +} + +/// Helper for writing trace logs from JSON. +pub fn tx_log_from_json(s: &impl Serialize, max: u32) { + if tracing::enabled!(Level::TRACE) { + let json = serde_json::to_string(s).unwrap_or_default(); + let msg = truncate_at_char_boundary(&json, max as usize); + tracing::trace!(send = msg); + } +} + +/// Helper for writing trace logs from str. +pub fn rx_log_from_str(s: impl AsRef, max: u32) { + if tracing::enabled!(Level::TRACE) { + let msg = truncate_at_char_boundary(s.as_ref(), max as usize); + tracing::trace!(recv = msg); + } +} + +/// Helper for writing trace logs from JSON. +pub fn rx_log_from_json(s: &impl Serialize, max: u32) { + if tracing::enabled!(Level::TRACE) { + let res = serde_json::to_string(s).unwrap_or_default(); + let msg = truncate_at_char_boundary(res.as_str(), max as usize); + tracing::trace!(recv = msg); + } +} + +/// Helper for writing trace logs from bytes. +pub fn rx_log_from_bytes(bytes: &[u8], max: u32) { + if tracing::enabled!(Level::TRACE) { + let res = serde_json::from_slice::(bytes).unwrap_or_default(); + rx_log_from_json(&res, max); + } +} + +/// Find the next char boundary to truncate at. +fn truncate_at_char_boundary(s: &str, max: usize) -> &str { + if s.len() < max { + return s; + } + + match s.char_indices().nth(max as usize) { + None => s, + Some((idx, _)) => &s[..idx], + } +} + +#[cfg(test)] +mod tests { + use super::truncate_at_char_boundary; + + #[test] + fn truncate_at_char_boundary_works() { + assert_eq!(truncate_at_char_boundary("ボルテックス", 0), ""); + assert_eq!(truncate_at_char_boundary("ボルテックス", 4), "ボルテッ"); + assert_eq!(truncate_at_char_boundary("ボルテックス", 100), "ボルテックス"); + assert_eq!(truncate_at_char_boundary("hola-hola", 4), "hola"); + } +} diff --git a/examples/examples/http.rs b/examples/examples/http.rs index cc96fe6866..030e96356f 100644 --- a/examples/examples/http.rs +++ b/examples/examples/http.rs @@ -30,13 +30,13 @@ use jsonrpsee::core::client::ClientT; use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle, RpcModule}; use jsonrpsee::rpc_params; +use tracing_subscriber::util::SubscriberInitExt; #[tokio::main] async fn main() -> anyhow::Result<()> { - tracing_subscriber::FmtSubscriber::builder() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .try_init() - .expect("setting default subscriber failed"); + let filter = tracing_subscriber::EnvFilter::try_from_default_env()? + .add_directive("jsonrpsee[method_call{name = \"say_hello\"}]=trace".parse()?); + tracing_subscriber::FmtSubscriber::builder().with_env_filter(filter).finish().try_init()?; let (server_addr, _handle) = run_server().await?; let url = format!("http://{}", server_addr); diff --git a/examples/examples/ws.rs b/examples/examples/ws.rs index 43be8fd742..7f27351c38 100644 --- a/examples/examples/ws.rs +++ b/examples/examples/ws.rs @@ -29,13 +29,14 @@ use std::net::SocketAddr; use jsonrpsee::core::client::ClientT; use jsonrpsee::ws_client::WsClientBuilder; use jsonrpsee::ws_server::{RpcModule, WsServerBuilder}; +use tracing_subscriber::util::SubscriberInitExt; #[tokio::main] async fn main() -> anyhow::Result<()> { - tracing_subscriber::FmtSubscriber::builder() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .try_init() - .expect("setting default subscriber failed"); + let filter = tracing_subscriber::EnvFilter::try_from_default_env()? + .add_directive("jsonrpsee[method_call{name = \"say_hello\"}]=trace".parse()?); + + tracing_subscriber::FmtSubscriber::builder().with_env_filter(filter).finish().try_init()?; let addr = run_server().await?; let url = format!("ws://{}", addr); diff --git a/http-server/Cargo.toml b/http-server/Cargo.toml index 1b1c246047..b61e1d5e06 100644 --- a/http-server/Cargo.toml +++ b/http-server/Cargo.toml @@ -16,6 +16,7 @@ futures-util = { version = "0.3.14", default-features = false } jsonrpsee-types = { path = "../types", version = "0.14.0" } jsonrpsee-core = { path = "../core", version = "0.14.0", features = ["server", "http-helpers"] } tracing = "0.1" +tracing-futures = "0.2.5" serde_json = { version = "1.0", features = ["raw_value"] } serde = "1" tokio = { version = "1.16", features = ["rt-multi-thread", "macros"] } diff --git a/http-server/src/server.rs b/http-server/src/server.rs index f0061c0f62..6850668ded 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -24,7 +24,6 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use std::cmp; use std::future::Future; use std::net::{SocketAddr, TcpListener as StdTcpListener}; use std::pin::Pin; @@ -45,11 +44,13 @@ use jsonrpsee_core::server::access_control::AccessControl; use jsonrpsee_core::server::helpers::{collect_batch_response, prepare_error, MethodSink}; use jsonrpsee_core::server::resource_limiting::Resources; use jsonrpsee_core::server::rpc_module::{MethodKind, Methods}; +use jsonrpsee_core::tracing::{rx_log_from_json, RpcTracing}; use jsonrpsee_core::TEN_MB_SIZE_BYTES; use jsonrpsee_types::error::{ErrorCode, ErrorObject, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG}; use jsonrpsee_types::{Id, Notification, Params, Request}; use serde_json::value::RawValue; use tokio::net::{TcpListener, ToSocketAddrs}; +use tracing_futures::Instrument; /// Builder to create JSON-RPC HTTP server. #[derive(Debug)] @@ -63,6 +64,7 @@ pub struct Builder { /// Custom tokio runtime to run the server on. tokio_runtime: Option, middleware: M, + max_log_length: u32, health_api: Option, } @@ -76,6 +78,7 @@ impl Default for Builder { resources: Resources::default(), tokio_runtime: None, middleware: (), + max_log_length: 4096, health_api: None, } } @@ -123,6 +126,7 @@ impl Builder { resources: self.resources, tokio_runtime: self.tokio_runtime, middleware, + max_log_length: self.max_log_length, health_api: self.health_api, } } @@ -235,6 +239,7 @@ impl Builder { resources: self.resources, tokio_runtime: self.tokio_runtime, middleware: self.middleware, + max_log_length: self.max_log_length, health_api: self.health_api, }) } @@ -279,6 +284,7 @@ impl Builder { resources: self.resources, tokio_runtime: self.tokio_runtime, middleware: self.middleware, + max_log_length: self.max_log_length, health_api: self.health_api, }) } @@ -314,6 +320,7 @@ impl Builder { resources: self.resources, tokio_runtime: self.tokio_runtime, middleware: self.middleware, + max_log_length: self.max_log_length, health_api: self.health_api, }) } @@ -367,6 +374,10 @@ pub struct Server { max_request_body_size: u32, /// Max response body size. max_response_body_size: u32, + /// Max length for logging for request and response + /// + /// Logs bigger than this limit will be truncated. + max_log_length: u32, /// Whether batch requests are supported by this server or not. batch_requests_supported: bool, /// Access control. @@ -389,6 +400,7 @@ impl Server { pub fn start(mut self, methods: impl Into) -> Result { let max_request_body_size = self.max_request_body_size; let max_response_body_size = self.max_response_body_size; + let max_log_length = self.max_log_length; let acl = self.access_control; let (tx, mut rx) = mpsc::channel(1); let listener = self.listener; @@ -477,6 +489,7 @@ impl Server { resources, max_request_body_size, max_response_body_size, + max_log_length, batch_requests_supported, ) .await?; @@ -488,7 +501,14 @@ impl Server { } Method::GET => match health_api.as_ref() { Some(health) if health.path.as_str() == request.uri().path() => { - process_health_request(health, middleware, methods, max_response_body_size).await + process_health_request( + health, + middleware, + methods, + max_response_body_size, + max_log_length, + ) + .await } _ => Ok(response::method_not_allowed()), }, @@ -556,6 +576,7 @@ async fn process_validated_request( resources: Resources, max_request_body_size: u32, max_response_body_size: u32, + max_log_length: u32, batch_requests_supported: bool, ) -> Result, HyperError> { let (parts, body) = request.into_parts(); @@ -574,7 +595,7 @@ async fn process_validated_request( // NOTE(niklasad1): it's a channel because it's needed for batch requests. let (tx, mut rx) = mpsc::unbounded::(); - let sink = MethodSink::new_with_limit(tx, max_response_body_size); + let sink = MethodSink::new_with_limit(tx, max_response_body_size, max_log_length); type Notif<'a> = Notification<'a, Option<&'a RawValue>>; @@ -582,6 +603,11 @@ async fn process_validated_request( if is_single { if let Ok(req) = serde_json::from_slice::(&body) { let method = req.method.as_ref(); + + let trace = RpcTracing::method_call(&req.method); + let _enter = trace.span().enter(); + + rx_log_from_json(&req, max_log_length); middleware.on_call(method); let id = req.id.clone(); @@ -607,8 +633,10 @@ async fn process_validated_request( }, MethodKind::Async(callback) => match method_callback.claim(name, &resources) { Ok(guard) => { - let result = - (callback)(id.into_owned(), params.into_owned(), sink.clone(), 0, Some(guard)).await; + let result = (callback)(id.into_owned(), params.into_owned(), sink.clone(), 0, Some(guard)) + .in_current_span() + .await; + result } Err(err) => { @@ -625,7 +653,12 @@ async fn process_validated_request( }, }; middleware.on_result(&req.method, result, request_start); - } else if let Ok(_req) = serde_json::from_slice::(&body) { + } else if let Ok(req) = serde_json::from_slice::(&body) { + let trace = RpcTracing::notification(&req.method); + let _enter = trace.span().enter(); + + rx_log_from_json(&req, max_log_length); + return Ok::<_, HyperError>(response::ok_response("".into())); } else { let (id, code) = prepare_error(&body); @@ -633,6 +666,11 @@ async fn process_validated_request( } // Batch of requests or notifications } else if let Ok(batch) = serde_json::from_slice::>(&body) { + let trace = RpcTracing::batch(); + let _enter = trace.span().enter(); + + rx_log_from_json(&batch, max_log_length); + if !batch_requests_supported { // Server was configured to not support batches. is_single = true; @@ -678,7 +716,7 @@ async fn process_validated_request( let callback = callback.clone(); Some(async move { - let result = (callback)(id, params, sink, 0, Some(guard)).await; + let result = (callback)(id, params, sink, 0, Some(guard)).in_current_span().await; middleware.on_result(name, result, request_start); }) } @@ -718,7 +756,7 @@ async fn process_validated_request( is_single = true; let (id, code) = prepare_error(&body); sink.send_error(id, code.into()); - } + }; // Closes the receiving half of a channel without dropping it. This prevents any further // messages from being sent on the channel. @@ -728,7 +766,7 @@ async fn process_validated_request( } else { collect_batch_response(rx).await }; - tracing::debug!("[service_fn] sending back: {:?}", &response[..cmp::min(response.len(), 1024)]); + middleware.on_response(request_start); Ok(response::ok_response(response)) } @@ -738,9 +776,10 @@ async fn process_health_request( middleware: impl Middleware, methods: Methods, max_response_body_size: u32, + max_log_length: u32, ) -> Result, HyperError> { let (tx, mut rx) = mpsc::unbounded::(); - let sink = MethodSink::new_with_limit(tx, max_response_body_size); + let sink = MethodSink::new_with_limit(tx, max_response_body_size, max_log_length); let request_start = middleware.on_request(); diff --git a/types/src/request.rs b/types/src/request.rs index c4836ee134..c6f6cf4357 100644 --- a/types/src/request.rs +++ b/types/src/request.rs @@ -33,7 +33,7 @@ use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; /// JSON-RPC request object as defined in the [spec](https://www.jsonrpc.org/specification#request-object). -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct Request<'a> { /// JSON-RPC version. diff --git a/types/src/response.rs b/types/src/response.rs index f364cf7995..d8c7eefb91 100644 --- a/types/src/response.rs +++ b/types/src/response.rs @@ -26,6 +26,8 @@ //! Types pertaining to JSON-RPC responses. +use std::fmt; + use crate::params::{Id, SubscriptionId, TwoPointZero}; use crate::request::Notification; use serde::{Deserialize, Serialize}; @@ -50,6 +52,12 @@ impl<'a, T> Response<'a, T> { } } +impl<'a, T: Serialize> fmt::Display for Response<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", serde_json::to_string(&self).expect("valid JSON; qed")) + } +} + /// Return value for subscriptions. #[derive(Serialize, Deserialize, Debug)] pub struct SubscriptionPayload<'a, T> { diff --git a/ws-server/Cargo.toml b/ws-server/Cargo.toml index 2b2b2ece2d..39d9035bd3 100644 --- a/ws-server/Cargo.toml +++ b/ws-server/Cargo.toml @@ -19,6 +19,7 @@ serde_json = { version = "1", features = ["raw_value"] } soketto = "0.7.1" tokio = { version = "1.16", features = ["net", "rt-multi-thread", "macros", "time"] } tokio-util = { version = "0.7", features = ["compat"] } +tracing-futures = "0.2.5" tokio-stream = "0.1.7" [dev-dependencies] diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 92569007c9..f4b282dddd 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -44,6 +44,7 @@ use jsonrpsee_core::server::access_control::AccessControl; use jsonrpsee_core::server::helpers::{collect_batch_response, prepare_error, BoundedSubscriptions, MethodSink}; use jsonrpsee_core::server::resource_limiting::Resources; use jsonrpsee_core::server::rpc_module::{ConnState, ConnectionId, MethodKind, Methods}; +use jsonrpsee_core::tracing::{rx_log_from_json, RpcTracing}; use jsonrpsee_core::traits::IdProvider; use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES}; use jsonrpsee_types::error::{reject_too_big_request, reject_too_many_subscriptions}; @@ -55,6 +56,7 @@ use soketto::Sender; use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; use tokio_stream::wrappers::IntervalStream; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; +use tracing_futures::Instrument; /// Default maximum connections allowed. const MAX_CONNECTIONS: u64 = 100; @@ -145,7 +147,7 @@ impl Server { }, ))); - tracing::info!("Accepting new connection, {}/{}", connections.count(), self.cfg.max_connections); + tracing::info!("Accepting new connection {}/{}", connections.count(), self.cfg.max_connections); id = id.wrapping_add(1); } @@ -246,7 +248,6 @@ where Ok(()) } HandshakeResponse::Accept { conn_id, methods, resources, cfg, stop_monitor, middleware, id_provider } => { - tracing::debug!("Accepting new connection: {}", conn_id); let key = { let req = server.receive_request().await?; @@ -287,6 +288,7 @@ where resources.clone(), cfg.max_request_body_size, cfg.max_response_body_size, + cfg.max_log_length, cfg.batch_requests_supported, BoundedSubscriptions::new(cfg.max_subscriptions_per_connection), stop_monitor.clone(), @@ -311,6 +313,7 @@ async fn background_task( resources: Resources, max_request_body_size: u32, max_response_body_size: u32, + max_log_length: u32, batch_requests_supported: bool, bounded_subscriptions: BoundedSubscriptions, stop_server: StopMonitor, @@ -326,7 +329,7 @@ async fn background_task( let bounded_subscriptions2 = bounded_subscriptions.clone(); let stop_server2 = stop_server.clone(); - let sink = MethodSink::new_with_limit(tx, max_response_body_size); + let sink = MethodSink::new_with_limit(tx, max_response_body_size, max_log_length); middleware.on_connect(); @@ -401,7 +404,7 @@ async fn background_task( if let Err(err) = method_executors.select_with(Monitored::new(receive, &stop_server)).await { match err { MonitoredError::Selector(SokettoError::Closed) => { - tracing::debug!("WS transport error: remote peer terminated the connection: {}", conn_id); + tracing::debug!("WS transport: remote peer terminated the connection: {}", conn_id); sink.close(); break Ok(()); } @@ -425,8 +428,6 @@ async fn background_task( }; }; - tracing::debug!("recv {} bytes", data.len()); - let request_start = middleware.on_request(); let first_non_whitespace = data.iter().find(|byte| !byte.is_ascii_whitespace()); @@ -434,8 +435,10 @@ async fn background_task( match first_non_whitespace { Some(b'{') => { if let Ok(req) = serde_json::from_slice::(&data) { - tracing::debug!("recv method call={}", req.method); - tracing::trace!("recv: req={:?}", req); + let trace = RpcTracing::method_call(&req.method); + let _enter = trace.span().enter(); + + rx_log_from_json(&req, max_log_length); let id = req.id.clone(); let params = Params::new(req.params.map(|params| params.get())); @@ -478,7 +481,7 @@ async fn background_task( middleware.on_response(request_start); }; - method_executors.add(fut.boxed()); + method_executors.add(fut.in_current_span().boxed()); } Err(err) => { tracing::error!( @@ -544,10 +547,8 @@ async fn background_task( // request in the batch and read the results off of a new channel, `rx_batch`, and then send the // complete batch response back to the client over `tx`. let (tx_batch, mut rx_batch) = mpsc::unbounded(); - let sink_batch = MethodSink::new_with_limit(tx_batch, max_response_body_size); + let sink_batch = MethodSink::new_with_limit(tx_batch, max_response_body_size, max_log_length); if let Ok(batch) = serde_json::from_slice::>(&d) { - tracing::debug!("recv batch len={}", batch.len()); - tracing::trace!("recv: batch={:?}", batch); if !batch_requests_supported { sink.send_error( Id::Null, @@ -555,6 +556,11 @@ async fn background_task( ); middleware.on_response(request_start); } else if !batch.is_empty() { + let trace = RpcTracing::batch(); + let _enter = trace.span().enter(); + + rx_log_from_json(&batch, max_log_length); + join_all(batch.into_iter().filter_map(move |req| { let id = req.id.clone(); let params = Params::new(req.params.map(|params| params.get())); @@ -704,6 +710,10 @@ struct Settings { max_connections: u64, /// Maximum number of subscriptions per connection. max_subscriptions_per_connection: u32, + /// Max length for logging for requests and responses + /// + /// Logs bigger than this limit will be truncated. + max_log_length: u32, /// Access control based on HTTP headers access_control: AccessControl, /// Whether batch requests are supported by this server or not. @@ -719,6 +729,7 @@ impl Default for Settings { Self { max_request_body_size: TEN_MB_SIZE_BYTES, max_response_body_size: TEN_MB_SIZE_BYTES, + max_log_length: 4096, max_subscriptions_per_connection: 1024, max_connections: MAX_CONNECTIONS, batch_requests_supported: true, @@ -922,8 +933,6 @@ async fn send_ws_message( sender: &mut Sender>>>, response: String, ) -> Result<(), Error> { - tracing::debug!("send {} bytes", response.len()); - tracing::trace!("send: {}", response); sender.send_text_owned(response).await?; sender.flush().await.map_err(Into::into) }