From 8b6172549e28e4364cd83422af92aee16a8fdafc Mon Sep 17 00:00:00 2001 From: Andrey Kuznetsov Date: Tue, 17 Dec 2019 02:32:33 +0300 Subject: [PATCH] new messaging API which allows identifying message senders and send messages back --- bastion/examples/callbacks.rs | 2 +- bastion/examples/getting_started.rs | 6 +- bastion/examples/middleware.rs | 8 +- bastion/examples/parallel_computation.rs | 6 +- bastion/examples/send_recv.rs | 5 +- bastion/src/bastion.rs | 39 +- bastion/src/broadcast.rs | 163 +++++-- bastion/src/children.rs | 219 ++++++--- bastion/src/context.rs | 188 +++++++- bastion/src/envelope.rs | 223 +++++++++ bastion/src/lib.rs | 7 +- bastion/src/message.rs | 60 ++- bastion/src/path.rs | 584 +++++++++++++++++++++++ bastion/src/supervisor.rs | 169 +++++-- bastion/src/system.rs | 129 ++++- bastion/tests/message_signatures.rs | 79 +++ bastion/tests/prop_children_message.rs | 5 +- 17 files changed, 1655 insertions(+), 237 deletions(-) create mode 100644 bastion/src/envelope.rs create mode 100644 bastion/src/path.rs create mode 100644 bastion/tests/message_signatures.rs diff --git a/bastion/examples/callbacks.rs b/bastion/examples/callbacks.rs index 4012864c..08eb4078 100644 --- a/bastion/examples/callbacks.rs +++ b/bastion/examples/callbacks.rs @@ -147,7 +147,7 @@ fn sp_sp_ch(children: Children, children_ref: ChildrenRef) -> Children { // This will send a message to "sp.ch", making it fault and making // "sp" restart "sp.ch" and "sp.sp" (because its supervision strategy // is "one-for-one")... - children_ref.elems()[0].tell(()).ok(); + children_ref.elems()[0].tell_anonymously(()).ok(); }); children diff --git a/bastion/examples/getting_started.rs b/bastion/examples/getting_started.rs index 4205d437..7a5b1fa5 100644 --- a/bastion/examples/getting_started.rs +++ b/bastion/examples/getting_started.rs @@ -109,16 +109,16 @@ fn main() { // ...to then "tell" it messages... child - .tell("A message containing data.") + .tell_anonymously("A message containing data.") .expect("Couldn't send the message."); // ...or "ask" it messages... let answer: Answer = child - .ask("A message containing data.") + .ask_anonymously("A message containing data.") .expect("Couldn't send the message."); let _ = async { // ...until the child eventually answers back... - let _answer: Result = answer.await; + let _answer: Result = answer.await; }; // ...and then even stop or kill it... diff --git a/bastion/examples/middleware.rs b/bastion/examples/middleware.rs index 5886a44d..c2dc2132 100644 --- a/bastion/examples/middleware.rs +++ b/bastion/examples/middleware.rs @@ -43,7 +43,7 @@ fn main() { let response = escape(&compressed); // println!("Response: {}", response); stream.write(response.as_bytes()).unwrap(); - answer!(stream); + answer!(ctx, stream); }; _: _ => (); } @@ -59,7 +59,7 @@ fn main() { // // Server entrypoint Bastion::children(|children: Children| { - children.with_exec(move |_ctx: BastionContext| { + children.with_exec(move |ctx: BastionContext| { let workers = workers.clone(); async move { println!("Server is starting!"); @@ -73,8 +73,8 @@ fn main() { round_robin %= workers.elems().len(); // Distribute tcp streams - let _ = workers.elems()[round_robin] - .ask(stream.unwrap()) + let _ = ctx + .ask(&workers.elems()[round_robin].addr(), stream.unwrap()) .unwrap() .await; } diff --git a/bastion/examples/parallel_computation.rs b/bastion/examples/parallel_computation.rs index b7b81b0f..aae23251 100644 --- a/bastion/examples/parallel_computation.rs +++ b/bastion/examples/parallel_computation.rs @@ -30,7 +30,7 @@ fn main() { msg: u64 =!> { let data: u64 = msg.wrapping_mul(2); println!("Child doubled the value of {} and gave {}", msg, data); // true - let _ = answer!(data); + let _ = answer!(ctx, data); }; _: _ => (); } @@ -46,7 +46,7 @@ fn main() { // // Mapper that generates work. Bastion::children(|children: Children| { - children.with_exec(move |_ctx: BastionContext| { + children.with_exec(move |ctx: BastionContext| { let workers = workers.clone(); async move { println!("Mapper started!"); @@ -58,7 +58,7 @@ fn main() { for id_worker_pair in workers.elems().iter().enumerate() { let data = cycle(id_worker_pair.0 as u64, 5); - let computed: Answer = id_worker_pair.1.ask(data).unwrap(); + let computed: Answer = ctx.ask(&id_worker_pair.1.addr(), data).unwrap(); msg! { computed.await?, msg: u64 => { // Handle the answer... diff --git a/bastion/examples/send_recv.rs b/bastion/examples/send_recv.rs index 73cfbe57..4d1eb9d5 100644 --- a/bastion/examples/send_recv.rs +++ b/bastion/examples/send_recv.rs @@ -32,14 +32,13 @@ fn main() { println!("try_recv.is_some() == {}", try_recv.is_some()); // false let answer = ctx - .current() - .ask("Hello World!") + .ask(&ctx.current().addr(), "Hello World!") .expect("Couldn't send the message."); msg! { ctx.recv().await?, msg: &'static str =!> { println!(r#"msg == "Hello World!" => {}"#, msg == "Hello World!"); // true - let _ = answer!("Goodbye!"); + let _ = answer!(ctx, "Goodbye!"); }; // This won't happen because this example // only "asks" a `&'static str`... diff --git a/bastion/src/bastion.rs b/bastion/src/bastion.rs index 2b028726..c1f0397d 100644 --- a/bastion/src/bastion.rs +++ b/bastion/src/bastion.rs @@ -1,8 +1,10 @@ use crate::broadcast::{Broadcast, Parent}; use crate::children::{Children, ChildrenRef}; use crate::config::Config; -use crate::context::BastionContext; +use crate::context::{BastionContext, BastionId}; +use crate::envelope::Envelope; use crate::message::{BastionMessage, Message}; +use crate::path::BastionPathElement; use crate::supervisor::{Supervisor, SupervisorRef}; use crate::system::SYSTEM; use core::future::Future; @@ -115,13 +117,13 @@ use std::thread; /// let child = &elems[0]; /// /// // ...to then "tell" it messages... -/// child.tell("A message containing data.").expect("Couldn't send the message."); +/// child.tell_anonymously("A message containing data.").expect("Couldn't send the message."); /// /// // ...or "ask" it messages... -/// let answer: Answer = child.ask("A message containing data.").expect("Couldn't send the message."); +/// let answer: Answer = child.ask_anonymously("A message containing data.").expect("Couldn't send the message."); /// # async { /// // ...until the child eventually answers back... -/// let answer: Result = answer.await; +/// let answer: Result = answer.await; /// # }; /// /// // ...and then even stop or kill it... @@ -258,7 +260,7 @@ impl Bastion { { debug!("Bastion: Creating supervisor."); let parent = Parent::system(); - let bcast = Broadcast::new(parent); + let bcast = Broadcast::new(parent, BastionPathElement::Supervisor(BastionId::new())); debug!("Bastion: Initializing Supervisor({}).", bcast.id()); let supervisor = Supervisor::new(bcast); @@ -268,8 +270,9 @@ impl Bastion { debug!("Bastion: Deploying Supervisor({}).", supervisor.id()); let msg = BastionMessage::deploy_supervisor(supervisor); - trace!("Bastion: Sending message: {:?}", msg); - SYSTEM.sender().unbounded_send(msg).map_err(|_| ())?; + let envelope = Envelope::new(msg, SYSTEM.path().clone(), SYSTEM.sender().clone()); + trace!("Bastion: Sending envelope: {:?}", envelope); + SYSTEM.sender().unbounded_send(envelope).map_err(|_| ())?; Ok(supervisor_ref) } @@ -303,7 +306,7 @@ impl Bastion { /// children.with_exec(|ctx: BastionContext| { /// async move { /// // Send and receive messages... - /// let opt_msg: Option = ctx.try_recv().await; + /// let opt_msg: Option = ctx.try_recv().await; /// // ...and return `Ok(())` or `Err(())` when you are done... /// Ok(()) /// @@ -423,11 +426,12 @@ impl Bastion { pub fn broadcast(msg: M) -> Result<(), M> { debug!("Bastion: Broadcasting message: {:?}", msg); let msg = BastionMessage::broadcast(msg); - trace!("Bastion: Sending message: {:?}", msg); + let envelope = Envelope::from_dead_letters(msg); + trace!("Bastion: Sending envelope: {:?}", envelope); // FIXME: panics? SYSTEM .sender() - .unbounded_send(msg) + .unbounded_send(envelope) .map_err(|err| err.into_inner().into_msg().unwrap()) } @@ -456,9 +460,10 @@ impl Bastion { pub fn start() { debug!("Bastion: Starting."); let msg = BastionMessage::start(); - trace!("Bastion: Sending message: {:?}", msg); + let envelope = Envelope::from_dead_letters(msg); + trace!("Bastion: Sending envelope: {:?}", envelope); // FIXME: Err(Error) - SYSTEM.sender().unbounded_send(msg).ok(); + SYSTEM.sender().unbounded_send(envelope).ok(); } /// Sends a message to the system to tell it to stop @@ -486,9 +491,10 @@ impl Bastion { pub fn stop() { debug!("Bastion: Stopping."); let msg = BastionMessage::stop(); - trace!("Bastion: Sending message: {:?}", msg); + let envelope = Envelope::from_dead_letters(msg); + trace!("Bastion: Sending envelope: {:?}", envelope); // FIXME: Err(Error) - SYSTEM.sender().unbounded_send(msg).ok(); + SYSTEM.sender().unbounded_send(envelope).ok(); } /// Sends a message to the system to tell it to kill every @@ -515,9 +521,10 @@ impl Bastion { pub fn kill() { debug!("Bastion: Killing."); let msg = BastionMessage::kill(); - trace!("Bastion: Sending message: {:?}", msg); + let envelope = Envelope::from_dead_letters(msg); + trace!("Bastion: Sending envelope: {:?}", envelope); // FIXME: Err(Error) - SYSTEM.sender().unbounded_send(msg).ok(); + SYSTEM.sender().unbounded_send(envelope).ok(); // FIXME: panics let mut system = SYSTEM.handle().lock().wait().unwrap(); diff --git a/bastion/src/broadcast.rs b/bastion/src/broadcast.rs index 132d21dc..6aef9d4f 100644 --- a/bastion/src/broadcast.rs +++ b/bastion/src/broadcast.rs @@ -1,22 +1,25 @@ use crate::children::ChildrenRef; use crate::context::BastionId; +use crate::envelope::Envelope; use crate::message::BastionMessage; +use crate::path::{BastionPath, BastionPathElement}; use crate::supervisor::SupervisorRef; use crate::system::SYSTEM; use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender}; use futures::prelude::*; use fxhash::FxHashMap; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; -pub(crate) type Sender = UnboundedSender; -pub(crate) type Receiver = UnboundedReceiver; +pub(crate) type Sender = UnboundedSender; +pub(crate) type Receiver = UnboundedReceiver; #[derive(Debug)] pub(crate) struct Broadcast { - id: BastionId, sender: Sender, recver: Receiver, + path: Arc, // Arc is needed because we put path to Envelope parent: Parent, children: FxHashMap, } @@ -29,42 +32,85 @@ pub(crate) enum Parent { Children(ChildrenRef), } +impl Parent { + pub(super) fn is_none(&self) -> bool { + match self { + Parent::None => true, + _ => false, + } + } + + pub(super) fn is_system(&self) -> bool { + match self { + Parent::System => true, + _ => false, + } + } +} + impl Broadcast { - pub(crate) fn new(parent: Parent) -> Self { - let id = BastionId::new(); + pub(crate) fn new(parent: Parent, element: BastionPathElement) -> Self { let (sender, recver) = mpsc::unbounded(); let children = FxHashMap::default(); + let parent_path: BastionPath = match &parent { + Parent::None | Parent::System => BastionPath::root(), + Parent::Supervisor(sv_ref) => BastionPath::clone(sv_ref.path()), + Parent::Children(ch_ref) => BastionPath::clone(ch_ref.path()), + }; + + // FIXME: unwrap + let path = parent_path + .append(element) + .expect("Can't append path in Broadcast::new"); + let path = Arc::new(path); + Broadcast { - id, parent, sender, recver, + path, children, } } - pub(crate) fn with_id(parent: Parent, id: BastionId) -> Self { - let mut bcast = Broadcast::new(parent); - bcast.id = id; + pub(crate) fn new_root(parent: Parent) -> Self { + // FIXME + assert!(parent.is_none() || parent.is_system()); + + let (sender, recver) = mpsc::unbounded(); + let children = FxHashMap::default(); + let path = BastionPath::root(); + let path = Arc::new(path); - bcast + Broadcast { + parent, + sender, + recver, + path, + children, + } } pub(crate) fn id(&self) -> &BastionId { - &self.id + self.path.id() } pub(crate) fn sender(&self) -> &Sender { &self.sender } + pub(crate) fn path(&self) -> &Arc { + &self.path + } + pub(crate) fn parent(&self) -> &Parent { &self.parent } pub(crate) fn register(&mut self, child: &Self) { - self.children.insert(child.id.clone(), child.sender.clone()); + self.children + .insert(child.id().clone(), child.sender.clone()); } pub(crate) fn unregister(&mut self, id: &BastionId) { @@ -77,28 +123,32 @@ impl Broadcast { pub(crate) fn stop_child(&mut self, id: &BastionId) { let msg = BastionMessage::stop(); - self.send_child(id, msg); + let env = Envelope::new(msg, self.path.clone(), self.sender.clone()); + self.send_child(id, env); self.unregister(id); } pub(crate) fn stop_children(&mut self) { let msg = BastionMessage::stop(); - self.send_children(msg); + let env = Envelope::new(msg, self.path.clone(), self.sender.clone()); + self.send_children(env); self.clear_children(); } pub(crate) fn kill_child(&mut self, id: &BastionId) { let msg = BastionMessage::kill(); - self.send_child(id, msg); + let env = Envelope::new(msg, self.path.clone(), self.sender.clone()); + self.send_child(id, env); self.unregister(id); } pub(crate) fn kill_children(&mut self) { let msg = BastionMessage::kill(); - self.send_children(msg); + let env = Envelope::new(msg, self.path.clone(), self.sender.clone()); + self.send_children(env); self.clear_children(); } @@ -106,44 +156,46 @@ impl Broadcast { pub(crate) fn stopped(&mut self) { self.stop_children(); - let msg = BastionMessage::stopped(self.id.clone()); + let msg = BastionMessage::stopped(self.id().clone()); + let env = Envelope::new(msg, self.path.clone(), self.sender.clone()); // FIXME: Err(msg) - self.send_parent(msg).ok(); + self.send_parent(env).ok(); } pub(crate) fn faulted(&mut self) { self.kill_children(); - let msg = BastionMessage::faulted(self.id.clone()); + let msg = BastionMessage::faulted(self.id().clone()); + let env = Envelope::new(msg, self.path.clone(), self.sender.clone()); // FIXME: Err(msg) - self.send_parent(msg).ok(); + self.send_parent(env).ok(); } - pub(crate) fn send_parent(&self, msg: BastionMessage) -> Result<(), BastionMessage> { - self.parent.send(msg) + pub(crate) fn send_parent(&self, envelope: Envelope) -> Result<(), Envelope> { + self.parent.send(envelope) } - pub(crate) fn send_child(&self, id: &BastionId, msg: BastionMessage) { + pub(crate) fn send_child(&self, id: &BastionId, envelope: Envelope) { // FIXME: Err if None? if let Some(child) = self.children.get(id) { // FIXME: handle errors - child.unbounded_send(msg).ok(); + child.unbounded_send(envelope).ok(); } } - pub(crate) fn send_children(&self, msg: BastionMessage) { + pub(crate) fn send_children(&self, env: Envelope) { for child in self.children.values() { // FIXME: Err(Error) if None - if let Some(msg) = msg.try_clone() { + if let Some(env) = env.try_clone() { // FIXME: handle errors - child.unbounded_send(msg).ok(); + child.unbounded_send(env).ok(); } } } - pub(crate) fn send_self(&self, msg: BastionMessage) { + pub(crate) fn send_self(&self, env: Envelope) { // FIXME: handle errors - self.sender.unbounded_send(msg).ok(); + self.sender.unbounded_send(env).ok(); } } @@ -180,22 +232,22 @@ impl Parent { } } - fn send(&self, msg: BastionMessage) -> Result<(), BastionMessage> { + fn send(&self, env: Envelope) -> Result<(), Envelope> { match self { // FIXME Parent::None => unimplemented!(), Parent::System => SYSTEM .sender() - .unbounded_send(msg) + .unbounded_send(env) .map_err(|err| err.into_inner()), - Parent::Supervisor(supervisor) => supervisor.send(msg), - Parent::Children(children) => children.send(msg), + Parent::Supervisor(supervisor) => supervisor.send(env), + Parent::Children(children) => children.send(env), } } } impl Stream for Broadcast { - type Item = BastionMessage; + type Item = Envelope; fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { Pin::new(&mut self.get_mut().recver).poll_next(ctx) @@ -205,50 +257,77 @@ impl Stream for Broadcast { #[cfg(test)] mod tests { use super::{BastionMessage, Broadcast, Parent}; + use crate::context::{BastionId, NIL_ID}; + use crate::envelope::Envelope; + use crate::path::{BastionPath, BastionPathElement}; + use futures::channel::mpsc; use futures::executor; use futures::poll; use futures::prelude::*; + use std::sync::Arc; use std::task::Poll; #[test] fn send_children() { - let mut parent = Broadcast::new(Parent::none()); + let mut parent = Broadcast::new_root(Parent::System); let mut children = vec![]; for _ in 0..4 { - let child = Broadcast::new(Parent::none()); + let child = Broadcast::new( + Parent::System, + BastionPathElement::Supervisor(BastionId::new()), + ); parent.register(&child); - children.push(child); } let msg = BastionMessage::start(); - parent.send_children(msg.try_clone().unwrap()); + // need manual construction because SYSTEM is not running in this test + let (sender, _) = mpsc::unbounded(); + let env = Envelope::new( + msg, + Arc::new( + BastionPath::root() + .append(BastionPathElement::Supervisor(NIL_ID)) + .unwrap() + .append(BastionPathElement::Children(NIL_ID)) + .unwrap(), + ), + sender, + ); + + parent.send_children(env.try_clone().unwrap()); executor::block_on(async { for child in &mut children { match poll!(child.next()) { - Poll::Ready(Some(BastionMessage::Start)) => (), + Poll::Ready(Some(Envelope { + msg: BastionMessage::Start, + .. + })) => (), _ => panic!(), } } }); parent.unregister(children[0].id()); - parent.send_children(msg.try_clone().unwrap()); + parent.send_children(env.try_clone().unwrap()); executor::block_on(async { assert!(poll!(children[0].next()).is_pending()); for child in &mut children[1..] { match poll!(child.next()) { - Poll::Ready(Some(BastionMessage::Start)) => (), + Poll::Ready(Some(Envelope { + msg: BastionMessage::Start, + .. + })) => (), _ => panic!(), } } }); parent.clear_children(); - parent.send_children(msg); + parent.send_children(env); executor::block_on(async { for child in &mut children[1..] { assert!(poll!(child.next()).is_pending()); diff --git a/bastion/src/children.rs b/bastion/src/children.rs index a96bb0fa..224c361c 100644 --- a/bastion/src/children.rs +++ b/bastion/src/children.rs @@ -4,7 +4,9 @@ use crate::broadcast::{Broadcast, Parent, Sender}; use crate::callbacks::Callbacks; use crate::context::{BastionContext, BastionId, ContextState}; +use crate::envelope::{Envelope, RefAddr}; use crate::message::{Answer, BastionMessage, Message}; +use crate::path::{BastionPath, BastionPathElement}; use bastion_executor::pool; use futures::pending; use futures::poll; @@ -18,6 +20,7 @@ use std::fmt::{self, Debug, Formatter}; use std::future::Future; use std::iter::FromIterator; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; struct Init(Box Exec + Send + Sync>); @@ -49,7 +52,7 @@ struct Exec(Pin> + Send>>); /// children.with_exec(|ctx: BastionContext| { /// async move { /// // Send and receive messages... -/// let opt_msg: Option = ctx.try_recv().await; +/// let opt_msg: Option = ctx.try_recv().await; /// // ...and return `Ok(())` or `Err(())` when you are done... /// Ok(()) /// @@ -83,7 +86,7 @@ pub struct Children { // Messages that were received before the group was // started. Those will be "replayed" once a start message // is received. - pre_start_msgs: Vec, + pre_start_msgs: Vec, started: bool, } @@ -93,6 +96,7 @@ pub struct Children { pub struct ChildrenRef { id: BastionId, sender: Sender, + path: Arc, children: Vec, } @@ -109,7 +113,7 @@ pub(crate) struct Child { // Messages that were received before the child was // started. Those will be "replayed" once a start message // is received. - pre_start_msgs: Vec, + pre_start_msgs: Vec, started: bool, } @@ -119,6 +123,7 @@ pub(crate) struct Child { pub struct ChildRef { id: BastionId, sender: Sender, + path: Arc, } impl Init { @@ -233,16 +238,17 @@ impl Children { // TODO: clone or ref? let id = self.bcast.id().clone(); let sender = self.bcast.sender().clone(); + let path = self.bcast.path().clone(); let mut children = Vec::with_capacity(self.launched.len()); for (id, (sender, _)) in &self.launched { trace!("Children({}): Creating new ChildRef({}).", self.id(), id); // TODO: clone or ref? - let child = ChildRef::new(id.clone(), sender.clone()); + let child = ChildRef::new(id.clone(), sender.clone(), path.clone()); children.push(child); } - ChildrenRef::new(id, sender, children) + ChildrenRef::new(id, sender, path, children) } /// Sets the closure taking a [`BastionContext`] and returning a @@ -273,7 +279,7 @@ impl Children { /// children.with_exec(|ctx| { /// async move { /// // Send and receive messages... - /// let opt_msg: Option = ctx.try_recv().await; + /// let opt_msg: Option = ctx.try_recv().await; /// // ...and return `Ok(())` or `Err(())` when you are done... /// Ok(()) /// @@ -441,36 +447,60 @@ impl Children { self.bcast.faulted(); } - async fn handle(&mut self, msg: BastionMessage) -> Result<(), ()> { - match msg { - BastionMessage::Start => unreachable!(), - BastionMessage::Stop => { + async fn handle(&mut self, env: Envelope) -> Result<(), ()> { + match env { + Envelope { + msg: BastionMessage::Start, + .. + } => unreachable!(), + Envelope { + msg: BastionMessage::Stop, + .. + } => { self.stop().await; self.stopped(); return Err(()); } - BastionMessage::Kill => { + Envelope { + msg: BastionMessage::Kill, + .. + } => { self.kill().await; self.stopped(); return Err(()); } // FIXME - BastionMessage::Deploy(_) => unimplemented!(), + Envelope { + msg: BastionMessage::Deploy(_), + .. + } => unimplemented!(), // FIXME - BastionMessage::Prune { .. } => unimplemented!(), + Envelope { + msg: BastionMessage::Prune { .. }, + .. + } => unimplemented!(), // FIXME - BastionMessage::SuperviseWith(_) => unimplemented!(), - BastionMessage::Message(ref message) => { + Envelope { + msg: BastionMessage::SuperviseWith(_), + .. + } => unimplemented!(), + Envelope { + msg: BastionMessage::Message(ref message), + .. + } => { debug!( "Children({}): Broadcasting a message: {:?}", self.id(), message ); - self.bcast.send_children(msg); + self.bcast.send_children(env); } - BastionMessage::Stopped { id } => { + Envelope { + msg: BastionMessage::Stopped { id }, + .. + } => { // FIXME: Err if false? if self.launched.contains_key(&id) { debug!("Children({}): Child({}) stopped.", self.id(), id); @@ -480,7 +510,10 @@ impl Children { return Err(()); } } - BastionMessage::Faulted { id } => { + Envelope { + msg: BastionMessage::Faulted { id }, + .. + } => { // FIXME: Err if false? if self.launched.contains_key(&id) { warn!("Children({}): Child({}) faulted.", self.id(), id); @@ -504,7 +537,10 @@ impl Children { match poll!(&mut self.bcast.next()) { // TODO: Err if started == true? - Poll::Ready(Some(BastionMessage::Start)) => { + Poll::Ready(Some(Envelope { + msg: BastionMessage::Start, + .. + })) => { trace!( "Children({}): Received a new message (started=false): {:?}", self.id(), @@ -514,7 +550,9 @@ impl Children { self.started = true; let msg = BastionMessage::start(); - self.bcast.send_children(msg); + let env = + Envelope::new(msg, self.bcast.path().clone(), self.bcast.sender().clone()); + self.bcast.send_children(env); let msgs = self.pre_start_msgs.drain(..).collect::>(); self.pre_start_msgs.shrink_to_fit(); @@ -561,12 +599,13 @@ impl Children { debug!("Children({}): Launching elements.", self.id()); for _ in 0..self.redundancy { let parent = Parent::children(self.as_ref()); - let bcast = Broadcast::new(parent); + let bcast = Broadcast::new(parent, BastionPathElement::Child(BastionId::new())); // TODO: clone or ref? let id = bcast.id().clone(); let sender = bcast.sender().clone(); - let child_ref = ChildRef::new(id.clone(), sender.clone()); + let path = bcast.path().clone(); + let child_ref = ChildRef::new(id.clone(), sender.clone(), path); let children = self.as_ref(); let supervisor = self.bcast.parent().clone().into_supervisor(); @@ -601,10 +640,11 @@ impl Children { } impl ChildrenRef { - fn new(id: BastionId, sender: Sender, children: Vec) -> Self { + fn new(id: BastionId, sender: Sender, path: Arc, children: Vec) -> Self { ChildrenRef { id, sender, + path, children, } } @@ -723,8 +763,9 @@ impl ChildrenRef { msg ); let msg = BastionMessage::broadcast(msg); + let env = Envelope::from_dead_letters(msg); // FIXME: panics? - self.send(msg).map_err(|err| err.into_msg().unwrap()) + self.send(env).map_err(|err| err.into_msg().unwrap()) } /// Sends a message to the children group this `ChildrenRef` @@ -753,7 +794,8 @@ impl ChildrenRef { pub fn stop(&self) -> Result<(), ()> { debug!("ChildrenRef({}): Stopping.", self.id()); let msg = BastionMessage::stop(); - self.send(msg).map_err(|_| ()) + let env = Envelope::from_dead_letters(msg); + self.send(env).map_err(|_| ()) } /// Sends a message to the children group this `ChildrenRef` @@ -782,15 +824,24 @@ impl ChildrenRef { pub fn kill(&self) -> Result<(), ()> { debug!("ChildrenRef({}): Killing.", self.id()); let msg = BastionMessage::kill(); - self.send(msg).map_err(|_| ()) + let env = Envelope::from_dead_letters(msg); + self.send(env).map_err(|_| ()) } - pub(crate) fn send(&self, msg: BastionMessage) -> Result<(), BastionMessage> { - trace!("ChildrenRef({}): Sending message: {:?}", self.id(), msg); + pub(crate) fn send(&self, env: Envelope) -> Result<(), Envelope> { + trace!("ChildrenRef({}): Sending message: {:?}", self.id(), env); self.sender - .unbounded_send(msg) + .unbounded_send(env) .map_err(|err| err.into_inner()) } + + pub(crate) fn path(&self) -> &Arc { + &self.path + } + + pub(crate) fn sender(&self) -> &Sender { + &self.sender + } } impl Child { @@ -813,6 +864,8 @@ impl Child { let id = self.bcast.id().clone(); // FIXME: panics? let parent = self.bcast.parent().clone().into_children().unwrap(); + let path = self.bcast.path().clone(); + let sender = self.bcast.sender().clone(); // FIXME: with_pid ProcStack::default().with_after_panic(move || { @@ -821,8 +874,9 @@ impl Child { warn!("Child({}): Panicked.", id); let msg = BastionMessage::faulted(id); + let env = Envelope::new(msg, path.clone(), sender.clone()); // TODO: handle errors - parent.send(msg).ok(); + parent.send(env).ok(); }) } @@ -840,34 +894,61 @@ impl Child { self.bcast.faulted(); } - async fn handle(&mut self, msg: BastionMessage) -> Result<(), ()> { - match msg { - BastionMessage::Start => unreachable!(), - BastionMessage::Stop => { + async fn handle(&mut self, env: Envelope) -> Result<(), ()> { + match env { + Envelope { + msg: BastionMessage::Start, + .. + } => unreachable!(), + Envelope { + msg: BastionMessage::Stop, + .. + } => { self.stopped(); return Err(()); } - BastionMessage::Kill => { + Envelope { + msg: BastionMessage::Kill, + .. + } => { self.stopped(); return Err(()); } // FIXME - BastionMessage::Deploy(_) => unimplemented!(), + Envelope { + msg: BastionMessage::Deploy(_), + .. + } => unimplemented!(), // FIXME - BastionMessage::Prune { .. } => unimplemented!(), + Envelope { + msg: BastionMessage::Prune { .. }, + .. + } => unimplemented!(), // FIXME - BastionMessage::SuperviseWith(_) => unimplemented!(), - BastionMessage::Message(msg) => { + Envelope { + msg: BastionMessage::SuperviseWith(_), + .. + } => unimplemented!(), + Envelope { + msg: BastionMessage::Message(msg), + sign, + } => { debug!("Child({}): Received a message: {:?}", self.id(), msg); let mut state = self.state.clone().lock_async().await.map_err(|_| ())?; - state.push_msg(msg); + state.push_msg(msg, sign); } // FIXME - BastionMessage::Stopped { .. } => unimplemented!(), + Envelope { + msg: BastionMessage::Stopped { .. }, + .. + } => unimplemented!(), // FIXME - BastionMessage::Faulted { .. } => unimplemented!(), + Envelope { + msg: BastionMessage::Faulted { .. }, + .. + } => unimplemented!(), } Ok(()) @@ -878,7 +959,10 @@ impl Child { loop { match poll!(&mut self.bcast.next()) { // TODO: Err if started == true? - Poll::Ready(Some(BastionMessage::Start)) => { + Poll::Ready(Some(Envelope { + msg: BastionMessage::Start, + .. + })) => { trace!( "Child({}): Received a new message (started=false): {:?}", self.id(), @@ -964,8 +1048,8 @@ impl Child { } impl ChildRef { - fn new(id: BastionId, sender: Sender) -> ChildRef { - ChildRef { id, sender } + fn new(id: BastionId, sender: Sender, path: Arc) -> ChildRef { + ChildRef { id, sender, path } } /// Returns the identifier of the children group element this @@ -1002,6 +1086,8 @@ impl ChildRef { } /// Sends a message to the child this `ChildRef` is referencing. + /// This message is intended to be used outside of Bastion context when + /// there is no way for receiver to identify message sender /// /// This method returns `()` if it succeeded, or `Err(msg)` /// otherwise. @@ -1043,22 +1129,25 @@ impl ChildRef { /// /// # let child_ref = &children_ref.elems()[0]; /// // Later, the message is "told" to the child... - /// child_ref.tell(TELL_MSG).expect("Couldn't send the message."); + /// child_ref.tell_anonymously(TELL_MSG).expect("Couldn't send the message."); /// # /// # Bastion::start(); /// # Bastion::stop(); /// # Bastion::block_until_stopped(); /// # } /// ``` - pub fn tell(&self, msg: M) -> Result<(), M> { + pub fn tell_anonymously(&self, msg: M) -> Result<(), M> { debug!("ChildRef({}): Telling message: {:?}", self.id(), msg); let msg = BastionMessage::tell(msg); + let env = Envelope::from_dead_letters(msg); // FIXME: panics? - self.send(msg).map_err(|msg| msg.into_msg().unwrap()) + self.send(env).map_err(|env| env.into_msg().unwrap()) } /// Sends a message to the child this `ChildRef` is referencing, /// allowing it to answer. + /// This message is intended to be used outside of Bastion context when + /// there is no way for receiver to identify message sender /// /// This method returns [`Answer`](../message/struct.Answer.html) if it succeeded, or `Err(msg)` /// otherwise. @@ -1091,7 +1180,7 @@ impl ChildRef { /// // Handle the message... /// /// // ...and eventually answer to it... - /// answer!(ANSWER_MSG); + /// answer!(ctx, ANSWER_MSG); /// }; /// // This won't happen because this example /// // only "asks" a `&'static str`... @@ -1108,7 +1197,7 @@ impl ChildRef { /// # let child_ref = children_ref.elems()[0].clone(); /// # async move { /// // Later, the message is "asked" to the child... - /// let answer: Answer = child_ref.ask(ASK_MSG).expect("Couldn't send the message."); + /// let answer: Answer = child_ref.ask_anonymously(ASK_MSG).expect("Couldn't send the message."); /// /// // ...and the child's answer is received... /// msg! { answer.await.expect("Couldn't receive the answer."), @@ -1133,11 +1222,12 @@ impl ChildRef { /// ``` /// /// [`Answer`]: message/struct.Answer.html - pub fn ask(&self, msg: M) -> Result { + pub fn ask_anonymously(&self, msg: M) -> Result { debug!("ChildRef({}): Asking message: {:?}", self.id(), msg); let (msg, answer) = BastionMessage::ask(msg); + let env = Envelope::from_dead_letters(msg); // FIXME: panics? - self.send(msg).map_err(|msg| msg.into_msg().unwrap())?; + self.send(env).map_err(|env| env.into_msg().unwrap())?; Ok(answer) } @@ -1168,7 +1258,8 @@ impl ChildRef { pub fn stop(&self) -> Result<(), ()> { debug!("ChildRef({}): Stopping.", self.id); let msg = BastionMessage::stop(); - self.send(msg).map_err(|_| ()) + let env = Envelope::from_dead_letters(msg); + self.send(env).map_err(|_| ()) } /// Sends a message to the child this `ChildRef` is referencing @@ -1197,15 +1288,29 @@ impl ChildRef { pub fn kill(&self) -> Result<(), ()> { debug!("ChildRef({}): Killing.", self.id()); let msg = BastionMessage::kill(); - self.send(msg).map_err(|_| ()) + let env = Envelope::from_dead_letters(msg); + self.send(env).map_err(|_| ()) } - pub(crate) fn send(&self, msg: BastionMessage) -> Result<(), BastionMessage> { - trace!("ChildRef({}): Sending message: {:?}", self.id(), msg); + /// Returns [`RefAddr`] for the child + pub fn addr(&self) -> RefAddr { + RefAddr::new(self.path.clone(), self.sender.clone()) + } + + pub(crate) fn send(&self, env: Envelope) -> Result<(), Envelope> { + trace!("ChildRef({}): Sending message: {:?}", self.id(), env); self.sender - .unbounded_send(msg) + .unbounded_send(env) .map_err(|err| err.into_inner()) } + + pub(crate) fn sender(&self) -> &Sender { + &self.sender + } + + pub(crate) fn path(&self) -> &Arc { + &self.path + } } impl Future for Exec { diff --git a/bastion/src/context.rs b/bastion/src/context.rs index 7d87fc46..52d059e8 100644 --- a/bastion/src/context.rs +++ b/bastion/src/context.rs @@ -3,7 +3,8 @@ //! messages, parent and supervisor. use crate::children::{ChildRef, ChildrenRef}; -use crate::message::Msg; +use crate::envelope::{Envelope, RefAddr, SignedMessage}; +use crate::message::{Answer, BastionMessage, Message, Msg}; use crate::supervisor::SupervisorRef; use futures::pending; use qutex::{Guard, Qutex}; @@ -11,7 +12,8 @@ use std::collections::VecDeque; use std::fmt::{self, Display, Formatter}; use uuid::Uuid; -pub(crate) const NIL_ID: BastionId = BastionId(Uuid::nil()); +/// Identifier for a root supervisor and dead-letters children. +pub const NIL_ID: BastionId = BastionId(Uuid::nil()); #[derive(Hash, Eq, PartialEq, Debug, Clone)] /// An identifier used by supervisors, children groups and @@ -80,9 +82,9 @@ pub struct BastionId(Uuid); /// // (which users can't get a reference to). /// /// // Try to receive a message... -/// let opt_msg: Option = ctx.try_recv().await; +/// let opt_msg: Option = ctx.try_recv().await; /// // Wait for a message to be received... -/// let msg: Msg = ctx.recv().await?; +/// let msg: SignedMessage = ctx.recv().await?; /// /// Ok(()) /// } @@ -104,7 +106,7 @@ pub struct BastionContext { #[derive(Debug)] pub(crate) struct ContextState { - msgs: VecDeque, + msgs: VecDeque, } impl BastionId { @@ -264,8 +266,8 @@ impl BastionContext { /// If you need to wait (always asynchronously) until at /// least one message can be retrieved, use [`recv`] instead. /// - /// This method returns [`Msg`] if a message was available, or - /// `None otherwise. + /// This method returns [`SignedMessage`] if a message was available, or + /// `None` otherwise. /// /// # Example /// @@ -278,7 +280,7 @@ impl BastionContext { /// Bastion::children(|children| { /// children.with_exec(|ctx: BastionContext| { /// async move { - /// let opt_msg: Option = ctx.try_recv().await; + /// let opt_msg: Option = ctx.try_recv().await; /// // If a message was received by the element, `opt_msg` will /// // be `Some(Msg)`, otherwise it will be `None`. /// @@ -294,8 +296,8 @@ impl BastionContext { /// ``` /// /// [`recv`]: #method.recv - /// [`Msg`]: children/struct.Msg.html - pub async fn try_recv(&self) -> Option { + /// [`SignedMessage`]: ../prelude/struct.SignedMessage.html + pub async fn try_recv(&self) -> Option { debug!("BastionContext({}): Trying to receive message.", self.id); // TODO: Err(Error) let mut state = self.state.clone().lock_async().await.ok()?; @@ -316,7 +318,7 @@ impl BastionContext { /// If you don't need to wait until at least one message /// can be retrieved, use [`try_recv`] instead. /// - /// This method returns [`Msg`] if it succeeded, or `Err(())` + /// This method returns [`SignedMessage`] if it succeeded, or `Err(())` /// otherwise. /// /// # Example @@ -331,7 +333,7 @@ impl BastionContext { /// children.with_exec(|ctx: BastionContext| { /// async move { /// // This will block until a message has been received... - /// let msg: Msg = ctx.recv().await?; + /// let msg: SignedMessage = ctx.recv().await?; /// /// Ok(()) /// } @@ -345,8 +347,8 @@ impl BastionContext { /// ``` /// /// [`try_recv`]: #method.try_recv - /// [`Msg`]: children/struct.Msg.html - pub async fn recv(&self) -> Result { + /// [`SignedMessage`]: ../prelude/struct.SignedMessage.html + pub async fn recv(&self) -> Result { debug!("BastionContext({}): Waiting to receive message.", self.id); loop { // TODO: Err(Error) @@ -362,6 +364,160 @@ impl BastionContext { pending!(); } } + + /// Returns [`RefAddr`] of the current `BastionContext` + /// + /// [`RefAddr`]: /prelude/struct.Answer.html + pub fn signature(&self) -> RefAddr { + RefAddr::new( + self.current().path().clone(), + self.current().sender().clone(), + ) + } + + /// Sends a message to the specified [`RefAddr`] + /// + /// # Arguments + /// + /// * `to` – the [`RefAddr`] to send the message to + /// * `msg` – The actual message to send + /// + /// # Example + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # fn main() { + /// # Bastion::init(); + /// # + /// Bastion::children(|children| { + /// children.with_exec(|ctx: BastionContext| { + /// async move { + /// // Wait for a message to be received... + /// let smsg: SignedMessage = ctx.recv().await?; + /// // Obtain address of this message sender... + /// let sender_addr = smsg.signature(); + /// // And send something back + /// ctx.tell(&sender_addr, "Ack").expect("Unable to acknowledge"); + /// Ok(()) + /// } + /// }) + /// }).expect("Couldn't create the children group."); + /// # + /// # Bastion::start(); + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + /// + /// [`RefAddr`]: ../prelude/struct.RefAddr.html + pub fn tell(&self, to: &RefAddr, msg: M) -> Result<(), M> { + debug!( + "{:?}: Telling message: {:?} to: {:?}", + self.current().path(), + msg, + to.path() + ); + let msg = BastionMessage::tell(msg); + let env = Envelope::new_with_sign(msg, self.signature()); + // FIXME: panics? + to.sender() + .unbounded_send(env) + .map_err(|err| err.into_inner().into_msg().unwrap()) + } + + /// Sends a message from behalf of current context to the addr, + /// allowing to addr owner answer. + /// + /// This method returns [`Answer`] if it succeeded, or `Err(msg)` + /// otherwise. + /// + /// # Argument + /// + /// * `msg` - The message to send. + /// + /// # Example + /// + /// ``` + /// # use bastion::prelude::*; + /// # + /// # fn main() { + /// # Bastion::init(); + /// // The message that will be "asked"... + /// const ASK_MSG: &'static str = "A message containing data (ask)."; + /// // The message the will be "answered"... + /// const ANSWER_MSG: &'static str = "A message containing data (answer)."; + /// + /// # let children_ref = + /// // Create a new child... + /// Bastion::children(|children| { + /// children.with_exec(|ctx: BastionContext| { + /// async move { + /// // ...which will receive the message asked... + /// msg! { ctx.recv().await?, + /// msg: &'static str =!> { + /// assert_eq!(msg, ASK_MSG); + /// // Handle the message... + /// + /// // ...and eventually answer to it... + /// answer!(ctx, ANSWER_MSG); + /// }; + /// // This won't happen because this example + /// // only "asks" a `&'static str`... + /// _: _ => (); + /// } + /// + /// Ok(()) + /// } + /// }) + /// }).expect("Couldn't create the children group."); + /// + /// # Bastion::children(|children| { + /// # children.with_exec(move |ctx: BastionContext| { + /// # let child_ref = children_ref.elems()[0].clone(); + /// # async move { + /// // Later, the message is "asked" to the child... + /// let answer: Answer = ctx.ask(&child_ref.addr(), ASK_MSG).expect("Couldn't send the message."); + /// + /// // ...and the child's answer is received... + /// msg! { answer.await.expect("Couldn't receive the answer."), + /// msg: &'static str => { + /// assert_eq!(msg, ANSWER_MSG); + /// // Handle the answer... + /// }; + /// // This won't happen because this example + /// // only answers a `&'static str`... + /// _: _ => (); + /// } + /// # + /// # Ok(()) + /// # } + /// # }) + /// # }).unwrap(); + /// # + /// # Bastion::start(); + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + /// + /// [`Answer`]: /message/struct.Answer.html + pub fn ask(&self, to: &RefAddr, msg: M) -> Result { + debug!( + "{:?}: Asking message: {:?} to: {:?}", + self.current().path(), + msg, + to + ); + let (msg, answer) = BastionMessage::ask(msg); + let env = Envelope::new_with_sign(msg, self.signature()); + // FIXME: panics? + to.sender() + .unbounded_send(env) + .map_err(|err| err.into_inner().into_msg().unwrap())?; + + Ok(answer) + } } impl ContextState { @@ -371,8 +527,8 @@ impl ContextState { ContextState { msgs } } - pub(crate) fn push_msg(&mut self, msg: Msg) { - self.msgs.push_back(msg) + pub(crate) fn push_msg(&mut self, msg: Msg, sign: RefAddr) { + self.msgs.push_back(SignedMessage::new(msg, sign)) } } diff --git a/bastion/src/envelope.rs b/bastion/src/envelope.rs new file mode 100644 index 00000000..bed7ba32 --- /dev/null +++ b/bastion/src/envelope.rs @@ -0,0 +1,223 @@ +use crate::broadcast::Sender; +use crate::message::{BastionMessage, Message, Msg}; +use crate::path::BastionPath; +use crate::system::SYSTEM; +use std::sync::Arc; + +#[derive(Debug)] +pub(crate) struct Envelope { + pub(crate) msg: BastionMessage, + pub(crate) sign: RefAddr, +} + +#[derive(Debug)] +/// A struct containing a message and its sender signature +pub struct SignedMessage { + pub(crate) msg: Msg, + pub(crate) sign: RefAddr, +} + +impl SignedMessage { + pub(crate) fn new(msg: Msg, sign: RefAddr) -> Self { + SignedMessage { msg, sign } + } + + #[doc(hidden)] + pub fn extract(self) -> (Msg, RefAddr) { + (self.msg, self.sign) + } + + /// Returns a message signature to identify the message sender + /// + /// # Example + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # fn main() { + /// # Bastion::init(); + /// # + /// Bastion::children(|children| { + /// children.with_exec(|ctx: BastionContext| { + /// async move { + /// let msg: SignedMessage = ctx.recv().await?; + /// println!("received message from {:?}", msg.signature().path()); + /// Ok(()) + /// } + /// }) + /// }).expect("Couldn't create the children group."); + /// # + /// # Bastion::start(); + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + pub fn signature(&self) -> &RefAddr { + &self.sign + } +} + +#[derive(Debug, Clone)] +/// Message signature used to identify message sender and send messages to it. +/// +/// # Example +/// +/// ```rust +/// # use bastion::prelude::*; +/// # +/// # fn main() { +/// # Bastion::init(); +/// # +/// Bastion::children(|children| { +/// children.with_exec(|ctx: BastionContext| { +/// async move { +/// // Wait for a message to be received... +/// let msg: SignedMessage = ctx.recv().await?; +/// +/// ctx.tell(msg.signature(), "reply").expect("Unable to reply"); +/// Ok(()) +/// } +/// }) +/// }).expect("Couldn't create the children group."); +/// # +/// # Bastion::start(); +/// # Bastion::stop(); +/// # Bastion::block_until_stopped(); +/// # } +/// ``` +pub struct RefAddr { + path: Arc, + sender: Sender, +} + +impl RefAddr { + pub(crate) fn new(path: Arc, sender: Sender) -> Self { + RefAddr { path, sender } + } + + pub(crate) fn dead_letters() -> Self { + Self::new( + SYSTEM.dead_letters().path().clone(), + SYSTEM.dead_letters().sender().clone(), + ) + } + + /// Checks whether the sender is identified. + /// Usually anonymous sender means messages sent by + /// [broadcast][crate::Bastion::broadcast()] and it's other methods implied to + /// be called outside of the Bastion context. + /// + /// # Example + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # fn main() { + /// # Bastion::init(); + /// # + /// # let children_ref = Bastion::children(|children| children).unwrap(); + /// let msg = "A message containing data."; + /// children_ref.broadcast(msg).expect("Couldn't send the message."); + /// + /// # Bastion::children(|children| { + /// # children.with_exec(|ctx: BastionContext| { + /// # async move { + /// msg! { ctx.recv().await?, + /// ref msg: &'static str => { + /// assert!(signature!().is_sender_identified()); + /// }; + /// // We are only sending a `&'static str` in this + /// // example, so we know that this won't happen... + /// _: _ => (); + /// } + /// # + /// # Ok(()) + /// # } + /// # }) + /// # }).unwrap(); + /// # + /// # Bastion::start(); + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + pub fn is_sender_identified(&self) -> bool { + self.path.is_dead_letters() + } + + /// Returns `BastionPath` of a sender + /// + /// # Example + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # fn main() { + /// # Bastion::init(); + /// # + /// # let children_ref = Bastion::children(|children| children).unwrap(); + /// let msg = "A message containing data."; + /// children_ref.broadcast(msg).expect("Couldn't send the message."); + /// + /// # Bastion::children(|children| { + /// # children.with_exec(|ctx: BastionContext| { + /// # async move { + /// msg! { ctx.recv().await?, + /// ref msg: &'static str => { + /// let path = signature!().path(); + /// assert!(path.is_dead_letters()); + /// }; + /// // We are only sending a `&'static str` in this + /// // example, so we know that this won't happen... + /// _: _ => (); + /// } + /// # + /// # Ok(()) + /// # } + /// # }) + /// # }).unwrap(); + /// # + /// # Bastion::start(); + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + pub fn path(&self) -> &Arc { + &self.path + } + + pub(crate) fn sender(&self) -> &Sender { + &self.sender + } +} + +impl Envelope { + pub(crate) fn new(msg: BastionMessage, path: Arc, sender: Sender) -> Self { + Envelope { + msg, + sign: RefAddr::new(path, sender), + } + } + + pub(crate) fn new_with_sign(msg: BastionMessage, sign: RefAddr) -> Self { + Envelope { msg, sign } + } + + pub(crate) fn from_dead_letters(msg: BastionMessage) -> Self { + Envelope { + msg, + sign: RefAddr::dead_letters(), + } + } + + pub(crate) fn try_clone(&self) -> Option { + self.msg.try_clone().map(|msg| Envelope { + msg, + sign: self.sign.clone(), + }) + } + + pub(crate) fn into_msg(self) -> Option { + self.msg.into_msg() + } +} diff --git a/bastion/src/lib.rs b/bastion/src/lib.rs index afd8a2d8..0f91bf6c 100644 --- a/bastion/src/lib.rs +++ b/bastion/src/lib.rs @@ -69,6 +69,8 @@ mod bastion; mod broadcast; mod callbacks; mod config; +mod envelope; +mod path; mod system; pub mod children; @@ -83,8 +85,9 @@ pub mod prelude { pub use crate::callbacks::Callbacks; pub use crate::children::{ChildRef, Children, ChildrenRef}; pub use crate::config::Config; - pub use crate::context::{BastionContext, BastionId}; - pub use crate::message::{Answer, Message, Msg, Sender}; + pub use crate::context::{BastionContext, BastionId, NIL_ID}; + pub use crate::envelope::{RefAddr, SignedMessage}; + pub use crate::message::{Answer, AnswerSender, Message, Msg}; pub use crate::msg; pub use crate::supervisor::{SupervisionStrategy, Supervisor, SupervisorRef}; } diff --git a/bastion/src/message.rs b/bastion/src/message.rs index d28a8b55..0aaf6293 100644 --- a/bastion/src/message.rs +++ b/bastion/src/message.rs @@ -8,6 +8,7 @@ //! use crate::children::Children; use crate::context::BastionId; +use crate::envelope::{RefAddr, SignedMessage}; use crate::supervisor::{SupervisionStrategy, Supervisor}; use futures::channel::oneshot::{self, Receiver}; use std::any::{type_name, Any}; @@ -31,7 +32,7 @@ impl Message for T where T: Any + Send + Sync + Debug {} #[derive(Debug)] #[doc(hidden)] -pub struct Sender(oneshot::Sender); +pub struct AnswerSender(oneshot::Sender); #[derive(Debug)] /// A [`Future`] returned when successfully "asking" a @@ -64,7 +65,7 @@ pub struct Sender(oneshot::Sender); /// // Handle the message... /// /// // ...and eventually answer to it... -/// answer!(ANSWER_MSG); +/// answer!(ctx, ANSWER_MSG); /// }; /// // This won't happen because this example /// // only "asks" a `&'static str`... @@ -81,7 +82,7 @@ pub struct Sender(oneshot::Sender); /// # let child_ref = children_ref.elems()[0].clone(); /// # async move { /// // Later, the message is "asked" to the child... -/// let answer: Answer = child_ref.ask(ASK_MSG).expect("Couldn't send the message."); +/// let answer: Answer = ctx.ask(&child_ref.addr(), ASK_MSG).expect("Couldn't send the message."); /// /// // ...and the child's answer is received... /// msg! { answer.await.expect("Couldn't receive the answer."), @@ -109,7 +110,7 @@ pub struct Sender(oneshot::Sender); /// [`ChildRef::ask`]: ../children/struct.ChildRef.html#method.ask /// [`Msg`]: message/struct.Msg.html /// [`msg!`]: macro.msg.html -pub struct Answer(Receiver); +pub struct Answer(Receiver); #[derive(Debug)] /// A message returned by [`BastionContext::recv`] or @@ -127,7 +128,7 @@ pub struct Answer(Receiver); /// children.with_exec(|ctx: BastionContext| { /// async move { /// loop { -/// let msg: Msg = ctx.recv().await?; +/// let msg: SignedMessage = ctx.recv().await?; /// msg! { msg, /// // We match a broadcasted `&'static str`s... /// ref msg: &'static str => { @@ -140,6 +141,10 @@ pub struct Answer(Receiver); /// msg: &'static str => { /// assert_eq!(msg, "A message containing data."); /// // Handle the message... +/// +/// // get message signature +/// let sign = signature!(); +/// ctx.tell(&sign, "A message containing reply").unwrap(); /// }; /// // We match a `&'static str`s "asked" to this child... /// msg: &'static str =!> { @@ -147,7 +152,7 @@ pub struct Answer(Receiver); /// // Handle the message... /// /// // ...and eventually answer to it... -/// answer!("An answer message containing data."); +/// answer!(ctx, "An answer message containing data."); /// }; /// // We match a message that wasn't previously matched... /// _: _ => (); @@ -174,7 +179,7 @@ enum MsgInner { Tell(Box), Ask { msg: Box, - sender: Option, + sender: Option, }, } @@ -197,13 +202,17 @@ pub(crate) enum Deployment { Children(Children), } -impl Sender { +impl AnswerSender { + // FIXME: we can't let manipulating Signature in a public API + // but now it's being called only by a macro so we are trusting it #[doc(hidden)] - pub fn send(self, msg: M) -> Result<(), M> { + pub fn send(self, msg: M, sign: RefAddr) -> Result<(), M> { debug!("{:?}: Sending answer: {:?}", self, msg); let msg = Msg::tell(msg); trace!("{:?}: Sending message: {:?}", self, msg); - self.0.send(msg).map_err(|msg| msg.try_unwrap().unwrap()) + self.0 + .send(SignedMessage::new(msg, sign)) + .map_err(|smsg| smsg.msg.try_unwrap().unwrap()) } } @@ -221,7 +230,7 @@ impl Msg { pub(crate) fn ask(msg: M) -> (Self, Answer) { let msg = Box::new(msg); let (sender, recver) = oneshot::channel(); - let sender = Sender(sender); + let sender = AnswerSender(sender); let answer = Answer(recver); let sender = Some(sender); @@ -258,7 +267,7 @@ impl Msg { } #[doc(hidden)] - pub fn take_sender(&mut self) -> Option { + pub fn take_sender(&mut self) -> Option { debug!("{:?}: Taking sender.", self); if let MsgInner::Ask { sender, .. } = &mut self.0 { sender.take() @@ -432,7 +441,7 @@ impl BastionMessage { } impl Future for Answer { - type Output = Result; + type Output = Result; fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll { debug!("{:?}: Polling.", self); @@ -483,8 +492,8 @@ impl Future for Answer { /// Bastion::children(|children| { /// children.with_exec(|ctx: BastionContext| { /// async move { -/// # ctx.current().tell(TELL_MSG).unwrap(); -/// # ctx.current().ask(ASK_MSG).unwrap(); +/// # ctx.tell(&ctx.current().addr(), TELL_MSG).unwrap(); +/// # ctx.ask(&ctx.current().addr(), ASK_MSG).unwrap(); /// # /// loop { /// msg! { ctx.recv().await?, @@ -505,7 +514,7 @@ impl Future for Answer { /// // Handle the message... /// /// // ...and eventually answer to it... -/// answer!("An answer to the message."); +/// answer!(ctx, "An answer to the message."); /// }; /// // We are only broadcasting, "telling" and "asking" a /// // `&'static str` in this example, so we know that this won't @@ -602,7 +611,16 @@ macro_rules! msg { ($($avar:ident, $aty:ty, $ahandle:expr,)*), $var:ident: _ => $handle:expr; ) => { { - let mut $var = $msg; + let mut signed = $msg; + + let (mut $var, sign) = signed.extract(); + + macro_rules! signature { + () => { + sign + }; + } + let sender = $var.take_sender(); if $var.is_broadcast() { if false { @@ -619,9 +637,13 @@ macro_rules! msg { } } else if sender.is_some() { let sender = sender.unwrap(); + macro_rules! answer { - ($answer:expr) => { - sender.send($answer) + ($ctx:expr, $answer:expr) => { + { + let sign = $ctx.signature(); + sender.send($answer, sign) + } }; } diff --git a/bastion/src/path.rs b/bastion/src/path.rs new file mode 100644 index 00000000..3da0c513 --- /dev/null +++ b/bastion/src/path.rs @@ -0,0 +1,584 @@ +use crate::context::{BastionId, NIL_ID}; +use std::fmt; +use std::result::Result; + +#[derive(Clone)] +/// Represents a Path for a System, Supervisor, Children or Child. +/// +/// BastionPath can be used to identify message senders. +/// Later it will be used to route messages to a path. +pub struct BastionPath { + // TODO: possibly more effective collection depending on how we'll use it in routing + parent_chain: Vec, + this: Option, +} + +impl BastionPath { + // SYSTEM or a sender out of Bastion scope + pub(crate) fn root() -> BastionPath { + BastionPath { + parent_chain: vec![], + this: None, + } + } + + /// iterates over path elements + pub(crate) fn iter(&self) -> impl Iterator { + let parent_iter = self.parent_chain.iter(); + parent_iter.chain(self.this.iter().map(|e| e.id())) + } + + /// Returns the last element's id. + /// If it's root or a dead_letters then &NIL_ID is returned. + /// + /// # Example + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # fn main() { + /// # Bastion::init(); + /// # + /// # let children_ref = Bastion::children(|children| children).unwrap(); + /// let msg = "A message containing data."; + /// children_ref.broadcast(msg).expect("Couldn't send the message."); + /// + /// # Bastion::children(|children| { + /// # children.with_exec(|ctx: BastionContext| { + /// # async move { + /// msg! { ctx.recv().await?, + /// ref msg: &'static str => { + /// let path = signature!().path(); + /// assert_eq!(path.id(), &NIL_ID); + /// }; + /// // We are only sending a `&'static str` in this + /// // example, so we know that this won't happen... + /// _: _ => (); + /// } + /// # + /// # Ok(()) + /// # } + /// # }) + /// # }).unwrap(); + /// # + /// # Bastion::start(); + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + pub fn id(&self) -> &BastionId { + self.this.as_ref().map(|e| e.id()).unwrap_or(&NIL_ID) + } + + /// Returns a path element. If the path is root then None is returned. + /// + /// # Example + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # fn main() { + /// # Bastion::init(); + /// # + /// # let children_ref = Bastion::children(|children| children).unwrap(); + /// let msg = "A message containing data."; + /// children_ref.broadcast(msg).expect("Couldn't send the message."); + /// + /// # Bastion::children(|children| { + /// # children.with_exec(|ctx: BastionContext| { + /// # async move { + /// msg! { ctx.recv().await?, + /// ref msg: &'static str => { + /// let path = signature!().path(); + /// assert!(path.elem().as_ref().unwrap().is_children()); + /// }; + /// // We are only sending a `&'static str` in this + /// // example, so we know that this won't happen... + /// _: _ => (); + /// } + /// # + /// # Ok(()) + /// # } + /// # }) + /// # }).unwrap(); + /// # + /// # Bastion::start(); + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + pub fn elem(&self) -> &Option { + &self.this + } + + /// Checks whether `BastionPath` is a dead-letters path. + /// + /// # Example + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # fn main() { + /// # Bastion::init(); + /// # + /// # let children_ref = Bastion::children(|children| children).unwrap(); + /// let msg = "A message containing data."; + /// children_ref.broadcast(msg).expect("Couldn't send the message."); + /// + /// # Bastion::children(|children| { + /// # children.with_exec(|ctx: BastionContext| { + /// # async move { + /// msg! { ctx.recv().await?, + /// ref msg: &'static str => { + /// let path = signature!().path(); + /// assert!(path.is_dead_letters()); + /// }; + /// // We are only sending a `&'static str` in this + /// // example, so we know that this won't happen... + /// _: _ => (); + /// } + /// # + /// # Ok(()) + /// # } + /// # }) + /// # }).unwrap(); + /// # + /// # Bastion::start(); + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + pub fn is_dead_letters(&self) -> bool { + self.parent_chain.len() == 2 && self.this.as_ref().map(|e| e.is_child()).unwrap_or(false) + } +} + +impl fmt::Display for BastionPath { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "/{}", + self.iter() + .map(|id| format!("{}", id)) + .collect::>() + .join("/") + ) + } +} + +impl fmt::Debug for BastionPath { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.this { + Some(this @ BastionPathElement::Supervisor(_)) => write!( + f, + "/{}", + self.parent_chain + .iter() + .map(|id| BastionPathElement::Supervisor(id.clone())) + .chain(vec![this.clone()]) + .map(|el| format!("{:?}", el)) + .collect::>() + .join("/") + ), + // TODO: combine with the pattern above when or-patterns become stable + Some(this @ BastionPathElement::Children(_)) => write!( + f, + "/{}", + self.parent_chain + .iter() + .map(|id| BastionPathElement::Supervisor(id.clone())) + .chain(vec![this.clone()]) + .map(|el| format!("{:?}", el)) + .collect::>() + .join("/") + ), + Some(this @ BastionPathElement::Child(_)) => { + let parent_len = self.parent_chain.len(); + + write!( + f, + "/{}", + self.parent_chain + .iter() + .enumerate() + .map(|(i, id)| { + if i == parent_len - 1 { + BastionPathElement::Children(id.clone()) + } else { + BastionPathElement::Supervisor(id.clone()) + } + }) + .chain(vec![this.clone()]) + .map(|el| format!("{:?}", el)) + .collect::>() + .join("/") + ) + } + None => write!(f, "/"), + } + } +} + +#[derive(Clone, PartialEq)] +/// Represents BastionPath element +pub enum BastionPathElement { + Supervisor(BastionId), + Children(BastionId), + Child(BastionId), +} + +impl fmt::Debug for BastionPathElement { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + BastionPathElement::Supervisor(id) => write!(f, "supervisor#{}", id), + BastionPathElement::Children(id) => write!(f, "children#{}", id), + BastionPathElement::Child(id) => write!(f, "child#{}", id), + } + } +} + +impl BastionPathElement { + pub(crate) fn id(&self) -> &BastionId { + match self { + BastionPathElement::Supervisor(id) => id, + BastionPathElement::Children(id) => id, + BastionPathElement::Child(id) => id, + } + } + + pub(crate) fn with_id(self, id: BastionId) -> Self { + match self { + BastionPathElement::Supervisor(_) => BastionPathElement::Supervisor(id), + BastionPathElement::Children(_) => BastionPathElement::Children(id), + BastionPathElement::Child(_) => BastionPathElement::Child(id), + } + } + + /// Checks whether the BastionPath identifies a supervisor. + pub fn is_supervisor(&self) -> bool { + match self { + BastionPathElement::Supervisor(_) => true, + _ => false, + } + } + + /// Checks whether the BastionPath identifies children. + pub fn is_children(&self) -> bool { + match self { + BastionPathElement::Children(_) => true, + _ => false, + } + } + + /// Checks whether the BastionPath identifies a child. + pub fn is_child(&self) -> bool { + match self { + BastionPathElement::Child(_) => true, + _ => false, + } + } +} + +#[derive(Clone)] +pub(crate) struct AppendError { + path: BastionPath, + element: BastionPathElement, +} + +impl fmt::Display for AppendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.element { + BastionPathElement::Supervisor(..) => match self.path.this { + None => unreachable!(), + Some(BastionPathElement::Supervisor(..)) => unreachable!(), + Some(BastionPathElement::Children(..)) => { + write!(f, "Supervisor is not appendable to children") + } + Some(BastionPathElement::Child(..)) => { + write!(f, "Supervisor is not appendable to a child") + } + }, + BastionPathElement::Children(..) => match self.path.this { + None => write!(f, "Children is not appendable to root"), + Some(BastionPathElement::Supervisor(..)) => unreachable!(), + Some(BastionPathElement::Children(..)) => { + write!(f, "Children is not appendable to children") + } + Some(BastionPathElement::Child(..)) => { + write!(f, "Children is not appendable to a child") + } + }, + BastionPathElement::Child(..) => match self.path.this { + None => write!(f, "Child is not appendable to root"), + Some(BastionPathElement::Supervisor(..)) => { + write!(f, "Child is not appendable to a supervisor") + } + Some(BastionPathElement::Children(..)) => unreachable!(), + Some(BastionPathElement::Child(..)) => { + write!(f, "Child is not appendable to a child") + } + }, + } + } +} + +impl fmt::Debug for AppendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Can't append {:?} to {:?}", self.element, self.path) + } +} + +impl BastionPath { + pub(crate) fn append(self, el: BastionPathElement) -> Result { + match el { + sv @ BastionPathElement::Supervisor(_) => match self.this { + None => Ok(BastionPath { + parent_chain: self.parent_chain, + this: Some(sv), + }), + Some(BastionPathElement::Supervisor(id)) => { + let mut path = BastionPath { + parent_chain: self.parent_chain, + this: Some(sv), + }; + path.parent_chain.push(id); + Ok(path) + } + this => Err(AppendError { + path: BastionPath { + parent_chain: self.parent_chain, + this, + }, + element: sv, + }), + }, + children @ BastionPathElement::Children(_) => match self.this { + Some(BastionPathElement::Supervisor(id)) => { + let mut path = BastionPath { + parent_chain: self.parent_chain, + this: Some(children), + }; + path.parent_chain.push(id); + Ok(path) + } + this => Err(AppendError { + path: BastionPath { + parent_chain: self.parent_chain, + this, + }, + element: children, + }), + }, + child @ BastionPathElement::Child(_) => match self.this { + Some(BastionPathElement::Children(id)) => { + let mut path = BastionPath { + parent_chain: self.parent_chain, + this: Some(child), + }; + path.parent_chain.push(id); + Ok(path) + } + this => Err(AppendError { + path: BastionPath { + parent_chain: self.parent_chain, + this, + }, + element: child, + }), + }, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // SYSTEM + smth + + #[test] + fn append_sv_to_system() { + let sv_id = BastionId::new(); + let path = BastionPath::root() + .append(BastionPathElement::Supervisor(sv_id.clone())) + .unwrap(); + assert_eq!(path.iter().collect::>(), vec![&sv_id]); + } + + #[test] + fn append_children_to_system() { + let sv_id = BastionId::new(); + let res = BastionPath::root().append(BastionPathElement::Children(sv_id)); + assert_eq!( + res.unwrap_err().to_string(), + "Children is not appendable to root" + ); + } + + #[test] + fn append_child_to_system() { + let sv_id = BastionId::new(); + let res = BastionPath::root().append(BastionPathElement::Child(sv_id)); + assert_eq!( + res.unwrap_err().to_string(), + "Child is not appendable to root" + ); + } + + // Supervisor + smth + + #[test] + fn append_sv_to_sv() { + let sv1_id = BastionId::new(); + let sv2_id = BastionId::new(); + let path = BastionPath::root() + .append(BastionPathElement::Supervisor(sv1_id.clone())) + .unwrap() + .append(BastionPathElement::Supervisor(sv2_id.clone())) + .unwrap(); + assert_eq!( + path.iter().collect::>(), + vec![&sv1_id, &sv2_id] + ); + } + + #[test] + fn append_children_to_sv() { + let sv_id = BastionId::new(); + let children_id = BastionId::new(); + let path = BastionPath::root() + .append(BastionPathElement::Supervisor(sv_id.clone())) + .unwrap() + .append(BastionPathElement::Children(children_id.clone())) + .unwrap(); + assert_eq!( + path.iter().collect::>(), + vec![&sv_id, &children_id] + ); + } + + #[test] + fn append_child_to_sv() { + let sv_id = BastionId::new(); + let children_id = BastionId::new(); + let res = BastionPath::root() + .append(BastionPathElement::Supervisor(sv_id)) + .unwrap() + .append(BastionPathElement::Child(children_id)); + assert_eq!( + res.unwrap_err().to_string(), + "Child is not appendable to a supervisor" + ); + } + + // children + smth + + #[test] + fn append_sv_to_children() { + let sv_id = BastionId::new(); + let children_id = BastionId::new(); + let res = BastionPath::root() + .append(BastionPathElement::Supervisor(sv_id)) + .unwrap() + .append(BastionPathElement::Children(children_id)) + .unwrap() + .append(BastionPathElement::Supervisor(BastionId::new())); + assert_eq!( + res.unwrap_err().to_string(), + "Supervisor is not appendable to children" + ); + } + + #[test] + fn append_children_to_children() { + let sv_id = BastionId::new(); + let children_id = BastionId::new(); + let res = BastionPath::root() + .append(BastionPathElement::Supervisor(sv_id)) + .unwrap() + .append(BastionPathElement::Children(children_id)) + .unwrap() + .append(BastionPathElement::Children(BastionId::new())); + assert_eq!( + res.unwrap_err().to_string(), + "Children is not appendable to children" + ); + } + + #[test] + fn append_child_to_children() { + let sv_id = BastionId::new(); + let children_id = BastionId::new(); + let child_id = BastionId::new(); + let path = BastionPath::root() + .append(BastionPathElement::Supervisor(sv_id.clone())) + .unwrap() + .append(BastionPathElement::Children(children_id.clone())) + .unwrap() + .append(BastionPathElement::Child(child_id.clone())) + .unwrap(); + assert_eq!( + path.iter().collect::>(), + vec![&sv_id, &children_id, &child_id] + ); + } + + // child + smth + + #[test] + fn append_sv_to_child() { + let sv_id = BastionId::new(); + let children_id = BastionId::new(); + let child_id = BastionId::new(); + let res = BastionPath::root() + .append(BastionPathElement::Supervisor(sv_id)) + .unwrap() + .append(BastionPathElement::Children(children_id)) + .unwrap() + .append(BastionPathElement::Child(child_id)) + .unwrap() + .append(BastionPathElement::Supervisor(BastionId::new())); + assert_eq!( + res.unwrap_err().to_string(), + "Supervisor is not appendable to a child" + ); + } + + #[test] + fn append_children_to_child() { + let sv_id = BastionId::new(); + let children_id = BastionId::new(); + let child_id = BastionId::new(); + let res = BastionPath::root() + .append(BastionPathElement::Supervisor(sv_id)) + .unwrap() + .append(BastionPathElement::Children(children_id)) + .unwrap() + .append(BastionPathElement::Child(child_id)) + .unwrap() + .append(BastionPathElement::Children(BastionId::new())); + assert_eq!( + res.unwrap_err().to_string(), + "Children is not appendable to a child" + ); + } + + #[test] + fn append_child_to_child() { + let sv_id = BastionId::new(); + let children_id = BastionId::new(); + let child_id = BastionId::new(); + let res = BastionPath::root() + .append(BastionPathElement::Supervisor(sv_id)) + .unwrap() + .append(BastionPathElement::Children(children_id)) + .unwrap() + .append(BastionPathElement::Child(child_id)) + .unwrap() + .append(BastionPathElement::Child(BastionId::new())); + assert_eq!( + res.unwrap_err().to_string(), + "Child is not appendable to a child" + ); + } +} diff --git a/bastion/src/supervisor.rs b/bastion/src/supervisor.rs index efce501a..4411caf4 100644 --- a/bastion/src/supervisor.rs +++ b/bastion/src/supervisor.rs @@ -5,7 +5,9 @@ use crate::broadcast::{Broadcast, Parent, Sender}; use crate::callbacks::Callbacks; use crate::children::{Children, ChildrenRef}; use crate::context::BastionId; +use crate::envelope::Envelope; use crate::message::{BastionMessage, Deployment, Message}; +use crate::path::{BastionPath, BastionPathElement}; use bastion_executor::pool; use futures::prelude::*; use futures::stream::FuturesOrdered; @@ -15,6 +17,7 @@ use lightproc::prelude::*; use log::Level; use std::cmp::{Eq, PartialEq}; use std::ops::RangeFrom; +use std::sync::Arc; use std::task::Poll; #[derive(Debug)] @@ -80,7 +83,7 @@ pub struct Supervisor { // Messages that were received before the supervisor was // started. Those will be "replayed" once a start message // is received. - pre_start_msgs: Vec, + pre_start_msgs: Vec, started: bool, } @@ -92,6 +95,7 @@ pub struct Supervisor { pub struct SupervisorRef { id: BastionId, sender: Sender, + path: Arc, } #[derive(Debug, Clone)] @@ -268,8 +272,9 @@ impl Supervisor { // TODO: clone or ref? let id = self.bcast.id().clone(); let sender = self.bcast.sender().clone(); + let path = self.bcast.path().clone(); - SupervisorRef::new(id, sender) + SupervisorRef::new(id, sender, path) } /// Creates a new supervisor, passes it through the specified @@ -314,7 +319,7 @@ impl Supervisor { { debug!("Supervisor({}): Creating supervisor.", self.id()); let parent = Parent::supervisor(self.as_ref()); - let bcast = Broadcast::new(parent); + let bcast = Broadcast::new(parent, BastionPathElement::Supervisor(BastionId::new())); debug!( "Supervisor({}): Initializing Supervisor({}).", @@ -331,7 +336,8 @@ impl Supervisor { supervisor.id() ); let msg = BastionMessage::deploy_supervisor(supervisor); - self.bcast.send_self(msg); + let env = Envelope::new(msg, self.bcast.path().clone(), self.bcast.sender().clone()); + self.bcast.send_self(env); self } @@ -379,7 +385,7 @@ impl Supervisor { { debug!("Supervisor({}): Creating supervisor.", self.id()); let parent = Parent::supervisor(self.as_ref()); - let bcast = Broadcast::new(parent); + let bcast = Broadcast::new(parent, BastionPathElement::Supervisor(BastionId::new())); debug!( "Supervisor({}): Initializing Supervisor({}).", @@ -397,7 +403,8 @@ impl Supervisor { supervisor.id() ); let msg = BastionMessage::deploy_supervisor(supervisor); - self.bcast.send_self(msg); + let env = Envelope::new(msg, self.bcast.path().clone(), self.bcast.sender().clone()); + self.bcast.send_self(env); supervisor_ref } @@ -427,7 +434,7 @@ impl Supervisor { /// children.with_exec(|ctx: BastionContext| { /// async move { /// // Send and receive messages... - /// let opt_msg: Option = ctx.try_recv().await; + /// let opt_msg: Option = ctx.try_recv().await; /// /// // ...and return `Ok(())` or `Err(())` when you are done... /// Ok(()) @@ -453,7 +460,7 @@ impl Supervisor { { debug!("Supervisor({}): Creating children group.", self.id()); let parent = Parent::supervisor(self.as_ref()); - let bcast = Broadcast::new(parent); + let bcast = Broadcast::new(parent, BastionPathElement::Children(BastionId::new())); debug!( "Supervisor({}): Initializing Children({}).", @@ -472,7 +479,8 @@ impl Supervisor { children.id() ); let msg = BastionMessage::deploy_children(children); - self.bcast.send_self(msg); + let env = Envelope::new(msg, self.bcast.path().clone(), self.bcast.sender().clone()); + self.bcast.send_self(env); self } @@ -502,7 +510,7 @@ impl Supervisor { /// children.with_exec(|ctx: BastionContext| { /// async move { /// // Send and receive messages... - /// let opt_msg: Option = ctx.try_recv().await; + /// let opt_msg: Option = ctx.try_recv().await; /// /// // ...and return `Ok(())` or `Err(())` when you are done... /// Ok(()) @@ -529,7 +537,7 @@ impl Supervisor { { debug!("Supervisor({}): Creating children group.", self.id()); let parent = Parent::supervisor(self.as_ref()); - let bcast = Broadcast::new(parent); + let bcast = Broadcast::new(parent, BastionPathElement::Children(BastionId::new())); debug!( "Supervisor({}): Initializing Children({}).", @@ -549,7 +557,8 @@ impl Supervisor { children.id() ); let msg = BastionMessage::deploy_children(children); - self.bcast.send_self(msg); + let env = Envelope::new(msg, self.bcast.path().clone(), self.bcast.sender().clone()); + self.bcast.send_self(env); children_ref } @@ -677,7 +686,11 @@ impl Supervisor { supervised.callbacks().before_restart(); } - let bcast = Broadcast::new(parent.clone()); + let bcast = Broadcast::new( + parent.clone(), + supervised.elem().clone().with_id(BastionId::new()), + ); + reset.push(async move { debug!( "Supervisor({}): Resetting Supervised({}) (killed={}) to Supervised({}).", @@ -708,7 +721,9 @@ impl Supervisor { self.bcast.register(supervised.bcast()); if self.started { let msg = BastionMessage::start(); - self.bcast.send_child(supervised.id(), msg); + let env = + Envelope::new(msg, self.bcast.path().clone(), self.bcast.sender().clone()); + self.bcast.send_child(supervised.id(), env); } debug!( @@ -831,7 +846,8 @@ impl Supervisor { self.bcast.unregister(supervised.id()); let parent = Parent::supervisor(self.as_ref()); - let bcast = Broadcast::new(parent); + let bcast = + Broadcast::new(parent, supervised.elem().clone().with_id(BastionId::new())); let id = bcast.id().clone(); debug!( "Supervisor({}): Resetting Supervised({}) to Supervised({}).", @@ -846,7 +862,9 @@ impl Supervisor { self.bcast.register(supervised.bcast()); if self.started { let msg = BastionMessage::start(); - self.bcast.send_child(&id, msg); + let env = + Envelope::new(msg, self.bcast.path().clone(), self.bcast.sender().clone()); + self.bcast.send_child(&id, env); } debug!( @@ -876,22 +894,34 @@ impl Supervisor { Ok(()) } - async fn handle(&mut self, msg: BastionMessage) -> Result<(), ()> { - match msg { - BastionMessage::Start => unreachable!(), - BastionMessage::Stop => { + async fn handle(&mut self, env: Envelope) -> Result<(), ()> { + match env { + Envelope { + msg: BastionMessage::Start, + .. + } => unreachable!(), + Envelope { + msg: BastionMessage::Stop, + .. + } => { self.stop(0..).await; self.stopped(); return Err(()); } - BastionMessage::Kill => { + Envelope { + msg: BastionMessage::Kill, + .. + } => { self.kill(0..).await; self.stopped(); return Err(()); } - BastionMessage::Deploy(deployment) => { + Envelope { + msg: BastionMessage::Deploy(deployment), + .. + } => { let supervised = match deployment { Deployment::Supervisor(supervisor) => { debug!( @@ -916,7 +946,9 @@ impl Supervisor { self.bcast.register(supervised.bcast()); if self.started { let msg = BastionMessage::start(); - self.bcast.send_child(supervised.id(), msg); + let env = + Envelope::new(msg, self.bcast.path().clone(), self.bcast.sender().clone()); + self.bcast.send_child(supervised.id(), env); } debug!( @@ -931,8 +963,14 @@ impl Supervisor { self.order.push(id); } // FIXME - BastionMessage::Prune { .. } => unimplemented!(), - BastionMessage::SuperviseWith(strategy) => { + Envelope { + msg: BastionMessage::Prune { .. }, + .. + } => unimplemented!(), + Envelope { + msg: BastionMessage::SuperviseWith(strategy), + .. + } => { debug!( "Supervisor({}): Setting strategy: {:?}", self.id(), @@ -940,15 +978,21 @@ impl Supervisor { ); self.strategy = strategy; } - BastionMessage::Message(ref message) => { + Envelope { + msg: BastionMessage::Message(ref message), + .. + } => { debug!( "Supervisor({}): Broadcasting a message: {:?}", self.id(), message ); - self.bcast.send_children(msg); + self.bcast.send_children(env); } - BastionMessage::Stopped { id } => { + Envelope { + msg: BastionMessage::Stopped { id }, + .. + } => { // FIXME: Err if None? if let Some((_, launched)) = self.launched.remove(&id) { debug!("Supervisor({}): Supervised({}) stopped.", self.id(), id); @@ -961,7 +1005,10 @@ impl Supervisor { self.stopped.insert(id, supervised); } } - BastionMessage::Faulted { id } => { + Envelope { + msg: BastionMessage::Faulted { id }, + .. + } => { if self.launched.contains_key(&id) { warn!("Supervisor({}): Supervised({}) faulted.", self.id(), id); } @@ -984,7 +1031,10 @@ impl Supervisor { loop { match poll!(&mut self.bcast.next()) { // TODO: Err if started == true? - Poll::Ready(Some(BastionMessage::Start)) => { + Poll::Ready(Some(Envelope { + msg: BastionMessage::Start, + .. + })) => { trace!( "Supervisor({}): Received a new message (started=false): {:?}", self.id(), @@ -994,7 +1044,9 @@ impl Supervisor { self.started = true; let msg = BastionMessage::start(); - self.bcast.send_children(msg); + let env = + Envelope::new(msg, self.bcast.path().clone(), self.bcast.sender().clone()); + self.bcast.send_children(env); let msgs = self.pre_start_msgs.drain(..).collect::>(); self.pre_start_msgs.shrink_to_fit(); @@ -1045,8 +1097,8 @@ impl Supervisor { } impl SupervisorRef { - pub(crate) fn new(id: BastionId, sender: Sender) -> Self { - SupervisorRef { id, sender } + pub(crate) fn new(id: BastionId, sender: Sender, path: Arc) -> Self { + SupervisorRef { id, sender, path } } /// Returns the identifier of the supervisor this `SupervisorRef` @@ -1120,7 +1172,7 @@ impl SupervisorRef { { debug!("SupervisorRef({}): Creating supervisor.", self.id()); let parent = Parent::supervisor(self.clone()); - let bcast = Broadcast::new(parent); + let bcast = Broadcast::new(parent, BastionPathElement::Supervisor(BastionId::new())); debug!( "SupervisorRef({}): Initializing Supervisor({}).", @@ -1138,7 +1190,8 @@ impl SupervisorRef { supervisor.id() ); let msg = BastionMessage::deploy_supervisor(supervisor); - self.send(msg).map_err(|_| ())?; + let env = Envelope::new(msg, self.path.clone(), self.sender.clone()); + self.send(env).map_err(|_| ())?; Ok(supervisor_ref) } @@ -1169,7 +1222,7 @@ impl SupervisorRef { /// children.with_exec(|ctx: BastionContext| { /// async move { /// // Send and receive messages... - /// let opt_msg: Option = ctx.try_recv().await; + /// let opt_msg: Option = ctx.try_recv().await; /// /// // ...and return `Ok(())` or `Err(())` when you are done... /// Ok(()) @@ -1188,12 +1241,19 @@ impl SupervisorRef { /// [`Children`]: children/struct.Children.html /// [`ChildrenRef`]: children/struct.ChildrenRef.html pub fn children(&self, init: C) -> Result + where + C: FnOnce(Children) -> Children, + { + self.children_with_id(BastionId::new(), init) + } + + pub(crate) fn children_with_id(&self, id: BastionId, init: C) -> Result where C: FnOnce(Children) -> Children, { debug!("SupervisorRef({}): Creating children group.", self.id()); let parent = Parent::supervisor(self.clone()); - let bcast = Broadcast::new(parent); + let bcast = Broadcast::new(parent, BastionPathElement::Children(id)); debug!( "SupervisorRef({}): Initializing Children({}).", @@ -1213,7 +1273,8 @@ impl SupervisorRef { children.id() ); let msg = BastionMessage::deploy_children(children); - self.send(msg).map_err(|_| ())?; + let env = Envelope::new(msg, self.path.clone(), self.sender.clone()); + self.send(env).map_err(|_| ())?; Ok(children_ref) } @@ -1276,7 +1337,8 @@ impl SupervisorRef { strategy ); let msg = BastionMessage::supervise_with(strategy); - self.send(msg).map_err(|_| ()) + let env = Envelope::from_dead_letters(msg); + self.send(env).map_err(|_| ()) } /// Sends a message to the supervisor this `SupervisorRef` @@ -1334,8 +1396,9 @@ impl SupervisorRef { msg ); let msg = BastionMessage::broadcast(msg); + let env = Envelope::from_dead_letters(msg); // FIXME: panics? - self.send(msg).map_err(|msg| msg.into_msg().unwrap()) + self.send(env).map_err(|env| env.into_msg().unwrap()) } /// Sends a message to the supervisor this `SupervisorRef` @@ -1364,7 +1427,8 @@ impl SupervisorRef { pub fn stop(&self) -> Result<(), ()> { debug!("SupervisorRef({}): Stopping.", self.id()); let msg = BastionMessage::stop(); - self.send(msg).map_err(|_| ()) + let env = Envelope::from_dead_letters(msg); + self.send(env).map_err(|_| ()) } /// Sends a message to the supervisor this `SupervisorRef` @@ -1393,15 +1457,20 @@ impl SupervisorRef { pub fn kill(&self) -> Result<(), ()> { debug!("SupervisorRef({}): Killing.", self.id()); let msg = BastionMessage::kill(); - self.send(msg).map_err(|_| ()) + let env = Envelope::from_dead_letters(msg); + self.send(env).map_err(|_| ()) } - pub(crate) fn send(&self, msg: BastionMessage) -> Result<(), BastionMessage> { - trace!("SupervisorRef({}): Sending message: {:?}", self.id(), msg); + pub(crate) fn send(&self, env: Envelope) -> Result<(), Envelope> { + trace!("SupervisorRef({}): Sending message: {:?}", self.id(), env); self.sender - .unbounded_send(msg) + .unbounded_send(env) .map_err(|err| err.into_inner()) } + + pub(crate) fn path(&self) -> &Arc { + &self.path + } } impl Supervised { @@ -1458,6 +1527,16 @@ impl Supervised { } } + pub(crate) fn elem(&self) -> &BastionPathElement { + match self { + // FIXME + Supervised::Supervisor(supervisor) => { + supervisor.bcast().path().elem().as_ref().unwrap() + } + Supervised::Children(children) => children.bcast().path().elem().as_ref().unwrap(), + } + } + fn callbacks(&self) -> &Callbacks { match self { Supervised::Supervisor(supervisor) => supervisor.callbacks(), diff --git a/bastion/src/system.rs b/bastion/src/system.rs index 15a7bbb9..3bb8424f 100644 --- a/bastion/src/system.rs +++ b/bastion/src/system.rs @@ -1,6 +1,9 @@ use crate::broadcast::{Broadcast, Parent, Sender}; -use crate::context::{BastionId, NIL_ID}; +use crate::children::ChildrenRef; +use crate::context::{BastionContext, BastionId, NIL_ID}; +use crate::envelope::Envelope; use crate::message::{BastionMessage, Deployment}; +use crate::path::{BastionPath, BastionPathElement}; use crate::supervisor::{Supervisor, SupervisorRef}; use bastion_executor::pool; use futures::prelude::*; @@ -10,11 +13,14 @@ use fxhash::{FxHashMap, FxHashSet}; use lazy_static::lazy_static; use lightproc::prelude::*; use qutex::Qutex; +use std::sync::Arc; use std::task::Poll; pub(crate) struct GlobalSystem { sender: Sender, supervisor: SupervisorRef, + dead_letters: ChildrenRef, + path: Arc, handle: Qutex>>, } @@ -29,18 +35,26 @@ struct System { // TODO: set limit restart: FxHashSet, waiting: FuturesUnordered>, - pre_start_msgs: Vec, + pre_start_msgs: Vec, started: bool, } impl GlobalSystem { - fn new(sender: Sender, supervisor: SupervisorRef, handle: RecoverableHandle<()>) -> Self { + fn new( + sender: Sender, + supervisor: SupervisorRef, + dead_letters: ChildrenRef, + handle: RecoverableHandle<()>, + ) -> Self { let handle = Some(handle); let handle = Qutex::new(handle); + let path = Arc::new(BastionPath::root()); GlobalSystem { sender, supervisor, + dead_letters, + path, handle, } } @@ -53,16 +67,24 @@ impl GlobalSystem { &self.supervisor } + pub(crate) fn dead_letters(&self) -> &ChildrenRef { + &self.dead_letters + } + pub(crate) fn handle(&self) -> Qutex>> { self.handle.clone() } + + pub(crate) fn path(&self) -> &Arc { + &self.path + } } impl System { fn init() -> GlobalSystem { info!("System: Initializing."); let parent = Parent::none(); - let bcast = Broadcast::with_id(parent, NIL_ID); + let bcast = Broadcast::new_root(parent); let launched = FxHashMap::default(); let restart = FxHashSet::default(); let waiting = FuturesUnordered::new(); @@ -82,19 +104,27 @@ impl System { debug!("System: Creating the system supervisor."); let parent = Parent::system(); - let bcast = Broadcast::with_id(parent, NIL_ID); + let bcast = Broadcast::new(parent, BastionPathElement::Supervisor(NIL_ID)); let supervisor = Supervisor::system(bcast); let supervisor_ref = supervisor.as_ref(); let msg = BastionMessage::deploy_supervisor(supervisor); - system.bcast.send_self(msg); + let env = Envelope::new( + msg, + system.bcast.path().clone(), + system.bcast.sender().clone(), + ); + system.bcast.send_self(env); debug!("System: Launching."); let stack = system.stack(); let handle = pool::spawn(system.run(), stack); - GlobalSystem::new(sender, supervisor_ref, handle) + let dead_letters_ref = + Self::spawn_dead_letters(&supervisor_ref).expect("Can't spawn dead letters"); + + GlobalSystem::new(sender, supervisor_ref, dead_letters_ref, handle) } fn stack(&self) -> ProcStack { @@ -102,6 +132,19 @@ impl System { ProcStack::default() } + fn spawn_dead_letters(root_sv: &SupervisorRef) -> Result { + root_sv.children_with_id(NIL_ID, |children| { + children.with_exec(|ctx: BastionContext| { + async move { + loop { + let smsg = ctx.recv().await?; + debug!("Received dead letter: {:?}", smsg); + } + } + }) + }) + } + // TODO: set a limit? async fn recover(&mut self, mut supervisor: Supervisor) { warn!("System: Recovering Supervisor({}).", supervisor.id()); @@ -111,7 +154,10 @@ impl System { let bcast = if supervisor.id() == &NIL_ID { None } else { - Some(Broadcast::new(parent)) + Some(Broadcast::new( + parent, + BastionPathElement::Supervisor(BastionId::new()), + )) }; supervisor.reset(bcast).await; @@ -175,10 +221,16 @@ impl System { } } - async fn handle(&mut self, msg: BastionMessage) -> Result<(), ()> { - match msg { - BastionMessage::Start => unreachable!(), - BastionMessage::Stop => { + async fn handle(&mut self, env: Envelope) -> Result<(), ()> { + match env { + Envelope { + msg: BastionMessage::Start, + .. + } => unreachable!(), + Envelope { + msg: BastionMessage::Stop, + .. + } => { info!("System: Stopping."); for supervisor in self.stop().await { supervisor.callbacks().after_stop(); @@ -186,13 +238,19 @@ impl System { return Err(()); } - BastionMessage::Kill => { + Envelope { + msg: BastionMessage::Kill, + .. + } => { info!("System: Killing."); self.kill().await; return Err(()); } - BastionMessage::Deploy(deployment) => match deployment { + Envelope { + msg: BastionMessage::Deploy(deployment), + .. + } => match deployment { Deployment::Supervisor(supervisor) => { debug!("System: Deploying Supervisor({}).", supervisor.id()); supervisor.callbacks().before_start(); @@ -200,7 +258,12 @@ impl System { self.bcast.register(supervisor.bcast()); if self.started { let msg = BastionMessage::start(); - self.bcast.send_child(supervisor.id(), msg); + let envelope = Envelope::new( + msg, + self.bcast.path().clone(), + self.bcast.sender().clone(), + ); + self.bcast.send_child(supervisor.id(), envelope); } info!("System: Launching Supervisor({}).", supervisor.id()); @@ -211,7 +274,10 @@ impl System { // FIXME Deployment::Children(_) => unimplemented!(), }, - BastionMessage::Prune { id } => { + Envelope { + msg: BastionMessage::Prune { id }, + .. + } => { // TODO: Err if None? if let Some(launched) = self.launched.remove(&id) { // TODO: stop or kill? @@ -221,12 +287,21 @@ impl System { } } // FIXME - BastionMessage::SuperviseWith(_) => unimplemented!(), - BastionMessage::Message(ref message) => { + Envelope { + msg: BastionMessage::SuperviseWith(_), + .. + } => unimplemented!(), + Envelope { + msg: BastionMessage::Message(ref message), + .. + } => { debug!("System: Broadcasting a message: {:?}", message); - self.bcast.send_children(msg); + self.bcast.send_children(env); } - BastionMessage::Stopped { id } => { + Envelope { + msg: BastionMessage::Stopped { id }, + .. + } => { // TODO: Err if None? if let Some(launched) = self.launched.remove(&id) { info!("System: Supervisor({}) stopped.", id); @@ -234,7 +309,10 @@ impl System { self.restart.remove(&id); } } - BastionMessage::Faulted { id } => { + Envelope { + msg: BastionMessage::Faulted { id }, + .. + } => { // TODO: Err if None? if let Some(launched) = self.launched.remove(&id) { warn!("System: Supervisor({}) faulted.", id); @@ -270,7 +348,10 @@ impl System { match poll!(&mut self.bcast.next()) { // TODO: Err if started == true? - Poll::Ready(Some(BastionMessage::Start)) => { + Poll::Ready(Some(Envelope { + msg: BastionMessage::Start, + .. + })) => { trace!( "System: Received a new message (started=false): {:?}", BastionMessage::Start @@ -279,7 +360,9 @@ impl System { self.started = true; let msg = BastionMessage::start(); - self.bcast.send_children(msg); + let env = + Envelope::new(msg, self.bcast.path().clone(), self.bcast.sender().clone()); + self.bcast.send_children(env); let msgs = self.pre_start_msgs.drain(..).collect::>(); self.pre_start_msgs.shrink_to_fit(); diff --git a/bastion/tests/message_signatures.rs b/bastion/tests/message_signatures.rs new file mode 100644 index 00000000..14127f98 --- /dev/null +++ b/bastion/tests/message_signatures.rs @@ -0,0 +1,79 @@ +extern crate bastion; + +use bastion::prelude::*; +use std::panic; + +fn init_start() { + Bastion::init(); + Bastion::start(); +} + +fn spawn_responders() -> ChildrenRef { + Bastion::children(|children: Children| { + children.with_exec(move |ctx: BastionContext| { + async move { + msg! { ctx.recv().await?, + msg: &'static str =!> { + match msg { + "Hello" => { + assert!(signature!().is_sender_identified(), false); + answer!(ctx, "Goodbye").unwrap(); + }, + _ => (), + } + }; + _: _ => (); + } + + msg! { ctx.recv().await?, + msg: &'static str => { + match msg { + "Hi again" => { + let sign = signature!(); + ctx.tell(&sign, "Farewell").unwrap(); + }, + _ => (), + } + }; + _: _ => (); + } + + Ok(()) + } + }) + }) + .expect("Couldn't create the children group.") +} + +#[test] +fn answer_and_tell_signatures() { + init_start(); + Bastion::spawn(|ctx: BastionContext| { + async move { + let responders = spawn_responders(); + let responder = &responders.elems()[0]; + let answer = ctx.ask(&responder.addr(), "Hello").unwrap(); + let (msg, sign) = answer.await?.extract(); + let msg: &str = msg.downcast().unwrap(); + assert_eq!(msg, "Goodbye"); + + let path = sign.path(); + let elem = path.elem().as_ref().expect("elem is not present"); + assert!(elem.is_child()); + ctx.tell(&sign, "Hi again").unwrap(); + + let (msg, _) = ctx.recv().await?.extract(); + let msg: &str = msg.downcast().unwrap(); + assert_eq!(msg, "Farewell"); + + Bastion::stop(); + + Ok(()) + } + }) + .unwrap(); + + Bastion::block_until_stopped(); +} + +// TODO: anonymous signatures Bastion::* methods diff --git a/bastion/tests/prop_children_message.rs b/bastion/tests/prop_children_message.rs index 09e087b1..b3f2cdcd 100644 --- a/bastion/tests/prop_children_message.rs +++ b/bastion/tests/prop_children_message.rs @@ -23,13 +23,12 @@ proptest! { async move { let message: &'static str = Box::leak(message.into_boxed_str()); let answer = ctx - .current() - .ask(message) + .ask(&ctx.current().addr(), message) .expect("Couldn't send the message."); msg! { ctx.recv().await?, msg: &'static str =!> { - let _ = answer!(msg); + let _ = answer!(ctx, msg); }; _: _ => (); }