From 98e02bfefaf0e393d7380a72e413a768a619b624 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Fri, 4 Aug 2023 22:48:46 +0200 Subject: [PATCH 1/2] Add JetStream publish bench Signed-off-by: Tomasz Pietrek --- async-nats/Cargo.toml | 2 +- async-nats/benches/core_nats.rs | 5 +- async-nats/benches/jetstream.rs | 215 ++++++++++++++++++++++++++++++++ async-nats/benches/main.rs | 7 ++ 4 files changed, 225 insertions(+), 4 deletions(-) create mode 100644 async-nats/benches/jetstream.rs create mode 100644 async-nats/benches/main.rs diff --git a/async-nats/Cargo.toml b/async-nats/Cargo.toml index 2867e7496..7f8bb240a 100644 --- a/async-nats/Cargo.toml +++ b/async-nats/Cargo.toml @@ -58,6 +58,6 @@ slow_tests = [] [[bench]] -name = "core_nats" +name = "main" harness = false lto = true diff --git a/async-nats/benches/core_nats.rs b/async-nats/benches/core_nats.rs index 757c8df4a..ffe368c23 100644 --- a/async-nats/benches/core_nats.rs +++ b/async-nats/benches/core_nats.rs @@ -1,7 +1,7 @@ use std::time::Duration; use bytes::Bytes; -use criterion::{criterion_group, criterion_main, Criterion}; +use criterion::{criterion_group, Criterion}; use futures::stream::StreamExt; static MSG: &[u8] = &[22; 32768]; @@ -200,5 +200,4 @@ async fn subscribe_messages(nc: async_nats::Client, amount: u64) { } } -criterion_group!(benches, publish, subscribe, request); -criterion_main!(benches); +criterion_group!(core_nats, publish, subscribe, request); diff --git a/async-nats/benches/jetstream.rs b/async-nats/benches/jetstream.rs new file mode 100644 index 000000000..12695d6a2 --- /dev/null +++ b/async-nats/benches/jetstream.rs @@ -0,0 +1,215 @@ +use std::future::IntoFuture; + +use async_nats::jetstream::stream; +use bytes::Bytes; +use criterion::{criterion_group, Criterion}; + +static MSG: &[u8] = &[22; 32768]; + +pub fn jetstream_publish_sync(c: &mut Criterion) { + let messages_per_iter = 50_000; + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let mut throughput_group = c.benchmark_group("jetstream sync publish throughput"); + throughput_group.sample_size(10); + throughput_group.warm_up_time(std::time::Duration::from_secs(1)); + + for &size in [32, 1024, 8192].iter() { + throughput_group.throughput(criterion::Throughput::Bytes( + size as u64 * messages_per_iter, + )); + throughput_group.bench_with_input( + criterion::BenchmarkId::from_parameter(size), + &size, + |b, _| { + let rt = tokio::runtime::Runtime::new().unwrap(); + let context = rt.block_on(async { + let context = async_nats::jetstream::new( + async_nats::connect(server.client_url()).await.unwrap(), + ); + + let stream = context + .create_stream(stream::Config { + name: "bench".to_owned(), + subjects: vec!["bench".to_string()], + ..Default::default() + }) + .await + .unwrap(); + stream.purge().await.unwrap(); + context + }); + + b.to_async(rt).iter_with_large_drop(move || { + let nc = context.clone(); + async move { + publish_sync_batch(nc, Bytes::from_static(&MSG[..size]), messages_per_iter) + .await + } + }); + }, + ); + } + throughput_group.finish(); + + let mut messages_group = c.benchmark_group("jetstream sync publish messages amount"); + messages_group.sample_size(10); + messages_group.warm_up_time(std::time::Duration::from_secs(1)); + + for &size in [32, 1024, 8192].iter() { + messages_group.throughput(criterion::Throughput::Elements(messages_per_iter)); + messages_group.bench_with_input( + criterion::BenchmarkId::from_parameter(size), + &size, + |b, _| { + let rt = tokio::runtime::Runtime::new().unwrap(); + let context = rt.block_on(async { + let context = async_nats::jetstream::new( + async_nats::connect(server.client_url()).await.unwrap(), + ); + + let stream = context + .create_stream(stream::Config { + name: "bench".to_owned(), + subjects: vec!["bench".to_string()], + ..Default::default() + }) + .await + .unwrap(); + stream.purge().await.unwrap(); + context + }); + + b.to_async(rt).iter_with_large_drop(move || { + let context = context.clone(); + async move { + publish_sync_batch( + context, + Bytes::from_static(&MSG[..size]), + messages_per_iter, + ) + .await + } + }); + }, + ); + } + messages_group.finish(); +} + +pub fn jetstream_publish_async(c: &mut Criterion) { + let messages_per_iter = 50_000; + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let mut throughput_group = c.benchmark_group("jetstream async publish throughput"); + throughput_group.sample_size(10); + throughput_group.warm_up_time(std::time::Duration::from_secs(1)); + + for &size in [32, 1024, 8192].iter() { + throughput_group.throughput(criterion::Throughput::Bytes( + size as u64 * messages_per_iter, + )); + throughput_group.bench_with_input( + criterion::BenchmarkId::from_parameter(size), + &size, + |b, _| { + let rt = tokio::runtime::Runtime::new().unwrap(); + let context = rt.block_on(async { + let context = async_nats::jetstream::new( + async_nats::connect(server.client_url()).await.unwrap(), + ); + + let stream = context + .create_stream(stream::Config { + name: "bench".to_owned(), + subjects: vec!["bench".to_string()], + ..Default::default() + }) + .await + .unwrap(); + stream.purge().await.unwrap(); + context + }); + + b.to_async(rt).iter_with_large_drop(move || { + let nc = context.clone(); + async move { + publish_async_batch(nc, Bytes::from_static(&MSG[..size]), messages_per_iter) + .await + } + }); + }, + ); + } + throughput_group.finish(); + + let mut messages_group = c.benchmark_group("jetstream async publish messages amount"); + messages_group.sample_size(10); + messages_group.warm_up_time(std::time::Duration::from_secs(1)); + + for &size in [32, 1024, 8192].iter() { + messages_group.throughput(criterion::Throughput::Elements(messages_per_iter)); + messages_group.bench_with_input( + criterion::BenchmarkId::from_parameter(size), + &size, + |b, _| { + let rt = tokio::runtime::Runtime::new().unwrap(); + let context = rt.block_on(async { + let context = async_nats::jetstream::new( + async_nats::connect(server.client_url()).await.unwrap(), + ); + + let stream = context + .create_stream(stream::Config { + name: "bench".to_owned(), + subjects: vec!["bench".to_string()], + ..Default::default() + }) + .await + .unwrap(); + stream.purge().await.unwrap(); + context + }); + + b.to_async(rt).iter_with_large_drop(move || { + let context = context.clone(); + async move { + publish_async_batch( + context, + Bytes::from_static(&MSG[..size]), + messages_per_iter, + ) + .await + } + }); + }, + ); + } + messages_group.finish(); +} +async fn publish_sync_batch(context: async_nats::jetstream::Context, msg: Bytes, amount: u64) { + for _i in 0..amount { + context + .publish("bench".into(), msg.clone()) + .await + .unwrap() + .await + .unwrap(); + } +} + +async fn publish_async_batch(context: async_nats::jetstream::Context, msg: Bytes, amount: u64) { + // This acts as a semaphore that does not allow for more than 1000 publish acks awaiting. + let (tx, mut rx) = tokio::sync::mpsc::channel(10); + + let handle = tokio::task::spawn(async move { + for _ in 0..amount { + rx.recv().await.unwrap(); + } + }); + for _ in 0..amount { + let ack = context.publish("bench".into(), msg.clone()).await.unwrap(); + tx.send(ack.into_future()).await.unwrap(); + } + handle.await.unwrap(); +} + +criterion_group!(jetstream, jetstream_publish_sync, jetstream_publish_async); diff --git a/async-nats/benches/main.rs b/async-nats/benches/main.rs new file mode 100644 index 000000000..18fba05d3 --- /dev/null +++ b/async-nats/benches/main.rs @@ -0,0 +1,7 @@ +use criterion::criterion_main; + +// Import the benchmark groups from both files +mod core_nats; +mod jetstream; + +criterion_main!(core_nats::core_nats, jetstream::jetstream); From 222908ff38b337573a23f3f158cc35fde607a667 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Sat, 5 Aug 2023 07:36:17 +0200 Subject: [PATCH 2/2] Rename benchmarks This was done to allow running a specific bench. Now `cargo bench nats` or event `cargo bench nats::request` can be used. While it does fake the module structure, it is really intuitive to use and prints easy to read benchmark report. Signed-off-by: Tomasz Pietrek --- async-nats/benches/core_nats.rs | 8 ++++---- async-nats/benches/jetstream.rs | 7 ++++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/async-nats/benches/core_nats.rs b/async-nats/benches/core_nats.rs index ffe368c23..f9dc9f99a 100644 --- a/async-nats/benches/core_nats.rs +++ b/async-nats/benches/core_nats.rs @@ -9,7 +9,7 @@ static MSG: &[u8] = &[22; 32768]; pub fn publish(c: &mut Criterion) { let messages_per_iter = 500_000; let server = nats_server::run_basic_server(); - let mut throughput_group = c.benchmark_group("async-nats: publish throughput"); + let mut throughput_group = c.benchmark_group("nats::publish_throughput"); throughput_group.sample_size(10); throughput_group.warm_up_time(std::time::Duration::from_secs(1)); @@ -37,7 +37,7 @@ pub fn publish(c: &mut Criterion) { } throughput_group.finish(); - let mut messages_group = c.benchmark_group("async-nats: publish messages amount"); + let mut messages_group = c.benchmark_group("nats::publish_amount"); messages_group.sample_size(10); messages_group.warm_up_time(std::time::Duration::from_secs(1)); @@ -72,7 +72,7 @@ pub fn subscribe(c: &mut Criterion) { let server = nats_server::run_basic_server(); let messages_per_iter = 500_000; - let mut subscribe_amount_group = c.benchmark_group("subscribe amount"); + let mut subscribe_amount_group = c.benchmark_group("nats::subscribe_amount"); subscribe_amount_group.sample_size(10); for &size in [32, 1024, 8192].iter() { @@ -126,7 +126,7 @@ pub fn request(c: &mut Criterion) { let server = nats_server::run_basic_server(); let messages_per_iter = 10_000; - let mut subscribe_amount_group = c.benchmark_group("request amount"); + let mut subscribe_amount_group = c.benchmark_group("nats::request_amount"); subscribe_amount_group.sample_size(10); for &size in [32, 1024, 8192].iter() { diff --git a/async-nats/benches/jetstream.rs b/async-nats/benches/jetstream.rs index 12695d6a2..fc5fea139 100644 --- a/async-nats/benches/jetstream.rs +++ b/async-nats/benches/jetstream.rs @@ -9,7 +9,7 @@ static MSG: &[u8] = &[22; 32768]; pub fn jetstream_publish_sync(c: &mut Criterion) { let messages_per_iter = 50_000; let server = nats_server::run_server("tests/configs/jetstream.conf"); - let mut throughput_group = c.benchmark_group("jetstream sync publish throughput"); + let mut throughput_group = c.benchmark_group("jetstream::sync_publish_throughput"); throughput_group.sample_size(10); throughput_group.warm_up_time(std::time::Duration::from_secs(1)); @@ -141,7 +141,8 @@ pub fn jetstream_publish_async(c: &mut Criterion) { } throughput_group.finish(); - let mut messages_group = c.benchmark_group("jetstream async publish messages amount"); + let mut messages_group = c.benchmark_group("jetstream::async_publish_messages_amount"); + messages_group.sample_size(10); messages_group.warm_up_time(std::time::Duration::from_secs(1)); @@ -197,7 +198,7 @@ async fn publish_sync_batch(context: async_nats::jetstream::Context, msg: Bytes, } async fn publish_async_batch(context: async_nats::jetstream::Context, msg: Bytes, amount: u64) { - // This acts as a semaphore that does not allow for more than 1000 publish acks awaiting. + // This acts as a semaphore that does not allow for more than 10 publish acks awaiting. let (tx, mut rx) = tokio::sync::mpsc::channel(10); let handle = tokio::task::spawn(async move {