Skip to content

Commit

Permalink
Tracing improvments (#124)
Browse files Browse the repository at this point in the history
A `tracing` span is entered with host information for each execution iteration in the run loop.
  • Loading branch information
PetrichorIT committed Jul 6, 2023
1 parent dcb0346 commit 5954ef3
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 20 deletions.
8 changes: 4 additions & 4 deletions src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl Host {
pub(crate) fn receive_from_network(&mut self, envelope: Envelope) -> Result<(), Protocol> {
let Envelope { src, dst, message } = envelope;

tracing::trace!(target: TRACING_TARGET, ?dst, ?src, protocol = %message, "Delivered");
tracing::trace!(target: TRACING_TARGET, ?src, ?dst, protocol = %message, "Delivered");

match message {
Protocol::Tcp(segment) => self.tcp.receive_from_network(src, dst, segment),
Expand Down Expand Up @@ -171,7 +171,7 @@ impl Udp {
fn receive_from_network(&mut self, src: SocketAddr, dst: SocketAddr, datagram: Datagram) {
if let Some(bind) = self.binds.get_mut(&dst.port()) {
if !matches(bind.bind_addr, dst) {
tracing::trace!(target: TRACING_TARGET, ?dst, ?src, protocol = %Protocol::Udp(datagram), "Dropped (Addr not bound)");
tracing::trace!(target: TRACING_TARGET, ?src, ?dst, protocol = %Protocol::Udp(datagram), "Dropped (Addr not bound)");
return;
}
if let Err(err) = bind.queue.try_send((datagram, src)) {
Expand All @@ -180,10 +180,10 @@ impl Udp {
// require a different channel implementation.
match err {
mpsc::error::TrySendError::Full((datagram, _)) => {
tracing::trace!(target: TRACING_TARGET, ?dst, ?src, protocol = %Protocol::Udp(datagram), "Dropped (Full buffer)");
tracing::trace!(target: TRACING_TARGET, ?src, ?dst, protocol = %Protocol::Udp(datagram), "Dropped (Full buffer)");
}
mpsc::error::TrySendError::Closed((datagram, _)) => {
tracing::trace!(target: TRACING_TARGET, ?dst, ?src, protocol = %Protocol::Udp(datagram), "Dropped (Receiver closed)");
tracing::trace!(target: TRACING_TARGET, ?src, ?dst, protocol = %Protocol::Udp(datagram), "Dropped (Receiver closed)");
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/net/tcp/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl TcpListener {
let host = world.current_host_mut();
let (syn, origin) = host.tcp.accept(self.local_addr)?;

tracing::trace!(target: TRACING_TARGET, dst = ?origin, src = ?self.local_addr, protocol = %"TCP SYN", "Recv");
tracing::trace!(target: TRACING_TARGET, src = ?origin, dst = ?self.local_addr, protocol = %"TCP SYN", "Recv");

// Send SYN-ACK -> origin. If Ok we proceed (acts as the ACK),
// else we return early to avoid host mutations.
Expand Down
12 changes: 7 additions & 5 deletions src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl TcpStream {
io::Error::new(io::ErrorKind::ConnectionRefused, pair.remote.to_string())
})?;

tracing::trace!(target: TRACING_TARGET, dst = ?pair.local, src = ?pair.remote, protocol = %"TCP SYN-ACK", "Recv");
tracing::trace!(target: TRACING_TARGET, src = ?pair.remote, dst = ?pair.local, protocol = %"TCP SYN-ACK", "Recv");

Ok(TcpStream::new(pair, rx))
}
Expand Down Expand Up @@ -158,7 +158,7 @@ impl ReadHalf {

match ready!(self.rx.recv.poll_recv(cx)) {
Some(seg) => {
tracing::trace!(target: TRACING_TARGET, dst = ?self.pair.local, src = ?self.pair.remote, protocol = %seg, "Recv");
tracing::trace!(target: TRACING_TARGET, src = ?self.pair.remote, dst = ?self.pair.local, protocol = %seg, "Recv");

match seg {
SequencedSegment::Data(bytes) => {
Expand Down Expand Up @@ -282,9 +282,11 @@ fn send_loopback(src: SocketAddr, dst: SocketAddr, message: Protocol) {
tokio::spawn(async move {
sleep(Duration::from_micros(1)).await;
World::current(|world| {
if let Err(rst) = world
.current_host_mut()
.receive_from_network(Envelope { src, dst, message }) {
if let Err(rst) =
world
.current_host_mut()
.receive_from_network(Envelope { src, dst, message })
{
_ = world.current_host_mut().receive_from_network(Envelope {
src: dst,
dst: src,
Expand Down
4 changes: 2 additions & 2 deletions src/net/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl UdpSocket {
.try_recv_from(buf)
.expect("queue should be ready after readable yields");

tracing::trace!(target: TRACING_TARGET, dst = ?self.local_addr, src = ?origin, protocol = %datagram, "Recv");
tracing::trace!(target: TRACING_TARGET, src = ?origin, dst = ?self.local_addr, protocol = %datagram, "Recv");

Ok((limit, origin))
}
Expand All @@ -231,7 +231,7 @@ impl UdpSocket {
io::Error::new(io::ErrorKind::WouldBlock, "socket receive queue is empty")
})?;

tracing::trace!(target: TRACING_TARGET, dst = ?self.local_addr, src = ?origin, protocol = %datagram, "Recv");
tracing::trace!(target: TRACING_TARGET, src = ?origin, dst = ?self.local_addr, protocol = %datagram, "Recv");

Ok((limit, origin))
}
Expand Down
11 changes: 9 additions & 2 deletions src/rt.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::mem;
use std::sync::Arc;

use super::Result;
use futures::Future;
Expand Down Expand Up @@ -41,13 +42,16 @@ pub(crate) struct Rt<'a> {
/// Local task set used for running !Send tasks.
local: LocalSet,

/// A user readable name to identify the node.
pub(crate) nodename: Arc<str>,

/// Optional handle to a host's software. When software finishes, the handle is
/// consumed to check for error, which is propagated up to fail the simulation.
handle: Option<JoinHandle<Result>>,
}

impl<'a> Rt<'a> {
pub(crate) fn client<F>(client: F) -> Self
pub(crate) fn client<F>(nodename: Arc<str>, client: F) -> Self
where
F: Future<Output = Result> + 'static,
{
Expand All @@ -59,11 +63,12 @@ impl<'a> Rt<'a> {
kind: Kind::Client,
tokio,
local,
nodename,
handle: Some(handle),
}
}

pub(crate) fn host<F, Fut>(software: F) -> Self
pub(crate) fn host<F, Fut>(nodename: Arc<str>, software: F) -> Self
where
F: Fn() -> Fut + 'a,
Fut: Future<Output = Result> + 'static,
Expand All @@ -77,6 +82,7 @@ impl<'a> Rt<'a> {
kind: Kind::Host { software },
tokio,
local,
nodename,
handle: Some(handle),
}
}
Expand All @@ -88,6 +94,7 @@ impl<'a> Rt<'a> {
kind: Kind::NoSoftware,
tokio,
local,
nodename: String::new().into(),
handle: None,
}
}
Expand Down
28 changes: 24 additions & 4 deletions src/sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use std::cell::RefCell;
use std::future::Future;
use std::net::IpAddr;
use std::ops::DerefMut;
use std::sync::Arc;
use std::time::UNIX_EPOCH;
use tokio::time::Duration;
use tracing::Level;

/// A handle for interacting with the simulation.
pub struct Sim<'a> {
Expand Down Expand Up @@ -65,15 +67,23 @@ impl<'a> Sim<'a> {
F: Future<Output = Result> + 'static,
{
let addr = self.lookup(addr);
let nodename: Arc<str> = self
.world
.borrow_mut()
.dns
.reverse(addr)
.map(str::to_string)
.unwrap_or_else(|| addr.to_string())
.into();

{
let world = RefCell::get_mut(&mut self.world);

// Register host state with the world
world.register(addr, &self.config);
world.register(addr, &nodename, &self.config);
}

let rt = World::enter(&self.world, || Rt::client(client));
let rt = World::enter(&self.world, || Rt::client(nodename, client));

self.rts.insert(addr, rt);
}
Expand All @@ -90,15 +100,23 @@ impl<'a> Sim<'a> {
Fut: Future<Output = Result> + 'static,
{
let addr = self.lookup(addr);
let nodename: Arc<str> = self
.world
.borrow_mut()
.dns
.reverse(addr)
.map(str::to_string)
.unwrap_or_else(|| addr.to_string())
.into();

{
let world = RefCell::get_mut(&mut self.world);

// Register host state with the world
world.register(addr, &self.config);
world.register(addr, &nodename, &self.config);
}

let rt = World::enter(&self.world, || Rt::host(host));
let rt = World::enter(&self.world, || Rt::host(nodename, host));

self.rts.insert(addr, rt);
}
Expand Down Expand Up @@ -288,6 +306,8 @@ impl<'a> Sim<'a> {
.iter_mut()
.filter(|(_, rt)| rt.is_software_running())
{
let _span_guard = tracing::span!(Level::INFO, "node", name = &*rt.nodename).entered();

{
let mut world = self.world.borrow_mut();
// We need to move deliverable messages off the network and
Expand Down
4 changes: 2 additions & 2 deletions src/world.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ impl World {
}

/// Register a new host with the simulation.
pub(crate) fn register(&mut self, addr: IpAddr, config: &Config) {
pub(crate) fn register(&mut self, addr: IpAddr, nodename: &str, config: &Config) {
assert!(
!self.hosts.contains_key(&addr),
"already registered host for the given ip address"
);

tracing::info!(target: TRACING_TARGET, hostname = ?self.dns.reverse(addr), ?addr, "New");
tracing::info!(target: TRACING_TARGET, nodename, ?addr, "New");

// Register links between the new host and all existing hosts
for existing in self.hosts.keys() {
Expand Down

0 comments on commit 5954ef3

Please sign in to comment.