diff --git a/src/bin/bench.rs b/src/bin/bench.rs index 9625246..8c0f849 100644 --- a/src/bin/bench.rs +++ b/src/bin/bench.rs @@ -1,11 +1,14 @@ extern crate time; extern crate rs_ducts; + use rs_ducts::multiplex; use rs_ducts::map; use rs_ducts::Pipeline; +// tuneables for different workloads +const THREAD_COUNT: usize = 1000; const WORK_COUNT: u64 = 1000; const WORK_FACTOR: u64 = 34; const BUFFSIZE: usize = 5; @@ -13,23 +16,28 @@ const BUFFSIZE: usize = 5; fn bench_single() { let source: Vec = (1..WORK_COUNT).collect(); - Pipeline::new(source, 5) - .map(|x| fib(WORK_FACTOR) + x, BUFFSIZE) - .drain(); + Pipeline::new(source, 5).map(fib_work, BUFFSIZE).drain(); } + fn bench_multi() { let source: Vec = (1..WORK_COUNT).collect(); - let mappers = (0..4) - .map(|_| map::Mapper::new(|x| fib(WORK_FACTOR) + x)) - .collect(); Pipeline::new(source, BUFFSIZE) - .then(multiplex::Multiplex::new(mappers, BUFFSIZE), BUFFSIZE) + .then(multiplex::Multiplex::from(map::Mapper::new(fib_work), + THREAD_COUNT, + BUFFSIZE), + BUFFSIZE) .drain(); } + // just something expensive +fn fib_work(n: u64) -> u64 { + fib(WORK_FACTOR) + n +} + + fn fib(n: u64) -> u64 { if n == 0 || n == 1 { 1 @@ -38,6 +46,7 @@ fn fib(n: u64) -> u64 { } } + pub fn timeit(name: &str, func: F) where F: FnOnce() -> () + Copy { diff --git a/src/lib.rs b/src/lib.rs index b282953..ddc0093 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ use std::sync::mpsc; use std::thread; + #[derive(Debug)] pub struct Pipeline where Output: Send + 'static @@ -8,6 +9,7 @@ pub struct Pipeline rx: mpsc::Receiver, } + impl Pipeline where Output: Send { @@ -73,6 +75,7 @@ impl Pipeline } } + impl IntoIterator for Pipeline where Output: Send { @@ -85,6 +88,7 @@ impl IntoIterator for Pipeline } } + pub trait PipelineEntry { fn process>(self, rx: I, @@ -134,8 +138,22 @@ pub mod map { } } } + + impl Clone for Mapper + where Func: Fn(In) -> Out + Copy + { + fn clone(&self) -> Self { + Mapper::new(self.func) + } + } + + impl Copy for Mapper + where Func: Fn(In) -> Out + Copy + { + } } + pub mod filter { use std::marker::PhantomData; use std::sync::mpsc; @@ -180,6 +198,10 @@ pub mod filter { pub mod multiplex { + // work around https://github.com/rust-lang/rust/issues/28229 + // (functions implement Copy but not Clone) + #![cfg_attr(feature="cargo-clippy", allow(expl_impl_clone_on_copy))] + use std::marker::PhantomData; use std::sync::mpsc; use std::sync::{Arc, Mutex}; @@ -189,7 +211,7 @@ pub mod multiplex { #[derive(Debug)] pub struct Multiplex - where Entry: PipelineEntry + where Entry: PipelineEntry + Send { entries: Vec, buffsize: usize, @@ -199,6 +221,14 @@ pub mod multiplex { out_: PhantomData, } + impl Multiplex + where Entry: PipelineEntry + Send + Copy + { + pub fn from(entry: Entry, workers: usize, buffsize: usize) -> Self { + Self::new((0..workers).map(|_| entry).collect(), buffsize) + } + } + impl Multiplex where Entry: PipelineEntry + Send { @@ -324,21 +354,59 @@ mod tests { assert_eq!(produced, expect); } + // just something expensive + fn fib_work(n: u64) -> u64 { + const WORK_FACTOR: u64 = 10; + fib(WORK_FACTOR) + n + } + + fn fib(n: u64) -> u64 { + if n == 0 || n == 1 { + 1 + } else { + fib(n - 1) + fib(n - 2) + } + } + + #[test] + fn multiplex_map_function() { + // we have two signatures for Multiplex, one that takes a function + // pointer and one that can take a closure. THis is the function pointer + // side + + let buffsize: usize = 10; + let workers: usize = 10; + + let source: Vec = (1..1000).collect(); + let expect: Vec = + source.clone().into_iter().map(fib_work).collect(); + + let pbb: Pipeline = Pipeline::new(source, buffsize) + .then(multiplex::Multiplex::from(map::Mapper::new(fib_work), + workers, + buffsize), + buffsize); + let mut produced: Vec = pbb.into_iter().collect(); + + produced.sort(); // these may arrive out of order + assert_eq!(produced, expect); + } + #[test] - fn multiplex_map() { + fn multiplex_map_closure() { let buffsize: usize = 10; + let workers: usize = 10; let source: Vec = (1..1000).collect(); let expect: Vec = source.iter().map(|x| x * 2).collect(); let pbb: Pipeline = Pipeline::new(source, buffsize) - // TOOD multiplex takes a list of PipelineEntry but it would be - // nicer if it just took one and was able to clone it - .then( - multiplex::Multiplex::new( - (0..10).map(|_| map::Mapper::new(|i| i*2)).collect(), - buffsize), - buffsize); + .then(multiplex::Multiplex::new((0..workers) + .map(|_| { + map::Mapper::new(|i| i * 2) + }).collect(), + buffsize), + buffsize); let mut produced: Vec = pbb.into_iter().collect(); produced.sort(); // these may arrive out of order