Skip to content

Commit

Permalink
API for indentifying message senders and send messages back
Browse files Browse the repository at this point in the history
  • Loading branch information
onsails committed Dec 15, 2019
1 parent 59b3147 commit d6c3d28
Show file tree
Hide file tree
Showing 16 changed files with 1,230 additions and 209 deletions.
2 changes: 1 addition & 1 deletion bastion/examples/getting_started.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ fn main() {
.expect("Couldn't send the message.");
let _ = async {
// ...until the child eventually answers back...
let _answer: Result<Msg, ()> = answer.await;
let _answer: Result<SignedMessage, ()> = answer.await;
};

// ...and then even stop or kill it...
Expand Down
2 changes: 1 addition & 1 deletion bastion/examples/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ fn main() {
let response = escape(&compressed);
// println!("Response: {}", response);
stream.write(response.as_bytes()).unwrap();
answer!(stream);
answer!(ctx, stream);
};
_: _ => ();
}
Expand Down
2 changes: 1 addition & 1 deletion bastion/examples/parallel_computation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn main() {
msg: u64 =!> {
let data: u64 = msg.wrapping_mul(2);
println!("Child doubled the value of {} and gave {}", msg, data); // true
let _ = answer!(data);
let _ = answer!(ctx, data);
};
_: _ => ();
}
Expand Down
2 changes: 1 addition & 1 deletion bastion/examples/send_recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn main() {
msg! { ctx.recv().await?,
msg: &'static str =!> {
println!(r#"msg == "Hello World!" => {}"#, msg == "Hello World!"); // true
let _ = answer!("Goodbye!");
let _ = answer!(ctx, "Goodbye!");
};
// This won't happen because this example
// only "asks" a `&'static str`...
Expand Down
35 changes: 21 additions & 14 deletions bastion/src/bastion.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::broadcast::{Broadcast, Parent};
use crate::children::{Children, ChildrenRef};
use crate::config::Config;
use crate::context::BastionContext;
use crate::context::{BastionContext, BastionId};
use crate::envelope::Envelope;
use crate::message::{BastionMessage, Message};
use crate::path::BastionPathElement;
use crate::supervisor::{Supervisor, SupervisorRef};
use crate::system::SYSTEM;
use core::future::Future;
Expand Down Expand Up @@ -121,7 +123,7 @@ use std::thread;
/// let answer: Answer = child.ask("A message containing data.").expect("Couldn't send the message.");
/// # async {
/// // ...until the child eventually answers back...
/// let answer: Result<Msg, ()> = answer.await;
/// let answer: Result<SignedMessage, ()> = answer.await;
/// # };
///
/// // ...and then even stop or kill it...
Expand Down Expand Up @@ -258,7 +260,7 @@ impl Bastion {
{
debug!("Bastion: Creating supervisor.");
let parent = Parent::system();
let bcast = Broadcast::new(parent);
let bcast = Broadcast::new(parent, BastionPathElement::Supervisor(BastionId::new()));

debug!("Bastion: Initializing Supervisor({}).", bcast.id());
let supervisor = Supervisor::new(bcast);
Expand All @@ -268,8 +270,9 @@ impl Bastion {

debug!("Bastion: Deploying Supervisor({}).", supervisor.id());
let msg = BastionMessage::deploy_supervisor(supervisor);
trace!("Bastion: Sending message: {:?}", msg);
SYSTEM.sender().unbounded_send(msg).map_err(|_| ())?;
let envelope = Envelope::identified(msg, SYSTEM.path().clone(), SYSTEM.sender().clone());
trace!("Bastion: Sending envelope: {:?}", envelope);
SYSTEM.sender().unbounded_send(envelope).map_err(|_| ())?;

Ok(supervisor_ref)
}
Expand Down Expand Up @@ -303,7 +306,7 @@ impl Bastion {
/// children.with_exec(|ctx: BastionContext| {
/// async move {
/// // Send and receive messages...
/// let opt_msg: Option<Msg> = ctx.try_recv().await;
/// let opt_msg: Option<SignedMessage> = ctx.try_recv().await;
/// // ...and return `Ok(())` or `Err(())` when you are done...
/// Ok(())
///
Expand Down Expand Up @@ -423,11 +426,12 @@ impl Bastion {
pub fn broadcast<M: Message>(msg: M) -> Result<(), M> {
debug!("Bastion: Broadcasting message: {:?}", msg);
let msg = BastionMessage::broadcast(msg);
trace!("Bastion: Sending message: {:?}", msg);
let envelope = Envelope::anonymous(msg);
trace!("Bastion: Sending envelope: {:?}", envelope);
// FIXME: panics?
SYSTEM
.sender()
.unbounded_send(msg)
.unbounded_send(envelope)
.map_err(|err| err.into_inner().into_msg().unwrap())
}

Expand Down Expand Up @@ -456,9 +460,10 @@ impl Bastion {
pub fn start() {
debug!("Bastion: Starting.");
let msg = BastionMessage::start();
trace!("Bastion: Sending message: {:?}", msg);
let envelope = Envelope::anonymous(msg);
trace!("Bastion: Sending envelope: {:?}", envelope);
// FIXME: Err(Error)
SYSTEM.sender().unbounded_send(msg).ok();
SYSTEM.sender().unbounded_send(envelope).ok();
}

/// Sends a message to the system to tell it to stop
Expand Down Expand Up @@ -486,9 +491,10 @@ impl Bastion {
pub fn stop() {
debug!("Bastion: Stopping.");
let msg = BastionMessage::stop();
trace!("Bastion: Sending message: {:?}", msg);
let envelope = Envelope::anonymous(msg);
trace!("Bastion: Sending envelope: {:?}", envelope);
// FIXME: Err(Error)
SYSTEM.sender().unbounded_send(msg).ok();
SYSTEM.sender().unbounded_send(envelope).ok();
}

/// Sends a message to the system to tell it to kill every
Expand All @@ -515,9 +521,10 @@ impl Bastion {
pub fn kill() {
debug!("Bastion: Killing.");
let msg = BastionMessage::kill();
trace!("Bastion: Sending message: {:?}", msg);
let envelope = Envelope::anonymous(msg);
trace!("Bastion: Sending envelope: {:?}", envelope);
// FIXME: Err(Error)
SYSTEM.sender().unbounded_send(msg).ok();
SYSTEM.sender().unbounded_send(envelope).ok();

// FIXME: panics
let mut system = SYSTEM.handle().lock().wait().unwrap();
Expand Down
Loading

0 comments on commit d6c3d28

Please sign in to comment.