Skip to content

Commit

Permalink
[#27] Temporary solution for processor idle
Browse files Browse the repository at this point in the history
  • Loading branch information
zonyitoo committed Mar 19, 2016
1 parent fb3d941 commit d3d5397
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 67 deletions.
111 changes: 47 additions & 64 deletions src/runtime/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@ use std::cell::UnsafeCell;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender, SendError};
use std::thread::{self, Builder, Thread};
use std::time::Duration;
use std::thread::{self, Builder};
use std::fmt;

use deque::{self, Worker, Stealer, Stolen};
Expand All @@ -52,7 +50,6 @@ pub struct ProcMessageSender {
impl ProcMessageSender {
pub fn send(&self, proc_msg: ProcMessage) -> Result<(), SendError<ProcMessage>> {
try!(self.inner.send(proc_msg));
self.processor.try_wake_up();
Ok(())
}
}
Expand Down Expand Up @@ -177,17 +174,6 @@ pub struct ProcessorInner {

chan_sender: Sender<ProcMessage>,
chan_receiver: Receiver<ProcMessage>,

thread_handle: Option<Thread>,
should_wake_up: AtomicBool,
}

impl ProcessorInner {
fn try_wake_up(&self) {
// This flag should always set to true when we have job to do
self.should_wake_up.store(true, Ordering::SeqCst);
self.thread_handle.as_ref().map(|x| x.unpark());
}
}

impl Processor {
Expand All @@ -211,9 +197,6 @@ impl Processor {

chan_sender: tx,
chan_receiver: rx,

thread_handle: None,
should_wake_up: AtomicBool::new(false),
}),
};

Expand Down Expand Up @@ -297,62 +280,65 @@ impl Processor {
}

// 2. Check the mainbox
loop {
{
let mut resume_all_tasks = false;

while let Ok(msg) = self.chan_receiver.try_recv() {
match msg {
ProcMessage::NewNeighbor(nei) => self.neighbor_stealers.push(nei),
ProcMessage::Shutdown => {
trace!("{:?}: got shutdown signal", self);
break 'outerloop;
}
ProcMessage::Ready(mut coro) => {
coro.set_preferred_processor(Some(self.weak_self.clone()));
self.ready(coro);
resume_all_tasks = true;
}
{
let mut resume_all_tasks = false;

while let Ok(msg) = self.chan_receiver.try_recv() {
match msg {
ProcMessage::NewNeighbor(nei) => self.neighbor_stealers.push(nei),
ProcMessage::Shutdown => {
trace!("{:?}: got shutdown signal", self);
break 'outerloop;
}
ProcMessage::Ready(mut coro) => {
coro.set_preferred_processor(Some(self.weak_self.clone()));
self.ready(coro);
resume_all_tasks = true;
}
}

// Prefer running own tasks before stealing --> "continue" from anew.
if resume_all_tasks {
continue 'outerloop;
}
}

// 3. Randomly steal from neighbors as a last measure.
// TODO: To improve cache locality foreign lists
// should be split in half or so instead.
let rand_idx = self.rng.gen::<usize>();
let total_stealers = self.neighbor_stealers.len();
// Prefer running own tasks before stealing --> "continue" from anew.
if resume_all_tasks {
continue 'outerloop;
}
}

for idx in 0..total_stealers {
let idx = (rand_idx + idx) % total_stealers;
// 3. Randomly steal from neighbors as a last measure.
// TODO: To improve cache locality foreign lists
// should be split in half or so instead.
let rand_idx = self.rng.gen::<usize>();
let total_stealers = self.neighbor_stealers.len();

if let Stolen::Data(hdl) = self.neighbor_stealers[idx].steal() {
trace!("{:?}: stole Coroutine `{}`", self, hdl.debug_name());
self.resume(hdl);
continue 'outerloop;
}
}
for idx in 0..total_stealers {
let idx = (rand_idx + idx) % total_stealers;

// Check once before park
if self.should_wake_up.swap(false, Ordering::SeqCst) {
break;
if let Stolen::Data(hdl) = self.neighbor_stealers[idx].steal() {
trace!("{:?}: stole Coroutine `{}`", self, hdl.debug_name());
self.resume(hdl);
continue 'outerloop;
}
}

thread::park_timeout(Duration::from_millis(1));
// Park the processor
{
let sched = self.scheduler();
sched.park_processor(self.handle());
}

// If we are waken up, then break this loop
// otherwise, continue to steal jobs from the others
if self.should_wake_up.swap(false, Ordering::SeqCst) {
break;
match self.chan_receiver.recv().unwrap() {
ProcMessage::NewNeighbor(nei) => self.neighbor_stealers.push(nei),
ProcMessage::Shutdown => {
trace!("{:?}: got shutdown signal", self);
break 'outerloop;
}
ProcMessage::Ready(mut coro) => {
coro.set_preferred_processor(Some(self.weak_self.clone()));
self.ready(coro);
}
}
}

trace!("{:?}: dropping coroutines in channel", self);
while let Ok(msg) = self.chan_receiver.try_recv() {
match msg {
Expand Down Expand Up @@ -434,9 +420,6 @@ impl Processor {
/// Enqueue a coroutine to be resumed as soon as possible (making it the head of the queue)
pub fn ready(&mut self, coro: Handle) {
self.queue_worker.push(coro);

// Wake up the worker thread if it is parked
self.try_wake_up();
}

/// Suspends the current running coroutine, equivalent to `Scheduler::sched`
Expand Down
26 changes: 23 additions & 3 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,19 @@ use std::fmt::Debug;
use std::io::{self, Write};
use std::mem;
use std::panic;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;
use std::collections::VecDeque;

use mio::{Evented, EventLoop, EventSet, Handler, NotifyError, PollOpt, Sender, TimerError, Token};
use slab::Slab;

use coroutine::{Handle, Coroutine};
use join_handle::{self, JoinHandleReceiver};
use options::Options;
use runtime::processor::{Processor, ProcMessage};
use runtime::processor::{Processor, ProcMessage, ProcMessageSender};
use sync::mono_barrier::CoroMonoBarrier;


Expand Down Expand Up @@ -157,7 +158,6 @@ impl ReadyStates {
}
}


/// Coroutine scheduler
pub struct Scheduler {
default_spawn_options: Options,
Expand All @@ -167,6 +167,8 @@ pub struct Scheduler {
event_loop_sender: Option<Sender<Message>>,
slab: Slab<ReadyStates, usize>,
work_count: Arc<AtomicUsize>,

parked_processors: Mutex<VecDeque<ProcMessageSender>>,
}

unsafe impl Send for Scheduler {}
Expand All @@ -181,6 +183,8 @@ impl Scheduler {
event_loop_sender: None,
slab: Slab::new(1024),
work_count: Arc::new(AtomicUsize::new(0)),

parked_processors: Mutex::new(VecDeque::new()),
}
}

Expand Down Expand Up @@ -373,6 +377,16 @@ impl Scheduler {
return;
}

// Try to wake up a parked processor
if let Some(ref processor) = current {
let mut queue = processor.scheduler().parked_processors.lock().unwrap();
if let Some(prochdl) = queue.pop_front() {
trace!("Coroutine `{}`: pushing into a parked processor", coro.debug_name());
let _ = prochdl.send(ProcMessage::Ready(coro));
return;
}
}

if let Some(mut current) = current {
trace!("Coroutine `{}`: pushing into current queue",
coro.debug_name());
Expand Down Expand Up @@ -490,6 +504,12 @@ impl Scheduler {
pub fn sleep(&self, delay: Duration) -> Result<(), TimerError> {
self.sleep_ms(delay.as_secs() * 1_000 + delay.subsec_nanos() as u64 / 1_000_000)
}

#[doc(hidden)]
pub fn park_processor(&self, prochdl: ProcMessageSender) {
let mut queue = self.parked_processors.lock().unwrap();
queue.push_back(prochdl);
}
}

impl Handler for Scheduler {
Expand Down

0 comments on commit d3d5397

Please sign in to comment.