Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose a mechanism to manually drive the Sim #76

Merged
merged 2 commits into from
Jan 18, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 80 additions & 71 deletions src/sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::ops::DerefMut;
use std::time::UNIX_EPOCH;
use tokio::time::Duration;

/// Network simulation
/// A handle for interacting with the simulation.
pub struct Sim<'a> {
/// Simulation configuration
config: Config,
Expand Down Expand Up @@ -192,93 +192,102 @@ impl<'a> Sim<'a> {
world.topology.set_link_fail_rate(a, b, value);
}

/// Run the simulation until all clients finish.
/// Run the simulation to completion.
///
/// For each runtime, we [`Rt::tick`] it forward, which allows time to
/// advance just a little bit. In this way, only one runtime is ever active.
/// The turmoil APIs operate on the active host, and so we remember which
/// host is active before yielding to user code.
///
/// If any client errors, the simulation returns early with that Error.
/// Executes a simple event loop that calls [step](#method.step) each iteration,
/// returning early if any host software errors.
pub fn run(&mut self) -> Result {
loop {
let is_finished = self.step()?;

if is_finished {
return Ok(());
}
}
}

/// Step the simulation.
///
/// Runs each host in the simulation a fixed duration configured by
/// `tick_duration` in the builder.
///
/// The simulated network also steps, processing in flight messages, and
/// delivering them to their destination if appropriate.
pub fn step(&mut self) -> Result<bool> {
let tick = self.config.tick;

loop {
let mut is_finished = true;
let mut finished = vec![];

// Tick the networking, processing messages. This is done before
// ticking any other runtime, as they might be waiting on network
// IO. (It also might be waiting on something else, such as time.)
self.world.borrow_mut().topology.tick_by(tick);

for (&addr, rt) in self.rts.iter() {
{
let mut world = self.world.borrow_mut();
// We need to move deliverable messages off the network and
// into the dst host. This requires two mutable borrows.
let World {
rng,
topology,
hosts,
..
} = world.deref_mut();
topology.deliver_messages(rng, hosts.get_mut(&addr).expect("missing host"));

// Set the current host (see method docs)
world.current = Some(addr);

world.current_host_mut().now(rt.now());
}
let mut is_finished = true;
let mut finished = vec![];

World::enter(&self.world, || rt.tick(tick));
// Tick the networking, processing messages. This is done before
// ticking any other runtime, as they might be waiting on network
// IO. (It also might be waiting on something else, such as time.)
self.world.borrow_mut().topology.tick_by(tick);

// Unset the current host
for (&addr, rt) in self.rts.iter() {
{
let mut world = self.world.borrow_mut();
world.current = None;
// We need to move deliverable messages off the network and
// into the dst host. This requires two mutable borrows.
let World {
rng,
topology,
hosts,
..
} = world.deref_mut();
topology.deliver_messages(rng, hosts.get_mut(&addr).expect("missing host"));

// Set the current host (see method docs)
world.current = Some(addr);

world.current_host_mut().now(rt.now());
}

world.tick(addr, tick);
World::enter(&self.world, || rt.tick(tick));

match rt {
Role::Client { handle, .. } => {
if handle.is_finished() {
finished.push(addr);
}
is_finished = is_finished && handle.is_finished();
}
Role::Simulated { handle, .. } => {
if handle.is_finished() {
finished.push(addr);
}
// Unset the current host
let mut world = self.world.borrow_mut();
world.current = None;

world.tick(addr, tick);

match rt {
Role::Client { handle, .. } => {
if handle.is_finished() {
finished.push(addr);
}
is_finished = is_finished && handle.is_finished();
}
}

self.elapsed += tick;

// Check finished clients and hosts for err results. Runtimes are removed at
// this stage.
for addr in finished.into_iter() {
if let Some(role) = self.rts.remove(&addr) {
let (rt, handle) = match role {
Role::Client { rt, handle } => (rt, handle),
Role::Simulated { rt, handle, .. } => (rt, handle),
};
rt.block_on(handle)??;
Role::Simulated { handle, .. } => {
if handle.is_finished() {
finished.push(addr);
}
}
}
}

if is_finished {
return Ok(());
self.elapsed += tick;

// Check finished clients and hosts for err results. Runtimes are removed at
// this stage.
for addr in finished.into_iter() {
if let Some(role) = self.rts.remove(&addr) {
let (rt, handle) = match role {
Role::Client { rt, handle } => (rt, handle),
Role::Simulated { rt, handle, .. } => (rt, handle),
};
rt.block_on(handle)??;
}
}

if self.elapsed > self.config.duration {
Err(format!(
"Ran for {:?} without completing",
self.config.duration
))?;
}
if self.elapsed > self.config.duration && !is_finished {
return Err(format!(
"Ran for {:?} without completing",
self.config.duration
))?;
}

Ok(is_finished)
}
}

Expand Down