diff --git a/src/host.rs b/src/host.rs index 2418367..2235df6 100644 --- a/src/host.rs +++ b/src/host.rs @@ -99,7 +99,7 @@ impl Host { pub(crate) fn receive_from_network(&mut self, envelope: Envelope) -> Result<(), Protocol> { let Envelope { src, dst, message } = envelope; - tracing::trace!(target: TRACING_TARGET, ?dst, ?src, protocol = %message, "Delivered"); + tracing::trace!(target: TRACING_TARGET, ?src, ?dst, protocol = %message, "Delivered"); match message { Protocol::Tcp(segment) => self.tcp.receive_from_network(src, dst, segment), @@ -171,7 +171,7 @@ impl Udp { fn receive_from_network(&mut self, src: SocketAddr, dst: SocketAddr, datagram: Datagram) { if let Some(bind) = self.binds.get_mut(&dst.port()) { if !matches(bind.bind_addr, dst) { - tracing::trace!(target: TRACING_TARGET, ?dst, ?src, protocol = %Protocol::Udp(datagram), "Dropped (Addr not bound)"); + tracing::trace!(target: TRACING_TARGET, ?src, ?dst, protocol = %Protocol::Udp(datagram), "Dropped (Addr not bound)"); return; } if let Err(err) = bind.queue.try_send((datagram, src)) { @@ -180,10 +180,10 @@ impl Udp { // require a different channel implementation. match err { mpsc::error::TrySendError::Full((datagram, _)) => { - tracing::trace!(target: TRACING_TARGET, ?dst, ?src, protocol = %Protocol::Udp(datagram), "Dropped (Full buffer)"); + tracing::trace!(target: TRACING_TARGET, ?src, ?dst, protocol = %Protocol::Udp(datagram), "Dropped (Full buffer)"); } mpsc::error::TrySendError::Closed((datagram, _)) => { - tracing::trace!(target: TRACING_TARGET, ?dst, ?src, protocol = %Protocol::Udp(datagram), "Dropped (Receiver closed)"); + tracing::trace!(target: TRACING_TARGET, ?src, ?dst, protocol = %Protocol::Udp(datagram), "Dropped (Receiver closed)"); } } } diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index f42bb4a..e43ad5f 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -62,7 +62,7 @@ impl TcpListener { let host = world.current_host_mut(); let (syn, origin) = host.tcp.accept(self.local_addr)?; - tracing::trace!(target: TRACING_TARGET, dst = ?origin, src = ?self.local_addr, protocol = %"TCP SYN", "Recv"); + tracing::trace!(target: TRACING_TARGET, src = ?origin, dst = ?self.local_addr, protocol = %"TCP SYN", "Recv"); // Send SYN-ACK -> origin. If Ok we proceed (acts as the ACK), // else we return early to avoid host mutations. diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index e5f2b68..61180a3 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -87,7 +87,7 @@ impl TcpStream { io::Error::new(io::ErrorKind::ConnectionRefused, pair.remote.to_string()) })?; - tracing::trace!(target: TRACING_TARGET, dst = ?pair.local, src = ?pair.remote, protocol = %"TCP SYN-ACK", "Recv"); + tracing::trace!(target: TRACING_TARGET, src = ?pair.remote, dst = ?pair.local, protocol = %"TCP SYN-ACK", "Recv"); Ok(TcpStream::new(pair, rx)) } @@ -158,7 +158,7 @@ impl ReadHalf { match ready!(self.rx.recv.poll_recv(cx)) { Some(seg) => { - tracing::trace!(target: TRACING_TARGET, dst = ?self.pair.local, src = ?self.pair.remote, protocol = %seg, "Recv"); + tracing::trace!(target: TRACING_TARGET, src = ?self.pair.remote, dst = ?self.pair.local, protocol = %seg, "Recv"); match seg { SequencedSegment::Data(bytes) => { @@ -282,9 +282,11 @@ fn send_loopback(src: SocketAddr, dst: SocketAddr, message: Protocol) { tokio::spawn(async move { sleep(Duration::from_micros(1)).await; World::current(|world| { - if let Err(rst) = world - .current_host_mut() - .receive_from_network(Envelope { src, dst, message }) { + if let Err(rst) = + world + .current_host_mut() + .receive_from_network(Envelope { src, dst, message }) + { _ = world.current_host_mut().receive_from_network(Envelope { src: dst, dst: src, diff --git a/src/net/udp.rs b/src/net/udp.rs index 2f9a456..4502f34 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -205,7 +205,7 @@ impl UdpSocket { .try_recv_from(buf) .expect("queue should be ready after readable yields"); - tracing::trace!(target: TRACING_TARGET, dst = ?self.local_addr, src = ?origin, protocol = %datagram, "Recv"); + tracing::trace!(target: TRACING_TARGET, src = ?origin, dst = ?self.local_addr, protocol = %datagram, "Recv"); Ok((limit, origin)) } @@ -231,7 +231,7 @@ impl UdpSocket { io::Error::new(io::ErrorKind::WouldBlock, "socket receive queue is empty") })?; - tracing::trace!(target: TRACING_TARGET, dst = ?self.local_addr, src = ?origin, protocol = %datagram, "Recv"); + tracing::trace!(target: TRACING_TARGET, src = ?origin, dst = ?self.local_addr, protocol = %datagram, "Recv"); Ok((limit, origin)) } diff --git a/src/rt.rs b/src/rt.rs index 75621af..f87b4c3 100644 --- a/src/rt.rs +++ b/src/rt.rs @@ -1,4 +1,5 @@ use std::mem; +use std::sync::Arc; use super::Result; use futures::Future; @@ -41,13 +42,16 @@ pub(crate) struct Rt<'a> { /// Local task set used for running !Send tasks. local: LocalSet, + /// A user readable name to identify the node. + pub(crate) nodename: Arc, + /// Optional handle to a host's software. When software finishes, the handle is /// consumed to check for error, which is propagated up to fail the simulation. handle: Option>, } impl<'a> Rt<'a> { - pub(crate) fn client(client: F) -> Self + pub(crate) fn client(nodename: Arc, client: F) -> Self where F: Future + 'static, { @@ -59,11 +63,12 @@ impl<'a> Rt<'a> { kind: Kind::Client, tokio, local, + nodename, handle: Some(handle), } } - pub(crate) fn host(software: F) -> Self + pub(crate) fn host(nodename: Arc, software: F) -> Self where F: Fn() -> Fut + 'a, Fut: Future + 'static, @@ -77,6 +82,7 @@ impl<'a> Rt<'a> { kind: Kind::Host { software }, tokio, local, + nodename, handle: Some(handle), } } @@ -88,6 +94,7 @@ impl<'a> Rt<'a> { kind: Kind::NoSoftware, tokio, local, + nodename: String::new().into(), handle: None, } } diff --git a/src/sim.rs b/src/sim.rs index f8cae6b..8349a45 100644 --- a/src/sim.rs +++ b/src/sim.rs @@ -5,8 +5,10 @@ use std::cell::RefCell; use std::future::Future; use std::net::IpAddr; use std::ops::DerefMut; +use std::sync::Arc; use std::time::UNIX_EPOCH; use tokio::time::Duration; +use tracing::Level; /// A handle for interacting with the simulation. pub struct Sim<'a> { @@ -65,15 +67,23 @@ impl<'a> Sim<'a> { F: Future + 'static, { let addr = self.lookup(addr); + let nodename: Arc = self + .world + .borrow_mut() + .dns + .reverse(addr) + .map(str::to_string) + .unwrap_or_else(|| addr.to_string()) + .into(); { let world = RefCell::get_mut(&mut self.world); // Register host state with the world - world.register(addr, &self.config); + world.register(addr, &nodename, &self.config); } - let rt = World::enter(&self.world, || Rt::client(client)); + let rt = World::enter(&self.world, || Rt::client(nodename, client)); self.rts.insert(addr, rt); } @@ -90,15 +100,23 @@ impl<'a> Sim<'a> { Fut: Future + 'static, { let addr = self.lookup(addr); + let nodename: Arc = self + .world + .borrow_mut() + .dns + .reverse(addr) + .map(str::to_string) + .unwrap_or_else(|| addr.to_string()) + .into(); { let world = RefCell::get_mut(&mut self.world); // Register host state with the world - world.register(addr, &self.config); + world.register(addr, &nodename, &self.config); } - let rt = World::enter(&self.world, || Rt::host(host)); + let rt = World::enter(&self.world, || Rt::host(nodename, host)); self.rts.insert(addr, rt); } @@ -288,6 +306,8 @@ impl<'a> Sim<'a> { .iter_mut() .filter(|(_, rt)| rt.is_software_running()) { + let _span_guard = tracing::span!(Level::INFO, "node", name = &*rt.nodename).entered(); + { let mut world = self.world.borrow_mut(); // We need to move deliverable messages off the network and diff --git a/src/world.rs b/src/world.rs index 1fddd7d..efb628b 100644 --- a/src/world.rs +++ b/src/world.rs @@ -99,13 +99,13 @@ impl World { } /// Register a new host with the simulation. - pub(crate) fn register(&mut self, addr: IpAddr, config: &Config) { + pub(crate) fn register(&mut self, addr: IpAddr, nodename: &str, config: &Config) { assert!( !self.hosts.contains_key(&addr), "already registered host for the given ip address" ); - tracing::info!(target: TRACING_TARGET, hostname = ?self.dns.reverse(addr), ?addr, "New"); + tracing::info!(target: TRACING_TARGET, nodename, ?addr, "New"); // Register links between the new host and all existing hosts for existing in self.hosts.keys() {