Skip to content

Commit

Permalink
Refactor supervisors to carry out context to share data between children
Browse files Browse the repository at this point in the history
  • Loading branch information
vertexclique committed Aug 19, 2019
1 parent 06d74ef commit 26ac7e6
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 34 deletions.
60 changes: 60 additions & 0 deletions examples/spawn_from_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use bastion::prelude::*;
use std::{fs, thread, time};
use log::LevelFilter;

fn main() {
let config = BastionConfig {
log_level: LevelFilter::Debug,
in_test: false
};

Bastion::platform_from_config(config);


let message = "Main Message".to_string();

// Name of the supervisor, and system of the new supervisor
// By default if you don't specify Supervisors use "One for One".
// We are going to take a look at "One For All" strategy.
Bastion::supervisor("background-worker", "new-system")
.strategy(SupervisionStrategy::OneForAll)
.children(
|p: BastionContext, _msg| {
// Children spawned from the body of supervisor.
println!("Started Child");

// Spawn child from the child context.
// All rules apply at the supervisor level also to here.
// Supervisor -> Child -> Child
// \
// ---> Child
for ctx_message in 1..=2 {
p.clone().spawn(|sub_p: BastionContext, sub_msg: Box<dyn Message>| {
receive! { sub_msg,
i32 => |msg| {
if msg == 1 {
println!("First one");
panic!("Panic over the first one");
} else {
println!("Second one");
}
},
_ => println!("Message not known")
}

// Use blocking hook to commence restart of children
// that has finished their jobs.
sub_p.blocking_hook();
}, ctx_message, 1);
}

// Hook to rebind to the system.
p.hook();
},
message,
1_i32,
)
.launch();

Bastion::start()
}
28 changes: 16 additions & 12 deletions src/bastion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tokio::prelude::*;
use tokio::runtime::Runtime;
use uuid::Uuid;
use std::mem;
use objekt::Clone;

lazy_static! {
// Platform which contains runtime system.
Expand Down Expand Up @@ -103,6 +104,7 @@ impl Bastion {
pub(crate) fn fault_recovery(given: Supervisor, message_box: Box<dyn Message>) {
// Clone supervisor for trampoline bouncing
let trampoline_spv = given.clone();
println!("SPV RESTART: {:?}", trampoline_spv.clone().ctx.descendants);

// Push supervisor for next trampoline
let fark = FAULTED.clone();
Expand All @@ -113,15 +115,15 @@ impl Bastion {

let restart_needed = match trampoline_spv.strategy {
SupervisionStrategy::OneForOne => {
let killed = trampoline_spv.killed.clone();
let killed = trampoline_spv.ctx.killed.clone();
debug!(
"One for One – Children restart triggered for :: {:?}",
killed
);
killed
}
SupervisionStrategy::OneForAll => {
trampoline_spv.descendants.iter().for_each(|children| {
trampoline_spv.ctx.descendants.iter().for_each(|children| {
let tx = children.tx.as_ref().unwrap().clone();
debug!(
"One for All – Restart triggered for all :: {:?}",
Expand All @@ -131,7 +133,7 @@ impl Bastion {
});

// Don't make avalanche effect, send messages and wait for all to come back.
let killed_processes = trampoline_spv.killed.clone();
let killed_processes = trampoline_spv.ctx.killed.clone();
debug!(
"One for All – Restart triggered for killed :: {:?}",
killed_processes
Expand All @@ -140,8 +142,8 @@ impl Bastion {
}
SupervisionStrategy::RestForOne => {
// Find the rest in the group of killed one.
trampoline_spv.killed.iter().for_each(|killed| {
let mut rest_to_kill = trampoline_spv.descendants.clone();
trampoline_spv.ctx.killed.iter().for_each(|killed| {
let mut rest_to_kill = trampoline_spv.ctx.descendants.clone();
rest_to_kill.retain(|i| !killed.id.contains(&i.id));

rest_to_kill.iter().for_each(|children| {
Expand All @@ -154,7 +156,7 @@ impl Bastion {
});
});

let killed_processes = trampoline_spv.killed.clone();
let killed_processes = trampoline_spv.ctx.killed.clone();
debug!(
"Rest for One – Restart triggered for killed :: {:?}",
killed_processes
Expand Down Expand Up @@ -184,7 +186,9 @@ impl Bastion {
let f = future::lazy(move || {
bt(
BastionContext {
spv: Some(spv.clone()),
parent: Some(Box::new(spv.clone())),
descendants: spv.ctx.descendants,
killed: spv.ctx.killed,
bcast_rx: Some(rx.clone()),
bcast_tx: Some(tx.clone()),
},
Expand Down Expand Up @@ -223,8 +227,6 @@ impl Bastion {
}

pub fn start() {
println!("ARKARKKARK");
println!("ARC {:?}", Arc::strong_count(&PLATFORM));
Bastion::runtime_shutdown_callback()
}

Expand Down Expand Up @@ -261,7 +263,7 @@ impl Bastion {
let mut rootn = registry.root_mut();
let root: &mut Supervisor = rootn.value();

root.descendants.push(child);
root.ctx.descendants.push(child);

root_spv = root.clone();
}
Expand All @@ -272,7 +274,9 @@ impl Bastion {
let f = future::lazy(move || {
bt(
BastionContext {
spv: Some(root_spv),
parent: Some(Box::new(root_spv.clone())),
descendants: root_spv.ctx.descendants,
killed: root_spv.ctx.killed,
bcast_rx: Some(rx.clone()),
bcast_tx: Some(tx.clone()),
},
Expand All @@ -290,7 +294,7 @@ impl Bastion {
let mut rootn = registry.root_mut();
let mut root = rootn.value().clone();

root.killed.push(if_killed);
root.ctx.killed.push(if_killed);

// Enable re-entrant code
drop(registry);
Expand Down
92 changes: 79 additions & 13 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,18 @@ use std::time::Duration;
use crate::spawn::RuntimeSpawn;
use crate::supervisor::Supervisor;
use uuid::Uuid;
use std::fmt;
use std::fmt::Formatter;
use std::fmt::Debug;
use tokio::prelude::future::FutureResult;
use tokio::prelude::*;
use std::panic::AssertUnwindSafe;

#[derive(Clone)]
#[derive(Clone, Default)]
pub struct BastionContext {
pub spv: Option<Supervisor>,
pub parent: Option<Box<Supervisor>>,
pub descendants: Vec<BastionChildren>,
pub killed: Vec<BastionChildren>,
pub bcast_tx: Option<Sender<Box<dyn Message>>>,
pub bcast_rx: Option<Receiver<Box<dyn Message>>>,
}
Expand Down Expand Up @@ -57,21 +65,79 @@ impl BastionContext {
F: BastionClosure,
M: Message,
{
let bt = Box::new(thunk);
let nt = Box::new(thunk);
let msg_box = Box::new(msg);
let (p, c) = unbounded();
let current_spv = self.parent.clone().unwrap();
let (p, c) =
(current_spv.ctx.bcast_tx, current_spv.ctx.bcast_rx);

let children = BastionChildren {
id: Uuid::new_v4().to_string(),
tx: Some(p),
rx: Some(c),
redundancy: scale,
msg: objekt::clone_box(&*msg_box),
thunk: objekt::clone_box(&*bt),
};
for child_id in 0..scale {
let children = BastionChildren {
id: Uuid::new_v4().to_string(),
tx: p.clone(),
rx: c.clone(),
redundancy: scale,
msg: objekt::clone_box(&*msg_box),
thunk: objekt::clone_box(&*nt),
};

// self.descendants.push(children);
let mut this_spv = *self.parent.clone().unwrap();
this_spv.ctx.descendants.push(children.clone());

let tx = children.tx.as_ref().unwrap().clone();
let rx = children.rx.clone().unwrap();

let nt = objekt::clone_box(&*children.thunk);
let msgr = objekt::clone_box(&*children.msg);
let msgr_panic_handler = objekt::clone_box(&*children.msg);
let mut if_killed = children.clone();

let context_spv = this_spv.clone();
if_killed.id = format!("{}::{}::{}", context_spv.clone().urn.name, if_killed.id, child_id);

let f = future::lazy(move || {
nt(
BastionContext {
parent: Some(Box::new(context_spv.clone())),
descendants: context_spv.ctx.descendants,
killed: context_spv.ctx.killed,
bcast_rx: Some(rx.clone()),
bcast_tx: Some(tx.clone()),
},
msgr,
);
future::ok::<(), ()>(())
});

let k = AssertUnwindSafe(f)
.catch_unwind()
.then(|result| -> FutureResult<(), ()> {
this_spv.ctx.killed.push(if_killed);

// Already re-entrant code
if let Err(err) = result {
error!("Panic happened in supervised child - {:?}", err);
crate::bastion::Bastion::fault_recovery(this_spv, msgr_panic_handler);
}
future::ok(())
});

let ark = crate::bastion::PLATFORM.clone();
let mut runtime = ark.lock();
let shared_runtime = &mut runtime.runtime;
shared_runtime.spawn(k);
}

self
}
}

impl Debug for BastionContext {
fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
write!(
f,
"\nContext\n\tParent :: {:?}, Descendants :: {:?}, Killed :: {:?}, TX :: {:?}, RX :: {:?}\n\t",
self.parent, self.descendants, self.killed, self.bcast_tx, self.bcast_rx
)
}
}
24 changes: 15 additions & 9 deletions src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ impl Default for SupervisionStrategy {
#[derive(Default, Clone, Debug)]
pub struct Supervisor {
pub urn: SupervisorURN,
pub(crate) descendants: Vec<BastionChildren>,
pub(crate) killed: Vec<BastionChildren>,
pub(crate) ctx: BastionContext,
pub(crate) strategy: SupervisionStrategy,
}

Expand Down Expand Up @@ -80,20 +79,23 @@ impl Supervisor {

let children = BastionChildren {
id: Uuid::new_v4().to_string(),
tx: Some(p),
rx: Some(c),
tx: Some(p.clone()),
rx: Some(c.clone()),
redundancy: scale,
msg: objekt::clone_box(&*msg_box),
thunk: objekt::clone_box(&*bt),
};

self.descendants.push(children);
self.ctx.descendants.push(children);
self.ctx.bcast_rx = Some(c.clone());
self.ctx.bcast_tx = Some(p.clone());
self.ctx.parent = Some(Box::new(self.clone()));

self
}

pub fn launch(self) {
for descendant in &self.descendants {
pub fn launch(mut self) {
for descendant in &self.ctx.descendants {
let descendant = descendant.clone();

for child_id in 0..descendant.redundancy {
Expand All @@ -112,7 +114,9 @@ impl Supervisor {
let f = future::lazy(move || {
nt(
BastionContext {
spv: Some(context_spv),
parent: Some(Box::new(context_spv.clone())),
descendants: context_spv.ctx.descendants,
killed: context_spv.ctx.killed,
bcast_rx: Some(rx.clone()),
bcast_tx: Some(tx.clone()),
},
Expand All @@ -124,7 +128,7 @@ impl Supervisor {
let k = AssertUnwindSafe(f)
.catch_unwind()
.then(|result| -> FutureResult<(), ()> {
this_spv.killed.push(if_killed);
this_spv.ctx.killed.push(if_killed);

// Already re-entrant code
if let Err(err) = result {
Expand All @@ -140,5 +144,7 @@ impl Supervisor {
shared_runtime.spawn(k);
}
}

self.ctx.parent = Some(Box::new(self));
}
}

0 comments on commit 26ac7e6

Please sign in to comment.