Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

detect unconsumed/unsent messages, introduce generated dummy messages #15

Merged
merged 5 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion orchestra/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ name = "duo"
crate-type = ["bin"]

[[example]]
name = "solo"
name = "dig"
crate-type = ["bin"]

[features]
Expand Down
47 changes: 34 additions & 13 deletions orchestra/examples/solo.rs → orchestra/examples/dig.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,57 @@ mod misc;
pub use self::misc::*;

#[orchestra(signal=SigSigSig, event=EvX, error=Yikes, gen=AllMessages)]
struct Solo<T> {
#[subsystem(consumes: Plinko, sends: [MsgStrukt])]
struct Dig {
#[subsystem(consumes: Plinko)]
goblin_tower: GoblinTower,

#[subsystem(sends: [Plinko])]
goldmine: Goldmine,
}

use self::messages::*;

#[derive(Default)]
pub struct Fortified;

#[orchestra::subsystem(GoblinTower, error=Yikes)]
impl<Context> Fortified {
fn start(self, mut ctx: Context) -> SpawnedSubsystem<Yikes> {
let mut sender = ctx.sender().clone();
ctx.spawn(
"GoblinTower",
Box::pin(async move {
sender.send_message(MsgStrukt(8u8)).await;
SpawnedSubsystem {
name: "GoblinTower",
future: Box::pin(async move {
while let Ok(FromOrchestra::Communication { msg: _ }) = ctx.recv().await {
println!("Look a plinko!")
}
Ok(())
}),
)
.unwrap();
unimplemented!("welcum")
}
}
}

#[derive(Default)]
pub struct DragonsLair;

#[orchestra::subsystem(Goldmine, error=Yikes)]
impl<Context> DragonsLair {
fn start(self, mut ctx: Context) -> SpawnedSubsystem<Yikes> {
let mut sender = ctx.sender().clone();
let future = Box::pin(async move {
sender.send_message(Plinko).await;
Ok(())
});

SpawnedSubsystem { name: "RedThorntail", future }
}
}

async fn setup() {
let builder = Solo::builder();
let builder = Dig::builder();

let builder = builder.goblin_tower(Fortified::default());

let builder = builder.goldmine(DragonsLair::default());
let builder = builder.spawner(DummySpawner);
let (orchestra, _handle): (Solo<_>, _) = builder.build().unwrap();
let (orchestra, _handle) = builder.build().unwrap();

let orchestra_fut = orchestra
.running_subsystems
Expand Down
4 changes: 2 additions & 2 deletions orchestra/proc-macro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ targets = ["x86_64-unknown-linux-gnu"]
proc-macro = true

[dependencies]
syn = { version = "1.0.95", features = ["full", "extra-traits"] }
syn = { version = "1.0.105", features = ["full", "extra-traits"] }
quote = "1.0.20"
proc-macro2 = "1.0.43"
proc-macro2 = { version = "1.0.47", features = ["span-locations"] }
proc-macro-crate = "1.1.3"
expander = { version = "0.0.6", default-features = false }
petgraph = "0.6.0"
Expand Down
10 changes: 6 additions & 4 deletions orchestra/proc-macro/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@ impl<'a> ConnectionGraph<'a> {
for outgoing in ssf.messages_to_send.iter() {
outgoing_lut.entry(outgoing).or_default().push((&ssf.generic, node_index));
}
if let Some(_first_consument) =
consuming_lut.insert(&ssf.message_to_consume, (&ssf.generic, node_index))
{
// bail, two subsystems consuming the same message
if let Some(ref consumes) = ssf.message_to_consume {
if let Some(_first_consument) =
consuming_lut.insert(consumes, (&ssf.generic, node_index))
{
// bail, two subsystems consuming the same message
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions orchestra/proc-macro/src/impl_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream {
let subsystem_generics = &info.subsystem_generic_types();

let consumes = &info.consumes_without_wip();
let channel_name = &info.channel_names_without_wip("");
let channel_name = &info.channel_names_without_wip(None);
let channel_name_unbounded = &info.channel_names_without_wip("_unbounded");

let channel_name_tx = &info.channel_names_without_wip("_tx");
Expand Down Expand Up @@ -107,7 +107,7 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream {
info.subsystems().iter().filter(|ssf| !ssf.wip).enumerate().map(|(idx, ssf)| {
let field_name = &ssf.name;
let field_type = &ssf.generic;
let subsystem_consumes = &ssf.message_to_consume;
let subsystem_consumes = &ssf.message_to_consume();
// Remove state generic for the item to be replaced. It sufficient to know `field_type` for
// that since we always move from `Init<#field_type>` to `Init<NEW>`.
let impl_subsystem_state_generics = recollect_without_idx(&subsystem_passthrough_state_generics[..], idx);
Expand Down Expand Up @@ -314,7 +314,7 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream {
.iter()
.map(|ssf| {
let field_type = &ssf.generic;
let consumes = &ssf.message_to_consume;
let consumes = &ssf.message_to_consume();
let subsystem_sender_trait = format_ident!("{}SenderTrait", ssf.generic);
let subsystem_ctx_trait = format_ident!("{}ContextTrait", ssf.generic);
quote! {
Expand Down
2 changes: 1 addition & 1 deletion orchestra/proc-macro/src/impl_channels_out.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use super::*;
pub(crate) fn impl_channels_out_struct(info: &OrchestraInfo) -> Result<proc_macro2::TokenStream> {
let message_wrapper = info.message_wrapper.clone();

let channel_name = &info.channel_names_without_wip("");
let channel_name = &info.channel_names_without_wip(None);
let channel_name_unbounded = &info.channel_names_without_wip("_unbounded");

let consumes = &info.consumes_without_wip();
Expand Down
52 changes: 51 additions & 1 deletion orchestra/proc-macro/src/impl_message_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;

use itertools::Itertools;
use quote::quote;
use syn::{spanned::Spanned, Result};

Expand Down Expand Up @@ -50,7 +53,7 @@ pub(crate) fn impl_message_wrapper_enum(info: &OrchestraInfo) -> Result<proc_mac
(TokenStream::new(), TokenStream::new())
};

let ts = quote! {
let mut ts = quote! {
/// Generated message type wrapper over all possible messages
/// used by any subsystem.
#[allow(missing_docs)]
Expand Down Expand Up @@ -81,5 +84,52 @@ pub(crate) fn impl_message_wrapper_enum(info: &OrchestraInfo) -> Result<proc_mac
#outgoing_from_impl
};

// TODO it's not perfect, if the same type is used with different paths
// the detection will fail
let outgoing = HashSet::<&Path>::from_iter(
info.subsystems().iter().map(|ssf| ssf.messages_to_send.iter()).flatten(),
);
let incoming = HashSet::<&Path>::from_iter(
info.subsystems().iter().filter_map(|ssf| ssf.message_to_consume.as_ref()),
);

// Try to maintain the ordering according to the span start in the declaration.
fn cmp<'p, 'q>(a: &'p &&Path, b: &'q &&Path) -> std::cmp::Ordering {
a.span()
.start()
.partial_cmp(&b.span().start())
.unwrap_or(std::cmp::Ordering::Equal)
}

// sent but not received
for sbnr in outgoing.difference(&incoming).sorted_by(cmp) {
ts.extend(
syn::Error::new(
sbnr.span(),
format!(
"Message `{}` is sent but never received",
sbnr.get_ident()
.expect("Message is a path that must end in an identifier. qed")
),
)
.to_compile_error(),
);
}

// received but not sent
for rbns in incoming.difference(&outgoing).sorted_by(cmp) {
ts.extend(
syn::Error::new(
rbns.span(),
format!(
"Message `{}` is received but never sent",
rbns.get_ident()
.expect("Message is a path that must end in an identifier. qed")
),
)
.to_compile_error(),
);
}

Ok(ts)
}
21 changes: 19 additions & 2 deletions orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ pub(crate) fn impl_subsystem_types_all(info: &OrchestraInfo) -> Result<TokenStre
for ssf in info.subsystems() {
let subsystem_name = ssf.generic.to_string();
let outgoing_wrapper = &Ident::new(&(subsystem_name.clone() + "OutgoingMessages"), span);
let message_to_consume = ssf.message_to_consume();

let subsystem_ctx_trait = &Ident::new(&(subsystem_name.clone() + "ContextTrait"), span);
let subsystem_sender_trait = &Ident::new(&(subsystem_name.clone() + "SenderTrait"), span);
Expand All @@ -124,16 +125,32 @@ pub(crate) fn impl_subsystem_types_all(info: &OrchestraInfo) -> Result<TokenStre
subsystem_ctx_trait,
subsystem_sender_name,
subsystem_sender_trait,
&ssf.message_to_consume,
&message_to_consume,
&ssf.messages_to_send,
outgoing_wrapper,
));

ts.extend(impl_associate_outgoing_messages(&ssf.message_to_consume, &outgoing_wrapper));
ts.extend(impl_associate_outgoing_messages(&message_to_consume, &outgoing_wrapper));

ts.extend(impl_wrapper_enum(&outgoing_wrapper, ssf.messages_to_send.as_slice())?);
}

// generate the empty dummy messages, where needed
ts.extend({
let mut messages = TokenStream::new();
for ssf in info.subsystems() {
messages.extend(ssf.gen_dummy_message_ty());
}
quote! {
#[doc = r###"Generated dummy messages, only!

Meant to be used in conjunection with your own defined messages"###]
drahnr marked this conversation as resolved.
Show resolved Hide resolved
pub mod messages {
#messages
}
}
});

// impl the emtpy tuple handling for tests
let empty_tuple: Type = parse_quote! { () };
ts.extend(impl_subsystem_context_trait_for(
Expand Down
60 changes: 46 additions & 14 deletions orchestra/proc-macro/src/parse/parse_orchestra_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub(crate) struct SubSysField {
/// part.
pub(crate) generic: Ident,
/// Type of message to be consumed by the subsystem.
pub(crate) message_to_consume: Path,
pub(crate) message_to_consume: Option<Path>,
/// Types of messages to be sent by the subsystem.
pub(crate) messages_to_send: Vec<Path>,
/// If the subsystem implementation is blocking execution and hence
Expand All @@ -107,6 +107,41 @@ pub(crate) struct SubSysField {
pub(crate) wip: bool,
}

impl SubSysField {
pub(crate) fn dummy_msg_name(&self) -> Ident {
Ident::new(format!("{}Message", self.generic).as_str(), self.name.span())
}

/// Returns either the specified to be consumed messsage
/// or the generated dummy message.
pub(crate) fn message_to_consume(&self) -> Path {
if let Some(ref consumes) = self.message_to_consume {
consumes.clone()
} else {
Path::from(self.dummy_msg_name())
}
}

/// Generate the dummy message type if the subsystem does not consume one
///
/// Note: Only required to the internal structure anchoring everything to
/// the consuming message type. See #11 for a full solution.
pub(crate) fn gen_dummy_message_ty(&self) -> TokenStream {
if self.message_to_consume.is_none() {
let dummy_msg_ident = self.dummy_msg_name();
quote! {
#[doc =
r###"A dummy implementation to satisfy the current internal structure
and cannot be constructed delibarately, since it's not meant to be sent or used at all"###]
#[derive(Debug, Clone, Copy)]
pub enum #dummy_msg_ident {}
}
} else {
TokenStream::new()
}
}
}

// Converts a type enum to a path if this type is a TypePath
fn try_type_to_path(ty: &Type, span: Span) -> Result<Path> {
match ty {
Expand Down Expand Up @@ -425,13 +460,14 @@ impl OrchestraInfo {
}

pub(crate) fn any_message(&self) -> Vec<Path> {
self.subsystems
.iter()
.map(|ssf| ssf.message_to_consume.clone())
.collect::<Vec<_>>()
self.subsystems.iter().map(|ssf| ssf.message_to_consume()).collect::<Vec<_>>()
}

pub(crate) fn channel_names_without_wip(&self, suffix: &'static str) -> Vec<Ident> {
pub(crate) fn channel_names_without_wip(
&self,
suffix: impl Into<Option<&'static str>>,
) -> Vec<Ident> {
let suffix = suffix.into().unwrap_or("");
self.subsystems
.iter()
.filter(|ssf| !ssf.wip)
Expand All @@ -443,7 +479,7 @@ impl OrchestraInfo {
self.subsystems
.iter()
.filter(|ssf| !ssf.wip)
.map(|ssf| ssf.message_to_consume.clone())
.map(|ssf| ssf.message_to_consume())
.collect::<Vec<_>>()
}
}
Expand Down Expand Up @@ -487,8 +523,8 @@ impl OrchestraGuts {
)
})?;

// a `#[subsystem(..)]` annotation exists
if let Some((attr_tokens, span)) = subsystem_attr.next() {
// a `#[subsystem(..)]` annotation exists
if let Some((_attr_tokens2, span2)) = subsystem_attr.next() {
return Err({
let mut err = Error::new(span, "The first subsystem annotation is at");
Expand Down Expand Up @@ -528,12 +564,7 @@ impl OrchestraGuts {
} else {
vec![]
};
// messages deemed for consumption
let consumes = if let Some(consumes) = consumes {
consumes.consumes
} else {
return Err(Error::new(span, "Must provide exactly one consuming message type"))
};
let consumes = consumes.map(|consumes| consumes.consumes);

subsystems.push(SubSysField {
name: ident,
Expand All @@ -544,6 +575,7 @@ impl OrchestraGuts {
blocking,
});
} else {
// collect the "baggage"
let flattened = flatten_type(&ty, ident.span())?;
let generic_types = flattened
.iter()
Expand Down