From 324a80b1623b18b9a874ec78759af28cbb8eb61f Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Mon, 19 Aug 2019 20:14:47 +0200 Subject: [PATCH 1/2] Refactor supervisors to carry out context to share data between children --- benches/bench_one_for_one.rs | 4 +- examples/root_spv.rs | 2 +- examples/spawn_from_context.rs | 63 +++++++++++++++++++ src/bastion.rs | 37 ++++++----- src/context.rs | 110 +++++++++++++++++++++++++++------ src/lib.rs | 6 +- src/runtime_system.rs | 2 +- src/supervisor.rs | 24 ++++--- tests/lib.rs | 50 +++++++-------- 9 files changed, 220 insertions(+), 78 deletions(-) create mode 100644 examples/spawn_from_context.rs diff --git a/benches/bench_one_for_one.rs b/benches/bench_one_for_one.rs index 8f415fa9..e9c67df5 100644 --- a/benches/bench_one_for_one.rs +++ b/benches/bench_one_for_one.rs @@ -14,10 +14,9 @@ mod tests { use std::borrow::{Borrow, BorrowMut}; use std::sync::Once; use std::{fs, thread, time}; + use test::Bencher; use tokio::prelude::*; use tokio::runtime::{Builder, Runtime}; - use test::Bencher; - static INIT: Once = Once::new(); @@ -55,5 +54,4 @@ mod tests { b.iter(|| closure()); } - } diff --git a/examples/root_spv.rs b/examples/root_spv.rs index b82cec11..99844114 100644 --- a/examples/root_spv.rs +++ b/examples/root_spv.rs @@ -10,7 +10,7 @@ fn main() { // Message can be used here. match Receive::::from(msg) { Receive(Some(o)) => println!("Received {}", o), - _ => println!("other message type...") + _ => println!("other message type..."), } println!("root supervisor - spawn_at_root - 1"); diff --git a/examples/spawn_from_context.rs b/examples/spawn_from_context.rs new file mode 100644 index 00000000..217cf062 --- /dev/null +++ b/examples/spawn_from_context.rs @@ -0,0 +1,63 @@ +use bastion::prelude::*; +use log::LevelFilter; +use std::{fs, thread, time}; + +fn main() { + let config = BastionConfig { + log_level: LevelFilter::Debug, + in_test: false, + }; + + Bastion::platform_from_config(config); + + let message = "Main Message".to_string(); + + // Name of the supervisor, and system of the new supervisor + // By default if you don't specify Supervisors use "One for One". + // We are going to take a look at "One For All" strategy. + Bastion::supervisor("background-worker", "new-system") + .strategy(SupervisionStrategy::OneForAll) + .children( + |p: BastionContext, _msg| { + // Children spawned from the body of supervisor. + println!("Started Child"); + + // Spawn child from the child context. + // All rules apply at the supervisor level also to here. + // Supervisor -> Child -> Child + // \ + // ---> Child + for ctx_message in 1..=2 { + p.clone().spawn( + |sub_p: BastionContext, sub_msg: Box| { + receive! { sub_msg, + i32 => |msg| { + if msg == 1 { + println!("First one"); + panic!("Panic over the first one"); + } else { + println!("Second one"); + } + }, + _ => println!("Message not known") + } + + // Use blocking hook to commence restart of children + // that has finished their jobs. + sub_p.blocking_hook(); + }, + ctx_message, + 1, + ); + } + + // Hook to rebind to the system. + p.hook(); + }, + message, + 1_i32, + ) + .launch(); + + Bastion::start() +} diff --git a/src/bastion.rs b/src/bastion.rs index 79210b29..25444b88 100644 --- a/src/bastion.rs +++ b/src/bastion.rs @@ -12,15 +12,16 @@ use env_logger::Builder; use futures::future::poll_fn; use lazy_static::lazy_static; use log::LevelFilter; +use objekt::Clone; +use parking_lot::Mutex; +use std::mem; use std::panic::AssertUnwindSafe; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use parking_lot::Mutex; use tokio::prelude::future::FutureResult; use tokio::prelude::*; use tokio::runtime::Runtime; use uuid::Uuid; -use std::mem; -use std::sync::atomic::{AtomicBool, Ordering}; lazy_static! { // Platform which contains runtime system. @@ -103,6 +104,7 @@ impl Bastion { pub(crate) fn fault_recovery(given: Supervisor, message_box: Box) { // Clone supervisor for trampoline bouncing let trampoline_spv = given.clone(); + println!("SPV RESTART: {:?}", trampoline_spv.clone().ctx.descendants); // Push supervisor for next trampoline let fark = FAULTED.clone(); @@ -113,7 +115,7 @@ impl Bastion { let restart_needed = match trampoline_spv.strategy { SupervisionStrategy::OneForOne => { - let killed = trampoline_spv.killed.clone(); + let killed = trampoline_spv.ctx.killed.clone(); debug!( "One for One – Children restart triggered for :: {:?}", killed @@ -121,7 +123,7 @@ impl Bastion { killed } SupervisionStrategy::OneForAll => { - trampoline_spv.descendants.iter().for_each(|children| { + trampoline_spv.ctx.descendants.iter().for_each(|children| { let tx = children.tx.as_ref().unwrap().clone(); debug!( "One for All – Restart triggered for all :: {:?}", @@ -131,7 +133,7 @@ impl Bastion { }); // Don't make avalanche effect, send messages and wait for all to come back. - let killed_processes = trampoline_spv.killed.clone(); + let killed_processes = trampoline_spv.ctx.killed.clone(); debug!( "One for All – Restart triggered for killed :: {:?}", killed_processes @@ -140,8 +142,8 @@ impl Bastion { } SupervisionStrategy::RestForOne => { // Find the rest in the group of killed one. - trampoline_spv.killed.iter().for_each(|killed| { - let mut rest_to_kill = trampoline_spv.descendants.clone(); + trampoline_spv.ctx.killed.iter().for_each(|killed| { + let mut rest_to_kill = trampoline_spv.ctx.descendants.clone(); rest_to_kill.retain(|i| !killed.id.contains(&i.id)); rest_to_kill.iter().for_each(|children| { @@ -154,7 +156,7 @@ impl Bastion { }); }); - let killed_processes = trampoline_spv.killed.clone(); + let killed_processes = trampoline_spv.ctx.killed.clone(); debug!( "Rest for One – Restart triggered for killed :: {:?}", killed_processes @@ -184,7 +186,9 @@ impl Bastion { let f = future::lazy(move || { bt( BastionContext { - spv: Some(spv.clone()), + parent: Some(Box::new(spv.clone())), + descendants: spv.ctx.descendants, + killed: spv.ctx.killed, bcast_rx: Some(rx.clone()), bcast_tx: Some(tx.clone()), }, @@ -223,8 +227,6 @@ impl Bastion { } pub fn start() { - println!("ARKARKKARK"); - println!("ARC {:?}", Arc::strong_count(&PLATFORM)); Bastion::runtime_shutdown_callback() } @@ -261,7 +263,7 @@ impl Bastion { let mut rootn = registry.root_mut(); let root: &mut Supervisor = rootn.value(); - root.descendants.push(child); + root.ctx.descendants.push(child); root_spv = root.clone(); } @@ -272,7 +274,9 @@ impl Bastion { let f = future::lazy(move || { bt( BastionContext { - spv: Some(root_spv), + parent: Some(Box::new(root_spv.clone())), + descendants: root_spv.ctx.descendants, + killed: root_spv.ctx.killed, bcast_rx: Some(rx.clone()), bcast_tx: Some(tx.clone()), }, @@ -290,7 +294,7 @@ impl Bastion { let mut rootn = registry.root_mut(); let mut root = rootn.value().clone(); - root.killed.push(if_killed); + root.ctx.killed.push(if_killed); // Enable re-entrant code drop(registry); @@ -333,7 +337,8 @@ impl RuntimeManager for Bastion { let r = running.clone(); let _ = ctrlc::set_handler(move || { r.store(false, Ordering::SeqCst); - }).unwrap(); + }) + .unwrap(); entered .block_on(poll_fn(|| { while running.load(Ordering::SeqCst) {} diff --git a/src/context.rs b/src/context.rs index fbd921ee..79d510b1 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,16 +1,24 @@ -use crate::child::{Message, BastionChildren, BastionClosure}; +use crate::child::{BastionChildren, BastionClosure, Message}; use crate::messages::PoisonPill; -use crossbeam_channel::{Receiver, Sender, unbounded}; +use crate::spawn::RuntimeSpawn; +use crate::supervisor::Supervisor; +use crossbeam_channel::{unbounded, Receiver, Sender}; use ratelimit::Limiter; use std::any::Any; +use std::fmt; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::panic::AssertUnwindSafe; use std::time::Duration; -use crate::spawn::RuntimeSpawn; -use crate::supervisor::Supervisor; +use tokio::prelude::future::FutureResult; +use tokio::prelude::*; use uuid::Uuid; -#[derive(Clone)] +#[derive(Clone, Default)] pub struct BastionContext { - pub spv: Option, + pub parent: Option>, + pub descendants: Vec, + pub killed: Vec, pub bcast_tx: Option>>, pub bcast_rx: Option>>, } @@ -53,25 +61,87 @@ impl BastionContext { } pub fn spawn(mut self, thunk: F, msg: M, scale: i32) -> Self - where - F: BastionClosure, - M: Message, + where + F: BastionClosure, + M: Message, { - let bt = Box::new(thunk); + let nt = Box::new(thunk); let msg_box = Box::new(msg); - let (p, c) = unbounded(); + let current_spv = self.parent.clone().unwrap(); + let (p, c) = (current_spv.ctx.bcast_tx, current_spv.ctx.bcast_rx); + + for child_id in 0..scale { + let children = BastionChildren { + id: Uuid::new_v4().to_string(), + tx: p.clone(), + rx: c.clone(), + redundancy: scale, + msg: objekt::clone_box(&*msg_box), + thunk: objekt::clone_box(&*nt), + }; + + let mut this_spv = *self.parent.clone().unwrap(); + this_spv.ctx.descendants.push(children.clone()); + + let tx = children.tx.as_ref().unwrap().clone(); + let rx = children.rx.clone().unwrap(); + + let nt = objekt::clone_box(&*children.thunk); + let msgr = objekt::clone_box(&*children.msg); + let msgr_panic_handler = objekt::clone_box(&*children.msg); + let mut if_killed = children.clone(); - let children = BastionChildren { - id: Uuid::new_v4().to_string(), - tx: Some(p), - rx: Some(c), - redundancy: scale, - msg: objekt::clone_box(&*msg_box), - thunk: objekt::clone_box(&*bt), - }; + let context_spv = this_spv.clone(); + if_killed.id = format!( + "{}::{}::{}", + context_spv.clone().urn.name, + if_killed.id, + child_id + ); -// self.descendants.push(children); + let f = future::lazy(move || { + nt( + BastionContext { + parent: Some(Box::new(context_spv.clone())), + descendants: context_spv.ctx.descendants, + killed: context_spv.ctx.killed, + bcast_rx: Some(rx.clone()), + bcast_tx: Some(tx.clone()), + }, + msgr, + ); + future::ok::<(), ()>(()) + }); + + let k = AssertUnwindSafe(f) + .catch_unwind() + .then(|result| -> FutureResult<(), ()> { + this_spv.ctx.killed.push(if_killed); + + // Already re-entrant code + if let Err(err) = result { + error!("Panic happened in supervised child - {:?}", err); + crate::bastion::Bastion::fault_recovery(this_spv, msgr_panic_handler); + } + future::ok(()) + }); + + let ark = crate::bastion::PLATFORM.clone(); + let mut runtime = ark.lock(); + let shared_runtime = &mut runtime.runtime; + shared_runtime.spawn(k); + } self } } + +impl Debug for BastionContext { + fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> { + write!( + f, + "\nContext\n\tParent :: {:?}, Descendants :: {:?}, Killed :: {:?}, TX :: {:?}, RX :: {:?}\n\t", + self.parent, self.descendants, self.killed, self.bcast_tx, self.bcast_rx + ) + } +} diff --git a/src/lib.rs b/src/lib.rs index 60dabc9e..e5b64267 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,9 +4,9 @@ extern crate env_logger; // The Nether mod runtime_manager; +mod runtime_system; mod spawn; mod tramp; -mod runtime_system; // The Overworld pub mod bastion; @@ -14,9 +14,9 @@ pub mod config; pub mod child; pub mod context; +pub mod messages; pub mod receive; pub mod supervisor; -pub mod messages; pub mod macros; @@ -28,9 +28,9 @@ pub mod prelude { // Primitives pub use crate::child::*; pub use crate::context::*; + pub use crate::messages::*; pub use crate::receive::*; pub use crate::supervisor::*; - pub use crate::messages::*; pub use crate::macros::*; diff --git a/src/runtime_system.rs b/src/runtime_system.rs index 465af83a..f371b772 100644 --- a/src/runtime_system.rs +++ b/src/runtime_system.rs @@ -1,9 +1,9 @@ use crate::runtime_manager::FaultRecovery; use crate::supervisor::Supervisor; use ego_tree::Tree; +use parking_lot::Mutex; use std::any::Any; use std::sync::Arc; -use parking_lot::Mutex; use tokio::runtime::{Builder, Runtime}; pub struct RuntimeSystem { diff --git a/src/supervisor.rs b/src/supervisor.rs index 828483c3..b57f4c1c 100644 --- a/src/supervisor.rs +++ b/src/supervisor.rs @@ -50,8 +50,7 @@ impl Default for SupervisionStrategy { #[derive(Default, Clone, Debug)] pub struct Supervisor { pub urn: SupervisorURN, - pub(crate) descendants: Vec, - pub(crate) killed: Vec, + pub(crate) ctx: BastionContext, pub(crate) strategy: SupervisionStrategy, } @@ -80,20 +79,23 @@ impl Supervisor { let children = BastionChildren { id: Uuid::new_v4().to_string(), - tx: Some(p), - rx: Some(c), + tx: Some(p.clone()), + rx: Some(c.clone()), redundancy: scale, msg: objekt::clone_box(&*msg_box), thunk: objekt::clone_box(&*bt), }; - self.descendants.push(children); + self.ctx.descendants.push(children); + self.ctx.bcast_rx = Some(c.clone()); + self.ctx.bcast_tx = Some(p.clone()); + self.ctx.parent = Some(Box::new(self.clone())); self } - pub fn launch(self) { - for descendant in &self.descendants { + pub fn launch(mut self) { + for descendant in &self.ctx.descendants { let descendant = descendant.clone(); for child_id in 0..descendant.redundancy { @@ -112,7 +114,9 @@ impl Supervisor { let f = future::lazy(move || { nt( BastionContext { - spv: Some(context_spv), + parent: Some(Box::new(context_spv.clone())), + descendants: context_spv.ctx.descendants, + killed: context_spv.ctx.killed, bcast_rx: Some(rx.clone()), bcast_tx: Some(tx.clone()), }, @@ -124,7 +128,7 @@ impl Supervisor { let k = AssertUnwindSafe(f) .catch_unwind() .then(|result| -> FutureResult<(), ()> { - this_spv.killed.push(if_killed); + this_spv.ctx.killed.push(if_killed); // Already re-entrant code if let Err(err) = result { @@ -140,5 +144,7 @@ impl Supervisor { shared_runtime.spawn(k); } } + + self.ctx.parent = Some(Box::new(self)); } } diff --git a/tests/lib.rs b/tests/lib.rs index 572cefe9..f7b5fd7d 100644 --- a/tests/lib.rs +++ b/tests/lib.rs @@ -182,29 +182,29 @@ mod tests { awaiting(500); } -// #[test] -// fn spawn_over_context() { -// init(); -// -// let panicked_message = "Panicked Children Message".to_string(); -// let stable_message = "Stable Children Message".to_string(); -// -// Bastion::supervisor("background-worker", "new-system") -// .strategy(SupervisionStrategy::OneForAll) -// .children( -// |p: BastionContext, msg| { -// println!("new supervisor - panic_process - 1"); -// -// let children_scale = 1; -// p.spawn(|bc, msg| { -// println!("Spawned from context"); -// }, msg, children_scale); -// }, -// panicked_message, -// 1_i32, -// ) -// .launch(); -// -// awaiting(500); -// } + // #[test] + // fn spawn_over_context() { + // init(); + // + // let panicked_message = "Panicked Children Message".to_string(); + // let stable_message = "Stable Children Message".to_string(); + // + // Bastion::supervisor("background-worker", "new-system") + // .strategy(SupervisionStrategy::OneForAll) + // .children( + // |p: BastionContext, msg| { + // println!("new supervisor - panic_process - 1"); + // + // let children_scale = 1; + // p.spawn(|bc, msg| { + // println!("Spawned from context"); + // }, msg, children_scale); + // }, + // panicked_message, + // 1_i32, + // ) + // .launch(); + // + // awaiting(500); + // } } From 6d89a5025d9f1c74acb2b29c82ccaa15555df62c Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Mon, 19 Aug 2019 20:39:34 +0200 Subject: [PATCH 2/2] Remove leftover debug point --- src/bastion.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/bastion.rs b/src/bastion.rs index 25444b88..0628ba58 100644 --- a/src/bastion.rs +++ b/src/bastion.rs @@ -104,7 +104,6 @@ impl Bastion { pub(crate) fn fault_recovery(given: Supervisor, message_box: Box) { // Clone supervisor for trampoline bouncing let trampoline_spv = given.clone(); - println!("SPV RESTART: {:?}", trampoline_spv.clone().ctx.descendants); // Push supervisor for next trampoline let fark = FAULTED.clone();