Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds debug arg to the CLI #71

Merged
merged 3 commits into from
Aug 24, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 14 additions & 22 deletions sim-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ impl Simulation {
self.activity.len(),
self.nodes.len()
);
let mut tasks = JoinSet::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noice 🥇


// Before we start the simulation up, start tasks that will be responsible for gathering simulation data.
// The action channels are shared across our functionality:
Expand All @@ -241,30 +242,24 @@ impl Simulation {
// - Action Receiver: used by data reporting to receive events that have been simulated that need to be
// tracked and recorded.
let (action_sender, action_receiver) = channel(1);
let mut record_data_set = self.run_data_collection(action_receiver);
self.run_data_collection(action_receiver, &mut tasks);

// Next, we'll spin up our actual activity generator that will be responsible for triggering the activity that
// has been configured, passing in the channel that is used to notify data collection that actions have been
// generated.
let mut generate_activity_set = self.generate_activity(action_sender).await?;
self.generate_activity(action_sender, &mut tasks).await?;

// We always want to wait ofr all threads to exit, so we wait for all of them to exit and track any errors
// that surface. It's okay if there are multiple and one is overwritten, we just want to know whether we
// exited with an error or not.
// TODO: more succinct handling of tasks here.
let mut success = true;
while let Some(res) = record_data_set.join_next().await {
if let Err(e) = res {
log::error!("Task exited with error: {e}");
success = false;
}
}
while let Some(res) = generate_activity_set.join_next().await {
while let Some(res) = tasks.join_next().await {
if let Err(e) = res {
log::error!("Task exited with error: {e}");
success = false;
}
}

success.then_some(()).ok_or(SimulationError::TaskError)
}

Expand All @@ -277,33 +272,30 @@ impl Simulation {
fn run_data_collection(
&self,
action_receiver: Receiver<ActionOutcome>,
) -> tokio::task::JoinSet<()> {
tasks: &mut JoinSet<()>,
) {
let listener = self.shutdown_listener.clone();

log::debug!("Simulator data recording starting.");
let mut set = JoinSet::new();

// Create a sender/receiver pair that will be used to report final results of action outcomes.
let (results_sender, results_receiver) = channel(1);

set.spawn(produce_simulation_results(
tasks.spawn(produce_simulation_results(
self.nodes.clone(),
action_receiver,
results_sender,
listener.clone(),
));

set.spawn(consume_simulation_results(results_receiver, listener));

tasks.spawn(consume_simulation_results(results_receiver, listener));
log::debug!("Simulator data recording exiting.");
set
}

async fn generate_activity(
&self,
executed_actions: Sender<ActionOutcome>,
) -> Result<JoinSet<()>, SimulationError> {
let mut set = JoinSet::new();
tasks: &mut JoinSet<()>,
) -> Result<(), SimulationError> {
let shutdown = self.shutdown_trigger.clone();
let listener = self.shutdown_listener.clone();

Expand All @@ -325,7 +317,7 @@ impl Simulation {

// Generate a consumer for the receiving end of the channel. It takes the event receiver that it'll pull
// events from and the results sender to report the events it has triggered for further monitoring.
set.spawn(consume_events(
tasks.spawn(consume_events(
node.clone(),
receiver,
executed_actions.clone(),
Expand All @@ -338,15 +330,15 @@ impl Simulation {

for description in self.activity.iter() {
let sender_chan = producer_channels.get(&description.source).unwrap();
set.spawn(produce_events(
tasks.spawn(produce_events(
*description,
sender_chan.clone(),
shutdown.clone(),
listener.clone(),
));
}

Ok(set)
Ok(())
}
}

Expand Down