Skip to content

Commit

Permalink
[#27] Block the processor when it does not have work to do
Browse files Browse the repository at this point in the history
  • Loading branch information
zonyitoo committed Feb 7, 2016
1 parent 0117dba commit 9cf45b8
Show file tree
Hide file tree
Showing 5 changed files with 398 additions and 81 deletions.
84 changes: 71 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ extern crate libc;
use std::thread;
use std::panic;
use std::time::Duration;
use std::any::Any;

pub use scheduler::{Scheduler, JoinHandle};
pub use scheduler::JoinHandle;
pub use options::Options;
pub use promise::Promise;

Expand Down Expand Up @@ -73,26 +74,16 @@ pub fn sched() {
Scheduler::sched()
}

/// Run the scheduler with threads
// #[inline(always)]
// pub fn run(threads: usize) {
// Scheduler::run(threads)
// }

/// Put the current coroutine to sleep for the specific amount of time
#[inline]
pub fn sleep_ms(ms: u64) {
if let Some(s) = Scheduler::instance() {
s.sleep_ms(ms).unwrap();
}
Scheduler::sleep(Duration::from_millis(ms))
}

/// Put the current coroutine to sleep for the specific amount of time
#[inline]
pub fn sleep(duration: Duration) {
if let Some(s) = Scheduler::instance() {
s.sleep(duration).unwrap();
}
Scheduler::sleep(duration)
}

/// Coroutine configuration. Provides detailed control over the properties and behavior of new coroutines.
Expand Down Expand Up @@ -132,6 +123,73 @@ impl Builder {
}
}

/// Coroutine Scheduler
pub struct Scheduler(scheduler::Scheduler);

impl Scheduler {
/// Create a new Scheduler with default configuration
pub fn new() -> Scheduler {
Scheduler(scheduler::Scheduler::new())
}

/// Set the number of workers
#[inline(always)]
pub fn with_workers(self, workers: usize) -> Scheduler {
Scheduler(self.0.with_workers(workers))
}

/// Set the default stack size
#[inline(always)]
pub fn default_stack_size(self, default_stack_size: usize) -> Scheduler {
Scheduler(self.0.default_stack_size(default_stack_size))
}

/// Get the total work count
#[inline(always)]
pub fn work_count(&self) -> usize {
self.0.work_count()
}

/// Spawn a new coroutine with default options
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where F: FnOnce() -> T + Send + 'static,
T: Send + 'static
{
scheduler::Scheduler::spawn(f)
}

/// Spawn a new coroutine with options
pub fn spawn_opts<F, T>(f: F, opts: Options) -> JoinHandle<T>
where F: FnOnce() -> T + Send + 'static,
T: Send + 'static
{
scheduler::Scheduler::spawn_opts(f, opts)
}

/// Run the scheduler
pub fn run<F, T>(&mut self, f: F) -> Result<T, Box<Any + Send>>
where F: FnOnce() -> T + Send,
T: Send
{
self.0.run(f)
}

/// Yield the current coroutine
#[inline]
pub fn sched() {
scheduler::Scheduler::sched()
}

/// Put the current coroutine to sleep for the specific amount of time
#[inline]
pub fn sleep(duration: Duration) {
match scheduler::Scheduler::instance() {
None => thread::sleep(duration),
Some(s) => s.sleep(duration).unwrap(),
}
}
}

unsafe fn try<R, F: FnOnce() -> R>(f: F) -> thread::Result<R> {
let mut f = Some(f);
let f = &mut f as *mut Option<F> as usize;
Expand Down
224 changes: 224 additions & 0 deletions src/runtime/bounded_mpmc_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
* SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
* OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* The views and conclusions contained in the software and documentation are
* those of the authors and should not be interpreted as representing official
* policies, either expressed or implied, of Dmitry Vyukov.
*/

#![allow(missing_docs, dead_code)]

// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue

// This queue is copy pasted from old rust stdlib.

use std::sync::Arc;
use std::cell::UnsafeCell;

use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Relaxed, Release, Acquire};

struct Node<T> {
sequence: AtomicUsize,
value: Option<T>,
}

unsafe impl<T: Send> Send for Node<T> {}
unsafe impl<T: Sync> Sync for Node<T> {}

struct State<T> {
pad0: [u8; 64],
buffer: Vec<UnsafeCell<Node<T>>>,
mask: usize,
pad1: [u8; 64],
enqueue_pos: AtomicUsize,
pad2: [u8; 64],
dequeue_pos: AtomicUsize,
pad3: [u8; 64],
}

unsafe impl<T: Send> Send for State<T> {}
unsafe impl<T: Send + Sync> Sync for State<T> {}

pub struct Queue<T> {
state: Arc<State<T>>,
}

impl<T> State<T> {
fn with_capacity(capacity: usize) -> State<T> {
let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 {
if capacity < 2 {
2
} else {
// use next power of 2 as capacity
capacity.next_power_of_two()
}
} else {
capacity
};
let buffer = (0..capacity).map(|i| {
UnsafeCell::new(Node { sequence:AtomicUsize::new(i), value: None })
}).collect::<Vec<_>>();
State{
pad0: [0; 64],
buffer: buffer,
mask: capacity-1,
pad1: [0; 64],
enqueue_pos: AtomicUsize::new(0),
pad2: [0; 64],
dequeue_pos: AtomicUsize::new(0),
pad3: [0; 64],
}
}

fn push(&self, value: T) -> Result<(), T> {
let mask = self.mask;
let mut pos = self.enqueue_pos.load(Relaxed);
loop {
let node = &self.buffer[pos & mask];
let seq = unsafe { (*node.get()).sequence.load(Acquire) };
let diff: isize = seq as isize - pos as isize;

if diff == 0 {
let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed);
if enqueue_pos == pos {
unsafe {
(*node.get()).value = Some(value);
(*node.get()).sequence.store(pos+1, Release);
}
break
} else {
pos = enqueue_pos;
}
} else if diff < 0 {
return Err(value);
} else {
pos = self.enqueue_pos.load(Relaxed);
}
}
Ok(())
}

fn pop(&self) -> Option<T> {
let mask = self.mask;
let mut pos = self.dequeue_pos.load(Relaxed);
loop {
let node = &self.buffer[pos & mask];
let seq = unsafe { (*node.get()).sequence.load(Acquire) };
let diff: isize = seq as isize - (pos + 1) as isize;
if diff == 0 {
let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed);
if dequeue_pos == pos {
unsafe {
let value = (*node.get()).value.take();
(*node.get()).sequence.store(pos + mask + 1, Release);
return value
}
} else {
pos = dequeue_pos;
}
} else if diff < 0 {
return None
} else {
pos = self.dequeue_pos.load(Relaxed);
}
}
}
}

impl<T> Queue<T> {
pub fn with_capacity(capacity: usize) -> Queue<T> {
Queue{
state: Arc::new(State::with_capacity(capacity))
}
}

pub fn push(&self, value: T) -> Result<(), T> {
self.state.push(value)
}

pub fn pop(&self) -> Option<T> {
self.state.pop()
}
}

impl<T> Clone for Queue<T> {
fn clone(&self) -> Queue<T> {
Queue { state: self.state.clone() }
}
}

#[cfg(test)]
mod tests {
use std::thread;
use std::sync::mpsc::channel;
use super::Queue;

#[test]
fn test() {
let nthreads = 8;
let nmsgs = 1000;
let q = Queue::with_capacity(nthreads*nmsgs);
assert_eq!(None, q.pop());
let (tx, rx) = channel();

for _ in 0..nthreads {
let q = q.clone();
let tx = tx.clone();
thread::spawn(move || {
let q = q;
for i in 0..nmsgs {
assert!(q.push(i).is_ok());
}
tx.send(()).unwrap();
});
}

let mut completion_rxs = vec![];
for _ in 0..nthreads {
let (tx, rx) = channel();
completion_rxs.push(rx);
let q = q.clone();
thread::spawn(move || {
let q = q;
let mut i = 0;
loop {
match q.pop() {
None => {},
Some(_) => {
i += 1;
if i == nmsgs { break }
}
}
}
tx.send(i).unwrap();
});
}

for rx in completion_rxs.iter_mut() {
assert_eq!(nmsgs, rx.recv().unwrap());
}
for _ in 0..nthreads {
rx.recv().unwrap();
}
}
}
1 change: 1 addition & 0 deletions src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@
pub use self::processor::Processor;

pub mod processor;
pub mod bounded_mpmc_queue;
Loading

0 comments on commit 9cf45b8

Please sign in to comment.