Skip to content

Commit

Permalink
unify logging format
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasad1 committed May 27, 2022
1 parent 6d5d04d commit 27391f2
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 55 deletions.
5 changes: 2 additions & 3 deletions client/http-client/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl HttpTransportClient {
}

async fn inner_send(&self, body: String) -> Result<hyper::Response<hyper::Body>, Error> {
tracing::trace!(tx_len = body.len(), tx = body.as_str());
tracing::trace!(send = body.as_str());

if body.len() > self.max_request_body_size as usize {
return Err(Error::RequestTooLarge);
Expand All @@ -115,8 +115,7 @@ impl HttpTransportClient {
let (body, _) = http_helpers::read_body(&parts.headers, body, self.max_request_body_size).await?;

tracing::trace!(
rx_len = body.len(),
rx = serde_json::from_slice::<serde_json::Value>(&body).unwrap_or_default().to_string().as_str()
recv = serde_json::from_slice::<serde_json::Value>(&body).unwrap_or_default().to_string().as_str()
);

Ok(body)
Expand Down
64 changes: 25 additions & 39 deletions core/src/client/async_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl ClientT for Client {
let _enter = trace.span().enter();

let raw = serde_json::to_string(&notif).map_err(Error::ParseError)?;
tracing::trace!(tx_len = raw.len(), tx = raw.as_str());
tracing::trace!(send = raw.as_str());

let mut sender = self.to_back.clone();
let fut = sender.send(FrontToBack::Notification(raw)).in_current_span();
Expand All @@ -238,12 +238,12 @@ impl ClientT for Client {
let _enter = trace.span().enter();

let raw = serde_json::to_string(&RequestSer::new(&id, method, params)).map_err(Error::ParseError)?;
tracing::trace!(tx_len = raw.len(), tx = raw.as_str());
tracing::trace!(send = raw.as_str());

if self
.to_back
.clone()
.send(FrontToBack::Request(RequestMessage { raw, id, send_back: Some(send_back_tx) }))
.send(FrontToBack::Request(RequestMessage { raw, id: id.clone(), send_back: Some(send_back_tx) }))
.await
.is_err()
{
Expand All @@ -257,8 +257,7 @@ impl ClientT for Client {
Err(_) => return Err(self.read_error_from_backend().await),
};

// there is no way to get the length of `serde_json::Value` without deserializing.
tracing::trace!(rx = serde_json::to_string(&json_value).expect("valid JSON; qed").as_str());
tracing::trace!(receive = serde_json::to_string(&Response::new(&json_value, id)).unwrap_or_default().as_str());

serde_json::from_value(json_value).map_err(Error::ParseError)
}
Expand All @@ -280,7 +279,7 @@ impl ClientT for Client {
let (send_back_tx, send_back_rx) = oneshot::channel();

let raw = serde_json::to_string(&batches).map_err(Error::ParseError)?;
tracing::trace!(tx_len = raw.len(), tx = raw.as_str());
tracing::trace!(send = raw.as_str());

if self
.to_back
Expand All @@ -299,7 +298,7 @@ impl ClientT for Client {
Err(_) => return Err(self.read_error_from_backend().await),
};

tracing::trace!(rx_len = json_values.len(), tx = ?json_values);
tracing::trace!(receive = serde_json::to_string(&json_values).unwrap_or_default().as_str());

let values: Result<_, _> =
json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect();
Expand Down Expand Up @@ -331,9 +330,10 @@ impl SubscriptionClientT for Client {
let trace = RpcTracing::method_call(subscribe_method);
let _enter = trace.span().enter();

let raw =
serde_json::to_string(&RequestSer::new(&ids[0], subscribe_method, params)).map_err(Error::ParseError)?;
tracing::trace!(tx_len = raw.len(), tx = raw.as_str());
let id = ids[0].clone();

let raw = serde_json::to_string(&RequestSer::new(&id, subscribe_method, params)).map_err(Error::ParseError)?;
tracing::trace!(send = raw.as_str());

let (send_back_tx, send_back_rx) = oneshot::channel();
if self
Expand All @@ -354,24 +354,22 @@ impl SubscriptionClientT for Client {

let res = call_with_timeout(self.request_timeout, send_back_rx).in_current_span().await;

let (notifs_rx, id) = match res {
let (notifs_rx, sub_id) = match res {
Ok(Ok(val)) => val,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
};

tracing::trace!(rx = serde_json::to_string(&id).expect("valid JSON; qed").as_str());
tracing::trace!(recv = serde_json::to_string(&Response::new(&sub_id, id)).unwrap_or_default().as_str());

Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(id)))
Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(sub_id)))
}

/// Subscribe to a specific method.
async fn subscribe_to_method<'a, N>(&self, method: &'a str) -> Result<Subscription<N>, Error>
where
N: DeserializeOwned,
{
tracing::trace!("[frontend]: register_notification: {:?}", method);

let (send_back_tx, send_back_rx) = oneshot::channel();
if self
.to_back
Expand Down Expand Up @@ -427,7 +425,7 @@ async fn background_task<S, R>(
// User dropped the sender side of the channel.
// There is nothing to do just terminate.
Either::Left((None, _)) => {
tracing::trace!("[backend]: frontend dropped; terminate client");
tracing::warn!("[backend]: background task terminated");
break;
}

Expand All @@ -447,25 +445,21 @@ async fn background_task<S, R>(
}
// User called `notification` on the front-end
Either::Left((Some(FrontToBack::Notification(notif)), _)) => {
tracing::trace!("[backend]: client prepares to send notification: {:?}", notif);
if let Err(e) = sender.send(notif).await {
tracing::warn!("[backend]: client notif failed: {:?}", e);
}
}

// User called `request` on the front-end
Either::Left((Some(FrontToBack::Request(request)), _)) => {
tracing::trace!("[backend]: client prepares to send request={:?}", request);
match sender.send(request.raw).await {
Ok(_) => manager
.insert_pending_call(request.id, request.send_back)
.expect("ID unused checked above; qed"),
Err(e) => {
tracing::warn!("[backend]: client request failed: {:?}", e);
let _ = request.send_back.map(|s| s.send(Err(Error::Transport(e.into()))));
}
Either::Left((Some(FrontToBack::Request(request)), _)) => match sender.send(request.raw).await {
Ok(_) => {
manager.insert_pending_call(request.id, request.send_back).expect("ID unused checked above; qed")
}
}
Err(e) => {
tracing::warn!("[backend]: client request failed: {:?}", e);
let _ = request.send_back.map(|s| s.send(Err(Error::Transport(e.into()))));
}
},

// User called `subscribe` on the front-end.
Either::Left((Some(FrontToBack::Subscribe(sub)), _)) => match sender.send(sub.raw).await {
Expand Down Expand Up @@ -497,7 +491,6 @@ async fn background_task<S, R>(

// User called `register_notification` on the front-end.
Either::Left((Some(FrontToBack::RegisterNotification(reg)), _)) => {
tracing::trace!("[backend] registering notification handler: {:?}", reg.method);
let (subscribe_tx, subscribe_rx) = mpsc::channel(max_notifs_per_subscription);

if manager.insert_notification_handler(&reg.method, subscribe_tx).is_ok() {
Expand All @@ -509,13 +502,11 @@ async fn background_task<S, R>(

// User dropped the notificationHandler for this method
Either::Left((Some(FrontToBack::UnregisterNotification(method)), _)) => {
tracing::trace!("[backend] unregistering notification handler: {:?}", method);
let _ = manager.remove_notification_handler(method);
}
Either::Right((Some(Ok(raw)), _)) => {
// Single response to a request.
if let Ok(single) = serde_json::from_str::<Response<_>>(&raw) {
tracing::trace!("[backend]: recv method_call {:?}", single);
match process_single_response(&mut manager, single, max_notifs_per_subscription) {
Ok(Some(unsub)) => {
stop_subscription(&mut sender, &mut manager, unsub).await;
Expand All @@ -529,32 +520,27 @@ async fn background_task<S, R>(
}
// Subscription response.
else if let Ok(response) = serde_json::from_str::<SubscriptionResponse<_>>(&raw) {
tracing::trace!("[backend]: recv subscription {:?}", response);
if let Err(Some(unsub)) = process_subscription_response(&mut manager, response) {
let _ = stop_subscription(&mut sender, &mut manager, unsub).await;
}
}
// Subscription error response.
else if let Ok(response) = serde_json::from_str::<SubscriptionError<_>>(&raw) {
tracing::debug!("[backend]: recv subscription closed {:?}", response);
let _ = process_subscription_close_response(&mut manager, response);
}
// Incoming Notification
else if let Ok(notif) = serde_json::from_str::<Notification<_>>(&raw) {
tracing::trace!("[backend]: recv notification {:?}", notif);
let _ = process_notification(&mut manager, notif);
}
// Batch response.
else if let Ok(batch) = serde_json::from_str::<Vec<Response<_>>>(&raw) {
tracing::trace!("[backend]: recv batch {:?}", batch);
if let Err(e) = process_batch_response(&mut manager, batch) {
let _ = front_error.send(e);
break;
}
}
// Error response
else if let Ok(err) = serde_json::from_str::<ErrorResponse>(&raw) {
tracing::trace!("[backend]: recv error response {:?}", err);
if let Err(e) = process_error_response(&mut manager, err) {
let _ = front_error.send(e);
break;
Expand All @@ -571,13 +557,13 @@ async fn background_task<S, R>(
}
}
Either::Right((Some(Err(e)), _)) => {
tracing::error!("Error: {:?} terminating client", e);
tracing::error!("error: {:?} terminating client", e);
let _ = front_error.send(Error::Transport(e.into()));
break;
}
Either::Right((None, _)) => {
tracing::error!("[backend]: WebSocket receiver dropped; terminate client");
let _ = front_error.send(Error::Custom("WebSocket receiver dropped".into()));
tracing::warn!("[backend]: transport receiver dropped");
let _ = front_error.send(Error::Custom("Receiver dropped".into()));
break;
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/server/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl MethodSink {
}
};

tracing::trace!(tx_len = json.len(), tx = json.as_str());
tracing::trace!(send = json.as_str());

if let Err(err) = self.tx.unbounded_send(json) {
tracing::warn!("Error sending response {:?}", err);
Expand All @@ -149,7 +149,7 @@ impl MethodSink {
}
};

tracing::trace!(tx_len = json.len(), tx = json.as_str());
tracing::trace!(send = json.as_str());

if let Err(err) = self.tx.unbounded_send(json) {
tracing::warn!("Error sending response {:?}", err);
Expand Down
7 changes: 3 additions & 4 deletions http-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ async fn process_validated_request(
let _enter = trace.span().enter();

// TODO: add limit to this.
tracing::trace!(rx_len = body.len(), rx = serde_json::to_string(&req).expect("valid JSON; qed").as_str());
tracing::trace!(recv = serde_json::to_string(&req).unwrap_or_default().as_str());
middleware.on_call(method);

let id = req.id.clone();
Expand Down Expand Up @@ -634,8 +634,7 @@ async fn process_validated_request(
let trace = RpcTracing::notification(&req.method);
let _enter = trace.span().enter();

// Todo introduce BoundedWriterAtMostBytes to serialize this.
tracing::trace!(rx_len = body.len(), rx = serde_json::to_string(&req).expect("valid JSON; qed").as_str());
tracing::trace!(recv = serde_json::to_string(&req).unwrap_or_default().as_str());

return Ok::<_, HyperError>(response::ok_response("".into()));
} else {
Expand All @@ -647,7 +646,7 @@ async fn process_validated_request(
let trace = RpcTracing::batch();
let _enter = trace.span().enter();

tracing::trace!(rx_len = batch.len(), rx = ?&batch[0..std::cmp::max(batch.len(), max_tracing_length as usize)]);
tracing::trace!(recv = serde_json::to_string(&batch).unwrap_or_default().as_str());

if !batch_requests_supported {
// Server was configured to not support batches.
Expand Down
12 changes: 5 additions & 7 deletions ws-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl<M: Middleware> Server<M> {
},
)));

tracing::info!("Accepting new connection, {}/{}", connections.count(), self.cfg.max_connections);
tracing::info!("Accepting new connection {}/{}", connections.count(), self.cfg.max_connections);

id = id.wrapping_add(1);
}
Expand Down Expand Up @@ -243,7 +243,6 @@ where
Ok(())
}
HandshakeResponse::Accept { conn_id, methods, resources, cfg, stop_monitor, middleware, id_provider } => {
tracing::debug!("Accepting new connection: {}", conn_id);
let key = {
let req = server.receive_request().await?;
let host_check = cfg.allowed_hosts.verify("Host", Some(req.headers().host));
Expand All @@ -258,6 +257,7 @@ where
server.send_response(&accept).await?;
}
Err(error) => {
tracing::warn!("Denied connection: {:?}", error);
let reject = Response::Reject { status_code: 403 };
server.send_response(&reject).await?;

Expand Down Expand Up @@ -385,7 +385,7 @@ async fn background_task(
let trace = RpcTracing::method_call(&req.method);
let _enter = trace.span().enter();

tracing::trace!(rx_len = data.len(), rx = ?req);
tracing::trace!(recv = serde_json::to_string(&req).unwrap_or_default().as_str());

let id = req.id.clone();
let params = Params::new(req.params.map(|params| params.get()));
Expand Down Expand Up @@ -494,7 +494,6 @@ async fn background_task(
let (tx_batch, mut rx_batch) = mpsc::unbounded();
let sink_batch = MethodSink::new_with_limit(tx_batch, max_response_body_size);
if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&d) {

if !batch_requests_supported {
sink.send_error(
Id::Null,
Expand All @@ -504,10 +503,9 @@ async fn background_task(
} else if !batch.is_empty() {
let trace = RpcTracing::batch();
let _enter = trace.span().enter();
tracing::debug!("recv batch len={}", batch.len());
tracing::trace!("recv: batch={:?}", batch);

tracing::trace!(rx_len = batch.len(), tx = ?batch);
tracing::trace!(recv = serde_json::to_string(&batch).unwrap_or_default().as_str());

join_all(batch.into_iter().filter_map(move |req| {
let id = req.id.clone();
let params = Params::new(req.params.map(|params| params.get()));
Expand Down

0 comments on commit 27391f2

Please sign in to comment.