Skip to content

Commit

Permalink
Scheduler complete
Browse files Browse the repository at this point in the history
  • Loading branch information
vertexclique committed Oct 30, 2019
1 parent 1ea3cb9 commit 089e3f2
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 28 deletions.
14 changes: 6 additions & 8 deletions bastion-executor/examples/spawn_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,18 @@ use bastion_executor::prelude::*;
use lightproc::proc_stack::ProcStack;
use std::{thread, time};

const SIXTY_MILLIS: time::Duration = time::Duration::from_millis(4000);

fn main() {
let pid = 1;
let stack = ProcStack::default()
.with_pid(1)
.with_after_panic(|| {println!("after panic")});
.with_pid(pid)
.with_after_panic(move || {println!("after panic {}", pid.clone())});

let handle = spawn(async {
panic!("test");
println!("test");
}, stack);

let stack = ProcStack::default()
.with_pid(2)
.with_after_panic(|| {println!("after panic")});
let pid = 2;
let stack = ProcStack::default().with_pid(pid);

run(
async {
Expand Down
8 changes: 3 additions & 5 deletions bastion-executor/src/distributor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,14 @@ impl Distributor {
}
}

pub fn assign(mut self) -> (Vec<Stealer<LightProc>>, Vec<Worker<LightProc>>) {
pub fn assign(mut self) -> Vec<Stealer<LightProc>> {
let mut stealers = Vec::<Stealer<LightProc>>::new();
let mut workers = Vec::<Worker<LightProc>>::new();

for core in self.cores {
self.round = core.id;

let wrk = Worker::new_fifo();
stealers.push(wrk.stealer());
// workers.push(wrk);

thread::Builder::new()
.name("bastion-async-thread".to_string())
Expand All @@ -40,11 +38,11 @@ impl Distributor {
placement::set_for_current(core);

// actual execution
worker::main_loop(wrk);
worker::main_loop(core.id.clone(), wrk);
})
.expect("cannot start the thread for running proc");
}

(stealers, workers)
stealers
}
}
31 changes: 22 additions & 9 deletions bastion-executor/src/load_balancer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,29 @@ use lightproc::lightproc::LightProc;
use std::{thread, time};
use std::sync::atomic::AtomicUsize;
use std::collections::HashMap;
use std::sync::RwLock;
use crossbeam_utils::sync::ShardedLock;
use rustc_hash::FxHashMap;
use super::load_balancer;

const SIXTY_MILLIS: time::Duration = time::Duration::from_millis(60);

pub struct LoadBalancer();

impl LoadBalancer {
pub fn sample(self, workers: Vec<Worker<LightProc>>) -> LoadBalancer {
pub fn sample() {
thread::Builder::new()
.name("load-balancer-thread".to_string())
.spawn(move || {
loop {
let mut m = 0_usize;
if let Ok(stats) = load_balancer::stats().try_read() {
m = stats.smp_queues.values().sum::<usize>()
.wrapping_div(placement::get_core_ids().unwrap().len());
}

if let Ok(mut stats) = load_balancer::stats().try_write() {
stats.mean_level = m;
}

// General suspending is equal to cache line size in ERTS
// https://github.com/erlang/otp/blob/master/erts/emulator/beam/erl_process.c#L10887
Expand All @@ -32,33 +41,37 @@ impl LoadBalancer {
}
})
.expect("load-balancer couldn't start");

self
}
}

#[derive(Clone)]
pub struct Stats {
global_run_queue: usize,
smp_queues: FxHashMap<usize, usize>,
pub(crate) global_run_queue: usize,
pub(crate) mean_level: usize,
pub(crate) smp_queues: FxHashMap<usize, usize>,
}

unsafe impl Send for Stats {}
unsafe impl Sync for Stats {}

#[inline]
pub fn stats() -> &'static RwLock<Stats> {
pub fn stats() -> &'static ShardedLock<Stats> {
lazy_static! {
static ref LB_STATS: RwLock<Stats> = {
static ref LB_STATS: ShardedLock<Stats> = {
let stats = Stats {
global_run_queue: 0,
mean_level: 0,
smp_queues: FxHashMap::with_capacity_and_hasher(
placement::get_core_ids().unwrap().len(),
Default::default()
)
};

RwLock::new(stats)
// Start sampler
LoadBalancer::sample();

// Return stats
ShardedLock::new(stats)
};
}
&*LB_STATS
Expand Down
31 changes: 27 additions & 4 deletions bastion-executor/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use super::worker;
use lazy_static::*;
use lightproc::prelude::*;
use std::future::Future;
use std::iter;
use crate::load_balancer;

pub fn spawn<F, T>(future: F, stack: ProcStack) -> RecoverableHandle<T>
where
Expand All @@ -28,7 +30,30 @@ impl Pool {
unimplemented!()
}

pub fn fetch_proc(&self, local: &Worker<LightProc>) -> Option<LightProc> {
pub fn fetch_proc(&self, affinity: usize, local: &Worker<LightProc>) -> Option<LightProc> {
if let Ok(mut stats) = load_balancer::stats().try_write() {
stats.smp_queues.insert(affinity, local.worker_run_queue_size());
}

if let Ok(stats) = load_balancer::stats().try_read() {
if local.worker_run_queue_size() == 0 {
while let Some(proc) = self.injector.steal_batch_and_pop(local).success() {
return Some(proc);
}
} else {
let affine_core =
*stats.smp_queues.iter()
.max_by_key(|&(core, stat)| stat).unwrap().1;
let stealer =
self.stealers.get(affine_core).unwrap();
if let Some(amount) = stealer.run_queue_size().checked_sub(stats.mean_level) {
if let Some(proc) = stealer.steal_batch_and_pop_with_amount(local, amount.wrapping_add(1)).success() {
return Some(proc);
}
}
}
}

// Pop only from the local queue with full trust
local.pop()
}
Expand Down Expand Up @@ -56,9 +81,7 @@ pub fn get() -> &'static Pool {
lazy_static! {
static ref POOL: Pool = {
let distributor = Distributor::new();
let (stealers, workers) = distributor.assign();

// LoadBalancer().start(workers);
let stealers = distributor.assign();

Pool {
injector: Injector::new(),
Expand Down
3 changes: 3 additions & 0 deletions bastion-executor/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ pub fn run<F, T>(future: F, stack: ProcStack) -> T
let child_id = stack.get_pid();
let parent_id = worker::get_proc_stack(|t| t.get_pid()).unwrap_or(0);

dbg!(parent_id);
dbg!(child_id);

// Wrap the future into one that drops task-local variables on exit.
// let future = task_local::add_finalizer(future);

Expand Down
4 changes: 2 additions & 2 deletions bastion-executor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ pub(crate) fn schedule(proc: LightProc) {
pool::get().sleepers.notify_one();
}

pub(crate) fn main_loop(worker: Worker<LightProc>) {
pub(crate) fn main_loop(affinity: usize, worker: Worker<LightProc>) {
IS_WORKER.with(|is_worker| is_worker.set(true));
QUEUE.with(|queue| queue.set(Some(worker)));

loop {
match get_queue(|q| pool::get().fetch_proc(q)) {
match get_queue(|q| pool::get().fetch_proc(affinity, q)) {
Some(proc) => set_stack(proc.stack(), || proc.run()),
None => pool::get().sleepers.wait(),
}
Expand Down

0 comments on commit 089e3f2

Please sign in to comment.