Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable tokio io driver #171

Merged
merged 14 commits into from
Apr 3, 2024
13 changes: 8 additions & 5 deletions src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use crate::*;
use std::{ops::RangeInclusive, time::SystemTime};

use rand::{RngCore, SeedableRng};
use std::{
ops::RangeInclusive,
time::{Duration, SystemTime},
};

use crate::*;

/// A builder that can be used to configure the simulation.
///
Expand Down Expand Up @@ -166,6 +164,11 @@ impl Builder {
self
}

pub fn enable_tokio_io(&mut self) -> &mut Self {
mcches marked this conversation as resolved.
Show resolved Hide resolved
self.config.enable_tokio_io = true;
self
}

/// Build a simulation with the settings from the builder.
///
/// This will use default rng with entropy from the device running.
Expand Down
4 changes: 4 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub(crate) struct Config {

/// Max size of the udp receive buffer
pub(crate) udp_capacity: usize,

/// Enables tokio IO driver
pub(crate) enable_tokio_io: bool,
}

/// Configures link behavior.
Expand Down Expand Up @@ -87,6 +90,7 @@ impl Default for Config {
ephemeral_ports: 49152..=65535,
tcp_capacity: 64,
udp_capacity: 64,
enable_tokio_io: false,
}
}
}
Expand Down
41 changes: 28 additions & 13 deletions src/rt.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::mem;
use std::pin::Pin;
use std::sync::Arc;

use super::Result;
use futures::Future;
use std::pin::Pin;
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;
use tokio::task::LocalSet;
use tokio::time::{sleep, Duration, Instant};
use tokio::time::{Duration, Instant, sleep};

use super::Result;

// To support re-creation, we need to store a factory of the future that
// represents the software. This is somewhat annoying in that it requires
Expand Down Expand Up @@ -48,14 +49,17 @@ pub(crate) struct Rt<'a> {
/// Optional handle to a host's software. When software finishes, the handle is
/// consumed to check for error, which is propagated up to fail the simulation.
handle: Option<JoinHandle<Result>>,

/// Configuration of simulation
mcches marked this conversation as resolved.
Show resolved Hide resolved
enable_io: bool,
}

impl<'a> Rt<'a> {
pub(crate) fn client<F>(nodename: Arc<str>, client: F) -> Self
pub(crate) fn client<F>(nodename: Arc<str>, client: F, enable_io: bool) -> Self
where
F: Future<Output = Result> + 'static,
{
let (tokio, local) = init();
let (tokio, local) = init(enable_io);

let handle = with(&tokio, &local, || tokio::task::spawn_local(client));

Expand All @@ -65,15 +69,16 @@ impl<'a> Rt<'a> {
local,
nodename,
handle: Some(handle),
enable_io,
}
}

pub(crate) fn host<F, Fut>(nodename: Arc<str>, software: F) -> Self
pub(crate) fn host<F, Fut>(nodename: Arc<str>, software: F, enable_io: bool) -> Self
where
F: Fn() -> Fut + 'a,
Fut: Future<Output = Result> + 'static,
{
let (tokio, local) = init();
let (tokio, local) = init(enable_io);

let software: Software = Box::new(move || Box::pin(software()));
let handle = with(&tokio, &local, || tokio::task::spawn_local(software()));
Expand All @@ -84,18 +89,20 @@ impl<'a> Rt<'a> {
local,
nodename,
handle: Some(handle),
enable_io,
}
}

pub(crate) fn no_software() -> Self {
let (tokio, local) = init();
let (tokio, local) = init(false);

Self {
kind: Kind::NoSoftware,
tokio,
local,
nodename: String::new().into(),
handle: None,
enable_io: false,
}
}

Expand Down Expand Up @@ -200,20 +207,28 @@ impl<'a> Rt<'a> {
///
/// Both the [`Runtime`] and [`LocalSet`] are replaced with new instances.
fn cancel_tasks(&mut self) {
let (tokio, local) = init();
let (tokio, local) = init(self.enable_io);

_ = mem::replace(&mut self.tokio, tokio);
drop(mem::replace(&mut self.local, local));
}
}

fn init() -> (Runtime, LocalSet) {
let mut builder = tokio::runtime::Builder::new_current_thread();
fn init(enable_io: bool) -> (Runtime, LocalSet) {
let mut tokio_builder = tokio::runtime::Builder::new_current_thread();

#[cfg(tokio_unstable)]
builder.unhandled_panic(tokio::runtime::UnhandledPanic::ShutdownRuntime);
tokio_builder.unhandled_panic(tokio::runtime::UnhandledPanic::ShutdownRuntime);

if enable_io {
tokio_builder.enable_io();
}

let tokio = builder.enable_time().start_paused(true).build().unwrap();
let tokio = tokio_builder
.enable_time()
.start_paused(true)
.build()
.unwrap();

tokio.block_on(async {
// Sleep to "round" `Instant::now()` to the closest `ms`
Expand Down
33 changes: 19 additions & 14 deletions src/sim.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use crate::host::HostTimer;
use crate::{for_pairs, Config, LinksIter, Result, Rt, ToIpAddr, ToIpAddrs, World, TRACING_TARGET};

use indexmap::IndexMap;
use std::cell::RefCell;
use std::future::Future;
use std::net::IpAddr;
use std::ops::DerefMut;
use std::sync::Arc;
use std::time::UNIX_EPOCH;

use indexmap::IndexMap;
use tokio::time::Duration;
use tracing::Level;

use crate::{Config, for_pairs, LinksIter, Result, Rt, ToIpAddr, ToIpAddrs, TRACING_TARGET, World};
use crate::host::HostTimer;

/// A handle for interacting with the simulation.
pub struct Sim<'a> {
/// Simulation configuration
Expand Down Expand Up @@ -87,7 +88,9 @@ impl<'a> Sim<'a> {
world.register(addr, &nodename, HostTimer::new(self.elapsed), &self.config);
}

let rt = World::enter(&self.world, || Rt::client(nodename, client));
let rt = World::enter(&self.world, || {
Rt::client(nodename, client, self.config.enable_tokio_io)
});

self.rts.insert(addr, rt);
}
Expand Down Expand Up @@ -120,7 +123,9 @@ impl<'a> Sim<'a> {
world.register(addr, &nodename, HostTimer::new(self.elapsed), &self.config);
}

let rt = World::enter(&self.world, || Rt::host(nodename, host));
let rt = World::enter(&self.world, || {
Rt::host(nodename, host, self.config.enable_tokio_io)
});

self.rts.insert(addr, rt);
}
Expand Down Expand Up @@ -400,23 +405,23 @@ mod test {
net::{IpAddr, Ipv4Addr},
rc::Rc,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
atomic::{AtomicU64, Ordering},
},
time::Duration,
};

use std::future;

use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::Semaphore,
time::Instant,
};

use crate::{
elapsed, hold, World,
net::{TcpListener, TcpStream},
sim_elapsed, Builder, Result,
Builder, elapsed,
hold,
net::{TcpListener, TcpStream}, Result, sim_elapsed, World,
};

#[test]
Expand Down Expand Up @@ -561,16 +566,16 @@ mod test {
}

/// This is a regression test to ensure it is safe to call sim_elapsed
/// if current world of host is not set.
/// if current world of host is not set.
#[test]
fn sim_elapsed_time() -> Result {
// Safe to call outside of simution while there
// Safe to call outside of simution while there
// is no current world set
assert!(sim_elapsed().is_none());

let sim = Builder::new().build();
// Safe to call while there is no current host set
World::enter(&sim.world, || assert!(sim_elapsed().is_none()));
World::enter(&sim.world, || assert!(sim_elapsed().is_none()));

Ok(())
}
Expand Down
42 changes: 42 additions & 0 deletions tests/tokio_io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use tokio::fs::remove_file;
use tokio::net::UnixListener;
use tokio_test::assert_err;

use turmoil::Builder;

/// test assumes IO operation (binding unix domain socket) will succeed
#[test]
fn test_tokio_with_io_enabled() -> turmoil::Result {
let mut sim = Builder::new().enable_tokio_io().build();
// client, which would panic if tokio io wouldn't be enabled
sim.client("client", async move {
let path = "/tmp/test_socket1";
// bind unix domain socket -> needs tokio io
let _ = UnixListener::bind(path).unwrap();
// remove socket file
remove_file(path).await?;
Ok(())
});

sim.run()
}

/// test assumes IO operation (binding unix domain socket) will fail
#[test]
fn test_tokio_with_io_disabled() -> turmoil::Result {
let mut sim = Builder::new().build();
// client panics (panic is caught) since tokio IO is not enabled
sim.client("client", async move {
let path = "/tmp/test_socket2";
let result = std::panic::catch_unwind(|| {
let _ = UnixListener::bind(path);
});
assert_err!(result);

// remove socket file
tokio::fs::remove_file(path).await?;
Ok(())
});

sim.run()
}