Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message Signature to identify message sender and sending messages back #130

Merged
merged 1 commit into from
Dec 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bastion/examples/callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ fn sp_sp_ch(children: Children, children_ref: ChildrenRef) -> Children {
// This will send a message to "sp.ch", making it fault and making
// "sp" restart "sp.ch" and "sp.sp" (because its supervision strategy
// is "one-for-one")...
children_ref.elems()[0].tell(()).ok();
children_ref.elems()[0].tell_anonymously(()).ok();
});

children
Expand Down
6 changes: 3 additions & 3 deletions bastion/examples/getting_started.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,16 @@ fn main() {

// ...to then "tell" it messages...
child
.tell("A message containing data.")
.tell_anonymously("A message containing data.")
.expect("Couldn't send the message.");

// ...or "ask" it messages...
let answer: Answer = child
.ask("A message containing data.")
.ask_anonymously("A message containing data.")
.expect("Couldn't send the message.");
let _ = async {
// ...until the child eventually answers back...
let _answer: Result<Msg, ()> = answer.await;
let _answer: Result<SignedMessage, ()> = answer.await;
};

// ...and then even stop or kill it...
Expand Down
8 changes: 4 additions & 4 deletions bastion/examples/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ fn main() {
let response = escape(&compressed);
// println!("Response: {}", response);
stream.write(response.as_bytes()).unwrap();
answer!(stream);
answer!(ctx, stream);
};
_: _ => ();
}
Expand All @@ -59,7 +59,7 @@ fn main() {
//
// Server entrypoint
Bastion::children(|children: Children| {
children.with_exec(move |_ctx: BastionContext| {
children.with_exec(move |ctx: BastionContext| {
let workers = workers.clone();
async move {
println!("Server is starting!");
Expand All @@ -73,8 +73,8 @@ fn main() {
round_robin %= workers.elems().len();

// Distribute tcp streams
let _ = workers.elems()[round_robin]
.ask(stream.unwrap())
let _ = ctx
.ask(&workers.elems()[round_robin].addr(), stream.unwrap())
.unwrap()
.await;
}
Expand Down
6 changes: 3 additions & 3 deletions bastion/examples/parallel_computation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn main() {
msg: u64 =!> {
let data: u64 = msg.wrapping_mul(2);
println!("Child doubled the value of {} and gave {}", msg, data); // true
let _ = answer!(data);
let _ = answer!(ctx, data);
};
_: _ => ();
}
Expand All @@ -46,7 +46,7 @@ fn main() {
//
// Mapper that generates work.
Bastion::children(|children: Children| {
children.with_exec(move |_ctx: BastionContext| {
children.with_exec(move |ctx: BastionContext| {
let workers = workers.clone();
async move {
println!("Mapper started!");
Expand All @@ -58,7 +58,7 @@ fn main() {
for id_worker_pair in workers.elems().iter().enumerate() {
let data = cycle(id_worker_pair.0 as u64, 5);

let computed: Answer = id_worker_pair.1.ask(data).unwrap();
let computed: Answer = ctx.ask(&id_worker_pair.1.addr(), data).unwrap();
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
msg! { computed.await?,
msg: u64 => {
// Handle the answer...
Expand Down
5 changes: 2 additions & 3 deletions bastion/examples/send_recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@ fn main() {
println!("try_recv.is_some() == {}", try_recv.is_some()); // false

let answer = ctx
.current()
.ask("Hello World!")
.ask(&ctx.current().addr(), "Hello World!")
.expect("Couldn't send the message.");

msg! { ctx.recv().await?,
msg: &'static str =!> {
println!(r#"msg == "Hello World!" => {}"#, msg == "Hello World!"); // true
let _ = answer!("Goodbye!");
let _ = answer!(ctx, "Goodbye!");
};
// This won't happen because this example
// only "asks" a `&'static str`...
Expand Down
39 changes: 23 additions & 16 deletions bastion/src/bastion.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::broadcast::{Broadcast, Parent};
use crate::children::{Children, ChildrenRef};
use crate::config::Config;
use crate::context::BastionContext;
use crate::context::{BastionContext, BastionId};
use crate::envelope::Envelope;
use crate::message::{BastionMessage, Message};
use crate::path::BastionPathElement;
use crate::supervisor::{Supervisor, SupervisorRef};
use crate::system::SYSTEM;
use core::future::Future;
Expand Down Expand Up @@ -115,13 +117,13 @@ use std::thread;
/// let child = &elems[0];
///
/// // ...to then "tell" it messages...
/// child.tell("A message containing data.").expect("Couldn't send the message.");
/// child.tell_anonymously("A message containing data.").expect("Couldn't send the message.");
///
/// // ...or "ask" it messages...
/// let answer: Answer = child.ask("A message containing data.").expect("Couldn't send the message.");
/// let answer: Answer = child.ask_anonymously("A message containing data.").expect("Couldn't send the message.");
/// # async {
/// // ...until the child eventually answers back...
/// let answer: Result<Msg, ()> = answer.await;
/// let answer: Result<SignedMessage, ()> = answer.await;
/// # };
///
/// // ...and then even stop or kill it...
Expand Down Expand Up @@ -258,7 +260,7 @@ impl Bastion {
{
debug!("Bastion: Creating supervisor.");
let parent = Parent::system();
let bcast = Broadcast::new(parent);
let bcast = Broadcast::new(parent, BastionPathElement::Supervisor(BastionId::new()));

debug!("Bastion: Initializing Supervisor({}).", bcast.id());
let supervisor = Supervisor::new(bcast);
Expand All @@ -268,8 +270,9 @@ impl Bastion {

debug!("Bastion: Deploying Supervisor({}).", supervisor.id());
let msg = BastionMessage::deploy_supervisor(supervisor);
trace!("Bastion: Sending message: {:?}", msg);
SYSTEM.sender().unbounded_send(msg).map_err(|_| ())?;
let envelope = Envelope::new(msg, SYSTEM.path().clone(), SYSTEM.sender().clone());
trace!("Bastion: Sending envelope: {:?}", envelope);
SYSTEM.sender().unbounded_send(envelope).map_err(|_| ())?;

Ok(supervisor_ref)
}
Expand Down Expand Up @@ -303,7 +306,7 @@ impl Bastion {
/// children.with_exec(|ctx: BastionContext| {
/// async move {
/// // Send and receive messages...
/// let opt_msg: Option<Msg> = ctx.try_recv().await;
/// let opt_msg: Option<SignedMessage> = ctx.try_recv().await;
/// // ...and return `Ok(())` or `Err(())` when you are done...
/// Ok(())
///
Expand Down Expand Up @@ -423,11 +426,12 @@ impl Bastion {
pub fn broadcast<M: Message>(msg: M) -> Result<(), M> {
debug!("Bastion: Broadcasting message: {:?}", msg);
let msg = BastionMessage::broadcast(msg);
trace!("Bastion: Sending message: {:?}", msg);
let envelope = Envelope::from_dead_letters(msg);
trace!("Bastion: Sending envelope: {:?}", envelope);
// FIXME: panics?
SYSTEM
.sender()
.unbounded_send(msg)
.unbounded_send(envelope)
.map_err(|err| err.into_inner().into_msg().unwrap())
}

Expand Down Expand Up @@ -456,9 +460,10 @@ impl Bastion {
pub fn start() {
debug!("Bastion: Starting.");
let msg = BastionMessage::start();
trace!("Bastion: Sending message: {:?}", msg);
let envelope = Envelope::from_dead_letters(msg);
trace!("Bastion: Sending envelope: {:?}", envelope);
// FIXME: Err(Error)
SYSTEM.sender().unbounded_send(msg).ok();
SYSTEM.sender().unbounded_send(envelope).ok();
}

/// Sends a message to the system to tell it to stop
Expand Down Expand Up @@ -486,9 +491,10 @@ impl Bastion {
pub fn stop() {
debug!("Bastion: Stopping.");
let msg = BastionMessage::stop();
trace!("Bastion: Sending message: {:?}", msg);
let envelope = Envelope::from_dead_letters(msg);
trace!("Bastion: Sending envelope: {:?}", envelope);
// FIXME: Err(Error)
SYSTEM.sender().unbounded_send(msg).ok();
SYSTEM.sender().unbounded_send(envelope).ok();
}

/// Sends a message to the system to tell it to kill every
Expand All @@ -515,9 +521,10 @@ impl Bastion {
pub fn kill() {
debug!("Bastion: Killing.");
let msg = BastionMessage::kill();
trace!("Bastion: Sending message: {:?}", msg);
let envelope = Envelope::from_dead_letters(msg);
trace!("Bastion: Sending envelope: {:?}", envelope);
// FIXME: Err(Error)
SYSTEM.sender().unbounded_send(msg).ok();
SYSTEM.sender().unbounded_send(envelope).ok();

// FIXME: panics
let mut system = SYSTEM.handle().lock().wait().unwrap();
Expand Down
Loading