Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shutdown implementation #14

Merged
merged 2 commits into from
Aug 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,12 @@ ego-tree = "0.6.0"
lazy_static = "1.3.0"
objekt = "0.1.2"
signal-hook = "0.1.10"
parking_lot = "0.9"


#futures-preview = "=0.3.0-alpha.16"
uuid = { version = "0.7", features = ["serde", "v4"] }

[dev-dependencies]
reqwest = "0.9.19"

[profile.bench]
panic = "unwind"
opt-level = 3
Expand Down
59 changes: 59 additions & 0 deletions benches/bench_one_for_one.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#![feature(test)]

extern crate test;

#[cfg(test)]
mod tests {
use super::*;
use bastion::bastion::Bastion;
use bastion::bastion::PLATFORM;
use bastion::config::BastionConfig;
use bastion::context::BastionContext;
use bastion::supervisor::SupervisionStrategy;
use log::LevelFilter;
use std::borrow::{Borrow, BorrowMut};
use std::sync::Once;
use std::{fs, thread, time};
use tokio::prelude::*;
use tokio::runtime::{Builder, Runtime};
use test::Bencher;


static INIT: Once = Once::new();

fn init() {
INIT.call_once(|| {
let config = BastionConfig {
log_level: LevelFilter::Debug,
in_test: true,
};
let bastion = Bastion::platform_from_config(config);
});
}

fn awaiting(time: u64) {
let ten_millis = time::Duration::from_millis(time);
thread::sleep(ten_millis);
}

#[bench]
fn spawn_with_supervisor_one_for_one(b: &mut Bencher) {
fn closure() {
init();

let message = "Supervision Message".to_string();

Bastion::spawn(
|p, msg| {
panic!("root supervisor - spawn_at_root - 1");
},
message,
);

awaiting(100);
}

b.iter(|| closure());
}

}
51 changes: 39 additions & 12 deletions src/bastion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ use lazy_static::lazy_static;
use log::LevelFilter;
use signal_hook::{iterator::Signals, SIGINT};
use std::panic::AssertUnwindSafe;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use parking_lot::Mutex;
use tokio::prelude::future::FutureResult;
use tokio::prelude::*;
use tokio::runtime::Runtime;
use uuid::Uuid;
use std::mem;

lazy_static! {
// Platform which contains runtime system.
Expand Down Expand Up @@ -90,9 +92,9 @@ impl Bastion {
}

fn traverse(ns: Supervisor) -> Supervisor {
let runtime = PLATFORM.lock().unwrap();
let runtime = PLATFORM.lock();
let arcreg = runtime.registry.clone();
let registry = arcreg.lock().unwrap();
let registry = arcreg.lock();
Bastion::traverse_registry(registry.clone(), registry.root(), &ns);

ns
Expand All @@ -104,14 +106,14 @@ impl Bastion {

// Push supervisor for next trampoline
let fark = FAULTED.clone();
let mut faulted_ones = fark.lock().unwrap();
let mut faulted_ones = fark.lock();
faulted_ones.push(given.clone());

debug!("Fault induced supervisors: {:?}", faulted_ones);

let restart_needed = match trampoline_spv.strategy {
SupervisionStrategy::OneForOne => {
let killed = trampoline_spv.killed;
let killed = trampoline_spv.killed.clone();
debug!(
"One for One – Children restart triggered for :: {:?}",
killed
Expand Down Expand Up @@ -166,6 +168,8 @@ impl Bastion {

Tramp::Traverse(restart_needed).execute(|desc| {
let message_clone = objekt::clone_box(&*message_box);
let spv = trampoline_spv.clone();

match &desc {
n if n.is_empty() => Tramp::Complete(n.to_vec()),
n => {
Expand All @@ -180,6 +184,7 @@ impl Bastion {
let f = future::lazy(move || {
bt(
BastionContext {
spv: Some(spv.clone()),
bcast_rx: Some(rx.clone()),
bcast_tx: Some(tx.clone()),
},
Expand All @@ -193,7 +198,7 @@ impl Bastion {
if let Err(err) = result {
error!("Panic happened in restarted - {:?}", err);
let fark = FAULTED.clone();
let mut faulted_ones = fark.lock().unwrap();
let mut faulted_ones = fark.lock();
let faulted = faulted_ones.pop().unwrap();

// Make trampoline to re-enter
Expand All @@ -206,7 +211,7 @@ impl Bastion {
);

let ark = PLATFORM.clone();
let mut runtime = ark.lock().unwrap();
let mut runtime = ark.lock();
let shared_runtime: &mut Runtime = &mut runtime.runtime;
shared_runtime.spawn(k);
}
Expand All @@ -218,9 +223,15 @@ impl Bastion {
}

pub fn start() {
println!("ARKARKKARK");
println!("ARC {:?}", Arc::strong_count(&PLATFORM));
Bastion::runtime_shutdown_callback()
}

pub fn force_shutdown() {
Bastion::unstable_shutdown()
}

pub fn spawn<F, M>(thunk: F, msg: M) -> BastionChildren
where
F: BastionClosure,
Expand All @@ -242,14 +253,17 @@ impl Bastion {
let if_killed = child.clone();
let ret_val = child.clone();

let mut root_spv;
{
let ark = PLATFORM.clone();
let runtime = ark.lock().unwrap();
let mut registry = runtime.registry.lock().unwrap();
let runtime = ark.lock();
let mut registry = runtime.registry.lock();
let mut rootn = registry.root_mut();
let root: &mut Supervisor = rootn.value();

root.descendants.push(child);

root_spv = root.clone();
}

let tx = ret_val.tx.as_ref().unwrap().clone();
Expand All @@ -258,6 +272,7 @@ impl Bastion {
let f = future::lazy(move || {
bt(
BastionContext {
spv: Some(root_spv),
bcast_rx: Some(rx.clone()),
bcast_tx: Some(tx.clone()),
},
Expand All @@ -270,8 +285,8 @@ impl Bastion {
.catch_unwind()
.then(|result| -> FutureResult<(), ()> {
let ark = PLATFORM.clone();
let runtime = ark.lock().unwrap();
let mut registry = runtime.registry.lock().unwrap();
let runtime = ark.lock();
let mut registry = runtime.registry.lock();
let mut rootn = registry.root_mut();
let mut root = rootn.value().clone();

Expand All @@ -288,7 +303,7 @@ impl Bastion {
});

let ark = PLATFORM.clone();
let mut runtime = ark.lock().unwrap();
let mut runtime = ark.lock();
let shared_runtime: &mut Runtime = &mut runtime.runtime;
shared_runtime.spawn(k);

Expand All @@ -300,6 +315,18 @@ type Never = ();
const CLOSE_OVER: Result<Async<()>, Never> = Ok(Async::Ready(()));

impl RuntimeManager for Bastion {
fn unstable_shutdown() {
let ark = PLATFORM.clone();
unsafe {
if let Some(lock_ptr) = ark.clone().try_lock() {
let l: RuntimeSystem = mem::transmute_copy(&*lock_ptr);
l.runtime.shutdown_now().wait().unwrap();
ark.force_unlock_fair();
mem::forget(lock_ptr);
}
}
}

fn runtime_shutdown_callback() {
let mut entered = tokio_executor::enter().expect("main thread_local runtime lock");
let signals = Signals::new(&[SIGINT]).unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ impl<T> Message for T
where
T: Shell + Debug,
{
#[inline(always)]
fn as_any(&self) -> &dyn Any {
self
}
Expand Down
31 changes: 29 additions & 2 deletions src/context.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use crate::child::Message;
use crate::child::{Message, BastionChildren, BastionClosure};
use crate::messages::PoisonPill;
use crossbeam_channel::{Receiver, Sender};
use crossbeam_channel::{Receiver, Sender, unbounded};
use ratelimit::Limiter;
use std::any::Any;
use std::time::Duration;
use crate::spawn::RuntimeSpawn;
use crate::supervisor::Supervisor;
use uuid::Uuid;

#[derive(Clone)]
pub struct BastionContext {
pub spv: Option<Supervisor>,
pub bcast_tx: Option<Sender<Box<dyn Message>>>,
pub bcast_rx: Option<Receiver<Box<dyn Message>>>,
}
Expand Down Expand Up @@ -47,4 +51,27 @@ impl BastionContext {
}
}
}

pub fn spawn<F, M>(mut self, thunk: F, msg: M, scale: i32) -> Self
where
F: BastionClosure,
M: Message,
{
let bt = Box::new(thunk);
let msg_box = Box::new(msg);
let (p, c) = unbounded();

let children = BastionChildren {
id: Uuid::new_v4().to_string(),
tx: Some(p),
rx: Some(c),
redundancy: scale,
msg: objekt::clone_box(&*msg_box),
thunk: objekt::clone_box(&*bt),
};

// self.descendants.push(children);

self
}
}
2 changes: 0 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// Because if we can't trust, we can't make.
#![forbid(unsafe_code)]
#![feature(const_fn)]
#![feature(unboxed_closures)]
#![feature(fn_traits)]
Expand Down
1 change: 1 addition & 0 deletions src/runtime_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::any::Any;

pub(crate) trait RuntimeManager {
fn unstable_shutdown();
fn runtime_shutdown_callback();
}

Expand Down
5 changes: 3 additions & 2 deletions src/runtime_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use crate::runtime_manager::FaultRecovery;
use crate::supervisor::Supervisor;
use ego_tree::Tree;
use std::any::Any;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use parking_lot::Mutex;
use tokio::runtime::{Builder, Runtime};

pub struct RuntimeSystem {
Expand All @@ -15,7 +16,7 @@ impl RuntimeSystem {
let runtime: Runtime = Builder::new()
.panic_handler(|err| RuntimeSystem::panic_dispatcher(err))
.before_stop(|| {
debug!("Executing thread stopping");
debug!("System is stopping...");
})
.build()
.unwrap(); // Builder panic isn't a problem since we haven't started.
Expand Down
4 changes: 3 additions & 1 deletion src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,12 @@ impl Supervisor {
if_killed.id = format!("{}::{}", if_killed.id, child_id);

let mut this_spv = self.clone();
let context_spv = self.clone();

let f = future::lazy(move || {
nt(
BastionContext {
spv: Some(context_spv),
bcast_rx: Some(rx.clone()),
bcast_tx: Some(tx.clone()),
},
Expand All @@ -133,7 +135,7 @@ impl Supervisor {
});

let ark = crate::bastion::PLATFORM.clone();
let mut runtime = ark.lock().unwrap();
let mut runtime = ark.lock();
let shared_runtime = &mut runtime.runtime;
shared_runtime.spawn(k);
}
Expand Down
26 changes: 26 additions & 0 deletions tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,30 @@ mod tests {

awaiting(500);
}

// #[test]
// fn spawn_over_context() {
// init();
//
// let panicked_message = "Panicked Children Message".to_string();
// let stable_message = "Stable Children Message".to_string();
//
// Bastion::supervisor("background-worker", "new-system")
// .strategy(SupervisionStrategy::OneForAll)
// .children(
// |p: BastionContext, msg| {
// println!("new supervisor - panic_process - 1");
//
// let children_scale = 1;
// p.spawn(|bc, msg| {
// println!("Spawned from context");
// }, msg, children_scale);
// },
// panicked_message,
// 1_i32,
// )
// .launch();
//
// awaiting(500);
// }
}
Loading