Skip to content

Commit

Permalink
new messaging API which allows identifying message senders and send m…
Browse files Browse the repository at this point in the history
…essages back
  • Loading branch information
onsails committed Dec 16, 2019
1 parent 0416043 commit 6f1e16c
Show file tree
Hide file tree
Showing 17 changed files with 1,653 additions and 237 deletions.
2 changes: 1 addition & 1 deletion bastion/examples/callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ fn sp_sp_ch(children: Children, children_ref: ChildrenRef) -> Children {
// This will send a message to "sp.ch", making it fault and making
// "sp" restart "sp.ch" and "sp.sp" (because its supervision strategy
// is "one-for-one")...
children_ref.elems()[0].tell(()).ok();
children_ref.elems()[0].tell_anonymously(()).ok();
});

children
Expand Down
6 changes: 3 additions & 3 deletions bastion/examples/getting_started.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,16 @@ fn main() {

// ...to then "tell" it messages...
child
.tell("A message containing data.")
.tell_anonymously("A message containing data.")
.expect("Couldn't send the message.");

// ...or "ask" it messages...
let answer: Answer = child
.ask("A message containing data.")
.ask_anonymously("A message containing data.")
.expect("Couldn't send the message.");
let _ = async {
// ...until the child eventually answers back...
let _answer: Result<Msg, ()> = answer.await;
let _answer: Result<SignedMessage, ()> = answer.await;
};

// ...and then even stop or kill it...
Expand Down
8 changes: 4 additions & 4 deletions bastion/examples/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ fn main() {
let response = escape(&compressed);
// println!("Response: {}", response);
stream.write(response.as_bytes()).unwrap();
answer!(stream);
answer!(ctx, stream);
};
_: _ => ();
}
Expand All @@ -59,7 +59,7 @@ fn main() {
//
// Server entrypoint
Bastion::children(|children: Children| {
children.with_exec(move |_ctx: BastionContext| {
children.with_exec(move |ctx: BastionContext| {
let workers = workers.clone();
async move {
println!("Server is starting!");
Expand All @@ -73,8 +73,8 @@ fn main() {
round_robin %= workers.elems().len();

// Distribute tcp streams
let _ = workers.elems()[round_robin]
.ask(stream.unwrap())
let _ = ctx
.ask(&workers.elems()[round_robin].addr(), stream.unwrap())
.unwrap()
.await;
}
Expand Down
6 changes: 3 additions & 3 deletions bastion/examples/parallel_computation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn main() {
msg: u64 =!> {
let data: u64 = msg.wrapping_mul(2);
println!("Child doubled the value of {} and gave {}", msg, data); // true
let _ = answer!(data);
let _ = answer!(ctx, data);
};
_: _ => ();
}
Expand All @@ -46,7 +46,7 @@ fn main() {
//
// Mapper that generates work.
Bastion::children(|children: Children| {
children.with_exec(move |_ctx: BastionContext| {
children.with_exec(move |ctx: BastionContext| {
let workers = workers.clone();
async move {
println!("Mapper started!");
Expand All @@ -58,7 +58,7 @@ fn main() {
for id_worker_pair in workers.elems().iter().enumerate() {
let data = cycle(id_worker_pair.0 as u64, 5);

let computed: Answer = id_worker_pair.1.ask(data).unwrap();
let computed: Answer = ctx.ask(&id_worker_pair.1.addr(), data).unwrap();
msg! { computed.await?,
msg: u64 => {
// Handle the answer...
Expand Down
5 changes: 2 additions & 3 deletions bastion/examples/send_recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@ fn main() {
println!("try_recv.is_some() == {}", try_recv.is_some()); // false

let answer = ctx
.current()
.ask("Hello World!")
.ask(&ctx.current().addr(), "Hello World!")
.expect("Couldn't send the message.");

msg! { ctx.recv().await?,
msg: &'static str =!> {
println!(r#"msg == "Hello World!" => {}"#, msg == "Hello World!"); // true
let _ = answer!("Goodbye!");
let _ = answer!(ctx, "Goodbye!");
};
// This won't happen because this example
// only "asks" a `&'static str`...
Expand Down
39 changes: 23 additions & 16 deletions bastion/src/bastion.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::broadcast::{Broadcast, Parent};
use crate::children::{Children, ChildrenRef};
use crate::config::Config;
use crate::context::BastionContext;
use crate::context::{BastionContext, BastionId};
use crate::envelope::Envelope;
use crate::message::{BastionMessage, Message};
use crate::path::BastionPathElement;
use crate::supervisor::{Supervisor, SupervisorRef};
use crate::system::SYSTEM;
use core::future::Future;
Expand Down Expand Up @@ -115,13 +117,13 @@ use std::thread;
/// let child = &elems[0];
///
/// // ...to then "tell" it messages...
/// child.tell("A message containing data.").expect("Couldn't send the message.");
/// child.tell_anonymously("A message containing data.").expect("Couldn't send the message.");
///
/// // ...or "ask" it messages...
/// let answer: Answer = child.ask("A message containing data.").expect("Couldn't send the message.");
/// let answer: Answer = child.ask_anonymously("A message containing data.").expect("Couldn't send the message.");
/// # async {
/// // ...until the child eventually answers back...
/// let answer: Result<Msg, ()> = answer.await;
/// let answer: Result<SignedMessage, ()> = answer.await;
/// # };
///
/// // ...and then even stop or kill it...
Expand Down Expand Up @@ -258,7 +260,7 @@ impl Bastion {
{
debug!("Bastion: Creating supervisor.");
let parent = Parent::system();
let bcast = Broadcast::new(parent);
let bcast = Broadcast::new(parent, BastionPathElement::Supervisor(BastionId::new()));

debug!("Bastion: Initializing Supervisor({}).", bcast.id());
let supervisor = Supervisor::new(bcast);
Expand All @@ -268,8 +270,9 @@ impl Bastion {

debug!("Bastion: Deploying Supervisor({}).", supervisor.id());
let msg = BastionMessage::deploy_supervisor(supervisor);
trace!("Bastion: Sending message: {:?}", msg);
SYSTEM.sender().unbounded_send(msg).map_err(|_| ())?;
let envelope = Envelope::new(msg, SYSTEM.path().clone(), SYSTEM.sender().clone());
trace!("Bastion: Sending envelope: {:?}", envelope);
SYSTEM.sender().unbounded_send(envelope).map_err(|_| ())?;

Ok(supervisor_ref)
}
Expand Down Expand Up @@ -303,7 +306,7 @@ impl Bastion {
/// children.with_exec(|ctx: BastionContext| {
/// async move {
/// // Send and receive messages...
/// let opt_msg: Option<Msg> = ctx.try_recv().await;
/// let opt_msg: Option<SignedMessage> = ctx.try_recv().await;
/// // ...and return `Ok(())` or `Err(())` when you are done...
/// Ok(())
///
Expand Down Expand Up @@ -423,11 +426,12 @@ impl Bastion {
pub fn broadcast<M: Message>(msg: M) -> Result<(), M> {
debug!("Bastion: Broadcasting message: {:?}", msg);
let msg = BastionMessage::broadcast(msg);
trace!("Bastion: Sending message: {:?}", msg);
let envelope = Envelope::from_dead_letters(msg);
trace!("Bastion: Sending envelope: {:?}", envelope);
// FIXME: panics?
SYSTEM
.sender()
.unbounded_send(msg)
.unbounded_send(envelope)
.map_err(|err| err.into_inner().into_msg().unwrap())
}

Expand Down Expand Up @@ -456,9 +460,10 @@ impl Bastion {
pub fn start() {
debug!("Bastion: Starting.");
let msg = BastionMessage::start();
trace!("Bastion: Sending message: {:?}", msg);
let envelope = Envelope::from_dead_letters(msg);
trace!("Bastion: Sending envelope: {:?}", envelope);
// FIXME: Err(Error)
SYSTEM.sender().unbounded_send(msg).ok();
SYSTEM.sender().unbounded_send(envelope).ok();
}

/// Sends a message to the system to tell it to stop
Expand Down Expand Up @@ -486,9 +491,10 @@ impl Bastion {
pub fn stop() {
debug!("Bastion: Stopping.");
let msg = BastionMessage::stop();
trace!("Bastion: Sending message: {:?}", msg);
let envelope = Envelope::from_dead_letters(msg);
trace!("Bastion: Sending envelope: {:?}", envelope);
// FIXME: Err(Error)
SYSTEM.sender().unbounded_send(msg).ok();
SYSTEM.sender().unbounded_send(envelope).ok();
}

/// Sends a message to the system to tell it to kill every
Expand All @@ -515,9 +521,10 @@ impl Bastion {
pub fn kill() {
debug!("Bastion: Killing.");
let msg = BastionMessage::kill();
trace!("Bastion: Sending message: {:?}", msg);
let envelope = Envelope::from_dead_letters(msg);
trace!("Bastion: Sending envelope: {:?}", envelope);
// FIXME: Err(Error)
SYSTEM.sender().unbounded_send(msg).ok();
SYSTEM.sender().unbounded_send(envelope).ok();

// FIXME: panics
let mut system = SYSTEM.handle().lock().wait().unwrap();
Expand Down
Loading

0 comments on commit 6f1e16c

Please sign in to comment.