diff --git a/src/bastion.rs b/src/bastion.rs new file mode 100644 index 00000000..7dd28590 --- /dev/null +++ b/src/bastion.rs @@ -0,0 +1,18 @@ +use crate::supervisor::Supervisor; +use crate::system::System; +use futures::channel::mpsc::UnboundedSender; +use lazy_static::lazy_static; + +lazy_static! { + pub(super) static ref SYSTEM_SENDER: UnboundedSender = System::start(); +} + +pub struct Bastion { + // TODO: ... +} + +impl Bastion { + pub fn supervisor() -> Supervisor { + Supervisor::new() + } +} \ No newline at end of file diff --git a/src/broadcast.rs b/src/broadcast.rs index 918f678f..285c431c 100644 --- a/src/broadcast.rs +++ b/src/broadcast.rs @@ -1,5 +1,5 @@ use crate::children::Message; -use futures::channel::mpsc::{self, TrySendError, UnboundedReceiver, UnboundedSender}; +use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender}; use futures::prelude::*; use fxhash::FxHashMap; use std::pin::Pin; diff --git a/src/children.rs b/src/children.rs index 3759191c..3890b163 100644 --- a/src/children.rs +++ b/src/children.rs @@ -1,6 +1,5 @@ use crate::broadcast::{BastionMessage, Broadcast}; use crate::context::BastionContext; -use futures::channel::mpsc::{Receiver, Sender}; use futures::future::CatchUnwind; use futures::pending; use futures::poll; @@ -11,7 +10,7 @@ use std::fmt::Debug; use std::future::Future; use std::panic::UnwindSafe; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::Poll; use uuid::Uuid; pub trait Shell: objekt::Clone + Send + Sync + Any + 'static {} @@ -87,7 +86,7 @@ impl Children { return self; } // FIXME - BastionMessage::Message(msg) => unimplemented!(), + BastionMessage::Message(_) => unimplemented!(), } } // FIXME: "return self" or "send_parent(Faulted)"? @@ -98,7 +97,7 @@ impl Children { } pub(super) fn launch(mut self) -> JoinHandle { - for child in 0..self.redundancy { + for _ in 0..self.redundancy { let id = Uuid::new_v4(); let bcast = self.bcast.new_child(id); @@ -145,7 +144,7 @@ impl Child { unimplemented!() } // FIXME - BastionMessage::Message(msg) => unimplemented!(), + BastionMessage::Message(_) => unimplemented!(), } } // FIXME: "return" or "send_parent(Faulted)"? diff --git a/src/context.rs b/src/context.rs index 702fbf15..957eb987 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,3 +1 @@ -use crate::broadcast::Broadcast; - pub struct BastionContext {} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 1b186008..6e3b0cc8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,6 @@ +mod bastion; mod broadcast; mod children; mod context; mod supervisor; +mod system; diff --git a/src/supervisor.rs b/src/supervisor.rs index 720cce8d..a6c40eeb 100644 --- a/src/supervisor.rs +++ b/src/supervisor.rs @@ -1,10 +1,9 @@ +use crate::bastion::SYSTEM_SENDER; use crate::broadcast::{BastionMessage, Broadcast}; use crate::children::{Children, Closure, Message}; -use futures::pending; -use futures::poll; +use futures::{pending, poll}; use futures::prelude::*; -use fxhash::FxHashMap; -use fxhash::FxHashSet; +use fxhash::{FxHashMap, FxHashSet}; use runtime::task::JoinHandle; use std::collections::BTreeMap; use std::ops::RangeFrom; @@ -76,7 +75,7 @@ impl Supervisor { self } - fn launch_children(&mut self) { + pub(super) fn launch_children(&mut self) { let start = if let Some(order) = self.order.keys().next_back() { *order + 1 } else { @@ -103,7 +102,7 @@ impl Supervisor { } } - let mut removed = vec![]; + let mut removed = Vec::new(); let mut children = Vec::new(); for (order, id) in self.order.range(range) { // FIXME: Err if None? @@ -141,7 +140,7 @@ impl Supervisor { self.children.push(children); - self.kill_children(order..); + self.kill_children(order..).await; self.launch_children(); } } @@ -149,7 +148,7 @@ impl Supervisor { Ok(()) } - async fn run(mut self) -> Self { + pub(super) async fn run(mut self) -> Self { loop { match poll!(&mut self.bcast.next()) { Poll::Ready(Some(msg)) => { @@ -168,6 +167,7 @@ impl Supervisor { self.launched.remove(&id); self.bcast.remove_child(&id); + // FIXME: remove from order? self.dead.insert(id); } BastionMessage::Faulted { id } => { @@ -192,10 +192,9 @@ impl Supervisor { } } - pub fn launch(mut self) -> JoinHandle { - //self.launch_children(); - - runtime::spawn(self.run()) + pub fn launch(self) { + // FIXME: handle errors + SYSTEM_SENDER.unbounded_send(self); } } diff --git a/src/system.rs b/src/system.rs new file mode 100644 index 00000000..f84c544f --- /dev/null +++ b/src/system.rs @@ -0,0 +1,85 @@ +use crate::broadcast::{BastionMessage, Broadcast}; +use crate::supervisor::Supervisor; +use futures::{pending, poll}; +use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use futures::prelude::*; +use fxhash::{FxHashMap, FxHashSet}; +use runtime::task::JoinHandle; +use std::task::Poll; +use uuid::Uuid; + +pub(super) struct System { + bcast: Broadcast, + recver: UnboundedReceiver, + supervisors: FxHashMap>, + dead: FxHashSet, +} + +impl System { + pub(super) fn start() -> UnboundedSender { + let id = Uuid::new_v4(); + let bcast = Broadcast::new(id); + let (sender, recver) = mpsc::unbounded(); + + let supervisors = FxHashMap::default(); + let dead = FxHashSet::default(); + + let system = System { + bcast, + recver, + supervisors, + dead, + }; + + runtime::spawn(system.run()); + + sender + } + + fn launch_supervisor(&mut self, mut supervisor: Supervisor) { + supervisor.launch_children(); + + runtime::spawn(supervisor.run()); + } + + async fn run(mut self) { + loop { + let mut ready = false; + + match poll!(&mut self.bcast.next()) { + Poll::Ready(Some(msg)) => { + ready = true; + + match msg { + // FIXME + BastionMessage::PoisonPill => unimplemented!(), + // FIXME + BastionMessage::Dead { .. } => unimplemented!(), + // FIXME + BastionMessage::Faulted { .. } => unimplemented!(), + // FIXME + BastionMessage::Message(_) => unimplemented!(), + } + } + // FIXME + Poll::Ready(None) => unimplemented!(), + Poll::Pending => (), + } + + match poll!(&mut self.recver.next()) { + Poll::Ready(Some(supervisor)) => { + ready = true; + + self.launch_supervisor(supervisor); + } + // FIXME + Poll::Ready(None) => unimplemented!(), + Poll::Pending => (), + } + + if !ready { + pending!(); + } + } + } +} \ No newline at end of file