Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable tokio io driver #171

Merged
merged 14 commits into from
Apr 3, 2024
29 changes: 13 additions & 16 deletions src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ use futures::Future;
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;
use tokio::task::LocalSet;
use tokio::time::{sleep, Duration, Instant};

use crate::config::Config;
use tokio::time::{Duration, Instant, sleep};

use super::Result;

Expand Down Expand Up @@ -53,15 +51,15 @@ pub(crate) struct Rt<'a> {
handle: Option<JoinHandle<Result>>,

/// Configuration of simulation
mcches marked this conversation as resolved.
Show resolved Hide resolved
sim_cfg: Config,
enable_io: bool,
}

impl<'a> Rt<'a> {
pub(crate) fn client<F>(nodename: Arc<str>, client: F, sim_cfg: Config) -> Self
pub(crate) fn client<F>(nodename: Arc<str>, client: F, enable_io: bool) -> Self
where
F: Future<Output = Result> + 'static,
{
let (tokio, local) = init(&sim_cfg);
let (tokio, local) = init(enable_io);

let handle = with(&tokio, &local, || tokio::task::spawn_local(client));

Expand All @@ -71,16 +69,16 @@ impl<'a> Rt<'a> {
local,
nodename,
handle: Some(handle),
sim_cfg,
enable_io,
}
}

pub(crate) fn host<F, Fut>(nodename: Arc<str>, software: F, sim_cfg: Config) -> Self
pub(crate) fn host<F, Fut>(nodename: Arc<str>, software: F, enable_io: bool) -> Self
where
F: Fn() -> Fut + 'a,
Fut: Future<Output = Result> + 'static,
{
let (tokio, local) = init(&sim_cfg);
let (tokio, local) = init(enable_io);

let software: Software = Box::new(move || Box::pin(software()));
let handle = with(&tokio, &local, || tokio::task::spawn_local(software()));
Expand All @@ -91,21 +89,20 @@ impl<'a> Rt<'a> {
local,
nodename,
handle: Some(handle),
sim_cfg,
enable_io,
}
}

pub(crate) fn no_software() -> Self {
let sim_cfg = Config::default();
let (tokio, local) = init(&sim_cfg);
let (tokio, local) = init(false);

Self {
kind: Kind::NoSoftware,
tokio,
local,
nodename: String::new().into(),
handle: None,
sim_cfg,
enable_io: false,
}
}

Expand Down Expand Up @@ -210,20 +207,20 @@ impl<'a> Rt<'a> {
///
/// Both the [`Runtime`] and [`LocalSet`] are replaced with new instances.
fn cancel_tasks(&mut self) {
let (tokio, local) = init(&self.sim_cfg);
let (tokio, local) = init(self.enable_io);

_ = mem::replace(&mut self.tokio, tokio);
drop(mem::replace(&mut self.local, local));
}
}

fn init(sim_cfg: &Config) -> (Runtime, LocalSet) {
fn init(enable_io: bool) -> (Runtime, LocalSet) {
let mut tokio_builder = tokio::runtime::Builder::new_current_thread();

#[cfg(tokio_unstable)]
tokio_builder.unhandled_panic(tokio::runtime::UnhandledPanic::ShutdownRuntime);

if sim_cfg.enable_tokio_io {
if enable_io {
tokio_builder.enable_io();
}

Expand Down
23 changes: 12 additions & 11 deletions src/sim.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use crate::host::HostTimer;
use crate::{for_pairs, Config, LinksIter, Result, Rt, ToIpAddr, ToIpAddrs, World, TRACING_TARGET};

use indexmap::IndexMap;
use std::cell::RefCell;
use std::future::Future;
use std::net::IpAddr;
use std::ops::DerefMut;
use std::sync::Arc;
use std::time::UNIX_EPOCH;

use indexmap::IndexMap;
use tokio::time::Duration;
use tracing::Level;

use crate::{Config, for_pairs, LinksIter, Result, Rt, ToIpAddr, ToIpAddrs, TRACING_TARGET, World};
use crate::host::HostTimer;

/// A handle for interacting with the simulation.
pub struct Sim<'a> {
/// Simulation configuration
Expand Down Expand Up @@ -88,7 +89,7 @@ impl<'a> Sim<'a> {
}

let rt = World::enter(&self.world, || {
Rt::client(nodename, client, self.config.clone())
Rt::client(nodename, client, self.config.enable_tokio_io)
});

self.rts.insert(addr, rt);
Expand Down Expand Up @@ -123,7 +124,7 @@ impl<'a> Sim<'a> {
}

let rt = World::enter(&self.world, || {
Rt::host(nodename, host, self.config.clone())
Rt::host(nodename, host, self.config.enable_tokio_io)
});

self.rts.insert(addr, rt);
Expand Down Expand Up @@ -404,23 +405,23 @@ mod test {
net::{IpAddr, Ipv4Addr},
rc::Rc,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
atomic::{AtomicU64, Ordering},
},
time::Duration,
};

use std::future;

use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::Semaphore,
time::Instant,
};

use crate::{
elapsed, hold,
net::{TcpListener, TcpStream},
sim_elapsed, Builder, Result, World,
Builder, elapsed,
hold,
net::{TcpListener, TcpStream}, Result, sim_elapsed, World,
};

#[test]
Expand Down
6 changes: 3 additions & 3 deletions tests/tokio_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fn test_tokio_with_io_enabled() -> turmoil::Result {
// bind unix domain socket -> needs tokio io
let _ = UnixListener::bind(path).unwrap();
// remove socket file
let _ = remove_file(path).await?;
remove_file(path).await?;
Ok(())
});

Expand All @@ -23,7 +23,7 @@ fn test_tokio_with_io_enabled() -> turmoil::Result {

/// test assumes IO operation (binding unix domain socket) will fail
#[test]
fn test_tokio_with_io_disabled() -> () {
fn test_tokio_with_io_disabled() -> turmoil::Result {
let mut sim = Builder::new().build();
// client panics (panic is caught) since tokio IO is not enabled
sim.client("client", async move {
Expand All @@ -38,5 +38,5 @@ fn test_tokio_with_io_disabled() -> () {
Ok(())
});

let _ = sim.run();
sim.run()
}