Skip to content

Commit

Permalink
rustfmt
Browse files Browse the repository at this point in the history
  • Loading branch information
dking committed Jun 10, 2017
1 parent 1c9beb2 commit 09b7922
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 18 deletions.
3 changes: 2 additions & 1 deletion rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
fn_brace_style = "SameLineWhere"
item_brace_style = "SameLineWhere"
use_try_shorthand = true
use_try_shorthand = true
max_width = 80
56 changes: 39 additions & 17 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,22 @@ impl<Output> Pipeline<Output>
// given another pipeline entry, send the results of the previous entry into
// the next one
#[must_use]
pub fn then<EntryOut, Entry>(self, next: Entry, buffsize: usize) -> Pipeline<EntryOut>
pub fn then<EntryOut, Entry>(self,
next: Entry,
buffsize: usize)
-> Pipeline<EntryOut>
where Entry: PipelineEntry<Output, EntryOut> + Send + 'static,
EntryOut: Send
{
self.pipe(move |tx, rx| next.process(tx, rx), buffsize)
}

pub fn pipe<EntryOut, Func>(self, func: Func, buffsize: usize) -> Pipeline<EntryOut>
where Func: FnOnce(mpsc::Receiver<Output>, mpsc::SyncSender<EntryOut>) -> (),
pub fn pipe<EntryOut, Func>(self,
func: Func,
buffsize: usize)
-> Pipeline<EntryOut>
where Func: FnOnce(mpsc::Receiver<Output>, mpsc::SyncSender<EntryOut>)
-> (),
Func: Send + 'static,
EntryOut: Send
{
Expand All @@ -45,7 +52,10 @@ impl<Output> Pipeline<Output>
Pipeline { rx }
}

pub fn map<EntryOut, Func>(self, func: Func, buffsize: usize) -> Pipeline<EntryOut>
pub fn map<EntryOut, Func>(self,
func: Func,
buffsize: usize)
-> Pipeline<EntryOut>
where Func: Fn(Output) -> EntryOut + Send + 'static,
EntryOut: Send
{
Expand Down Expand Up @@ -76,7 +86,10 @@ impl<Output> IntoIterator for Pipeline<Output>
}

pub trait PipelineEntry<In, Out> {
fn process<I: IntoIterator<Item = In>>(self, rx: I, tx: mpsc::SyncSender<Out>) -> ();
fn process<I: IntoIterator<Item = In>>(self,
rx: I,
tx: mpsc::SyncSender<Out>)
-> ();
}


Expand Down Expand Up @@ -112,7 +125,9 @@ pub mod map {
impl<In, Out, Func> PipelineEntry<In, Out> for Mapper<In, Out, Func>
where Func: Fn(In) -> Out
{
fn process<I: IntoIterator<Item = In>>(self, rx: I, tx: mpsc::SyncSender<Out>) {
fn process<I: IntoIterator<Item = In>>(self,
rx: I,
tx: mpsc::SyncSender<Out>) {
for item in rx {
let mapped = (self.func)(item);
tx.send(mapped).expect("failed to send");
Expand Down Expand Up @@ -151,7 +166,9 @@ pub mod filter {
impl<In, Func> PipelineEntry<In, In> for Filter<In, Func>
where Func: Fn(&In) -> bool
{
fn process<I: IntoIterator<Item = In>>(self, rx: I, tx: mpsc::SyncSender<In>) {
fn process<I: IntoIterator<Item = In>>(self,
rx: I,
tx: mpsc::SyncSender<In>) {
for item in rx {
if (self.func)(&item) {
tx.send(item).expect("failed to send")
Expand Down Expand Up @@ -200,7 +217,9 @@ pub mod multiplex {
In: Send + 'static,
Out: Send + 'static
{
fn process<I: IntoIterator<Item = In>>(self, rx: I, tx: mpsc::SyncSender<Out>) {
fn process<I: IntoIterator<Item = In>>(self,
rx: I,
tx: mpsc::SyncSender<Out>) {
// workers will read their work out of this channel but send their
// results directly into the regular tx channel
let (master_tx, chan_rx) = mpsc::sync_channel(self.buffsize);
Expand Down Expand Up @@ -283,7 +302,8 @@ mod tests {
let source: Vec<i32> = (1..1000).collect();
let expect: Vec<i32> = source.iter().map(|x| x * 2).collect();

let pbb: Pipeline<i32> = Pipeline::new(source, buffsize).map(|i| i * 2, buffsize);
let pbb: Pipeline<i32> =
Pipeline::new(source, buffsize).map(|i| i * 2, buffsize);
let produced: Vec<i32> = pbb.into_iter().collect();

assert_eq!(produced, expect);
Expand All @@ -293,7 +313,8 @@ mod tests {
fn multiple_map() {
let buffsize: usize = 10;
let source: Vec<i32> = vec![1, 2, 3];
let expect: Vec<i32> = source.iter().map(|x| (x * 2) * (x * 2)).collect();
let expect: Vec<i32> =
source.iter().map(|x| (x * 2) * (x * 2)).collect();

let pbb: Pipeline<i32> = Pipeline::new(source, buffsize)
.map(|i| i * 2, buffsize)
Expand Down Expand Up @@ -354,13 +375,14 @@ mod tests {
.filter(|x| x % 2 == 0)
.collect();

let pbb: Pipeline<i32> = Pipeline::new(source, buffsize).pipe(|in_, out| for item in in_ {
let item = item + 1;
if item % 2 == 0 {
out.send(item).expect("failed to send")
}
},
10);
let pbb: Pipeline<i32> = Pipeline::new(source, buffsize)
.pipe(|in_, out| for item in in_ {
let item = item + 1;
if item % 2 == 0 {
out.send(item).expect("failed to send")
}
},
10);
let produced: Vec<i32> = pbb.into_iter().collect();

assert_eq!(produced, expect);
Expand Down

0 comments on commit 09b7922

Please sign in to comment.