Skip to content

Commit

Permalink
Async/subscription benches (#372)
Browse files Browse the repository at this point in the history
* Add benches for async methods

* Benches for subscriptions
  • Loading branch information
popzxc authored and niklasad1 committed Oct 15, 2021
1 parent 3f804de commit df98cc4
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 39 deletions.
3 changes: 2 additions & 1 deletion benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ publish = false

[dev-dependencies]
criterion = "0.3"
futures-channel = "0.3.14"
futures-channel = "0.3.15"
futures-util = "0.3.15"
jsonrpsee = { path = "../jsonrpsee", features = ["full"] }
num_cpus = "1"
serde_json = "1"
Expand Down
194 changes: 158 additions & 36 deletions benches/bench.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,58 @@
use criterion::*;
use helpers::{SUB_METHOD_NAME, UNSUB_METHOD_NAME};
use jsonrpsee::{
http_client::{
traits::Client,
v2::params::{Id, JsonRpcParams},
v2::request::JsonRpcCallSer,
HttpClientBuilder,
},
types::traits::SubscriptionClient,
ws_client::WsClientBuilder,
};
use std::sync::Arc;
use tokio::runtime::Runtime as TokioRuntime;

mod helpers;

criterion_group!(benches, http_requests, batched_http_requests, websocket_requests, jsonrpsee_types_v2);
criterion_main!(benches);
criterion_group!(types_benches, jsonrpsee_types_v2);
criterion_group!(
sync_benches,
SyncBencher::http_requests,
SyncBencher::batched_http_requests,
SyncBencher::websocket_requests
);
criterion_group!(
async_benches,
AsyncBencher::http_requests,
AsyncBencher::batched_http_requests,
AsyncBencher::websocket_requests
);
criterion_group!(subscriptions, AsyncBencher::subscriptions);
criterion_main!(types_benches, sync_benches, async_benches, subscriptions);

#[derive(Debug, Clone, Copy)]
enum RequestType {
Sync,
Async,
}

impl RequestType {
fn method_name(self) -> &'static str {
match self {
RequestType::Sync => crate::helpers::SYNC_METHOD_NAME,
RequestType::Async => crate::helpers::ASYNC_METHOD_NAME,
}
}

fn group_name(self, name: &str) -> String {
let request_type_name = match self {
RequestType::Sync => "sync",
RequestType::Async => "async",
};
format!("{}/{}", request_type_name, name)
}
}

fn v2_serialize(req: JsonRpcCallSer<'_>) -> String {
serde_json::to_string(&req).unwrap()
Expand All @@ -39,53 +77,135 @@ pub fn jsonrpsee_types_v2(crit: &mut Criterion) {
});
}

pub fn http_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::http_server());
let client = Arc::new(HttpClientBuilder::default().build(&url).unwrap());
run_round_trip(&rt, crit, client.clone(), "http_round_trip");
run_concurrent_round_trip(&rt, crit, client, "http_concurrent_round_trip");
}
trait RequestBencher {
const REQUEST_TYPE: RequestType;

pub fn batched_http_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::http_server());
let client = Arc::new(HttpClientBuilder::default().build(&url).unwrap());
run_round_trip_with_batch(&rt, crit, client, "http batch requests");
fn http_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::http_server());
let client = Arc::new(HttpClientBuilder::default().build(&url).unwrap());
run_round_trip(&rt, crit, client.clone(), "http_round_trip", Self::REQUEST_TYPE);
run_concurrent_round_trip(&rt, crit, client, "http_concurrent_round_trip", Self::REQUEST_TYPE);
}

fn batched_http_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::http_server());
let client = Arc::new(HttpClientBuilder::default().build(&url).unwrap());
run_round_trip_with_batch(&rt, crit, client, "http batch requests", Self::REQUEST_TYPE);
}

fn websocket_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::ws_server());
let client =
Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap());
run_round_trip(&rt, crit, client.clone(), "ws_round_trip", Self::REQUEST_TYPE);
run_concurrent_round_trip(&rt, crit, client, "ws_concurrent_round_trip", Self::REQUEST_TYPE);
}

fn batched_ws_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::ws_server());
let client =
Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap());
run_round_trip_with_batch(&rt, crit, client, "ws batch requests", Self::REQUEST_TYPE);
}

fn subscriptions(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::ws_server());
let client =
Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap());
run_sub_round_trip(&rt, crit, client, "subscriptions");
}
}

pub fn websocket_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::ws_server());
let client =
Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap());
run_round_trip(&rt, crit, client.clone(), "ws_round_trip");
run_concurrent_round_trip(&rt, crit, client, "ws_concurrent_round_trip");
pub struct SyncBencher;

impl RequestBencher for SyncBencher {
const REQUEST_TYPE: RequestType = RequestType::Sync;
}
pub struct AsyncBencher;

pub fn batched_ws_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::ws_server());
let client =
Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap());
run_round_trip_with_batch(&rt, crit, client, "ws batch requests");
impl RequestBencher for AsyncBencher {
const REQUEST_TYPE: RequestType = RequestType::Async;
}

fn run_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Client>, name: &str) {
crit.bench_function(name, |b| {
fn run_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Client>, name: &str, request: RequestType) {
crit.bench_function(&request.group_name(name), |b| {
b.iter(|| {
rt.block_on(async {
black_box(client.request::<String>("say_hello", JsonRpcParams::NoParams).await.unwrap());
black_box(client.request::<String>(request.method_name(), JsonRpcParams::NoParams).await.unwrap());
})
})
});
}

/// Benchmark http batch requests over batch sizes of 2, 5, 10, 50 and 100 RPCs in each batch.
fn run_round_trip_with_batch(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Client>, name: &str) {
fn run_sub_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl SubscriptionClient>, name: &str) {
let mut group = crit.benchmark_group(name);
group.bench_function("subscribe", |b| {
b.iter_with_large_drop(|| {
rt.block_on(async {
black_box(
client
.subscribe::<String>(SUB_METHOD_NAME, JsonRpcParams::NoParams, UNSUB_METHOD_NAME)
.await
.unwrap(),
);
})
})
});
group.bench_function("subscribe_response", |b| {
b.iter_with_setup(
|| {
rt.block_on(async {
client
.subscribe::<String>(SUB_METHOD_NAME, JsonRpcParams::NoParams, UNSUB_METHOD_NAME)
.await
.unwrap()
})
},
|mut sub| {
rt.block_on(async { black_box(sub.next().await.unwrap()) });
// Note that this benchmark will include costs for measuring `drop` for subscription,
// since it's not possible to combine both `iter_with_setup` and `iter_with_large_drop`.
// To estimate pure cost of method, one should subtract the result of `unsub` bench
// from this one.
},
)
});
group.bench_function("unsub", |b| {
b.iter_with_setup(
|| {
rt.block_on(async {
client
.subscribe::<String>(SUB_METHOD_NAME, JsonRpcParams::NoParams, UNSUB_METHOD_NAME)
.await
.unwrap()
})
},
|sub| {
// Subscription will be closed inside of the drop impl.
// Actually, it just sends a notification about object being closed,
// but it's still important to know that drop impl is not too expensive.
drop(black_box(sub));
},
)
});
}

/// Benchmark http batch requests over batch sizes of 2, 5, 10, 50 and 100 RPCs in each batch.
fn run_round_trip_with_batch(
rt: &TokioRuntime,
crit: &mut Criterion,
client: Arc<impl Client>,
name: &str,
request: RequestType,
) {
let mut group = crit.benchmark_group(request.group_name(name));
for batch_size in [2, 5, 10, 50, 100usize].iter() {
let batch = vec![("say_hello", JsonRpcParams::NoParams); *batch_size];
let batch = vec![(request.method_name(), JsonRpcParams::NoParams); *batch_size];
group.throughput(Throughput::Elements(*batch_size as u64));
group.bench_with_input(BenchmarkId::from_parameter(batch_size), batch_size, |b, _| {
b.iter(|| rt.block_on(async { client.batch_request::<String>(batch.clone()).await.unwrap() }))
Expand All @@ -99,17 +219,19 @@ fn run_concurrent_round_trip<C: 'static + Client + Send + Sync>(
crit: &mut Criterion,
client: Arc<C>,
name: &str,
request: RequestType,
) {
let mut group = crit.benchmark_group(name);
let mut group = crit.benchmark_group(request.group_name(name));
for num_concurrent_tasks in helpers::concurrent_tasks() {
group.bench_function(format!("{}", num_concurrent_tasks), |b| {
b.iter(|| {
let mut tasks = Vec::new();
for _ in 0..num_concurrent_tasks {
let client_rc = client.clone();
let task = rt.spawn(async move {
let _ =
black_box(client_rc.request::<String>("say_hello", JsonRpcParams::NoParams).await.unwrap());
let _ = black_box(
client_rc.request::<String>(request.method_name(), JsonRpcParams::NoParams).await.unwrap(),
);
});
tasks.push(task);
}
Expand Down
20 changes: 18 additions & 2 deletions benches/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
use futures_channel::oneshot;
use futures_util::future::FutureExt;
use jsonrpsee::{
http_server::HttpServerBuilder,
ws_server::{RpcModule, WsServerBuilder},
};

pub(crate) const SYNC_METHOD_NAME: &str = "say_hello";
pub(crate) const ASYNC_METHOD_NAME: &str = "say_hello_async";
pub(crate) const SUB_METHOD_NAME: &str = "sub";
pub(crate) const UNSUB_METHOD_NAME: &str = "unsub";

/// Run jsonrpsee HTTP server for benchmarks.
pub async fn http_server() -> String {
let (server_started_tx, server_started_rx) = oneshot::channel();
tokio::spawn(async move {
let mut server =
HttpServerBuilder::default().max_request_body_size(u32::MAX).build("127.0.0.1:0".parse().unwrap()).unwrap();
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo")).unwrap();
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| (async { Ok("lo") }).boxed()).unwrap();
server.register_module(module).unwrap();
server_started_tx.send(server.local_addr().unwrap()).unwrap();
server.start().await
Expand All @@ -25,7 +32,16 @@ pub async fn ws_server() -> String {
tokio::spawn(async move {
let mut server = WsServerBuilder::default().build("127.0.0.1:0").await.unwrap();
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo")).unwrap();
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| (async { Ok("lo") }).boxed()).unwrap();
module
.register_subscription(SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| {
let x = "Hello";
tokio::spawn(async move { sink.send(&x) });
Ok(())
})
.unwrap();

server.register_module(module).unwrap();
server_started_tx.send(server.local_addr().unwrap()).unwrap();
server.start().await
Expand Down

0 comments on commit df98cc4

Please sign in to comment.