Skip to content

Commit

Permalink
Added a skeleton for the Bastion and System structs, and cleaned up a…
Browse files Browse the repository at this point in the history
… bit
  • Loading branch information
r3v2d0g committed Oct 9, 2019
1 parent 825ab3f commit 291bde6
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 20 deletions.
18 changes: 18 additions & 0 deletions src/bastion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use crate::supervisor::Supervisor;
use crate::system::System;
use futures::channel::mpsc::UnboundedSender;
use lazy_static::lazy_static;

lazy_static! {
pub(super) static ref SYSTEM_SENDER: UnboundedSender<Supervisor> = System::start();
}

pub struct Bastion {
// TODO: ...
}

impl Bastion {
pub fn supervisor() -> Supervisor {
Supervisor::new()
}
}
2 changes: 1 addition & 1 deletion src/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::children::Message;
use futures::channel::mpsc::{self, TrySendError, UnboundedReceiver, UnboundedSender};
use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
use futures::prelude::*;
use fxhash::FxHashMap;
use std::pin::Pin;
Expand Down
9 changes: 4 additions & 5 deletions src/children.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::broadcast::{BastionMessage, Broadcast};
use crate::context::BastionContext;
use futures::channel::mpsc::{Receiver, Sender};
use futures::future::CatchUnwind;
use futures::pending;
use futures::poll;
Expand All @@ -11,7 +10,7 @@ use std::fmt::Debug;
use std::future::Future;
use std::panic::UnwindSafe;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::Poll;
use uuid::Uuid;

pub trait Shell: objekt::Clone + Send + Sync + Any + 'static {}
Expand Down Expand Up @@ -87,7 +86,7 @@ impl Children {
return self;
}
// FIXME
BastionMessage::Message(msg) => unimplemented!(),
BastionMessage::Message(_) => unimplemented!(),
}
}
// FIXME: "return self" or "send_parent(Faulted)"?
Expand All @@ -98,7 +97,7 @@ impl Children {
}

pub(super) fn launch(mut self) -> JoinHandle<Self> {
for child in 0..self.redundancy {
for _ in 0..self.redundancy {
let id = Uuid::new_v4();
let bcast = self.bcast.new_child(id);

Expand Down Expand Up @@ -145,7 +144,7 @@ impl Child {
unimplemented!()
}
// FIXME
BastionMessage::Message(msg) => unimplemented!(),
BastionMessage::Message(_) => unimplemented!(),
}
}
// FIXME: "return" or "send_parent(Faulted)"?
Expand Down
2 changes: 0 additions & 2 deletions src/context.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
use crate::broadcast::Broadcast;

pub struct BastionContext {}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
mod bastion;
mod broadcast;
mod children;
mod context;
mod supervisor;
mod system;
23 changes: 11 additions & 12 deletions src/supervisor.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::bastion::SYSTEM_SENDER;
use crate::broadcast::{BastionMessage, Broadcast};
use crate::children::{Children, Closure, Message};
use futures::pending;
use futures::poll;
use futures::{pending, poll};
use futures::prelude::*;
use fxhash::FxHashMap;
use fxhash::FxHashSet;
use fxhash::{FxHashMap, FxHashSet};
use runtime::task::JoinHandle;
use std::collections::BTreeMap;
use std::ops::RangeFrom;
Expand Down Expand Up @@ -76,7 +75,7 @@ impl Supervisor {
self
}

fn launch_children(&mut self) {
pub(super) fn launch_children(&mut self) {
let start = if let Some(order) = self.order.keys().next_back() {
*order + 1
} else {
Expand All @@ -103,7 +102,7 @@ impl Supervisor {
}
}

let mut removed = vec![];
let mut removed = Vec::new();
let mut children = Vec::new();
for (order, id) in self.order.range(range) {
// FIXME: Err if None?
Expand Down Expand Up @@ -141,15 +140,15 @@ impl Supervisor {

self.children.push(children);

self.kill_children(order..);
self.kill_children(order..).await;
self.launch_children();
}
}

Ok(())
}

async fn run(mut self) -> Self {
pub(super) async fn run(mut self) -> Self {
loop {
match poll!(&mut self.bcast.next()) {
Poll::Ready(Some(msg)) => {
Expand All @@ -168,6 +167,7 @@ impl Supervisor {
self.launched.remove(&id);
self.bcast.remove_child(&id);

// FIXME: remove from order?
self.dead.insert(id);
}
BastionMessage::Faulted { id } => {
Expand All @@ -192,10 +192,9 @@ impl Supervisor {
}
}

pub fn launch(mut self) -> JoinHandle<Self> {
//self.launch_children();

runtime::spawn(self.run())
pub fn launch(self) {
// FIXME: handle errors
SYSTEM_SENDER.unbounded_send(self);
}
}

Expand Down
85 changes: 85 additions & 0 deletions src/system.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use crate::broadcast::{BastionMessage, Broadcast};
use crate::supervisor::Supervisor;
use futures::{pending, poll};
use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
use futures::prelude::*;
use fxhash::{FxHashMap, FxHashSet};
use runtime::task::JoinHandle;
use std::task::Poll;
use uuid::Uuid;

pub(super) struct System {
bcast: Broadcast,
recver: UnboundedReceiver<Supervisor>,
supervisors: FxHashMap<Uuid, JoinHandle<Supervisor>>,
dead: FxHashSet<Uuid>,
}

impl System {
pub(super) fn start() -> UnboundedSender<Supervisor> {
let id = Uuid::new_v4();
let bcast = Broadcast::new(id);
let (sender, recver) = mpsc::unbounded();

let supervisors = FxHashMap::default();
let dead = FxHashSet::default();

let system = System {
bcast,
recver,
supervisors,
dead,
};

runtime::spawn(system.run());

sender
}

fn launch_supervisor(&mut self, mut supervisor: Supervisor) {
supervisor.launch_children();

runtime::spawn(supervisor.run());
}

async fn run(mut self) {
loop {
let mut ready = false;

match poll!(&mut self.bcast.next()) {
Poll::Ready(Some(msg)) => {
ready = true;

match msg {
// FIXME
BastionMessage::PoisonPill => unimplemented!(),
// FIXME
BastionMessage::Dead { .. } => unimplemented!(),
// FIXME
BastionMessage::Faulted { .. } => unimplemented!(),
// FIXME
BastionMessage::Message(_) => unimplemented!(),
}
}
// FIXME
Poll::Ready(None) => unimplemented!(),
Poll::Pending => (),
}

match poll!(&mut self.recver.next()) {
Poll::Ready(Some(supervisor)) => {
ready = true;

self.launch_supervisor(supervisor);
}
// FIXME
Poll::Ready(None) => unimplemented!(),
Poll::Pending => (),
}

if !ready {
pending!();
}
}
}
}

0 comments on commit 291bde6

Please sign in to comment.