Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add jetstream benchmarks #1080

Merged
merged 2 commits into from
Aug 5, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion async-nats/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,6 @@ slow_tests = []


[[bench]]
name = "core_nats"
name = "main"
harness = false
lto = true
5 changes: 2 additions & 3 deletions async-nats/benches/core_nats.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;

use bytes::Bytes;
use criterion::{criterion_group, criterion_main, Criterion};
use criterion::{criterion_group, Criterion};
use futures::stream::StreamExt;

static MSG: &[u8] = &[22; 32768];
Expand Down Expand Up @@ -200,5 +200,4 @@ async fn subscribe_messages(nc: async_nats::Client, amount: u64) {
}
}

criterion_group!(benches, publish, subscribe, request);
criterion_main!(benches);
criterion_group!(core_nats, publish, subscribe, request);
215 changes: 215 additions & 0 deletions async-nats/benches/jetstream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
use std::future::IntoFuture;

use async_nats::jetstream::stream;
use bytes::Bytes;
use criterion::{criterion_group, Criterion};

static MSG: &[u8] = &[22; 32768];

pub fn jetstream_publish_sync(c: &mut Criterion) {
let messages_per_iter = 50_000;
let server = nats_server::run_server("tests/configs/jetstream.conf");
let mut throughput_group = c.benchmark_group("jetstream sync publish throughput");
throughput_group.sample_size(10);
throughput_group.warm_up_time(std::time::Duration::from_secs(1));

for &size in [32, 1024, 8192].iter() {
throughput_group.throughput(criterion::Throughput::Bytes(
size as u64 * messages_per_iter,
));
throughput_group.bench_with_input(
criterion::BenchmarkId::from_parameter(size),
&size,
|b, _| {
let rt = tokio::runtime::Runtime::new().unwrap();
let context = rt.block_on(async {
let context = async_nats::jetstream::new(
async_nats::connect(server.client_url()).await.unwrap(),
);

let stream = context
.create_stream(stream::Config {
name: "bench".to_owned(),
subjects: vec!["bench".to_string()],
..Default::default()
})
.await
.unwrap();
stream.purge().await.unwrap();
context
});

b.to_async(rt).iter_with_large_drop(move || {
let nc = context.clone();
async move {
publish_sync_batch(nc, Bytes::from_static(&MSG[..size]), messages_per_iter)
.await
}
});
},
);
}
throughput_group.finish();

let mut messages_group = c.benchmark_group("jetstream sync publish messages amount");
messages_group.sample_size(10);
messages_group.warm_up_time(std::time::Duration::from_secs(1));

for &size in [32, 1024, 8192].iter() {
messages_group.throughput(criterion::Throughput::Elements(messages_per_iter));
messages_group.bench_with_input(
criterion::BenchmarkId::from_parameter(size),
&size,
|b, _| {
let rt = tokio::runtime::Runtime::new().unwrap();
let context = rt.block_on(async {
let context = async_nats::jetstream::new(
async_nats::connect(server.client_url()).await.unwrap(),
);

let stream = context
.create_stream(stream::Config {
name: "bench".to_owned(),
subjects: vec!["bench".to_string()],
..Default::default()
})
.await
.unwrap();
stream.purge().await.unwrap();
context
});

b.to_async(rt).iter_with_large_drop(move || {
let context = context.clone();
async move {
publish_sync_batch(
context,
Bytes::from_static(&MSG[..size]),
messages_per_iter,
)
.await
}
});
},
);
}
messages_group.finish();
}

pub fn jetstream_publish_async(c: &mut Criterion) {
let messages_per_iter = 50_000;
let server = nats_server::run_server("tests/configs/jetstream.conf");
let mut throughput_group = c.benchmark_group("jetstream async publish throughput");
throughput_group.sample_size(10);
throughput_group.warm_up_time(std::time::Duration::from_secs(1));

for &size in [32, 1024, 8192].iter() {
throughput_group.throughput(criterion::Throughput::Bytes(
size as u64 * messages_per_iter,
));
throughput_group.bench_with_input(
criterion::BenchmarkId::from_parameter(size),
&size,
|b, _| {
let rt = tokio::runtime::Runtime::new().unwrap();
let context = rt.block_on(async {
let context = async_nats::jetstream::new(
async_nats::connect(server.client_url()).await.unwrap(),
);

let stream = context
.create_stream(stream::Config {
name: "bench".to_owned(),
subjects: vec!["bench".to_string()],
..Default::default()
})
.await
.unwrap();
stream.purge().await.unwrap();
context
});

b.to_async(rt).iter_with_large_drop(move || {
let nc = context.clone();
async move {
publish_async_batch(nc, Bytes::from_static(&MSG[..size]), messages_per_iter)
.await
}
});
},
);
}
throughput_group.finish();

let mut messages_group = c.benchmark_group("jetstream async publish messages amount");
messages_group.sample_size(10);
messages_group.warm_up_time(std::time::Duration::from_secs(1));

for &size in [32, 1024, 8192].iter() {
messages_group.throughput(criterion::Throughput::Elements(messages_per_iter));
messages_group.bench_with_input(
criterion::BenchmarkId::from_parameter(size),
&size,
|b, _| {
let rt = tokio::runtime::Runtime::new().unwrap();
let context = rt.block_on(async {
let context = async_nats::jetstream::new(
async_nats::connect(server.client_url()).await.unwrap(),
);

let stream = context
.create_stream(stream::Config {
name: "bench".to_owned(),
subjects: vec!["bench".to_string()],
..Default::default()
})
.await
.unwrap();
stream.purge().await.unwrap();
context
});

b.to_async(rt).iter_with_large_drop(move || {
let context = context.clone();
async move {
publish_async_batch(
context,
Bytes::from_static(&MSG[..size]),
messages_per_iter,
)
.await
}
});
},
);
}
messages_group.finish();
}
async fn publish_sync_batch(context: async_nats::jetstream::Context, msg: Bytes, amount: u64) {
for _i in 0..amount {
context
.publish("bench".into(), msg.clone())
.await
.unwrap()
.await
.unwrap();
}
}

async fn publish_async_batch(context: async_nats::jetstream::Context, msg: Bytes, amount: u64) {
// This acts as a semaphore that does not allow for more than 1000 publish acks awaiting.
Jarema marked this conversation as resolved.
Show resolved Hide resolved
let (tx, mut rx) = tokio::sync::mpsc::channel(10);

let handle = tokio::task::spawn(async move {
for _ in 0..amount {
rx.recv().await.unwrap();
}
});
for _ in 0..amount {
let ack = context.publish("bench".into(), msg.clone()).await.unwrap();
tx.send(ack.into_future()).await.unwrap();
}
handle.await.unwrap();
}

criterion_group!(jetstream, jetstream_publish_sync, jetstream_publish_async);
7 changes: 7 additions & 0 deletions async-nats/benches/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use criterion::criterion_main;

// Import the benchmark groups from both files
mod core_nats;
mod jetstream;

criterion_main!(core_nats::core_nats, jetstream::jetstream);