Skip to content

Commit

Permalink
Spawn async problems
Browse files Browse the repository at this point in the history
  • Loading branch information
vertexclique committed Oct 30, 2019
1 parent b825af1 commit 1ea3cb9
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 39 deletions.
1 change: 1 addition & 0 deletions bastion-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ lazy_static = "1.4"
libc = "0.2"
num_cpus = "1.10"
rustc-hash = "1.0.1"
pin-utils = "0.1.0-alpha.4"

# Allocator
context-allocator = "0.2"
Expand Down
14 changes: 14 additions & 0 deletions bastion-executor/examples/blocking_run.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use bastion_executor::prelude::*;
use lightproc::proc_stack::ProcStack;
use std::{thread, time};

fn main() {
run(
async {
println!("DATA");
panic!("kaka");
},
ProcStack::default()
.with_after_panic(|| {println!("after panic")}),
);
}
11 changes: 0 additions & 11 deletions bastion-executor/examples/new_proc.rs

This file was deleted.

26 changes: 26 additions & 0 deletions bastion-executor/examples/spawn_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use bastion_executor::prelude::*;
use lightproc::proc_stack::ProcStack;
use std::{thread, time};

const SIXTY_MILLIS: time::Duration = time::Duration::from_millis(4000);

fn main() {
let stack = ProcStack::default()
.with_pid(1)
.with_after_panic(|| {println!("after panic")});

let handle = spawn(async {
panic!("test");
}, stack);

let stack = ProcStack::default()
.with_pid(2)
.with_after_panic(|| {println!("after panic")});

run(
async {
handle.await
},
stack.clone()
);
}
5 changes: 3 additions & 2 deletions bastion-executor/src/distributor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::run_queue::{Stealer, Worker};
use lightproc::prelude::*;

use std::thread;
use crate::worker;

pub(crate) struct Distributor {
pub round: usize,
Expand All @@ -30,7 +31,7 @@ impl Distributor {

let wrk = Worker::new_fifo();
stealers.push(wrk.stealer());
workers.push(wrk);
// workers.push(wrk);

thread::Builder::new()
.name("bastion-async-thread".to_string())
Expand All @@ -39,7 +40,7 @@ impl Distributor {
placement::set_for_current(core);

// actual execution
// worker::main_loop(wrk);x
worker::main_loop(wrk);
})
.expect("cannot start the thread for running proc");
}
Expand Down
2 changes: 2 additions & 0 deletions bastion-executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ pub mod run_queue;
pub mod sleepers;
pub mod thread_recovery;
pub mod worker;
pub mod run;

pub mod prelude {
pub use crate::run::*;
pub use crate::pool::*;
}
38 changes: 13 additions & 25 deletions bastion-executor/src/load_balancer.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,26 @@
use super::pool;
use super::run_queue::Worker;
use super::placement;
use lazy_static::*;
use lightproc::lightproc::LightProc;

use std::{thread, time};
use std::sync::atomic::AtomicUsize;
use std::collections::HashMap;
use std::sync::RwLock;
use rustc_hash::FxHashMap;

const SIXTY_MILLIS: time::Duration = time::Duration::from_millis(60);

pub struct LoadBalancer();

impl LoadBalancer {
pub fn start(self, workers: Vec<Worker<LightProc>>) -> LoadBalancer {
pub fn sample(self, workers: Vec<Worker<LightProc>>) -> LoadBalancer {
thread::Builder::new()
.name("load-balancer-thread".to_string())
.spawn(move || {
loop {
workers.iter().for_each(|w| {
pool::get().injector.steal_batch_and_pop(w);
});

let stealer = pool::get()
.stealers
.iter()
.min_by_key(|e| e.run_queue_size())
.unwrap();

let worker = workers
.iter()
.min_by_key(|e| e.worker_run_queue_size())
.unwrap();

let big = worker.worker_run_queue_size();
let small = stealer.run_queue_size();
let m = (big & small) + ((big ^ small) >> 1);

stealer.steal_batch_and_pop_with_amount(&worker, big.wrapping_sub(m));

// General suspending is equal to cache line size in ERTS
// https://github.com/erlang/otp/blob/master/erts/emulator/beam/erl_process.c#L10887
Expand All @@ -64,13 +47,18 @@ unsafe impl Send for Stats {}
unsafe impl Sync for Stats {}

#[inline]
pub fn stats() -> &'static Stats {
pub fn stats() -> &'static RwLock<Stats> {
lazy_static! {
static ref LB_STATS: Stats = {
Stats {
static ref LB_STATS: RwLock<Stats> = {
let stats = Stats {
global_run_queue: 0,
smp_queues: FxHashMap::default()
}
smp_queues: FxHashMap::with_capacity_and_hasher(
placement::get_core_ids().unwrap().len(),
Default::default()
)
};

RwLock::new(stats)
};
}
&*LB_STATS
Expand Down
2 changes: 1 addition & 1 deletion bastion-executor/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub fn get() -> &'static Pool {
let distributor = Distributor::new();
let (stealers, workers) = distributor.assign();

LoadBalancer().start(workers);
// LoadBalancer().start(workers);

Pool {
injector: Injector::new(),
Expand Down
123 changes: 123 additions & 0 deletions bastion-executor/src/run.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use std::future::Future;
use std::cell::UnsafeCell;
use std::cell::Cell;
use std::pin::Pin;
use std::{mem, panic};
use lightproc::proc_stack::ProcStack;
use super::worker;
use std::sync::Arc;
use crossbeam_utils::sync::Parker;
use std::mem::ManuallyDrop;
use std::task::{Waker, RawWaker, Context, Poll, RawWakerVTable};
use pin_utils::*;

pub fn run<F, T>(future: F, stack: ProcStack) -> T
where
F: Future<Output = T>,
{
unsafe {
// A place on the stack where the result will be stored.
let out = &mut UnsafeCell::new(None);

// Wrap the future into one that stores the result into `out`.
let future = {
let out = out.get();

async move {
*out = Some(future.await);
}
};

// Log this `block_on` operation.
let child_id = stack.get_pid();
let parent_id = worker::get_proc_stack(|t| t.get_pid()).unwrap_or(0);

// Wrap the future into one that drops task-local variables on exit.
// let future = task_local::add_finalizer(future);

let future = async move {
future.await;
};

// Pin the future onto the stack.
pin_utils::pin_mut!(future);

// Transmute the future into one that is futurestatic.
let future = mem::transmute::<
Pin<&'_ mut dyn Future<Output = ()>>,
Pin<&'static mut dyn Future<Output = ()>>,
>(future);

// Block on the future and and wait for it to complete.
worker::set_stack(&stack, || block(future));

// Take out the result.
match (*out.get()).take() {
Some(v) => v,
_ => unimplemented!(),
}
}
}



fn block<F, T>(f: F) -> T
where
F: Future<Output = T>,
{
thread_local! {
// May hold a pre-allocated parker that can be reused for efficiency.
//
// Note that each invocation of `block` needs its own parker. In particular, if `block`
// recursively calls itself, we must make sure that each recursive call uses a distinct
// parker instance.
static CACHE: Cell<Option<Arc<Parker>>> = Cell::new(None);
}

pin_utils::pin_mut!(f);

CACHE.with(|cache| {
// Reuse a cached parker or create a new one for this invocation of `block`.
let arc_parker: Arc<Parker> = cache.take().unwrap_or_else(|| Arc::new(Parker::new()));

let ptr = (&*arc_parker as *const Parker) as *const ();
let vt = vtable();

let waker = unsafe { ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, vt))) };
let cx = &mut Context::from_waker(&waker);

loop {
if let Poll::Ready(t) = f.as_mut().poll(cx) {
// Save the parker for the next invocation of `block`.
cache.set(Some(arc_parker));
return t;
}
arc_parker.park();
}
})
}

fn vtable() -> &'static RawWakerVTable {
unsafe fn clone_raw(ptr: *const ()) -> RawWaker {
#![allow(clippy::redundant_clone)]
let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker));
mem::forget(arc.clone());
RawWaker::new(ptr, vtable())
}

unsafe fn wake_raw(ptr: *const ()) {
let arc = Arc::from_raw(ptr as *const Parker);
arc.unparker().unpark();
}

unsafe fn wake_by_ref_raw(ptr: *const ()) {
let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker));
arc.unparker().unpark();
}

unsafe fn drop_raw(ptr: *const ()) {
drop(Arc::from_raw(ptr as *const Parker))
}

&RawWakerVTable::new(clone_raw, wake_raw, wake_by_ref_raw, drop_raw)
}

0 comments on commit 1ea3cb9

Please sign in to comment.