Skip to content

Commit

Permalink
spmc bench added
Browse files Browse the repository at this point in the history
  • Loading branch information
tower120 authored Nov 10, 2024
1 parent eaec341 commit 25e2d2c
Show file tree
Hide file tree
Showing 11 changed files with 428 additions and 142 deletions.
7 changes: 5 additions & 2 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ If write calls will be synchronized - all messages will be ordered by that "sync

## Example

Write from multiple threads, read from multiple threads.
Write from multiple threads, read from multiple threads:

```rust
const WRITERS : usize = 4;
Expand Down Expand Up @@ -78,14 +78,17 @@ std::thread::scope(|s| {
});
```

See [examples](examples).

## Benchmarks

Intel i4771 (3.5Ghz 4C/8T), DDR3 1600Mhz, Windows 10. See [benchmarks](benchmarks) sub-project.

![seq benchmark](doc/img/benchmarks/seq.svg)
![spsc benchmark](doc/img/benchmarks/spsc.svg)
![mpsc benchmark](doc/img/benchmarks/mpsc.svg)
![mpmc benchmark](doc/img/benchmarks/mpmc.svg)
![broadcast mpmc benchmark](doc/img/benchmarks/mpmc.svg)
![broadcast spmc benchmark](doc/img/benchmarks/spmc.svg)

Benchmarks compare with a channels since chute can be used +/- as a channel, by
spinning on the reader side.
Expand Down
12 changes: 9 additions & 3 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,23 @@ charming = { git = "https://github.com/yuankunzhang/charming.git", rev = "aa18c2
#charming = { version = "0.4", features = ["ssr"] }

[[bench]]
name = "mpmc"
name = "seq"
harness = false

[[bench]]
name = "spsc"
harness = false

[[bench]]
name = "mpsc"
harness = false

# MULTICAST

[[bench]]
name = "spsc"
name = "mpmc"
harness = false

[[bench]]
name = "seq"
name = "spmc"
harness = false
2 changes: 1 addition & 1 deletion benchmarks/benches/mpmc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Multiple-producers, multiple-consumers
//! Multicast multiple-producers, multiple-consumers

use chute::LendingReader;
use arrayvec::ArrayVec;
Expand Down
122 changes: 122 additions & 0 deletions benchmarks/benches/spmc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
//! Multicast single-producer, multiple-consumers

use chute::LendingReader;
use arrayvec::ArrayVec;
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use std::sync::Arc;

mod common;
use common::*;

fn chute_spmc(reader_threads: usize){
let mut queue: chute::spmc::Queue<_> = Default::default();

let mut joins: ArrayVec<_, 64> = Default::default();

// READ
for _ in 0..reader_threads {
let mut reader = queue.reader();
joins.push(std::thread::spawn(move || {
for _ in 0..COUNT {
let msg = loop {
if let Some(msg) = reader.next() {
break msg;
}
yield_fn();
};
}
}));
}

// WRITE
for i in 0..COUNT {
queue.push(message::new(i));
}

for join in joins{
join.join().unwrap();
}
}

fn chute_mpmc(reader_threads: usize){
let queue = chute::mpmc::Queue::new();

let mut joins: ArrayVec<_, 64> = Default::default();

// READ
for _ in 0..reader_threads {
let mut reader = queue.reader();
joins.push(std::thread::spawn(move || {
for _ in 0..COUNT {
let msg = loop {
if let Some(msg) = reader.next() {
break msg;
}
yield_fn();
};
}
}));
}

// WRITE
let mut writer = queue.writer();
for i in 0..COUNT {
writer.push(message::new(i));
}

for join in joins{
join.join().unwrap();
}
}

fn tokio_broadcast(reader_threads: usize){
use tokio::sync::broadcast;
let (tx, _) = broadcast::channel(COUNT);

let mut joins: ArrayVec<_, 64> = Default::default();

// READ
for _ in 0..reader_threads {
let mut reader = tx.subscribe();
joins.push(std::thread::spawn(move || {
for _ in 0..COUNT {
reader.blocking_recv().unwrap();
}
}));
}

// WRITE
let mut writer = tx;
for i in 0..COUNT {
writer.send(message::new(i));
}

for join in joins{
join.join().unwrap();
}
}

fn criterion_benchmark(c: &mut Criterion) {
use criterion::BenchmarkId;

let mut group = c.benchmark_group("spmc");
for reader_threads in [1, 2, 4, 8] {
let parameter_string = format!("w:1 r:{}", reader_threads);

group.bench_with_input(BenchmarkId::new("chute::spmc", parameter_string.clone()), &reader_threads
, |b, rt| b.iter(|| chute_spmc(*rt))
);

group.bench_with_input(BenchmarkId::new("chute::mpmc", parameter_string.clone()), &reader_threads
, |b, rt| b.iter(|| chute_mpmc(*rt))
);

group.bench_with_input(BenchmarkId::new("tokio::broadcast", parameter_string.clone()), &reader_threads
, |b, rt| b.iter(|| tokio_broadcast(*rt))
);
}
group.finish();
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
11 changes: 9 additions & 2 deletions benchmarks/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
mod spmc;
mod mpmc;
mod mpsc;
mod spsc;
mod seq;
mod multi_chart;

use std::collections::{BTreeMap};
use std::fs;
Expand Down Expand Up @@ -31,6 +33,7 @@ fn read_estimate(fname: &Path) -> f64 {
point_estimate.as_f64().unwrap()
}

/// [writer_count][reader_count] -> estimate
type EstimatesMPMC = BTreeMap<usize, BTreeMap<usize, f64>>;

fn read_group(dir_name: &Path, writers: &[usize], readers: &[usize]) -> EstimatesMPMC {
Expand All @@ -54,11 +57,12 @@ const CHART_WIDTH: u32 = 570;

fn main(){
#[derive(Eq, PartialEq)]
enum Command{All, Mpmc, Mpsc, Spsc, Seq}
enum Command{All, Spmc, Mpmc, Mpsc, Spsc, Seq}

let args: Vec<String> = env::args().collect();
let command = match args.get(1).map(|s| s.as_str()) {
None => Command::All,
Some("spmc") => Command::Spmc,
Some("mpmc") => Command::Mpmc,
Some("mpsc") => Command::Mpsc,
Some("spsc") => Command::Spsc,
Expand All @@ -70,7 +74,10 @@ fn main(){
let criterion_dir = current_dir.join("target/criterion");

let _ = fs::create_dir(current_dir.join("out"));


if command == Command::All || command == Command::Spmc {
spmc::spmc(criterion_dir.join("spmc"));
}
if command == Command::All || command == Command::Mpmc {
mpmc::mpmc(criterion_dir.join("mpmc"));
}
Expand Down
124 changes: 16 additions & 108 deletions benchmarks/src/mpmc.rs
Original file line number Diff line number Diff line change
@@ -1,123 +1,31 @@
use std::collections::BTreeMap;
use std::path::Path;
use charming::{component::{Grid, Axis}, Chart, ImageRenderer};
use charming::element::{AxisLabel, AxisType, Color, Formatter};
use charming::element::LabelPosition;
use charming::element::Label;
use charming::series::{Bar, Series};
use charming::component::{Legend};
use charming::component::Title;
use str_macro::str;
use std::string::String;
use charming::element::font_settings::{FontFamily, FontStyle, FontWeight};
use crate::{read_group, EstimatesMPMC};
use crate::CHART_WIDTH;
use crate::CHART_THEME;
use crate::CHART_BACKGROUND;
use crate::{read_group};
use crate::multi_chart::{multi_chart, MultiChartData, Visual};

pub fn mpmc(dir_name: impl AsRef<Path>) {
let rt = 4;
let wts = [1,2,4,8];
let rts = [1,2,4,8];
let read = |dir: &str| -> EstimatesMPMC {
read_group(
let read = |dir: &str| -> BTreeMap<usize, f64> {
let data = read_group(
&std::path::Path::new(dir_name.as_ref()).join(dir)
,&wts, &rts
)
,&wts, &[rt]
);
data.iter().map(|(&wt, readers)| (wt, readers[&rt])).collect()
};

let all: Vec<(String, EstimatesMPMC)> = vec![
let all: MultiChartData = vec![
(str!("chute::spmc\nw/ mutex"), read("chute__spmc_mutex")),
(str!("chute::mpmc"), read("chute__mpmc")),
(str!("tokio::\nbroadcast"), read("tokio__broadcast")),
];

chart(&all, 4, str!("mpmc (4 readers)"), "out/mpmc");
}

/// `rt` - read thread count
pub fn chart(
all_estimates: &Vec<(String, EstimatesMPMC)>,
rt: usize,
title: String,
fname: impl AsRef<Path>
) {
let wts: Vec<usize> = all_estimates.first().unwrap().1
.iter().map(|(wt, _)| *wt)
.collect();

let unit = String::from("ms");
let ns_to_unit = 1.0 / 1_000_000.0;

let mut chart =
Chart::new()
.background_color(CHART_BACKGROUND)
.title(
Title::new()
.text(title)
.left("center")
)
.legend(
Legend::new().top("bottom")
)
.grid(
Grid::new()
.left(100)
.right(40)
.top(40)
.bottom(60)
)
.y_axis(
Axis::new()
.type_(AxisType::Category)
.data(
all_estimates.iter()
.map(|(name,_)| name.clone())
.collect()
)
.axis_label(
AxisLabel::new().show(true)
.font_size(13)
.font_weight(FontWeight::Bolder)
.color("#666666")
)
)
.x_axis(
Axis::new()
.type_(AxisType::Value)
.axis_label(AxisLabel::new().formatter(
Formatter::String(
"{value}".to_string() + &unit
)
))
);

for wt in wts {
let mut bar =
Bar::new()
.name(format!("{wt} writers"))
.label(
Label::new()
.show(true)
.font_size(11)
.font_weight(FontWeight::Bold)
.font_family(FontFamily::MonoSpace)
.position(LabelPosition::InsideRight)
.formatter(Formatter::Function(
(
"function (param) { return param.data.toFixed(2); }"
).into()
))
);
let mut datas = Vec::new();
for (_, estimates) in all_estimates {
let data_ns = estimates[&wt][&rt];
datas.push(data_ns * ns_to_unit);
}
bar = bar.data(datas);
chart = chart.series(Series::Bar(bar));
}

let height = all_estimates.len() as u32 * 80 + 100;
let mut renderer = ImageRenderer::new(CHART_WIDTH, height).theme(CHART_THEME);
renderer.save(&chart, fname.as_ref().with_extension("svg")).unwrap();
renderer.save_format(charming::ImageFormat::Png, &chart, fname.as_ref().with_extension("png")).unwrap();
let visual = Visual{
title: format!("broadcast mpmc ({rt} readers)"),
sub_chart_name: str!("writers"),
label_pos: LabelPosition::Right,
};
multi_chart(&all, "out/mpmc", visual);
}
Loading

0 comments on commit 25e2d2c

Please sign in to comment.