diff --git a/bastion-executor/Cargo.toml b/bastion-executor/Cargo.toml index 1ac52783..0b54e37b 100644 --- a/bastion-executor/Cargo.toml +++ b/bastion-executor/Cargo.toml @@ -15,6 +15,7 @@ lazy_static = "1.4" libc = "0.2" num_cpus = "1.10" rustc-hash = "1.0.1" +pin-utils = "0.1.0-alpha.4" # Allocator context-allocator = "0.2" diff --git a/bastion-executor/examples/blocking_run.rs b/bastion-executor/examples/blocking_run.rs new file mode 100644 index 00000000..203c788f --- /dev/null +++ b/bastion-executor/examples/blocking_run.rs @@ -0,0 +1,14 @@ +use bastion_executor::prelude::*; +use lightproc::proc_stack::ProcStack; +use std::{thread, time}; + +fn main() { + run( + async { + println!("DATA"); + panic!("kaka"); + }, + ProcStack::default() + .with_after_panic(|| {println!("after panic")}), + ); +} diff --git a/bastion-executor/examples/new_proc.rs b/bastion-executor/examples/new_proc.rs deleted file mode 100644 index 583a2465..00000000 --- a/bastion-executor/examples/new_proc.rs +++ /dev/null @@ -1,11 +0,0 @@ -use bastion_executor::prelude::*; -use lightproc::proc_stack::ProcStack; - -fn main() { - spawn( - async { - println!("DATA"); - }, - ProcStack::default(), - ); -} diff --git a/bastion-executor/examples/spawn_async.rs b/bastion-executor/examples/spawn_async.rs new file mode 100644 index 00000000..368ffe8b --- /dev/null +++ b/bastion-executor/examples/spawn_async.rs @@ -0,0 +1,26 @@ +use bastion_executor::prelude::*; +use lightproc::proc_stack::ProcStack; +use std::{thread, time}; + +const SIXTY_MILLIS: time::Duration = time::Duration::from_millis(4000); + +fn main() { + let stack = ProcStack::default() + .with_pid(1) + .with_after_panic(|| {println!("after panic")}); + + let handle = spawn(async { + panic!("test"); + }, stack); + + let stack = ProcStack::default() + .with_pid(2) + .with_after_panic(|| {println!("after panic")}); + + run( + async { + handle.await + }, + stack.clone() + ); +} diff --git a/bastion-executor/src/distributor.rs b/bastion-executor/src/distributor.rs index 9e872a57..6cd17ce9 100644 --- a/bastion-executor/src/distributor.rs +++ b/bastion-executor/src/distributor.rs @@ -5,6 +5,7 @@ use super::run_queue::{Stealer, Worker}; use lightproc::prelude::*; use std::thread; +use crate::worker; pub(crate) struct Distributor { pub round: usize, @@ -30,7 +31,7 @@ impl Distributor { let wrk = Worker::new_fifo(); stealers.push(wrk.stealer()); - workers.push(wrk); +// workers.push(wrk); thread::Builder::new() .name("bastion-async-thread".to_string()) @@ -39,7 +40,7 @@ impl Distributor { placement::set_for_current(core); // actual execution - // worker::main_loop(wrk);x + worker::main_loop(wrk); }) .expect("cannot start the thread for running proc"); } diff --git a/bastion-executor/src/lib.rs b/bastion-executor/src/lib.rs index fdc1f773..8b8d5643 100644 --- a/bastion-executor/src/lib.rs +++ b/bastion-executor/src/lib.rs @@ -15,7 +15,9 @@ pub mod run_queue; pub mod sleepers; pub mod thread_recovery; pub mod worker; +pub mod run; pub mod prelude { + pub use crate::run::*; pub use crate::pool::*; } diff --git a/bastion-executor/src/load_balancer.rs b/bastion-executor/src/load_balancer.rs index 957fa2dd..18165cd0 100644 --- a/bastion-executor/src/load_balancer.rs +++ b/bastion-executor/src/load_balancer.rs @@ -1,11 +1,13 @@ use super::pool; use super::run_queue::Worker; +use super::placement; use lazy_static::*; use lightproc::lightproc::LightProc; use std::{thread, time}; use std::sync::atomic::AtomicUsize; use std::collections::HashMap; +use std::sync::RwLock; use rustc_hash::FxHashMap; const SIXTY_MILLIS: time::Duration = time::Duration::from_millis(60); @@ -13,31 +15,12 @@ const SIXTY_MILLIS: time::Duration = time::Duration::from_millis(60); pub struct LoadBalancer(); impl LoadBalancer { - pub fn start(self, workers: Vec>) -> LoadBalancer { + pub fn sample(self, workers: Vec>) -> LoadBalancer { thread::Builder::new() .name("load-balancer-thread".to_string()) .spawn(move || { loop { - workers.iter().for_each(|w| { - pool::get().injector.steal_batch_and_pop(w); - }); - let stealer = pool::get() - .stealers - .iter() - .min_by_key(|e| e.run_queue_size()) - .unwrap(); - - let worker = workers - .iter() - .min_by_key(|e| e.worker_run_queue_size()) - .unwrap(); - - let big = worker.worker_run_queue_size(); - let small = stealer.run_queue_size(); - let m = (big & small) + ((big ^ small) >> 1); - - stealer.steal_batch_and_pop_with_amount(&worker, big.wrapping_sub(m)); // General suspending is equal to cache line size in ERTS // https://github.com/erlang/otp/blob/master/erts/emulator/beam/erl_process.c#L10887 @@ -64,13 +47,18 @@ unsafe impl Send for Stats {} unsafe impl Sync for Stats {} #[inline] -pub fn stats() -> &'static Stats { +pub fn stats() -> &'static RwLock { lazy_static! { - static ref LB_STATS: Stats = { - Stats { + static ref LB_STATS: RwLock = { + let stats = Stats { global_run_queue: 0, - smp_queues: FxHashMap::default() - } + smp_queues: FxHashMap::with_capacity_and_hasher( + placement::get_core_ids().unwrap().len(), + Default::default() + ) + }; + + RwLock::new(stats) }; } &*LB_STATS diff --git a/bastion-executor/src/pool.rs b/bastion-executor/src/pool.rs index d88d365c..0d4f3d86 100644 --- a/bastion-executor/src/pool.rs +++ b/bastion-executor/src/pool.rs @@ -58,7 +58,7 @@ pub fn get() -> &'static Pool { let distributor = Distributor::new(); let (stealers, workers) = distributor.assign(); - LoadBalancer().start(workers); +// LoadBalancer().start(workers); Pool { injector: Injector::new(), diff --git a/bastion-executor/src/run.rs b/bastion-executor/src/run.rs new file mode 100644 index 00000000..7f4b45d2 --- /dev/null +++ b/bastion-executor/src/run.rs @@ -0,0 +1,123 @@ +use std::future::Future; +use std::cell::UnsafeCell; +use std::cell::Cell; +use std::pin::Pin; +use std::{mem, panic}; +use lightproc::proc_stack::ProcStack; +use super::worker; +use std::sync::Arc; +use crossbeam_utils::sync::Parker; +use std::mem::ManuallyDrop; +use std::task::{Waker, RawWaker, Context, Poll, RawWakerVTable}; +use pin_utils::*; + +pub fn run(future: F, stack: ProcStack) -> T + where + F: Future, +{ + unsafe { + // A place on the stack where the result will be stored. + let out = &mut UnsafeCell::new(None); + + // Wrap the future into one that stores the result into `out`. + let future = { + let out = out.get(); + + async move { + *out = Some(future.await); + } + }; + + // Log this `block_on` operation. + let child_id = stack.get_pid(); + let parent_id = worker::get_proc_stack(|t| t.get_pid()).unwrap_or(0); + + // Wrap the future into one that drops task-local variables on exit. + // let future = task_local::add_finalizer(future); + + let future = async move { + future.await; + }; + + // Pin the future onto the stack. + pin_utils::pin_mut!(future); + + // Transmute the future into one that is futurestatic. + let future = mem::transmute::< + Pin<&'_ mut dyn Future>, + Pin<&'static mut dyn Future>, + >(future); + + // Block on the future and and wait for it to complete. + worker::set_stack(&stack, || block(future)); + + // Take out the result. + match (*out.get()).take() { + Some(v) => v, + _ => unimplemented!(), + } + } +} + + + +fn block(f: F) -> T + where + F: Future, +{ + thread_local! { + // May hold a pre-allocated parker that can be reused for efficiency. + // + // Note that each invocation of `block` needs its own parker. In particular, if `block` + // recursively calls itself, we must make sure that each recursive call uses a distinct + // parker instance. + static CACHE: Cell>> = Cell::new(None); + } + + pin_utils::pin_mut!(f); + + CACHE.with(|cache| { + // Reuse a cached parker or create a new one for this invocation of `block`. + let arc_parker: Arc = cache.take().unwrap_or_else(|| Arc::new(Parker::new())); + + let ptr = (&*arc_parker as *const Parker) as *const (); + let vt = vtable(); + + let waker = unsafe { ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, vt))) }; + let cx = &mut Context::from_waker(&waker); + + loop { + if let Poll::Ready(t) = f.as_mut().poll(cx) { + // Save the parker for the next invocation of `block`. + cache.set(Some(arc_parker)); + return t; + } + arc_parker.park(); + } + }) +} + +fn vtable() -> &'static RawWakerVTable { + unsafe fn clone_raw(ptr: *const ()) -> RawWaker { + #![allow(clippy::redundant_clone)] + let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker)); + mem::forget(arc.clone()); + RawWaker::new(ptr, vtable()) + } + + unsafe fn wake_raw(ptr: *const ()) { + let arc = Arc::from_raw(ptr as *const Parker); + arc.unparker().unpark(); + } + + unsafe fn wake_by_ref_raw(ptr: *const ()) { + let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker)); + arc.unparker().unpark(); + } + + unsafe fn drop_raw(ptr: *const ()) { + drop(Arc::from_raw(ptr as *const Parker)) + } + + &RawWakerVTable::new(clone_raw, wake_raw, wake_by_ref_raw, drop_raw) +}