diff --git a/benches/Cargo.toml b/benches/Cargo.toml index 394f91b67e..f5e6e12285 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -9,7 +9,8 @@ publish = false [dev-dependencies] criterion = "0.3" -futures-channel = "0.3.14" +futures-channel = "0.3.15" +futures-util = "0.3.15" jsonrpsee = { path = "../jsonrpsee", features = ["full"] } num_cpus = "1" serde_json = "1" diff --git a/benches/bench.rs b/benches/bench.rs index 1f00fdcd98..84a077f934 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -1,4 +1,5 @@ use criterion::*; +use helpers::{SUB_METHOD_NAME, UNSUB_METHOD_NAME}; use jsonrpsee::{ http_client::{ traits::Client, @@ -6,6 +7,7 @@ use jsonrpsee::{ v2::request::JsonRpcCallSer, HttpClientBuilder, }, + types::traits::SubscriptionClient, ws_client::WsClientBuilder, }; use std::sync::Arc; @@ -13,8 +15,44 @@ use tokio::runtime::Runtime as TokioRuntime; mod helpers; -criterion_group!(benches, http_requests, batched_http_requests, websocket_requests, jsonrpsee_types_v2); -criterion_main!(benches); +criterion_group!(types_benches, jsonrpsee_types_v2); +criterion_group!( + sync_benches, + SyncBencher::http_requests, + SyncBencher::batched_http_requests, + SyncBencher::websocket_requests +); +criterion_group!( + async_benches, + AsyncBencher::http_requests, + AsyncBencher::batched_http_requests, + AsyncBencher::websocket_requests +); +criterion_group!(subscriptions, AsyncBencher::subscriptions); +criterion_main!(types_benches, sync_benches, async_benches, subscriptions); + +#[derive(Debug, Clone, Copy)] +enum RequestType { + Sync, + Async, +} + +impl RequestType { + fn method_name(self) -> &'static str { + match self { + RequestType::Sync => crate::helpers::SYNC_METHOD_NAME, + RequestType::Async => crate::helpers::ASYNC_METHOD_NAME, + } + } + + fn group_name(self, name: &str) -> String { + let request_type_name = match self { + RequestType::Sync => "sync", + RequestType::Async => "async", + }; + format!("{}/{}", request_type_name, name) + } +} fn v2_serialize(req: JsonRpcCallSer<'_>) -> String { serde_json::to_string(&req).unwrap() @@ -39,53 +77,135 @@ pub fn jsonrpsee_types_v2(crit: &mut Criterion) { }); } -pub fn http_requests(crit: &mut Criterion) { - let rt = TokioRuntime::new().unwrap(); - let url = rt.block_on(helpers::http_server()); - let client = Arc::new(HttpClientBuilder::default().build(&url).unwrap()); - run_round_trip(&rt, crit, client.clone(), "http_round_trip"); - run_concurrent_round_trip(&rt, crit, client, "http_concurrent_round_trip"); -} +trait RequestBencher { + const REQUEST_TYPE: RequestType; -pub fn batched_http_requests(crit: &mut Criterion) { - let rt = TokioRuntime::new().unwrap(); - let url = rt.block_on(helpers::http_server()); - let client = Arc::new(HttpClientBuilder::default().build(&url).unwrap()); - run_round_trip_with_batch(&rt, crit, client, "http batch requests"); + fn http_requests(crit: &mut Criterion) { + let rt = TokioRuntime::new().unwrap(); + let url = rt.block_on(helpers::http_server()); + let client = Arc::new(HttpClientBuilder::default().build(&url).unwrap()); + run_round_trip(&rt, crit, client.clone(), "http_round_trip", Self::REQUEST_TYPE); + run_concurrent_round_trip(&rt, crit, client, "http_concurrent_round_trip", Self::REQUEST_TYPE); + } + + fn batched_http_requests(crit: &mut Criterion) { + let rt = TokioRuntime::new().unwrap(); + let url = rt.block_on(helpers::http_server()); + let client = Arc::new(HttpClientBuilder::default().build(&url).unwrap()); + run_round_trip_with_batch(&rt, crit, client, "http batch requests", Self::REQUEST_TYPE); + } + + fn websocket_requests(crit: &mut Criterion) { + let rt = TokioRuntime::new().unwrap(); + let url = rt.block_on(helpers::ws_server()); + let client = + Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap()); + run_round_trip(&rt, crit, client.clone(), "ws_round_trip", Self::REQUEST_TYPE); + run_concurrent_round_trip(&rt, crit, client, "ws_concurrent_round_trip", Self::REQUEST_TYPE); + } + + fn batched_ws_requests(crit: &mut Criterion) { + let rt = TokioRuntime::new().unwrap(); + let url = rt.block_on(helpers::ws_server()); + let client = + Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap()); + run_round_trip_with_batch(&rt, crit, client, "ws batch requests", Self::REQUEST_TYPE); + } + + fn subscriptions(crit: &mut Criterion) { + let rt = TokioRuntime::new().unwrap(); + let url = rt.block_on(helpers::ws_server()); + let client = + Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap()); + run_sub_round_trip(&rt, crit, client, "subscriptions"); + } } -pub fn websocket_requests(crit: &mut Criterion) { - let rt = TokioRuntime::new().unwrap(); - let url = rt.block_on(helpers::ws_server()); - let client = - Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap()); - run_round_trip(&rt, crit, client.clone(), "ws_round_trip"); - run_concurrent_round_trip(&rt, crit, client, "ws_concurrent_round_trip"); +pub struct SyncBencher; + +impl RequestBencher for SyncBencher { + const REQUEST_TYPE: RequestType = RequestType::Sync; } +pub struct AsyncBencher; -pub fn batched_ws_requests(crit: &mut Criterion) { - let rt = TokioRuntime::new().unwrap(); - let url = rt.block_on(helpers::ws_server()); - let client = - Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap()); - run_round_trip_with_batch(&rt, crit, client, "ws batch requests"); +impl RequestBencher for AsyncBencher { + const REQUEST_TYPE: RequestType = RequestType::Async; } -fn run_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc, name: &str) { - crit.bench_function(name, |b| { +fn run_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc, name: &str, request: RequestType) { + crit.bench_function(&request.group_name(name), |b| { b.iter(|| { rt.block_on(async { - black_box(client.request::("say_hello", JsonRpcParams::NoParams).await.unwrap()); + black_box(client.request::(request.method_name(), JsonRpcParams::NoParams).await.unwrap()); }) }) }); } -/// Benchmark http batch requests over batch sizes of 2, 5, 10, 50 and 100 RPCs in each batch. -fn run_round_trip_with_batch(rt: &TokioRuntime, crit: &mut Criterion, client: Arc, name: &str) { +fn run_sub_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc, name: &str) { let mut group = crit.benchmark_group(name); + group.bench_function("subscribe", |b| { + b.iter_with_large_drop(|| { + rt.block_on(async { + black_box( + client + .subscribe::(SUB_METHOD_NAME, JsonRpcParams::NoParams, UNSUB_METHOD_NAME) + .await + .unwrap(), + ); + }) + }) + }); + group.bench_function("subscribe_response", |b| { + b.iter_with_setup( + || { + rt.block_on(async { + client + .subscribe::(SUB_METHOD_NAME, JsonRpcParams::NoParams, UNSUB_METHOD_NAME) + .await + .unwrap() + }) + }, + |mut sub| { + rt.block_on(async { black_box(sub.next().await.unwrap()) }); + // Note that this benchmark will include costs for measuring `drop` for subscription, + // since it's not possible to combine both `iter_with_setup` and `iter_with_large_drop`. + // To estimate pure cost of method, one should subtract the result of `unsub` bench + // from this one. + }, + ) + }); + group.bench_function("unsub", |b| { + b.iter_with_setup( + || { + rt.block_on(async { + client + .subscribe::(SUB_METHOD_NAME, JsonRpcParams::NoParams, UNSUB_METHOD_NAME) + .await + .unwrap() + }) + }, + |sub| { + // Subscription will be closed inside of the drop impl. + // Actually, it just sends a notification about object being closed, + // but it's still important to know that drop impl is not too expensive. + drop(black_box(sub)); + }, + ) + }); +} + +/// Benchmark http batch requests over batch sizes of 2, 5, 10, 50 and 100 RPCs in each batch. +fn run_round_trip_with_batch( + rt: &TokioRuntime, + crit: &mut Criterion, + client: Arc, + name: &str, + request: RequestType, +) { + let mut group = crit.benchmark_group(request.group_name(name)); for batch_size in [2, 5, 10, 50, 100usize].iter() { - let batch = vec![("say_hello", JsonRpcParams::NoParams); *batch_size]; + let batch = vec![(request.method_name(), JsonRpcParams::NoParams); *batch_size]; group.throughput(Throughput::Elements(*batch_size as u64)); group.bench_with_input(BenchmarkId::from_parameter(batch_size), batch_size, |b, _| { b.iter(|| rt.block_on(async { client.batch_request::(batch.clone()).await.unwrap() })) @@ -99,8 +219,9 @@ fn run_concurrent_round_trip( crit: &mut Criterion, client: Arc, name: &str, + request: RequestType, ) { - let mut group = crit.benchmark_group(name); + let mut group = crit.benchmark_group(request.group_name(name)); for num_concurrent_tasks in helpers::concurrent_tasks() { group.bench_function(format!("{}", num_concurrent_tasks), |b| { b.iter(|| { @@ -108,8 +229,9 @@ fn run_concurrent_round_trip( for _ in 0..num_concurrent_tasks { let client_rc = client.clone(); let task = rt.spawn(async move { - let _ = - black_box(client_rc.request::("say_hello", JsonRpcParams::NoParams).await.unwrap()); + let _ = black_box( + client_rc.request::(request.method_name(), JsonRpcParams::NoParams).await.unwrap(), + ); }); tasks.push(task); } diff --git a/benches/helpers.rs b/benches/helpers.rs index 3dc4e4c5a2..64e7da54d9 100644 --- a/benches/helpers.rs +++ b/benches/helpers.rs @@ -1,9 +1,15 @@ use futures_channel::oneshot; +use futures_util::future::FutureExt; use jsonrpsee::{ http_server::HttpServerBuilder, ws_server::{RpcModule, WsServerBuilder}, }; +pub(crate) const SYNC_METHOD_NAME: &str = "say_hello"; +pub(crate) const ASYNC_METHOD_NAME: &str = "say_hello_async"; +pub(crate) const SUB_METHOD_NAME: &str = "sub"; +pub(crate) const UNSUB_METHOD_NAME: &str = "unsub"; + /// Run jsonrpsee HTTP server for benchmarks. pub async fn http_server() -> String { let (server_started_tx, server_started_rx) = oneshot::channel(); @@ -11,7 +17,8 @@ pub async fn http_server() -> String { let mut server = HttpServerBuilder::default().max_request_body_size(u32::MAX).build("127.0.0.1:0".parse().unwrap()).unwrap(); let mut module = RpcModule::new(()); - module.register_method("say_hello", |_, _| Ok("lo")).unwrap(); + module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap(); + module.register_async_method(ASYNC_METHOD_NAME, |_, _| (async { Ok("lo") }).boxed()).unwrap(); server.register_module(module).unwrap(); server_started_tx.send(server.local_addr().unwrap()).unwrap(); server.start().await @@ -25,7 +32,16 @@ pub async fn ws_server() -> String { tokio::spawn(async move { let mut server = WsServerBuilder::default().build("127.0.0.1:0").await.unwrap(); let mut module = RpcModule::new(()); - module.register_method("say_hello", |_, _| Ok("lo")).unwrap(); + module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap(); + module.register_async_method(ASYNC_METHOD_NAME, |_, _| (async { Ok("lo") }).boxed()).unwrap(); + module + .register_subscription(SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| { + let x = "Hello"; + tokio::spawn(async move { sink.send(&x) }); + Ok(()) + }) + .unwrap(); + server.register_module(module).unwrap(); server_started_tx.send(server.local_addr().unwrap()).unwrap(); server.start().await