Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: include error code in RpcLogger #1135

Merged
merged 10 commits into from
Jun 2, 2023
71 changes: 63 additions & 8 deletions core/src/server/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,56 @@ pub fn prepare_error(data: &[u8]) -> (Id<'_>, ErrorCode) {
}
}

/// Represent the response to method call.
/// Represent the response to a method call.
#[derive(Debug, Clone)]
pub struct MethodResponse {
/// Serialized JSON-RPC response,
pub result: String,
/// Indicates whether the call was successful or not.
pub success: bool,
pub success_or_error: MethodResponseResult,
}

impl MethodResponse {
/// Returns whether the call was successful.
pub fn is_success(&self) -> bool {
self.success_or_error.is_success()
}

/// Returns whether the call failed.
pub fn is_error(&self) -> bool {
self.success_or_error.is_success()
}
}

/// Represent the outcome of a method call success or failed.
#[derive(Debug, Copy, Clone)]
pub enum MethodResponseResult {
/// The method call was successful.
Success,
/// The method call with error code.
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
Failed(i32),
}

impl MethodResponseResult {
/// Returns whether the call was successful.
pub fn is_success(&self) -> bool {
matches!(self, MethodResponseResult::Success)
}

/// Returns whether the call failed.
pub fn is_error(&self) -> bool {
matches!(self, MethodResponseResult::Failed(_))
}

/// Get the error code
///
/// Returns `Some(error code)` if the call failed.
pub fn as_error_code(&self) -> Option<i32> {
match self {
Self::Failed(e) => Some(*e),
_ => None,
}
}
}

impl MethodResponse {
Expand All @@ -191,40 +234,52 @@ impl MethodResponse {
{
let mut writer = BoundedWriter::new(max_response_size);

let success_or_error = if let ResponsePayload::Error(ref e) = result {
MethodResponseResult::Failed(e.code())
} else {
MethodResponseResult::Success
};

match serde_json::to_writer(&mut writer, &Response::new(result, id.clone())) {
Ok(_) => {
// Safety - serde_json does not emit invalid UTF-8.
let result = unsafe { String::from_utf8_unchecked(writer.into_bytes()) };
Self { result, success: true }

Self { result, success_or_error }
}
Err(err) => {
tracing::error!("Error serializing response: {:?}", err);

if err.is_io() {
let data = to_raw_value(&format!("Exceeded max limit of {max_response_size}")).ok();
let err_code = OVERSIZED_RESPONSE_CODE;

let err = ResponsePayload::error_borrowed(ErrorObject::borrowed(
OVERSIZED_RESPONSE_CODE,
err_code,
&OVERSIZED_RESPONSE_MSG,
data.as_deref(),
));
let result =
serde_json::to_string(&Response::new(err, id)).expect("JSON serialization infallible; qed");

Self { result, success: false }
Self { result, success_or_error: MethodResponseResult::Failed(err_code) }
} else {
let result = serde_json::to_string(&Response::new(ErrorCode::InternalError.into(), id))
let err_code = ErrorCode::InternalError;
let result = serde_json::to_string(&Response::new(err_code.into(), id))
.expect("JSON serialization infallible; qed");
Self { result, success: false }
Self { result, success_or_error: MethodResponseResult::Failed(err_code.code()) }
}
}
}
}

/// Create a `MethodResponse` from an error.
pub fn error<'a>(id: Id, err: impl Into<ErrorObject<'a>>) -> Self {
let err: ErrorObject = err.into();
let err_code = err.code();
let err = ResponsePayload::error_borrowed(err);
let result = serde_json::to_string(&Response::new(err, id)).expect("JSON serialization infallible; qed");
Self { result, success: false }
Self { result, success_or_error: MethodResponseResult::Failed(err_code) }
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
Ok(msg) => {
// If the subscription was accepted then send a message
// to subscription task otherwise rely on the drop impl.
if msg.success {
if msg.is_success() {
let _ = accepted_tx.send(());
}
Ok(msg)
Expand Down
2 changes: 1 addition & 1 deletion core/src/server/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ impl PendingSubscriptionSink {
ResponsePayload::result_borrowed(&self.uniq_sub.sub_id),
self.inner.max_response_size() as usize,
);
let success = response.success;
let success = response.is_success();

// TODO: #1052
//
Expand Down
17 changes: 14 additions & 3 deletions examples/examples/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::net::SocketAddr;
use std::time::Instant;

use jsonrpsee::core::client::ClientT;
use jsonrpsee::server::logger::{self, HttpRequest, MethodKind, Params, TransportProtocol};
use jsonrpsee::server::logger::{self, HttpRequest, MethodKind, Params, SuccessOrError, TransportProtocol};
use jsonrpsee::server::Server;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::{rpc_params, RpcModule};
Expand All @@ -52,8 +52,19 @@ impl logger::Logger for Timings {
println!("[Logger::on_call] method: '{}', params: {:?}, kind: {}", name, params, kind);
}

fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant, _t: TransportProtocol) {
println!("[Logger::on_result] '{}', worked? {}, time elapsed {:?}", name, succeess, started_at.elapsed());
fn on_result(
&self,
name: &str,
success_or_error: SuccessOrError,
started_at: Self::Instant,
_t: TransportProtocol,
) {
println!(
"[Logger::on_result] '{}', worked? {}, time elapsed {:?}",
name,
success_or_error.is_success(),
started_at.elapsed()
);
}

fn on_response(&self, result: &str, started_at: Self::Instant, _t: TransportProtocol) {
Expand Down
25 changes: 21 additions & 4 deletions examples/examples/multi_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::time::Instant;

use jsonrpsee::core::client::ClientT;
use jsonrpsee::rpc_params;
use jsonrpsee::server::logger::{HttpRequest, MethodKind, TransportProtocol};
use jsonrpsee::server::logger::{HttpRequest, MethodKind, SuccessOrError, TransportProtocol};
use jsonrpsee::server::{logger, RpcModule, Server};
use jsonrpsee::types::Params;
use jsonrpsee::ws_client::WsClientBuilder;
Expand All @@ -56,8 +56,19 @@ impl logger::Logger for Timings {
println!("[Timings:on_call] method: '{}', params: {:?}, kind: {}", name, params, kind);
}

fn on_result(&self, name: &str, success: bool, started_at: Self::Instant, _t: TransportProtocol) {
println!("[Timings] call={}, worked? {}, duration {:?}", name, success, started_at.elapsed());
fn on_result(
&self,
name: &str,
success_or_error: SuccessOrError,
started_at: Self::Instant,
_t: TransportProtocol,
) {
println!(
"[Timings] call={}, worked? {}, duration {:?}",
name,
success_or_error.is_success(),
started_at.elapsed()
);
}

fn on_response(&self, _result: &str, started_at: Self::Instant, _t: TransportProtocol) {
Expand Down Expand Up @@ -106,7 +117,13 @@ impl logger::Logger for ThreadWatcher {
threads as isize
}

fn on_result(&self, _name: &str, _succees: bool, started_at: Self::Instant, _t: TransportProtocol) {
fn on_result(
&self,
_name: &str,
_success_or_error: SuccessOrError,
started_at: Self::Instant,
_t: TransportProtocol,
) {
let current_nr_threads = Self::count_threads() as isize;
println!("[ThreadWatcher::on_result] {} threads", current_nr_threads - started_at);
}
Expand Down
23 changes: 18 additions & 5 deletions server/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::net::SocketAddr;
/// HTTP request.
pub type HttpRequest = hyper::Request<Body>;
pub use hyper::{Body, HeaderMap as Headers};
pub use jsonrpsee_core::server::helpers::MethodResponseResult as SuccessOrError;
pub use jsonrpsee_types::Params;

/// The type JSON-RPC v2 call, it can be a subscription, method call or unknown.
Expand Down Expand Up @@ -99,7 +100,13 @@ pub trait Logger: Send + Sync + Clone + 'static {
fn on_call(&self, method_name: &str, params: Params, kind: MethodKind, transport: TransportProtocol);

/// Called on each JSON-RPC method completion, batch requests will trigger `on_result` multiple times.
fn on_result(&self, method_name: &str, success: bool, started_at: Self::Instant, transport: TransportProtocol);
fn on_result(
&self,
method_name: &str,
success_or_error: SuccessOrError,
started_at: Self::Instant,
transport: TransportProtocol,
);

/// Called once the JSON-RPC request is finished and response is sent to the output buffer.
fn on_response(&self, result: &str, started_at: Self::Instant, transport: TransportProtocol);
Expand All @@ -117,7 +124,7 @@ impl Logger for () {

fn on_call(&self, _: &str, _: Params, _: MethodKind, _p: TransportProtocol) {}

fn on_result(&self, _: &str, _: bool, _: Self::Instant, _p: TransportProtocol) {}
fn on_result(&self, _: &str, _: SuccessOrError, _: Self::Instant, _p: TransportProtocol) {}

fn on_response(&self, _: &str, _: Self::Instant, _p: TransportProtocol) {}

Expand Down Expand Up @@ -145,9 +152,15 @@ where
self.1.on_call(method_name, params, kind, transport);
}

fn on_result(&self, method_name: &str, success: bool, started_at: Self::Instant, transport: TransportProtocol) {
self.0.on_result(method_name, success, started_at.0, transport);
self.1.on_result(method_name, success, started_at.1, transport);
fn on_result(
&self,
method_name: &str,
success_or_error: SuccessOrError,
started_at: Self::Instant,
transport: TransportProtocol,
) {
self.0.on_result(method_name, success_or_error, started_at.0, transport);
self.1.on_result(method_name, success_or_error, started_at.1, transport);
}

fn on_response(&self, result: &str, started_at: Self::Instant, transport: TransportProtocol) {
Expand Down
6 changes: 3 additions & 3 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ impl<B, L> Builder<B, L> {
/// ```
/// use std::{time::Instant, net::SocketAddr};
///
/// use jsonrpsee_server::logger::{Logger, HttpRequest, MethodKind, Params, TransportProtocol};
/// use jsonrpsee_server::logger::{Logger, HttpRequest, MethodKind, Params, TransportProtocol, SuccessOrError};
/// use jsonrpsee_server::ServerBuilder;
///
/// #[derive(Clone)]
Expand All @@ -331,8 +331,8 @@ impl<B, L> Builder<B, L> {
/// println!("[MyLogger::on_call] method: '{}' params: {:?}, kind: {:?}, transport: {}", method_name, params, kind, transport);
/// }
///
/// fn on_result(&self, method_name: &str, success: bool, started_at: Self::Instant, transport: TransportProtocol) {
/// println!("[MyLogger::on_result] '{}', worked? {}, time elapsed {:?}, transport: {}", method_name, success, started_at.elapsed(), transport);
/// fn on_result(&self, method_name: &str, success_or_error: SuccessOrError, started_at: Self::Instant, transport: TransportProtocol) {
/// println!("[MyLogger::on_result] '{}', worked? {}, time elapsed {:?}, transport: {}", method_name, success_or_error.is_success(), started_at.elapsed(), transport);
/// }
///
/// fn on_response(&self, result: &str, started_at: Self::Instant, transport: TransportProtocol) {
Expand Down
8 changes: 5 additions & 3 deletions server/src/transport/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use futures_util::stream::{FuturesOrdered, StreamExt};
use hyper::Method;
use jsonrpsee_core::error::GenericTransportError;
use jsonrpsee_core::http_helpers::read_body;
use jsonrpsee_core::server::helpers::{batch_response_error, prepare_error, BatchResponseBuilder, MethodResponse};
use jsonrpsee_core::server::helpers::{
batch_response_error, prepare_error, BatchResponseBuilder, MethodResponse, MethodResponseResult,
};
use jsonrpsee_core::server::{MethodCallback, Methods};
use jsonrpsee_core::tracing::{rx_log_from_json, tx_log_from_str};
use jsonrpsee_core::JsonRawValue;
Expand Down Expand Up @@ -257,14 +259,14 @@ pub(crate) async fn execute_call<L: Logger>(req: Request<'_>, call: CallData<'_,
};

tx_log_from_str(&response.result, max_log_length);
logger.on_result(name, response.success, request_start, TransportProtocol::Http);
logger.on_result(name, response.success_or_error, request_start, TransportProtocol::Http);
response
}

#[instrument(name = "notification", fields(method = notif.method.as_ref()), skip(notif, max_log_length), level = "TRACE")]
fn execute_notification(notif: Notif, max_log_length: u32) -> MethodResponse {
rx_log_from_json(&notif, max_log_length);
let response = MethodResponse { result: String::new(), success: true };
let response = MethodResponse { result: String::new(), success_or_error: MethodResponseResult::Success };
tx_log_from_str(&response.result, max_log_length);
response
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/transport/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ pub(crate) async fn execute_call<'a, L: Logger>(req: Request<'a>, call: CallData
let r = response.as_response();

tx_log_from_str(&r.result, max_log_length);
logger.on_result(name, r.success, request_start, TransportProtocol::WebSocket);
logger.on_result(name, r.success_or_error, request_start, TransportProtocol::WebSocket);
response
}

Expand Down
28 changes: 20 additions & 8 deletions tests/tests/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ use jsonrpsee::core::{client::ClientT, Error};
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::proc_macros::rpc;
use jsonrpsee::rpc_params;
use jsonrpsee::server::logger::{HttpRequest, Logger, MethodKind, TransportProtocol};
use jsonrpsee::server::logger::{HttpRequest, Logger, MethodKind, SuccessOrError, TransportProtocol};
use jsonrpsee::server::{ServerBuilder, ServerHandle};
use jsonrpsee::types::Params;
use jsonrpsee::types::{ErrorObject, ErrorObjectOwned, Params};
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::RpcModule;
use tokio::time::sleep;
Expand Down Expand Up @@ -82,8 +82,8 @@ impl Logger for Counter {
entry.0 += 1;
}

fn on_result(&self, name: &str, success: bool, n: u32, _t: TransportProtocol) {
if success {
fn on_result(&self, name: &str, success_or_error: SuccessOrError, n: u32, _t: TransportProtocol) {
if success_or_error.is_success() {
self.inner.lock().unwrap().calls.get_mut(name).unwrap().1.push(n);
}
}
Expand All @@ -105,6 +105,11 @@ fn test_module() -> RpcModule<()> {
sleep(Duration::from_millis(50)).await;
"hello".to_string()
}

#[method(name = "err")]
async fn err(&self) -> Result<String, ErrorObjectOwned> {
Err(ErrorObject::owned(1, "err", None::<()>))
}
}

impl RpcServer for () {}
Expand Down Expand Up @@ -154,12 +159,16 @@ async fn ws_server_logger() {
let res: Result<String, Error> = client.request("unknown_method", rpc_params![]).await;
assert!(res.is_err());

let res: Result<String, Error> = client.request("err", rpc_params![]).await;
Copy link
Member Author

@niklasad1 niklasad1 May 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expanded the test for failed RPC calls too

assert!(res.is_err());

{
let inner = counter.inner.lock().unwrap();

assert_eq!(inner.connections, (1, 0));
assert_eq!(inner.requests, (5, 5));
assert_eq!(inner.requests, (6, 6));
assert_eq!(inner.calls["say_hello"], (3, vec![0, 2, 3]));
assert_eq!(inner.calls["err"], (1, vec![]));
assert_eq!(inner.calls["unknown_method"], (2, vec![]));
}

Expand Down Expand Up @@ -193,17 +202,20 @@ async fn http_server_logger() {
let res: Result<String, Error> = client.request("unknown_method", rpc_params![]).await;
assert!(res.is_err());

let res: Result<String, Error> = client.request("err", rpc_params![]).await;
assert!(res.is_err());

{
let inner = counter.inner.lock().unwrap();
assert_eq!(inner.requests, (5, 5));
assert_eq!(inner.requests, (6, 6));
assert_eq!(inner.calls["say_hello"], (3, vec![0, 2, 3]));
assert_eq!(inner.calls["unknown_method"], (2, vec![]));
assert_eq!(inner.calls["err"], (1, vec![]));
}

server_handle.stop().unwrap();
server_handle.stopped().await;

// HTTP server doesn't track connections
let inner = counter.inner.lock().unwrap();
assert_eq!(inner.connections, (5, 5));
assert_eq!(inner.connections, (6, 6));
}