Skip to content

Commit

Permalink
Merge pull request #19 from vertexclique/context-spawn
Browse files Browse the repository at this point in the history
Spawn from the context
  • Loading branch information
vertexclique authored Aug 20, 2019
2 parents 319ec92 + 6d89a50 commit c1884c6
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 78 deletions.
4 changes: 1 addition & 3 deletions benches/bench_one_for_one.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ mod tests {
use std::borrow::{Borrow, BorrowMut};
use std::sync::Once;
use std::{fs, thread, time};
use test::Bencher;
use tokio::prelude::*;
use tokio::runtime::{Builder, Runtime};
use test::Bencher;


static INIT: Once = Once::new();

Expand Down Expand Up @@ -55,5 +54,4 @@ mod tests {

b.iter(|| closure());
}

}
2 changes: 1 addition & 1 deletion examples/root_spv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ fn main() {
// Message can be used here.
match Receive::<String>::from(msg) {
Receive(Some(o)) => println!("Received {}", o),
_ => println!("other message type...")
_ => println!("other message type..."),
}

println!("root supervisor - spawn_at_root - 1");
Expand Down
63 changes: 63 additions & 0 deletions examples/spawn_from_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use bastion::prelude::*;
use log::LevelFilter;
use std::{fs, thread, time};

fn main() {
let config = BastionConfig {
log_level: LevelFilter::Debug,
in_test: false,
};

Bastion::platform_from_config(config);

let message = "Main Message".to_string();

// Name of the supervisor, and system of the new supervisor
// By default if you don't specify Supervisors use "One for One".
// We are going to take a look at "One For All" strategy.
Bastion::supervisor("background-worker", "new-system")
.strategy(SupervisionStrategy::OneForAll)
.children(
|p: BastionContext, _msg| {
// Children spawned from the body of supervisor.
println!("Started Child");

// Spawn child from the child context.
// All rules apply at the supervisor level also to here.
// Supervisor -> Child -> Child
// \
// ---> Child
for ctx_message in 1..=2 {
p.clone().spawn(
|sub_p: BastionContext, sub_msg: Box<dyn Message>| {
receive! { sub_msg,
i32 => |msg| {
if msg == 1 {
println!("First one");
panic!("Panic over the first one");
} else {
println!("Second one");
}
},
_ => println!("Message not known")
}

// Use blocking hook to commence restart of children
// that has finished their jobs.
sub_p.blocking_hook();
},
ctx_message,
1,
);
}

// Hook to rebind to the system.
p.hook();
},
message,
1_i32,
)
.launch();

Bastion::start()
}
36 changes: 20 additions & 16 deletions src/bastion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@ use env_logger::Builder;
use futures::future::poll_fn;
use lazy_static::lazy_static;
use log::LevelFilter;
use objekt::Clone;
use parking_lot::Mutex;
use std::mem;
use std::panic::AssertUnwindSafe;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use parking_lot::Mutex;
use tokio::prelude::future::FutureResult;
use tokio::prelude::*;
use tokio::runtime::Runtime;
use uuid::Uuid;
use std::mem;
use std::sync::atomic::{AtomicBool, Ordering};

lazy_static! {
// Platform which contains runtime system.
Expand Down Expand Up @@ -113,15 +114,15 @@ impl Bastion {

let restart_needed = match trampoline_spv.strategy {
SupervisionStrategy::OneForOne => {
let killed = trampoline_spv.killed.clone();
let killed = trampoline_spv.ctx.killed.clone();
debug!(
"One for One – Children restart triggered for :: {:?}",
killed
);
killed
}
SupervisionStrategy::OneForAll => {
trampoline_spv.descendants.iter().for_each(|children| {
trampoline_spv.ctx.descendants.iter().for_each(|children| {
let tx = children.tx.as_ref().unwrap().clone();
debug!(
"One for All – Restart triggered for all :: {:?}",
Expand All @@ -131,7 +132,7 @@ impl Bastion {
});

// Don't make avalanche effect, send messages and wait for all to come back.
let killed_processes = trampoline_spv.killed.clone();
let killed_processes = trampoline_spv.ctx.killed.clone();
debug!(
"One for All – Restart triggered for killed :: {:?}",
killed_processes
Expand All @@ -140,8 +141,8 @@ impl Bastion {
}
SupervisionStrategy::RestForOne => {
// Find the rest in the group of killed one.
trampoline_spv.killed.iter().for_each(|killed| {
let mut rest_to_kill = trampoline_spv.descendants.clone();
trampoline_spv.ctx.killed.iter().for_each(|killed| {
let mut rest_to_kill = trampoline_spv.ctx.descendants.clone();
rest_to_kill.retain(|i| !killed.id.contains(&i.id));

rest_to_kill.iter().for_each(|children| {
Expand All @@ -154,7 +155,7 @@ impl Bastion {
});
});

let killed_processes = trampoline_spv.killed.clone();
let killed_processes = trampoline_spv.ctx.killed.clone();
debug!(
"Rest for One – Restart triggered for killed :: {:?}",
killed_processes
Expand Down Expand Up @@ -184,7 +185,9 @@ impl Bastion {
let f = future::lazy(move || {
bt(
BastionContext {
spv: Some(spv.clone()),
parent: Some(Box::new(spv.clone())),
descendants: spv.ctx.descendants,
killed: spv.ctx.killed,
bcast_rx: Some(rx.clone()),
bcast_tx: Some(tx.clone()),
},
Expand Down Expand Up @@ -223,8 +226,6 @@ impl Bastion {
}

pub fn start() {
println!("ARKARKKARK");
println!("ARC {:?}", Arc::strong_count(&PLATFORM));
Bastion::runtime_shutdown_callback()
}

Expand Down Expand Up @@ -261,7 +262,7 @@ impl Bastion {
let mut rootn = registry.root_mut();
let root: &mut Supervisor = rootn.value();

root.descendants.push(child);
root.ctx.descendants.push(child);

root_spv = root.clone();
}
Expand All @@ -272,7 +273,9 @@ impl Bastion {
let f = future::lazy(move || {
bt(
BastionContext {
spv: Some(root_spv),
parent: Some(Box::new(root_spv.clone())),
descendants: root_spv.ctx.descendants,
killed: root_spv.ctx.killed,
bcast_rx: Some(rx.clone()),
bcast_tx: Some(tx.clone()),
},
Expand All @@ -290,7 +293,7 @@ impl Bastion {
let mut rootn = registry.root_mut();
let mut root = rootn.value().clone();

root.killed.push(if_killed);
root.ctx.killed.push(if_killed);

// Enable re-entrant code
drop(registry);
Expand Down Expand Up @@ -333,7 +336,8 @@ impl RuntimeManager for Bastion {
let r = running.clone();
let _ = ctrlc::set_handler(move || {
r.store(false, Ordering::SeqCst);
}).unwrap();
})
.unwrap();
entered
.block_on(poll_fn(|| {
while running.load(Ordering::SeqCst) {}
Expand Down
110 changes: 90 additions & 20 deletions src/context.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
use crate::child::{Message, BastionChildren, BastionClosure};
use crate::child::{BastionChildren, BastionClosure, Message};
use crate::messages::PoisonPill;
use crossbeam_channel::{Receiver, Sender, unbounded};
use crate::spawn::RuntimeSpawn;
use crate::supervisor::Supervisor;
use crossbeam_channel::{unbounded, Receiver, Sender};
use ratelimit::Limiter;
use std::any::Any;
use std::fmt;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::panic::AssertUnwindSafe;
use std::time::Duration;
use crate::spawn::RuntimeSpawn;
use crate::supervisor::Supervisor;
use tokio::prelude::future::FutureResult;
use tokio::prelude::*;
use uuid::Uuid;

#[derive(Clone)]
#[derive(Clone, Default)]
pub struct BastionContext {
pub spv: Option<Supervisor>,
pub parent: Option<Box<Supervisor>>,
pub descendants: Vec<BastionChildren>,
pub killed: Vec<BastionChildren>,
pub bcast_tx: Option<Sender<Box<dyn Message>>>,
pub bcast_rx: Option<Receiver<Box<dyn Message>>>,
}
Expand Down Expand Up @@ -53,25 +61,87 @@ impl BastionContext {
}

pub fn spawn<F, M>(mut self, thunk: F, msg: M, scale: i32) -> Self
where
F: BastionClosure,
M: Message,
where
F: BastionClosure,
M: Message,
{
let bt = Box::new(thunk);
let nt = Box::new(thunk);
let msg_box = Box::new(msg);
let (p, c) = unbounded();
let current_spv = self.parent.clone().unwrap();
let (p, c) = (current_spv.ctx.bcast_tx, current_spv.ctx.bcast_rx);

for child_id in 0..scale {
let children = BastionChildren {
id: Uuid::new_v4().to_string(),
tx: p.clone(),
rx: c.clone(),
redundancy: scale,
msg: objekt::clone_box(&*msg_box),
thunk: objekt::clone_box(&*nt),
};

let mut this_spv = *self.parent.clone().unwrap();
this_spv.ctx.descendants.push(children.clone());

let tx = children.tx.as_ref().unwrap().clone();
let rx = children.rx.clone().unwrap();

let nt = objekt::clone_box(&*children.thunk);
let msgr = objekt::clone_box(&*children.msg);
let msgr_panic_handler = objekt::clone_box(&*children.msg);
let mut if_killed = children.clone();

let children = BastionChildren {
id: Uuid::new_v4().to_string(),
tx: Some(p),
rx: Some(c),
redundancy: scale,
msg: objekt::clone_box(&*msg_box),
thunk: objekt::clone_box(&*bt),
};
let context_spv = this_spv.clone();
if_killed.id = format!(
"{}::{}::{}",
context_spv.clone().urn.name,
if_killed.id,
child_id
);

// self.descendants.push(children);
let f = future::lazy(move || {
nt(
BastionContext {
parent: Some(Box::new(context_spv.clone())),
descendants: context_spv.ctx.descendants,
killed: context_spv.ctx.killed,
bcast_rx: Some(rx.clone()),
bcast_tx: Some(tx.clone()),
},
msgr,
);
future::ok::<(), ()>(())
});

let k = AssertUnwindSafe(f)
.catch_unwind()
.then(|result| -> FutureResult<(), ()> {
this_spv.ctx.killed.push(if_killed);

// Already re-entrant code
if let Err(err) = result {
error!("Panic happened in supervised child - {:?}", err);
crate::bastion::Bastion::fault_recovery(this_spv, msgr_panic_handler);
}
future::ok(())
});

let ark = crate::bastion::PLATFORM.clone();
let mut runtime = ark.lock();
let shared_runtime = &mut runtime.runtime;
shared_runtime.spawn(k);
}

self
}
}

impl Debug for BastionContext {
fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
write!(
f,
"\nContext\n\tParent :: {:?}, Descendants :: {:?}, Killed :: {:?}, TX :: {:?}, RX :: {:?}\n\t",
self.parent, self.descendants, self.killed, self.bcast_tx, self.bcast_rx
)
}
}
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ extern crate env_logger;

// The Nether
mod runtime_manager;
mod runtime_system;
mod spawn;
mod tramp;
mod runtime_system;

// The Overworld
pub mod bastion;
pub mod config;

pub mod child;
pub mod context;
pub mod messages;
pub mod receive;
pub mod supervisor;
pub mod messages;

pub mod macros;

Expand All @@ -28,9 +28,9 @@ pub mod prelude {
// Primitives
pub use crate::child::*;
pub use crate::context::*;
pub use crate::messages::*;
pub use crate::receive::*;
pub use crate::supervisor::*;
pub use crate::messages::*;

pub use crate::macros::*;

Expand Down
2 changes: 1 addition & 1 deletion src/runtime_system.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::runtime_manager::FaultRecovery;
use crate::supervisor::Supervisor;
use ego_tree::Tree;
use parking_lot::Mutex;
use std::any::Any;
use std::sync::Arc;
use parking_lot::Mutex;
use tokio::runtime::{Builder, Runtime};

pub struct RuntimeSystem {
Expand Down
Loading

0 comments on commit c1884c6

Please sign in to comment.