Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spawn from the context #19

Merged
merged 2 commits into from
Aug 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions benches/bench_one_for_one.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ mod tests {
use std::borrow::{Borrow, BorrowMut};
use std::sync::Once;
use std::{fs, thread, time};
use test::Bencher;
use tokio::prelude::*;
use tokio::runtime::{Builder, Runtime};
use test::Bencher;


static INIT: Once = Once::new();

Expand Down Expand Up @@ -55,5 +54,4 @@ mod tests {

b.iter(|| closure());
}

}
2 changes: 1 addition & 1 deletion examples/root_spv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ fn main() {
// Message can be used here.
match Receive::<String>::from(msg) {
Receive(Some(o)) => println!("Received {}", o),
_ => println!("other message type...")
_ => println!("other message type..."),
}

println!("root supervisor - spawn_at_root - 1");
Expand Down
63 changes: 63 additions & 0 deletions examples/spawn_from_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use bastion::prelude::*;
use log::LevelFilter;
use std::{fs, thread, time};

fn main() {
let config = BastionConfig {
log_level: LevelFilter::Debug,
in_test: false,
};

Bastion::platform_from_config(config);

let message = "Main Message".to_string();

// Name of the supervisor, and system of the new supervisor
// By default if you don't specify Supervisors use "One for One".
// We are going to take a look at "One For All" strategy.
Bastion::supervisor("background-worker", "new-system")
.strategy(SupervisionStrategy::OneForAll)
.children(
|p: BastionContext, _msg| {
// Children spawned from the body of supervisor.
println!("Started Child");

// Spawn child from the child context.
// All rules apply at the supervisor level also to here.
// Supervisor -> Child -> Child
// \
// ---> Child
for ctx_message in 1..=2 {
p.clone().spawn(
|sub_p: BastionContext, sub_msg: Box<dyn Message>| {
receive! { sub_msg,
i32 => |msg| {
if msg == 1 {
println!("First one");
panic!("Panic over the first one");
} else {
println!("Second one");
}
},
_ => println!("Message not known")
}

// Use blocking hook to commence restart of children
// that has finished their jobs.
sub_p.blocking_hook();
},
ctx_message,
1,
);
}

// Hook to rebind to the system.
p.hook();
},
message,
1_i32,
)
.launch();

Bastion::start()
}
36 changes: 20 additions & 16 deletions src/bastion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@ use env_logger::Builder;
use futures::future::poll_fn;
use lazy_static::lazy_static;
use log::LevelFilter;
use objekt::Clone;
use parking_lot::Mutex;
use std::mem;
use std::panic::AssertUnwindSafe;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use parking_lot::Mutex;
use tokio::prelude::future::FutureResult;
use tokio::prelude::*;
use tokio::runtime::Runtime;
use uuid::Uuid;
use std::mem;
use std::sync::atomic::{AtomicBool, Ordering};

lazy_static! {
// Platform which contains runtime system.
Expand Down Expand Up @@ -113,15 +114,15 @@ impl Bastion {

let restart_needed = match trampoline_spv.strategy {
SupervisionStrategy::OneForOne => {
let killed = trampoline_spv.killed.clone();
let killed = trampoline_spv.ctx.killed.clone();
debug!(
"One for One – Children restart triggered for :: {:?}",
killed
);
killed
}
SupervisionStrategy::OneForAll => {
trampoline_spv.descendants.iter().for_each(|children| {
trampoline_spv.ctx.descendants.iter().for_each(|children| {
let tx = children.tx.as_ref().unwrap().clone();
debug!(
"One for All – Restart triggered for all :: {:?}",
Expand All @@ -131,7 +132,7 @@ impl Bastion {
});

// Don't make avalanche effect, send messages and wait for all to come back.
let killed_processes = trampoline_spv.killed.clone();
let killed_processes = trampoline_spv.ctx.killed.clone();
debug!(
"One for All – Restart triggered for killed :: {:?}",
killed_processes
Expand All @@ -140,8 +141,8 @@ impl Bastion {
}
SupervisionStrategy::RestForOne => {
// Find the rest in the group of killed one.
trampoline_spv.killed.iter().for_each(|killed| {
let mut rest_to_kill = trampoline_spv.descendants.clone();
trampoline_spv.ctx.killed.iter().for_each(|killed| {
let mut rest_to_kill = trampoline_spv.ctx.descendants.clone();
rest_to_kill.retain(|i| !killed.id.contains(&i.id));

rest_to_kill.iter().for_each(|children| {
Expand All @@ -154,7 +155,7 @@ impl Bastion {
});
});

let killed_processes = trampoline_spv.killed.clone();
let killed_processes = trampoline_spv.ctx.killed.clone();
debug!(
"Rest for One – Restart triggered for killed :: {:?}",
killed_processes
Expand Down Expand Up @@ -184,7 +185,9 @@ impl Bastion {
let f = future::lazy(move || {
bt(
BastionContext {
spv: Some(spv.clone()),
parent: Some(Box::new(spv.clone())),
descendants: spv.ctx.descendants,
killed: spv.ctx.killed,
bcast_rx: Some(rx.clone()),
bcast_tx: Some(tx.clone()),
},
Expand Down Expand Up @@ -223,8 +226,6 @@ impl Bastion {
}

pub fn start() {
println!("ARKARKKARK");
println!("ARC {:?}", Arc::strong_count(&PLATFORM));
Bastion::runtime_shutdown_callback()
}

Expand Down Expand Up @@ -261,7 +262,7 @@ impl Bastion {
let mut rootn = registry.root_mut();
let root: &mut Supervisor = rootn.value();

root.descendants.push(child);
root.ctx.descendants.push(child);

root_spv = root.clone();
}
Expand All @@ -272,7 +273,9 @@ impl Bastion {
let f = future::lazy(move || {
bt(
BastionContext {
spv: Some(root_spv),
parent: Some(Box::new(root_spv.clone())),
descendants: root_spv.ctx.descendants,
killed: root_spv.ctx.killed,
bcast_rx: Some(rx.clone()),
bcast_tx: Some(tx.clone()),
},
Expand All @@ -290,7 +293,7 @@ impl Bastion {
let mut rootn = registry.root_mut();
let mut root = rootn.value().clone();

root.killed.push(if_killed);
root.ctx.killed.push(if_killed);

// Enable re-entrant code
drop(registry);
Expand Down Expand Up @@ -333,7 +336,8 @@ impl RuntimeManager for Bastion {
let r = running.clone();
let _ = ctrlc::set_handler(move || {
r.store(false, Ordering::SeqCst);
}).unwrap();
})
.unwrap();
entered
.block_on(poll_fn(|| {
while running.load(Ordering::SeqCst) {}
Expand Down
110 changes: 90 additions & 20 deletions src/context.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
use crate::child::{Message, BastionChildren, BastionClosure};
use crate::child::{BastionChildren, BastionClosure, Message};
use crate::messages::PoisonPill;
use crossbeam_channel::{Receiver, Sender, unbounded};
use crate::spawn::RuntimeSpawn;
use crate::supervisor::Supervisor;
use crossbeam_channel::{unbounded, Receiver, Sender};
use ratelimit::Limiter;
use std::any::Any;
use std::fmt;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::panic::AssertUnwindSafe;
use std::time::Duration;
use crate::spawn::RuntimeSpawn;
use crate::supervisor::Supervisor;
use tokio::prelude::future::FutureResult;
use tokio::prelude::*;
use uuid::Uuid;

#[derive(Clone)]
#[derive(Clone, Default)]
pub struct BastionContext {
pub spv: Option<Supervisor>,
pub parent: Option<Box<Supervisor>>,
pub descendants: Vec<BastionChildren>,
pub killed: Vec<BastionChildren>,
pub bcast_tx: Option<Sender<Box<dyn Message>>>,
pub bcast_rx: Option<Receiver<Box<dyn Message>>>,
}
Expand Down Expand Up @@ -53,25 +61,87 @@ impl BastionContext {
}

pub fn spawn<F, M>(mut self, thunk: F, msg: M, scale: i32) -> Self
where
F: BastionClosure,
M: Message,
where
F: BastionClosure,
M: Message,
{
let bt = Box::new(thunk);
let nt = Box::new(thunk);
let msg_box = Box::new(msg);
let (p, c) = unbounded();
let current_spv = self.parent.clone().unwrap();
let (p, c) = (current_spv.ctx.bcast_tx, current_spv.ctx.bcast_rx);

for child_id in 0..scale {
let children = BastionChildren {
id: Uuid::new_v4().to_string(),
tx: p.clone(),
rx: c.clone(),
redundancy: scale,
msg: objekt::clone_box(&*msg_box),
thunk: objekt::clone_box(&*nt),
};

let mut this_spv = *self.parent.clone().unwrap();
this_spv.ctx.descendants.push(children.clone());

let tx = children.tx.as_ref().unwrap().clone();
let rx = children.rx.clone().unwrap();

let nt = objekt::clone_box(&*children.thunk);
let msgr = objekt::clone_box(&*children.msg);
let msgr_panic_handler = objekt::clone_box(&*children.msg);
let mut if_killed = children.clone();

let children = BastionChildren {
id: Uuid::new_v4().to_string(),
tx: Some(p),
rx: Some(c),
redundancy: scale,
msg: objekt::clone_box(&*msg_box),
thunk: objekt::clone_box(&*bt),
};
let context_spv = this_spv.clone();
if_killed.id = format!(
"{}::{}::{}",
context_spv.clone().urn.name,
if_killed.id,
child_id
);

// self.descendants.push(children);
let f = future::lazy(move || {
nt(
BastionContext {
parent: Some(Box::new(context_spv.clone())),
descendants: context_spv.ctx.descendants,
killed: context_spv.ctx.killed,
bcast_rx: Some(rx.clone()),
bcast_tx: Some(tx.clone()),
},
msgr,
);
future::ok::<(), ()>(())
});

let k = AssertUnwindSafe(f)
.catch_unwind()
.then(|result| -> FutureResult<(), ()> {
this_spv.ctx.killed.push(if_killed);

// Already re-entrant code
if let Err(err) = result {
error!("Panic happened in supervised child - {:?}", err);
crate::bastion::Bastion::fault_recovery(this_spv, msgr_panic_handler);
}
future::ok(())
});

let ark = crate::bastion::PLATFORM.clone();
let mut runtime = ark.lock();
let shared_runtime = &mut runtime.runtime;
shared_runtime.spawn(k);
}

self
}
}

impl Debug for BastionContext {
fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
write!(
f,
"\nContext\n\tParent :: {:?}, Descendants :: {:?}, Killed :: {:?}, TX :: {:?}, RX :: {:?}\n\t",
self.parent, self.descendants, self.killed, self.bcast_tx, self.bcast_rx
)
}
}
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ extern crate env_logger;

// The Nether
mod runtime_manager;
mod runtime_system;
mod spawn;
mod tramp;
mod runtime_system;

// The Overworld
pub mod bastion;
pub mod config;

pub mod child;
pub mod context;
pub mod messages;
pub mod receive;
pub mod supervisor;
pub mod messages;

pub mod macros;

Expand All @@ -28,9 +28,9 @@ pub mod prelude {
// Primitives
pub use crate::child::*;
pub use crate::context::*;
pub use crate::messages::*;
pub use crate::receive::*;
pub use crate::supervisor::*;
pub use crate::messages::*;

pub use crate::macros::*;

Expand Down
2 changes: 1 addition & 1 deletion src/runtime_system.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::runtime_manager::FaultRecovery;
use crate::supervisor::Supervisor;
use ego_tree::Tree;
use parking_lot::Mutex;
use std::any::Any;
use std::sync::Arc;
use parking_lot::Mutex;
use tokio::runtime::{Builder, Runtime};

pub struct RuntimeSystem {
Expand Down
Loading