From bcd466ff61cb267bc9fa3e20fcd8e1792f20c07b Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 3 Aug 2022 17:27:19 +0200 Subject: [PATCH 1/8] tracing: use instrument macro --- client/http-client/src/client.rs | 139 +++++++-------- core/src/client/async_client/mod.rs | 266 +++++++++++++--------------- http-server/src/server.rs | 114 ++++++------ ws-server/src/server.rs | 55 +++--- 4 files changed, 265 insertions(+), 309 deletions(-) diff --git a/client/http-client/src/client.rs b/client/http-client/src/client.rs index 541459c03e..b8b92f77bf 100644 --- a/client/http-client/src/client.rs +++ b/client/http-client/src/client.rs @@ -32,12 +32,11 @@ use crate::types::{ErrorResponse, Id, NotificationSer, ParamsSer, RequestSer, Re use async_trait::async_trait; use hyper::http::HeaderMap; use jsonrpsee_core::client::{CertificateStore, ClientT, IdKind, RequestIdManager, Subscription, SubscriptionClientT}; -use jsonrpsee_core::tracing::RpcTracing; use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES}; use jsonrpsee_types::error::CallError; use rustc_hash::FxHashMap; use serde::de::DeserializeOwned; -use tracing_futures::Instrument; +use tracing::instrument; /// Http Client Builder. /// @@ -166,24 +165,21 @@ pub struct HttpClient { #[async_trait] impl ClientT for HttpClient { + #[instrument(name = "notification", skip(self, params))] async fn notification<'a>(&self, method: &'a str, params: Option>) -> Result<(), Error> { - let trace = RpcTracing::notification(method); - async { - let notif = serde_json::to_string(&NotificationSer::new(method, params)).map_err(Error::ParseError)?; + let notif = serde_json::to_string(&NotificationSer::new(method, params)).map_err(Error::ParseError)?; - let fut = self.transport.send(notif); + let fut = self.transport.send(notif); - match tokio::time::timeout(self.request_timeout, fut).await { - Ok(Ok(ok)) => Ok(ok), - Err(_) => Err(Error::RequestTimeout), - Ok(Err(e)) => Err(Error::Transport(e.into())), - } + match tokio::time::timeout(self.request_timeout, fut).await { + Ok(Ok(ok)) => Ok(ok), + Err(_) => Err(Error::RequestTimeout), + Ok(Err(e)) => Err(Error::Transport(e.into())), } - .instrument(trace.into_span()) - .await } /// Perform a request towards the server. + #[instrument(name = "method_call", skip(self, params))] async fn request<'a, R>(&self, method: &'a str, params: Option>) -> Result where R: DeserializeOwned, @@ -191,94 +187,84 @@ impl ClientT for HttpClient { let guard = self.id_manager.next_request_id()?; let id = guard.inner(); let request = RequestSer::new(&id, method, params); - let trace = RpcTracing::method_call(method); - - async { - let raw = serde_json::to_string(&request).map_err(Error::ParseError)?; - - let fut = self.transport.send_and_read_body(raw); - let body = match tokio::time::timeout(self.request_timeout, fut).await { - Ok(Ok(body)) => body, - Err(_e) => { - return Err(Error::RequestTimeout); - } - Ok(Err(e)) => { - return Err(Error::Transport(e.into())); - } - }; + let raw = serde_json::to_string(&request).map_err(Error::ParseError)?; - let response: Response<_> = match serde_json::from_slice(&body) { - Ok(response) => response, - Err(_) => { - let err: ErrorResponse = serde_json::from_slice(&body).map_err(Error::ParseError)?; - return Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned()))); - } - }; + let fut = self.transport.send_and_read_body(raw); + let body = match tokio::time::timeout(self.request_timeout, fut).await { + Ok(Ok(body)) => body, + Err(_e) => { + return Err(Error::RequestTimeout); + } + Ok(Err(e)) => { + return Err(Error::Transport(e.into())); + } + }; - if response.id == id { - Ok(response.result) - } else { - Err(Error::InvalidRequestId) + let response: Response<_> = match serde_json::from_slice(&body) { + Ok(response) => response, + Err(_) => { + let err: ErrorResponse = serde_json::from_slice(&body).map_err(Error::ParseError)?; + return Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned()))); } + }; + + if response.id == id { + Ok(response.result) + } else { + Err(Error::InvalidRequestId) } - .instrument(trace.into_span()) - .await } + #[instrument(name = "batch", skip(self, batch))] async fn batch_request<'a, R>(&self, batch: Vec<(&'a str, Option>)>) -> Result, Error> where R: DeserializeOwned + Default + Clone, { let guard = self.id_manager.next_request_ids(batch.len())?; let ids: Vec = guard.inner(); - let trace = RpcTracing::batch(); - async { - let mut batch_request = Vec::with_capacity(batch.len()); - // NOTE(niklasad1): `ID` is not necessarily monotonically increasing. - let mut ordered_requests = Vec::with_capacity(batch.len()); - let mut request_set = FxHashMap::with_capacity_and_hasher(batch.len(), Default::default()); + let mut batch_request = Vec::with_capacity(batch.len()); + // NOTE(niklasad1): `ID` is not necessarily monotonically increasing. + let mut ordered_requests = Vec::with_capacity(batch.len()); + let mut request_set = FxHashMap::with_capacity_and_hasher(batch.len(), Default::default()); - for (pos, (method, params)) in batch.into_iter().enumerate() { - batch_request.push(RequestSer::new(&ids[pos], method, params)); - ordered_requests.push(&ids[pos]); - request_set.insert(&ids[pos], pos); - } + for (pos, (method, params)) in batch.into_iter().enumerate() { + batch_request.push(RequestSer::new(&ids[pos], method, params)); + ordered_requests.push(&ids[pos]); + request_set.insert(&ids[pos], pos); + } - let fut = - self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?); + let fut = self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?); - let body = match tokio::time::timeout(self.request_timeout, fut).await { - Ok(Ok(body)) => body, - Err(_e) => return Err(Error::RequestTimeout), - Ok(Err(e)) => return Err(Error::Transport(e.into())), - }; + let body = match tokio::time::timeout(self.request_timeout, fut).await { + Ok(Ok(body)) => body, + Err(_e) => return Err(Error::RequestTimeout), + Ok(Err(e)) => return Err(Error::Transport(e.into())), + }; - let rps: Vec> = - serde_json::from_slice(&body).map_err(|_| match serde_json::from_slice::(&body) { - Ok(e) => Error::Call(CallError::Custom(e.error_object().clone().into_owned())), - Err(e) => Error::ParseError(e), - })?; + let rps: Vec> = + serde_json::from_slice(&body).map_err(|_| match serde_json::from_slice::(&body) { + Ok(e) => Error::Call(CallError::Custom(e.error_object().clone().into_owned())), + Err(e) => Error::ParseError(e), + })?; - // NOTE: `R::default` is placeholder and will be replaced in loop below. - let mut responses = vec![R::default(); ordered_requests.len()]; - for rp in rps { - let pos = match request_set.get(&rp.id) { - Some(pos) => *pos, - None => return Err(Error::InvalidRequestId), - }; - responses[pos] = rp.result - } - Ok(responses) + // NOTE: `R::default` is placeholder and will be replaced in loop below. + let mut responses = vec![R::default(); ordered_requests.len()]; + for rp in rps { + let pos = match request_set.get(&rp.id) { + Some(pos) => *pos, + None => return Err(Error::InvalidRequestId), + }; + responses[pos] = rp.result } - .instrument(trace.into_span()) - .await + Ok(responses) } } #[async_trait] impl SubscriptionClientT for HttpClient { /// Send a subscription request to the server. Not implemented for HTTP; will always return [`Error::HttpNotImplemented`]. + #[instrument(name = "subscription", skip(self, _params))] async fn subscribe<'a, N>( &self, _subscribe_method: &'a str, @@ -292,6 +278,7 @@ impl SubscriptionClientT for HttpClient { } /// Subscribe to a specific method. Not implemented for HTTP; will always return [`Error::HttpNotImplemented`]. + #[instrument(name = "subscribe_method", skip(self))] async fn subscribe_to_method<'a, N>(&self, _method: &'a str) -> Result, Error> where N: DeserializeOwned, diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index cdf07f6a27..5e26271ac1 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -8,7 +8,7 @@ use crate::client::{ RegisterNotificationMessage, RequestMessage, Subscription, SubscriptionClientT, SubscriptionKind, SubscriptionMessage, TransportReceiverT, TransportSenderT, }; -use crate::tracing::{rx_log_from_json, tx_log_from_str, RpcTracing}; +use crate::tracing::{rx_log_from_json, tx_log_from_str}; use core::time::Duration; use helpers::{ @@ -31,7 +31,7 @@ use jsonrpsee_types::{ SubscriptionResponse, }; use serde::de::DeserializeOwned; -use tracing_futures::Instrument; +use tracing::instrument; use super::{FrontToBack, IdKind, RequestIdManager}; @@ -242,105 +242,98 @@ impl Drop for Client { #[async_trait] impl ClientT for Client { + #[instrument(name = "notification", skip(self, params))] async fn notification<'a>(&self, method: &'a str, params: Option>) -> Result<(), Error> { - // NOTE: we use this to guard against max number of concurrent requests. - let _req_id = self.id_manager.next_request_id()?; - let notif = NotificationSer::new(method, params); - let trace = RpcTracing::batch(); - - async { - let raw = serde_json::to_string(¬if).map_err(Error::ParseError)?; - tx_log_from_str(&raw, self.max_log_length); - - let mut sender = self.to_back.clone(); - let fut = sender.send(FrontToBack::Notification(raw)); - - match future::select(fut, Delay::new(self.request_timeout)).await { - Either::Left((Ok(()), _)) => Ok(()), - Either::Left((Err(_), _)) => Err(self.read_error_from_backend().await), - Either::Right((_, _)) => Err(Error::RequestTimeout), - } - }.instrument(trace.into_span()).await - } + // NOTE: we use this to guard against max number of concurrent requests. + let _req_id = self.id_manager.next_request_id()?; + let notif = NotificationSer::new(method, params); + let raw = serde_json::to_string(¬if).map_err(Error::ParseError)?; + tx_log_from_str(&raw, self.max_log_length); + + let mut sender = self.to_back.clone(); + let fut = sender.send(FrontToBack::Notification(raw)); + + match future::select(fut, Delay::new(self.request_timeout)).await { + Either::Left((Ok(()), _)) => Ok(()), + Either::Left((Err(_), _)) => Err(self.read_error_from_backend().await), + Either::Right((_, _)) => Err(Error::RequestTimeout), + } + } + + #[instrument(name = "method_call", skip(self, params))] async fn request<'a, R>(&self, method: &'a str, params: Option>) -> Result where R: DeserializeOwned, - { - let (send_back_tx, send_back_rx) = oneshot::channel(); - let guard = self.id_manager.next_request_id()?; - let id = guard.inner(); - let trace = RpcTracing::method_call(method); - - async { - let raw = serde_json::to_string(&RequestSer::new(&id, method, params)).map_err(Error::ParseError)?; - tx_log_from_str(&raw, self.max_log_length); - - if self - .to_back - .clone() - .send(FrontToBack::Request(RequestMessage { raw, id: id.clone(), send_back: Some(send_back_tx) })) - .await - .is_err() - { - return Err(self.read_error_from_backend().await); - } - - let res = call_with_timeout(self.request_timeout, send_back_rx).await; - let json_value = match res { - Ok(Ok(v)) => v, - Ok(Err(err)) => return Err(err), - Err(_) => return Err(self.read_error_from_backend().await), - }; - - rx_log_from_json(&Response::new(&json_value, id), self.max_log_length); - - serde_json::from_value(json_value).map_err(Error::ParseError) - }.instrument(trace.into_span()).await - } + { + let (send_back_tx, send_back_rx) = oneshot::channel(); + let guard = self.id_manager.next_request_id()?; + let id = guard.inner(); + + let raw = serde_json::to_string(&RequestSer::new(&id, method, params)).map_err(Error::ParseError)?; + tx_log_from_str(&raw, self.max_log_length); + + if self + .to_back + .clone() + .send(FrontToBack::Request(RequestMessage { raw, id: id.clone(), send_back: Some(send_back_tx) })) + .await + .is_err() + { + return Err(self.read_error_from_backend().await); + } + + let json_value = match call_with_timeout(self.request_timeout, send_back_rx).await { + Ok(Ok(v)) => v, + Ok(Err(err)) => return Err(err), + Err(_) => return Err(self.read_error_from_backend().await), + }; + rx_log_from_json(&Response::new(&json_value, id), self.max_log_length); + + serde_json::from_value(json_value).map_err(Error::ParseError) + } + + #[instrument(name = "batch", skip(self, batch))] async fn batch_request<'a, R>(&self, batch: Vec<(&'a str, Option>)>) -> Result, Error> where R: DeserializeOwned + Default + Clone, - { - let trace = RpcTracing::batch(); - async { - let guard = self.id_manager.next_request_ids(batch.len())?; - let batch_ids: Vec = guard.inner(); - let mut batches = Vec::with_capacity(batch.len()); - for (idx, (method, params)) in batch.into_iter().enumerate() { - batches.push(RequestSer::new(&batch_ids[idx], method, params)); - } - - let (send_back_tx, send_back_rx) = oneshot::channel(); - - let raw = serde_json::to_string(&batches).map_err(Error::ParseError)?; - - tx_log_from_str(&raw, self.max_log_length); - - if self - .to_back - .clone() - .send(FrontToBack::Batch(BatchMessage { raw, ids: batch_ids, send_back: send_back_tx })) - .await - .is_err() - { - return Err(self.read_error_from_backend().await); - } - - let res = call_with_timeout(self.request_timeout, send_back_rx).await; - let json_values = match res { - Ok(Ok(v)) => v, - Ok(Err(err)) => return Err(err), - Err(_) => return Err(self.read_error_from_backend().await), - }; - - rx_log_from_json(&json_values, self.max_log_length); - - let values: Result<_, _> = - json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect(); - Ok(values?) - }.instrument(trace.into_span()).await + { + let guard = self.id_manager.next_request_ids(batch.len())?; + let batch_ids: Vec = guard.inner(); + let mut batches = Vec::with_capacity(batch.len()); + for (idx, (method, params)) in batch.into_iter().enumerate() { + batches.push(RequestSer::new(&batch_ids[idx], method, params)); + } + + let (send_back_tx, send_back_rx) = oneshot::channel(); + + let raw = serde_json::to_string(&batches).map_err(Error::ParseError)?; + + tx_log_from_str(&raw, self.max_log_length); + + if self + .to_back + .clone() + .send(FrontToBack::Batch(BatchMessage { raw, ids: batch_ids, send_back: send_back_tx })) + .await + .is_err() + { + return Err(self.read_error_from_backend().await); + } + + let res = call_with_timeout(self.request_timeout, send_back_rx).await; + let json_values = match res { + Ok(Ok(v)) => v, + Ok(Err(err)) => return Err(err), + Err(_) => return Err(self.read_error_from_backend().await), + }; + + rx_log_from_json(&json_values, self.max_log_length); + + let values: Result<_, _> = + json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect(); + Ok(values?) } } @@ -350,6 +343,7 @@ impl SubscriptionClientT for Client { /// /// The `subscribe_method` and `params` are used to ask for the subscription towards the /// server. The `unsubscribe_method` is used to close the subscription. + #[instrument(name = "subscription", skip(self, params))] async fn subscribe<'a, N>( &self, subscribe_method: &'a str, @@ -358,54 +352,50 @@ impl SubscriptionClientT for Client { ) -> Result, Error> where N: DeserializeOwned, - { - if subscribe_method == unsubscribe_method { - return Err(Error::SubscriptionNameConflict(unsubscribe_method.to_owned())); - } - - let guard = self.id_manager.next_request_ids(2)?; - let mut ids: Vec = guard.inner(); - let trace = RpcTracing::method_call(subscribe_method); - - async { - let id = ids[0].clone(); - - let raw = serde_json::to_string(&RequestSer::new(&id, subscribe_method, params)).map_err(Error::ParseError)?; - - tx_log_from_str(&raw, self.max_log_length); - - let (send_back_tx, send_back_rx) = oneshot::channel(); - if self - .to_back - .clone() - .send(FrontToBack::Subscribe(SubscriptionMessage { - raw, - subscribe_id: ids.swap_remove(0), - unsubscribe_id: ids.swap_remove(0), - unsubscribe_method: unsubscribe_method.to_owned(), - send_back: send_back_tx, - })) - .await - .is_err() - { - return Err(self.read_error_from_backend().await); - } - - let res = call_with_timeout(self.request_timeout, send_back_rx).await; - - let (notifs_rx, sub_id) = match res { - Ok(Ok(val)) => val, - Ok(Err(err)) => return Err(err), - Err(_) => return Err(self.read_error_from_backend().await), - }; - - rx_log_from_json(&Response::new(&sub_id, id), self.max_log_length); - - Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(sub_id))) - }.instrument(trace.into_span()).await - } + { + if subscribe_method == unsubscribe_method { + return Err(Error::SubscriptionNameConflict(unsubscribe_method.to_owned())); + } + + let guard = self.id_manager.next_request_ids(2)?; + let mut ids: Vec = guard.inner(); + + let id = ids[0].clone(); + + let raw = serde_json::to_string(&RequestSer::new(&id, subscribe_method, params)).map_err(Error::ParseError)?; + + tx_log_from_str(&raw, self.max_log_length); + + let (send_back_tx, send_back_rx) = oneshot::channel(); + if self + .to_back + .clone() + .send(FrontToBack::Subscribe(SubscriptionMessage { + raw, + subscribe_id: ids.swap_remove(0), + unsubscribe_id: ids.swap_remove(0), + unsubscribe_method: unsubscribe_method.to_owned(), + send_back: send_back_tx, + })) + .await + .is_err() + { + return Err(self.read_error_from_backend().await); + } + + let (notifs_rx, sub_id) = match call_with_timeout(self.request_timeout, send_back_rx).await { + Ok(Ok(val)) => val, + Ok(Err(err)) => return Err(err), + Err(_) => return Err(self.read_error_from_backend().await), + }; + + rx_log_from_json(&Response::new(&sub_id, id), self.max_log_length); + + Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(sub_id))) + } /// Subscribe to a specific method. + #[instrument(name = "subscribe_method", skip(self))] async fn subscribe_to_method<'a, N>(&self, method: &'a str) -> Result, Error> where N: DeserializeOwned, diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 0f57b302fe..bb78eaaaec 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -53,6 +53,7 @@ use jsonrpsee_types::error::{ErrorCode, ErrorObject, BATCHES_NOT_SUPPORTED_CODE, use jsonrpsee_types::{Id, Notification, Params, Request}; use serde_json::value::RawValue; use tokio::net::{TcpListener, ToSocketAddrs}; +use tracing::instrument; use tracing_futures::Instrument; type Notif<'a> = Notification<'a, Option<&'a RawValue>>; @@ -677,6 +678,7 @@ async fn process_validated_request( } } +#[instrument(name = "health_api", skip(middleware, methods, max_response_body_size, request_start, max_log_length))] async fn process_health_request( health_api: &HealthApi, middleware: M, @@ -685,44 +687,37 @@ async fn process_health_request( request_start: M::Instant, max_log_length: u32, ) -> Result, HyperError> { - let trace = RpcTracing::method_call(&health_api.method); - async { - tx_log_from_str("HTTP health API", max_log_length); - let response = match methods.method_with_name(&health_api.method) { - None => MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::MethodNotFound)), - Some((_name, method_callback)) => match method_callback.inner() { - MethodKind::Sync(callback) => { - (callback)(Id::Number(0), Params::new(None), max_response_body_size as usize) - } - MethodKind::Async(callback) => { - (callback)(Id::Number(0), Params::new(None), 0, max_response_body_size as usize, None).await - } - MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => { - MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InternalError)) - } - }, - }; - - rx_log_from_str(&response.result, max_log_length); - middleware.on_result(&health_api.method, response.success, request_start); - middleware.on_response(&response.result, request_start); - - if response.success { - #[derive(serde::Deserialize)] - struct RpcPayload<'a> { - #[serde(borrow)] - result: &'a serde_json::value::RawValue, + tx_log_from_str("HTTP health API", max_log_length); + let response = match methods.method_with_name(&health_api.method) { + None => MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::MethodNotFound)), + Some((_name, method_callback)) => match method_callback.inner() { + MethodKind::Sync(callback) => (callback)(Id::Number(0), Params::new(None), max_response_body_size as usize), + MethodKind::Async(callback) => { + (callback)(Id::Number(0), Params::new(None), 0, max_response_body_size as usize, None).await } + MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => { + MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InternalError)) + } + }, + }; - let payload: RpcPayload = serde_json::from_str(&response.result) - .expect("valid JSON-RPC response must have a result field and be valid JSON; qed"); - Ok(response::ok_response(payload.result.to_string())) - } else { - Ok(response::internal_error()) + rx_log_from_str(&response.result, max_log_length); + middleware.on_result(&health_api.method, response.success, request_start); + middleware.on_response(&response.result, request_start); + + if response.success { + #[derive(serde::Deserialize)] + struct RpcPayload<'a> { + #[serde(borrow)] + result: &'a serde_json::value::RawValue, } + + let payload: RpcPayload = serde_json::from_str(&response.result) + .expect("valid JSON-RPC response must have a result field and be valid JSON; qed"); + Ok(response::ok_response(payload.result.to_string())) + } else { + Ok(response::internal_error()) } - .instrument(trace.into_span()) - .await } #[derive(Debug, Clone)] @@ -753,6 +748,7 @@ struct Call<'a, M: Middleware> { // Batch responses must be sent back as a single message so we read the results from each // request in the batch and read the results off of a new channel, `rx_batch`, and then send the // complete batch response back to the client over `tx`. +#[instrument(name = "batch", skip(b))] async fn process_batch_request(b: Batch<'_, M>) -> BatchResponse where M: Middleware, @@ -765,26 +761,21 @@ where let batch_stream = futures_util::stream::iter(batch); - let trace = RpcTracing::batch(); - return async { - let batch_response = batch_stream - .try_fold( - BatchResponseBuilder::new_with_limit(max_response_size as usize), - |batch_response, (req, call)| async move { - let params = Params::new(req.params.map(|params| params.get())); - let response = execute_call(Call { name: &req.method, params, id: req.id, call }).await; - batch_response.append(&response) - }, - ) - .await; - - match batch_response { - Ok(batch) => batch.finish(), - Err(batch_err) => batch_err, - } - } - .instrument(trace.into_span()) - .await; + let batch_response = batch_stream + .try_fold( + BatchResponseBuilder::new_with_limit(max_response_size as usize), + |batch_response, (req, call)| async move { + let params = Params::new(req.params.map(|params| params.get())); + let response = execute_call(Call { name: &req.method, params, id: req.id, call }).await; + batch_response.append(&response) + }, + ) + .await; + + return match batch_response { + Ok(batch) => batch.finish(), + Err(batch_err) => batch_err, + }; } if let Ok(batch) = serde_json::from_slice::>(&data) { @@ -805,15 +796,12 @@ where async fn process_single_request(data: Vec, call: CallData<'_, M>) -> MethodResponse { if let Ok(req) = serde_json::from_slice::(&data) { let trace = RpcTracing::method_call(&req.method); - async { - rx_log_from_json(&req, call.max_log_length); - let params = Params::new(req.params.map(|params| params.get())); - let name = &req.method; - let id = req.id; - execute_call(Call { name, params, id, call }).await - } - .instrument(trace.into_span()) - .await + rx_log_from_json(&req, call.max_log_length); + let params = Params::new(req.params.map(|params| params.get())); + let name = &req.method; + let id = req.id; + + execute_call(Call { name, params, id, call }).instrument(trace.into_span()).await } else if let Ok(req) = serde_json::from_slice::(&data) { let trace = RpcTracing::notification(&req.method); let span = trace.into_span(); diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 74e261919e..30bb320cc2 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -62,6 +62,7 @@ use soketto::Sender; use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; use tokio_stream::wrappers::IntervalStream; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; +use tracing::instrument; use tracing_futures::Instrument; /// Default maximum connections allowed. @@ -849,6 +850,7 @@ impl MethodResult { // Batch responses must be sent back as a single message so we read the results from each // request in the batch and read the results off of a new channel, `rx_batch`, and then send the // complete batch response back to the client over `tx`. +#[instrument(name = "batch", skip(b))] async fn process_batch_request(b: Batch<'_, M>) -> BatchResponse where M: Middleware, @@ -860,29 +862,23 @@ where let batch = batch.into_iter().map(|req| Ok((req, call.clone()))); let batch_stream = futures_util::stream::iter(batch); - let trace = RpcTracing::batch(); - - return async { - let max_response_size = call.max_response_body_size; - - let batch_response = batch_stream - .try_fold( - BatchResponseBuilder::new_with_limit(max_response_size as usize), - |batch_response, (req, call)| async move { - let params = Params::new(req.params.map(|params| params.get())); - let response = execute_call(Call { name: &req.method, params, id: req.id, call }).await; - batch_response.append(response.as_inner()) - }, - ) - .await; - - match batch_response { - Ok(batch) => batch.finish(), - Err(batch_err) => batch_err, - } + let max_response_size = call.max_response_body_size; + + let batch_response = batch_stream + .try_fold( + BatchResponseBuilder::new_with_limit(max_response_size as usize), + |batch_response, (req, call)| async move { + let params = Params::new(req.params.map(|params| params.get())); + let response = execute_call(Call { name: &req.method, params, id: req.id, call }).await; + batch_response.append(response.as_inner()) + }, + ) + .await; + + match batch_response { + Ok(batch) => batch.finish(), + Err(batch_err) => batch_err, } - .instrument(trace.into_span()) - .await; } else { BatchResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest)) }; @@ -895,18 +891,13 @@ where async fn process_single_request(data: Vec, call: CallData<'_, M>) -> MethodResult { if let Ok(req) = serde_json::from_slice::(&data) { let trace = RpcTracing::method_call(&req.method); + rx_log_from_json(&req, call.max_log_length); - async { - rx_log_from_json(&req, call.max_log_length); + let params = Params::new(req.params.map(|params| params.get())); + let name = &req.method; + let id = req.id; - let params = Params::new(req.params.map(|params| params.get())); - let name = &req.method; - let id = req.id; - - execute_call(Call { name, params, id, call }).await - } - .instrument(trace.into_span()) - .await + execute_call(Call { name, params, id, call }).instrument(trace.into_span()).await } else { let (id, code) = prepare_error(&data); MethodResult::SendAndMiddleware(MethodResponse::error(id, ErrorObject::from(code))) From f1833665a2ad4f3966be80af0604b7c81147f543 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 16 Aug 2022 17:14:51 +0200 Subject: [PATCH 2/8] fix merge nit --- core/src/client/async_client/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 5e26271ac1..aa21b09748 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -331,9 +331,7 @@ impl ClientT for Client { rx_log_from_json(&json_values, self.max_log_length); - let values: Result<_, _> = - json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect(); - Ok(values?) + json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect() } } From 342fdf4218bf2d8e7db036ad40f80c8619db968f Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 16 Aug 2022 17:15:38 +0200 Subject: [PATCH 3/8] cargo fmt --- http-server/src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 7a7b1f7d9d..4775746756 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -55,9 +55,9 @@ use jsonrpsee_types::{Id, Notification, Params, Request}; use serde_json::value::RawValue; use std::error::Error as StdError; use tokio::net::{TcpListener, ToSocketAddrs}; -use tracing::instrument; use tower::layer::util::Identity; use tower::Layer; +use tracing::instrument; use tracing_futures::Instrument; type Notif<'a> = Notification<'a, Option<&'a RawValue>>; From 198a7144ba9865754ee220f2f8fd8f2105dd7837 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 15 Sep 2022 15:35:03 +0200 Subject: [PATCH 4/8] tracing span in TRACE only --- client/http-client/src/client.rs | 10 +++++----- core/src/client/async_client/mod.rs | 10 +++++----- core/src/tracing.rs | 6 +++--- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/client/http-client/src/client.rs b/client/http-client/src/client.rs index cf613e76dc..0766bc6437 100644 --- a/client/http-client/src/client.rs +++ b/client/http-client/src/client.rs @@ -167,7 +167,7 @@ pub struct HttpClient { #[async_trait] impl ClientT for HttpClient { - #[instrument(name = "notification", skip(self, params))] + #[instrument(name = "notification", skip(self, params), level = "trace")] async fn notification(&self, method: &str, params: Params) -> Result<(), Error> where Params: ToRpcParams + Send, @@ -186,7 +186,7 @@ impl ClientT for HttpClient { /// Perform a request towards the server. - #[instrument(name = "method_call", skip(self, params))] + #[instrument(name = "method_call", skip(self, params), level = "trace")] async fn request(&self, method: &str, params: Params) -> Result where R: DeserializeOwned, @@ -225,7 +225,7 @@ impl ClientT for HttpClient { } } - #[instrument(name = "batch", skip(self, batch))] + #[instrument(name = "batch", skip(self, batch), level = "trace")] async fn batch_request<'a, R>(&self, batch: BatchRequestBuilder<'a>) -> Result, Error> where R: DeserializeOwned + Default + Clone, @@ -275,7 +275,7 @@ impl ClientT for HttpClient { #[async_trait] impl SubscriptionClientT for HttpClient { /// Send a subscription request to the server. Not implemented for HTTP; will always return [`Error::HttpNotImplemented`]. - #[instrument(name = "subscription", skip(self, _params))] + #[instrument(name = "subscription", skip(self, _params), level = "trace")] async fn subscribe<'a, N, Params>( &self, _subscribe_method: &'a str, @@ -290,7 +290,7 @@ impl SubscriptionClientT for HttpClient { } /// Subscribe to a specific method. Not implemented for HTTP; will always return [`Error::HttpNotImplemented`]. - #[instrument(name = "subscribe_method", skip(self))] + #[instrument(name = "subscribe_method", skip(self), level = "trace")] async fn subscribe_to_method<'a, N>(&self, _method: &'a str) -> Result, Error> where N: DeserializeOwned, diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 632ffffe6a..d8b01049e3 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -275,7 +275,7 @@ impl Drop for Client { #[async_trait] impl ClientT for Client { - #[instrument(name = "notification", skip(self, params))] + #[instrument(name = "notification", skip(self, params), level = "trace")] async fn notification(&self, method: &str, params: Params) -> Result<(), Error> where Params: ToRpcParams + Send, @@ -298,7 +298,7 @@ impl ClientT for Client { } } - #[instrument(name = "method_call", skip(self, params))] + #[instrument(name = "method_call", skip(self, params), level = "trace")] async fn request(&self, method: &str, params: Params) -> Result where R: DeserializeOwned, @@ -333,7 +333,7 @@ impl ClientT for Client { serde_json::from_value(json_value).map_err(Error::ParseError) } - #[instrument(name = "batch", skip(self, batch))] + #[instrument(name = "batch", skip(self, batch), level = "trace")] async fn batch_request<'a, R>(&self, batch: BatchRequestBuilder<'a>) -> Result, Error> where @@ -382,7 +382,7 @@ impl SubscriptionClientT for Client { /// /// The `subscribe_method` and `params` are used to ask for the subscription towards the /// server. The `unsubscribe_method` is used to close the subscription. - #[instrument(name = "subscription", skip(self, params))] + #[instrument(name = "subscription", skip(self, params), level = "trace")] async fn subscribe<'a, Notif, Params>( &self, subscribe_method: &'a str, @@ -435,7 +435,7 @@ impl SubscriptionClientT for Client { } /// Subscribe to a specific method. - #[instrument(name = "subscribe_method", skip(self))] + #[instrument(name = "subscribe_method", skip(self), level = "trace")] async fn subscribe_to_method<'a, N>(&self, method: &'a str) -> Result, Error> where N: DeserializeOwned, diff --git a/core/src/tracing.rs b/core/src/tracing.rs index 6f9312a4ae..1da514fddf 100644 --- a/core/src/tracing.rs +++ b/core/src/tracing.rs @@ -10,21 +10,21 @@ impl RpcTracing { /// /// To enable this you need to call `RpcTracing::method_call("some_method").span().enable()`. pub fn method_call(method: &str) -> Self { - Self(tracing::span!(tracing::Level::DEBUG, "method_call", %method)) + Self(tracing::span!(tracing::Level::TRACE, "method_call", %method)) } /// Create a `notification` tracing target. /// /// To enable this you need to call `RpcTracing::notification("some_method").span().enable()`. pub fn notification(method: &str) -> Self { - Self(tracing::span!(tracing::Level::DEBUG, "notification", %method)) + Self(tracing::span!(tracing::Level::TRACE, "notification", %method)) } /// Create a `batch` tracing target. /// /// To enable this you need to call `RpcTracing::batch().span().enable()`. pub fn batch() -> Self { - Self(tracing::span!(tracing::Level::DEBUG, "batch")) + Self(tracing::span!(tracing::Level::TRACE, "batch")) } /// Get the inner span. From ad3faa7b5e816fd37c88f28c06915ac3a02b02aa Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 15 Sep 2022 15:43:09 +0200 Subject: [PATCH 5/8] Update core/src/client/async_client/mod.rs --- core/src/client/async_client/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index d8b01049e3..d888cb03ca 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -334,7 +334,6 @@ impl ClientT for Client { } #[instrument(name = "batch", skip(self, batch), level = "trace")] - async fn batch_request<'a, R>(&self, batch: BatchRequestBuilder<'a>) -> Result, Error> where R: DeserializeOwned + Default + Clone, From 2b19efbe40db36dca4c38de51163e120081e3b45 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 15 Sep 2022 18:38:11 +0200 Subject: [PATCH 6/8] get rid of tracing-futures --- core/src/tracing.rs | 32 ------------- server/Cargo.toml | 1 - server/src/transport/http.rs | 92 +++++++++++++++++------------------- server/src/transport/ws.rs | 80 +++++++++++++------------------ 4 files changed, 75 insertions(+), 130 deletions(-) diff --git a/core/src/tracing.rs b/core/src/tracing.rs index 1da514fddf..6a138dc0cc 100644 --- a/core/src/tracing.rs +++ b/core/src/tracing.rs @@ -1,38 +1,6 @@ use serde::Serialize; use tracing::Level; -#[derive(Debug)] -/// Wrapper over [`tracing::Span`] to trace individual method calls, notifications and similar. -pub struct RpcTracing(tracing::Span); - -impl RpcTracing { - /// Create a `method_call` tracing target. - /// - /// To enable this you need to call `RpcTracing::method_call("some_method").span().enable()`. - pub fn method_call(method: &str) -> Self { - Self(tracing::span!(tracing::Level::TRACE, "method_call", %method)) - } - - /// Create a `notification` tracing target. - /// - /// To enable this you need to call `RpcTracing::notification("some_method").span().enable()`. - pub fn notification(method: &str) -> Self { - Self(tracing::span!(tracing::Level::TRACE, "notification", %method)) - } - - /// Create a `batch` tracing target. - /// - /// To enable this you need to call `RpcTracing::batch().span().enable()`. - pub fn batch() -> Self { - Self(tracing::span!(tracing::Level::TRACE, "batch")) - } - - /// Get the inner span. - pub fn into_span(self) -> tracing::Span { - self.0 - } -} - /// Helper for writing trace logs from str. pub fn tx_log_from_str(s: impl AsRef, max: u32) { if tracing::enabled!(Level::TRACE) { diff --git a/server/Cargo.toml b/server/Cargo.toml index be1ae60715..b7e4df4f7c 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -20,7 +20,6 @@ serde_json = { version = "1", features = ["raw_value"] } soketto = { version = "0.7.1", features = ["http"] } tokio = { version = "1.16", features = ["net", "rt-multi-thread", "macros", "time"] } tokio-util = { version = "0.7", features = ["compat"] } -tracing-futures = "0.2.5" tokio-stream = "0.1.7" http = "0.2.7" hyper = { version = "0.14", features = ["server", "http1", "http2"] } diff --git a/server/src/transport/http.rs b/server/src/transport/http.rs index 5da15281a7..8193950013 100644 --- a/server/src/transport/http.rs +++ b/server/src/transport/http.rs @@ -10,12 +10,12 @@ use jsonrpsee_core::http_helpers::read_body; use jsonrpsee_core::server::helpers::{prepare_error, BatchResponse, BatchResponseBuilder, MethodResponse}; use jsonrpsee_core::server::rpc_module::MethodKind; use jsonrpsee_core::server::{resource_limiting::Resources, rpc_module::Methods}; -use jsonrpsee_core::tracing::{rx_log_from_json, tx_log_from_str, RpcTracing}; +use jsonrpsee_core::tracing::{rx_log_from_json, tx_log_from_str}; use jsonrpsee_core::JsonRawValue; use jsonrpsee_types::error::{ErrorCode, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG}; use jsonrpsee_types::{ErrorObject, Id, Notification, Params, Request}; use tokio::sync::OwnedSemaphorePermit; -use tracing_futures::Instrument; +use tracing::instrument; type Notif<'a> = Notification<'a, Option<&'a JsonRawValue>>; @@ -151,17 +151,10 @@ pub(crate) struct CallData<'a, L: Logger> { request_start: L::Instant, } -#[derive(Debug, Clone)] -pub(crate) struct Call<'a, L: Logger> { - params: Params<'a>, - name: &'a str, - call: CallData<'a, L>, - id: Id<'a>, -} - // Batch responses must be sent back as a single message so we read the results from each // request in the batch and read the results off of a new channel, `rx_batch`, and then send the // complete batch response back to the client over `tx`. +#[instrument(name = "batch", skip(b), level = "TRACE")] pub(crate) async fn process_batch_request(b: Batch<'_, L>) -> BatchResponse where L: Logger, @@ -174,26 +167,20 @@ where let batch_stream = futures_util::stream::iter(batch); - let trace = RpcTracing::batch(); - return async { - let batch_response = batch_stream - .try_fold( - BatchResponseBuilder::new_with_limit(max_response_size as usize), - |batch_response, (req, call)| async move { - let params = Params::new(req.params.map(|params| params.get())); - let response = execute_call(Call { name: &req.method, params, id: req.id, call }).await; - batch_response.append(&response) - }, - ) - .await; - - match batch_response { - Ok(batch) => batch.finish(), - Err(batch_err) => batch_err, - } - } - .instrument(trace.into_span()) - .await; + let batch_response = batch_stream + .try_fold( + BatchResponseBuilder::new_with_limit(max_response_size as usize), + |batch_response, (req, call)| async move { + let response = execute_call(req, call).await; + batch_response.append(&response) + }, + ) + .await; + + return match batch_response { + Ok(batch) => batch.finish(), + Err(batch_err) => batch_err, + }; } if let Ok(batch) = serde_json::from_slice::>(&data) { @@ -213,33 +200,32 @@ where pub(crate) async fn process_single_request(data: Vec, call: CallData<'_, L>) -> MethodResponse { if let Ok(req) = serde_json::from_slice::(&data) { - let trace = RpcTracing::method_call(&req.method); - async { - rx_log_from_json(&req, call.max_log_length); - let params = Params::new(req.params.map(|params| params.get())); - let name = &req.method; - let id = req.id; - execute_call(Call { name, params, id, call }).await - } - .instrument(trace.into_span()) - .await - } else if let Ok(req) = serde_json::from_slice::(&data) { - let trace = RpcTracing::notification(&req.method); - let span = trace.into_span(); - let _enter = span.enter(); - rx_log_from_json(&req, call.max_log_length); - - MethodResponse { result: String::new(), success: true } + execute_call_with_tracing(req, call).await + } else if let Ok(notif) = serde_json::from_slice::(&data) { + execute_notification(notif, call.max_log_length) } else { let (id, code) = prepare_error(&data); MethodResponse::error(id, ErrorObject::from(code)) } } -pub(crate) async fn execute_call(c: Call<'_, L>) -> MethodResponse { - let Call { name, id, params, call } = c; +#[instrument(name = "method_call", fields(method = req.method.as_ref()), skip(call, req), level = "TRACE")] +pub(crate) async fn execute_call_with_tracing<'a, L: Logger>( + req: Request<'a>, + call: CallData<'_, L>, +) -> MethodResponse { + execute_call(req, call).await +} + +pub(crate) async fn execute_call(req: Request<'_>, call: CallData<'_, L>) -> MethodResponse { let CallData { resources, methods, logger, max_response_body_size, max_log_length, conn_id, request_start } = call; + rx_log_from_json(&req, call.max_log_length); + + let params = Params::new(req.params.map(|params| params.get())); + let name = &req.method; + let id = req.id; + let response = match methods.method_with_name(name) { None => { logger.on_call(name, params.clone(), logger::MethodKind::Unknown); @@ -289,6 +275,14 @@ pub(crate) async fn execute_call(c: Call<'_, L>) -> MethodResponse { response } +#[instrument(name = "notification", fields(method = notif.method.as_ref()), skip(notif, max_log_length), level = "TRACE")] +fn execute_notification(notif: Notif, max_log_length: u32) -> MethodResponse { + rx_log_from_json(¬if, max_log_length); + let response = MethodResponse { result: String::new(), success: true }; + tx_log_from_str(&response.result, max_log_length); + response +} + pub(crate) struct HandleRequest { pub(crate) methods: Methods, pub(crate) resources: Resources, diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 1c0bddd074..e8a4deab13 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -16,7 +16,7 @@ use jsonrpsee_core::server::helpers::{ }; use jsonrpsee_core::server::resource_limiting::Resources; use jsonrpsee_core::server::rpc_module::{ConnState, MethodKind, Methods}; -use jsonrpsee_core::tracing::{rx_log_from_json, rx_log_from_str, tx_log_from_str, RpcTracing}; +use jsonrpsee_core::tracing::{rx_log_from_json, tx_log_from_str}; use jsonrpsee_core::traits::IdProvider; use jsonrpsee_core::Error; use jsonrpsee_types::error::{ @@ -28,7 +28,7 @@ use soketto::connection::Error as SokettoError; use soketto::data::ByteSlice125; use tokio_stream::wrappers::IntervalStream; use tokio_util::compat::Compat; -use tracing_futures::Instrument; +use tracing::instrument; pub(crate) type Sender = soketto::Sender>>>; pub(crate) type Receiver = soketto::Receiver>>>; @@ -68,14 +68,6 @@ pub(crate) struct CallData<'a, L: Logger> { pub(crate) request_start: L::Instant, } -#[derive(Debug, Clone)] -pub(crate) struct Call<'a, L: Logger> { - pub(crate) params: Params<'a>, - pub(crate) name: &'a str, - pub(crate) call: CallData<'a, L>, - pub(crate) id: Id<'a>, -} - /// This is a glorified select listening for new messages, while also checking the `stop_receiver` signal. struct Monitored<'a, F> { future: F, @@ -113,6 +105,7 @@ where // Batch responses must be sent back as a single message so we read the results from each // request in the batch and read the results off of a new channel, `rx_batch`, and then send the // complete batch response back to the client over `tx`. +#[instrument(name = "batch", skip(b), level = "TRACE")] pub(crate) async fn process_batch_request(b: Batch<'_, L>) -> BatchResponse { let Batch { data, call } = b; @@ -121,29 +114,22 @@ pub(crate) async fn process_batch_request(b: Batch<'_, L>) -> BatchRe let batch = batch.into_iter().map(|req| Ok((req, call.clone()))); let batch_stream = futures_util::stream::iter(batch); - let trace = RpcTracing::batch(); - - return async { - let max_response_size = call.max_response_body_size; - - let batch_response = batch_stream - .try_fold( - BatchResponseBuilder::new_with_limit(max_response_size as usize), - |batch_response, (req, call)| async move { - let params = Params::new(req.params.map(|params| params.get())); - let response = execute_call(Call { name: &req.method, params, id: req.id, call }).await; - batch_response.append(response.as_inner()) - }, - ) - .await; - - match batch_response { - Ok(batch) => batch.finish(), - Err(batch_err) => batch_err, - } + let max_response_size = call.max_response_body_size; + + let batch_response = batch_stream + .try_fold( + BatchResponseBuilder::new_with_limit(max_response_size as usize), + |batch_response, (req, call)| async move { + let response = execute_call(req, call).await; + batch_response.append(response.as_inner()) + }, + ) + .await; + + match batch_response { + Ok(batch) => batch.finish(), + Err(batch_err) => batch_err, } - .instrument(trace.into_span()) - .await; } else { BatchResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest)) }; @@ -155,32 +141,24 @@ pub(crate) async fn process_batch_request(b: Batch<'_, L>) -> BatchRe pub(crate) async fn process_single_request(data: Vec, call: CallData<'_, L>) -> MethodResult { if let Ok(req) = serde_json::from_slice::(&data) { - let trace = RpcTracing::method_call(&req.method); - - async { - rx_log_from_json(&req, call.max_log_length); - - let params = Params::new(req.params.map(|params| params.get())); - let name = &req.method; - let id = req.id; - - execute_call(Call { name, params, id, call }).await - } - .instrument(trace.into_span()) - .await + execute_call_with_tracing(req, call).await } else { let (id, code) = prepare_error(&data); MethodResult::SendAndLogger(MethodResponse::error(id, ErrorObject::from(code))) } } +#[instrument(name = "method_call", fields(method = req.method.as_ref()), skip(call, req), level = "TRACE")] +pub(crate) async fn execute_call_with_tracing<'a, L: Logger>(req: Request<'a>, call: CallData<'_, L>) -> MethodResult { + execute_call(req, call).await +} + /// Execute a call which returns result of the call with a additional sink /// to fire a signal once the subscription call has been answered. /// /// Returns `(MethodResponse, None)` on every call that isn't a subscription /// Otherwise `(MethodResponse, Some(PendingSubscriptionCallTx)`. -pub(crate) async fn execute_call(c: Call<'_, L>) -> MethodResult { - let Call { name, id, params, call } = c; +pub(crate) async fn execute_call<'a, L: Logger>(req: Request<'a>, call: CallData<'_, L>) -> MethodResult { let CallData { resources, methods, @@ -194,6 +172,12 @@ pub(crate) async fn execute_call(c: Call<'_, L>) -> MethodResult { request_start, } = call; + rx_log_from_json(&req, call.max_log_length); + + let params = Params::new(req.params.map(|params| params.get())); + let name = &req.method; + let id = req.id; + let response = match methods.method_with_name(name) { None => { logger.on_call(name, params.clone(), logger::MethodKind::Unknown); @@ -267,7 +251,7 @@ pub(crate) async fn execute_call(c: Call<'_, L>) -> MethodResult { let r = response.as_inner(); - rx_log_from_str(&r.result, max_log_length); + tx_log_from_str(&r.result, max_log_length); logger.on_result(name, r.success, request_start); response } From e01cab0df8257dca9ded4b55c572fbcde96fe93d Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 16 Sep 2022 12:11:49 +0200 Subject: [PATCH 7/8] less noise for subscription spans --- client/http-client/src/client.rs | 4 ++-- core/src/client/async_client/mod.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/client/http-client/src/client.rs b/client/http-client/src/client.rs index c333c39804..63a76cbce4 100644 --- a/client/http-client/src/client.rs +++ b/client/http-client/src/client.rs @@ -283,7 +283,7 @@ impl ClientT for HttpClient { #[async_trait] impl SubscriptionClientT for HttpClient { /// Send a subscription request to the server. Not implemented for HTTP; will always return [`Error::HttpNotImplemented`]. - #[instrument(name = "subscription", skip(self, _params), level = "trace")] + #[instrument(name = "subscription", fields(method = _subscribe_method), skip(self, _params, _subscribe_method, _unsubscribe_method), level = "trace")] async fn subscribe<'a, N, Params>( &self, _subscribe_method: &'a str, @@ -298,7 +298,7 @@ impl SubscriptionClientT for HttpClient { } /// Subscribe to a specific method. Not implemented for HTTP; will always return [`Error::HttpNotImplemented`]. - #[instrument(name = "subscribe_method", skip(self), level = "trace")] + #[instrument(name = "subscribe_method", fields(method = _method), skip(self, _method), level = "trace")] async fn subscribe_to_method<'a, N>(&self, _method: &'a str) -> Result, Error> where N: DeserializeOwned, diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index d888cb03ca..086eaafad7 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -381,7 +381,7 @@ impl SubscriptionClientT for Client { /// /// The `subscribe_method` and `params` are used to ask for the subscription towards the /// server. The `unsubscribe_method` is used to close the subscription. - #[instrument(name = "subscription", skip(self, params), level = "trace")] + #[instrument(name = "subscription", fields(method = subscribe_method), skip(self, params, subscribe_method, unsubscribe_method), level = "trace")] async fn subscribe<'a, Notif, Params>( &self, subscribe_method: &'a str, From e969f291c7ae52e630988937a53ff6f611dc4246 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 16 Sep 2022 16:19:52 +0200 Subject: [PATCH 8/8] nits: replace spaces with tabs --- core/src/macros.rs | 30 ++++++++++++------------ core/src/traits.rs | 10 ++++---- jsonrpsee/src/macros.rs | 52 ++++++++++++++++++++--------------------- 3 files changed, 46 insertions(+), 46 deletions(-) diff --git a/core/src/macros.rs b/core/src/macros.rs index 5be3d99ab0..e983079f32 100644 --- a/core/src/macros.rs +++ b/core/src/macros.rs @@ -1,11 +1,11 @@ macro_rules! cfg_feature { - ($feature:literal, $($item:item)*) => { - $( - #[cfg(feature = $feature)] - #[cfg_attr(docsrs, doc(cfg(feature = $feature)))] - $item - )* - } + ($feature:literal, $($item:item)*) => { + $( + #[cfg(feature = $feature)] + #[cfg_attr(docsrs, doc(cfg(feature = $feature)))] + $item + )* +} } macro_rules! cfg_client { @@ -27,12 +27,12 @@ macro_rules! cfg_http_helpers { } macro_rules! cfg_async_client { - ($($item:item)*) => { - $( - #[cfg(any(feature = "async-wasm-client", feature = "async-client"))] - #[cfg_attr(docsrs, doc(cfg(feature = "async-client")))] - #[cfg_attr(docsrs, doc(cfg(feature = "async-wasm-client")))] - $item - )* - } + ($($item:item)*) => { + $( + #[cfg(any(feature = "async-wasm-client", feature = "async-client"))] + #[cfg_attr(docsrs, doc(cfg(feature = "async-client")))] + #[cfg_attr(docsrs, doc(cfg(feature = "async-wasm-client")))] + $item + )* + } } diff --git a/core/src/traits.rs b/core/src/traits.rs index 9b3532236f..bf95913f31 100644 --- a/core/src/traits.rs +++ b/core/src/traits.rs @@ -109,13 +109,13 @@ where } macro_rules! tuple_impls { - ($($len:expr => ($($n:tt $name:ident)+))+) => { - $( - impl<$($name: Serialize),+> ToRpcParams for ($($name,)+) { + ($($len:expr => ($($n:tt $name:ident)+))+) => { + $( + impl<$($name: Serialize),+> ToRpcParams for ($($name,)+) { to_rpc_params_impl!(); } - )+ - } + )+ + } } tuple_impls! { diff --git a/jsonrpsee/src/macros.rs b/jsonrpsee/src/macros.rs index d086abef97..745c5db84b 100644 --- a/jsonrpsee/src/macros.rs +++ b/jsonrpsee/src/macros.rs @@ -1,20 +1,20 @@ macro_rules! cfg_feature { - ($feature:literal, $($item:item)*) => { - $( - #[cfg(feature = $feature)] - #[cfg_attr(docsrs, doc(cfg(feature = $feature)))] - $item - )* - } + ($feature:literal, $($item:item)*) => { + $( + #[cfg(feature = $feature)] + #[cfg_attr(docsrs, doc(cfg(feature = $feature)))] + $item + )* + } } macro_rules! cfg_client { - ($($item:item)*) => { - $( - #[cfg(any(feature = "jsonrpsee-http-client", feature = "jsonrpsee-wasm-client", feature = "jsonrpsee-ws-client", feature = "client", feature = "async-client", feature = "client-core"))] - $item - )* - } + ($($item:item)*) => { + $( + #[cfg(any(feature = "jsonrpsee-http-client", feature = "jsonrpsee-wasm-client", feature = "jsonrpsee-ws-client", feature = "client", feature = "async-client", feature = "client-core"))] + $item + )* + } } macro_rules! cfg_http_client { @@ -36,40 +36,40 @@ macro_rules! cfg_wasm_client { } macro_rules! cfg_async_client { - ($($item:item)*) => { + ($($item:item)*) => { cfg_feature!("async-client", $($item)*); }; } macro_rules! cfg_client_transport { - ($($item:item)*) => { + ($($item:item)*) => { cfg_feature!("jsonrpsee-client-transport", $($item)*); }; } macro_rules! cfg_server { - ($($item:item)*) => { + ($($item:item)*) => { cfg_feature!("server", $($item)*); - } + } } macro_rules! cfg_proc_macros { - ($($item:item)*) => { + ($($item:item)*) => { cfg_feature!("jsonrpsee-proc-macros", $($item)*); }; } macro_rules! cfg_types { - ($($item:item)*) => { + ($($item:item)*) => { cfg_feature!("jsonrpsee-types", $($item)*); - }; + }; } macro_rules! cfg_client_or_server { - ($($item:item)*) => { - $( - #[cfg(any(feature = "jsonrpsee-http-client", feature = "jsonrpsee-wasm-client", feature = "jsonrpsee-ws-client", feature = "client", feature = "async-client", feature = "client-core", feature = "server", feature = "server-core"))] - $item - )* - } + ($($item:item)*) => { + $( + #[cfg(any(feature = "jsonrpsee-http-client", feature = "jsonrpsee-wasm-client", feature = "jsonrpsee-ws-client", feature = "client", feature = "async-client", feature = "client-core", feature = "server", feature = "server-core"))] + $item + )* + } }