Skip to content

Commit

Permalink
Use criterion's async bencher (#385)
Browse files Browse the repository at this point in the history
* Use criterion's async bencher

* Rewrite concurrent roundtrip in functional style
  • Loading branch information
popzxc authored Jun 16, 2021
1 parent ca11d12 commit 82b1614
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 37 deletions.
2 changes: 1 addition & 1 deletion benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license = "MIT"
publish = false

[dev-dependencies]
criterion = "0.3"
criterion = { version = "0.3", features = ["async_tokio", "html_reports"] }
futures-channel = "0.3.15"
futures-util = "0.3.15"
jsonrpsee = { path = "../jsonrpsee", features = ["full"] }
Expand Down
68 changes: 32 additions & 36 deletions benches/bench.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use criterion::*;
use futures_util::future::join_all;
use helpers::{SUB_METHOD_NAME, UNSUB_METHOD_NAME};
use jsonrpsee::{
http_client::{
Expand Down Expand Up @@ -134,40 +135,37 @@ impl RequestBencher for AsyncBencher {

fn run_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Client>, name: &str, request: RequestType) {
crit.bench_function(&request.group_name(name), |b| {
b.iter(|| {
rt.block_on(async {
black_box(client.request::<String>(request.method_name(), JsonRpcParams::NoParams).await.unwrap());
})
b.to_async(rt).iter(|| async {
black_box(client.request::<String>(request.method_name(), JsonRpcParams::NoParams).await.unwrap());
})
});
}

fn run_sub_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl SubscriptionClient>, name: &str) {
let mut group = crit.benchmark_group(name);
group.bench_function("subscribe", |b| {
b.iter_with_large_drop(|| {
rt.block_on(async {
black_box(
client
.subscribe::<String>(SUB_METHOD_NAME, JsonRpcParams::NoParams, UNSUB_METHOD_NAME)
.await
.unwrap(),
);
})
b.to_async(rt).iter_with_large_drop(|| async {
black_box(
client.subscribe::<String>(SUB_METHOD_NAME, JsonRpcParams::NoParams, UNSUB_METHOD_NAME).await.unwrap(),
);
})
});
group.bench_function("subscribe_response", |b| {
b.iter_with_setup(
b.to_async(rt).iter_with_setup(
|| {
rt.block_on(async {
client
.subscribe::<String>(SUB_METHOD_NAME, JsonRpcParams::NoParams, UNSUB_METHOD_NAME)
.await
.unwrap()
// We have to use `block_in_place` here since `b.to_async(rt)` automatically enters the
// runtime context and simply calling `block_on` here will cause the code to panic.
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
client
.subscribe::<String>(SUB_METHOD_NAME, JsonRpcParams::NoParams, UNSUB_METHOD_NAME)
.await
.unwrap()
})
})
},
|mut sub| {
rt.block_on(async { black_box(sub.next().await.unwrap()) });
|mut sub| async move {
black_box(sub.next().await.unwrap());
// Note that this benchmark will include costs for measuring `drop` for subscription,
// since it's not possible to combine both `iter_with_setup` and `iter_with_large_drop`.
// To estimate pure cost of method, one should subtract the result of `unsub` bench
Expand Down Expand Up @@ -208,7 +206,7 @@ fn run_round_trip_with_batch(
let batch = vec![(request.method_name(), JsonRpcParams::NoParams); *batch_size];
group.throughput(Throughput::Elements(*batch_size as u64));
group.bench_with_input(BenchmarkId::from_parameter(batch_size), batch_size, |b, _| {
b.iter(|| rt.block_on(async { client.batch_request::<String>(batch.clone()).await.unwrap() }))
b.to_async(rt).iter(|| async { client.batch_request::<String>(batch.clone()).await.unwrap() })
});
}
group.finish();
Expand All @@ -224,21 +222,19 @@ fn run_concurrent_round_trip<C: 'static + Client + Send + Sync>(
let mut group = crit.benchmark_group(request.group_name(name));
for num_concurrent_tasks in helpers::concurrent_tasks() {
group.bench_function(format!("{}", num_concurrent_tasks), |b| {
b.iter(|| {
let mut tasks = Vec::new();
for _ in 0..num_concurrent_tasks {
let client_rc = client.clone();
let task = rt.spawn(async move {
let _ = black_box(
client_rc.request::<String>(request.method_name(), JsonRpcParams::NoParams).await.unwrap(),
);
b.to_async(rt).iter_with_setup(
|| (0..num_concurrent_tasks).map(|_| client.clone()),
|clients| async {
let tasks = clients.map(|client| {
rt.spawn(async move {
let _ = black_box(
client.request::<String>(request.method_name(), JsonRpcParams::NoParams).await.unwrap(),
);
})
});
tasks.push(task);
}
for task in tasks {
rt.block_on(task).unwrap();
}
})
join_all(tasks).await;
},
)
});
}
group.finish();
Expand Down

0 comments on commit 82b1614

Please sign in to comment.