Skip to content

Commit

Permalink
tracing: instrument more resources (#4302)
Browse files Browse the repository at this point in the history
This PR adds instrumentation to more resources from the sync package. The new
instrumentation requires the `tokio_unstable` feature flag to enable.
  • Loading branch information
zaharidichev authored Dec 14, 2021
1 parent 4b6bb1d commit 4e3268d
Show file tree
Hide file tree
Showing 17 changed files with 1,185 additions and 98 deletions.
2 changes: 1 addition & 1 deletion tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ macro_rules! cfg_trace {
#[cfg_attr(docsrs, doc(cfg(feature = "tracing")))]
$item
)*
}
};
}

macro_rules! cfg_not_trace {
Expand Down
9 changes: 4 additions & 5 deletions tokio/src/macros/trace.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
cfg_trace! {
macro_rules! trace_op {
($name:literal, $readiness:literal, $parent:expr) => {
($name:expr, $readiness:literal) => {
tracing::trace!(
target: "runtime::resource::poll_op",
parent: $parent,
op_name = $name,
is_ready = $readiness
);
}
}

macro_rules! trace_poll_op {
($name:literal, $poll:expr, $parent:expr $(,)*) => {
($name:expr, $poll:expr $(,)*) => {
match $poll {
std::task::Poll::Ready(t) => {
trace_op!($name, true, $parent);
trace_op!($name, true);
std::task::Poll::Ready(t)
}
std::task::Poll::Pending => {
trace_op!($name, false, $parent);
trace_op!($name, false);
return std::task::Poll::Pending;
}
}
Expand Down
63 changes: 63 additions & 0 deletions tokio/src/sync/barrier.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::loom::sync::Mutex;
use crate::sync::watch;
#[cfg(all(tokio_unstable, feature = "tracing"))]
use crate::util::trace;

/// A barrier enables multiple tasks to synchronize the beginning of some computation.
///
Expand Down Expand Up @@ -41,6 +43,8 @@ pub struct Barrier {
state: Mutex<BarrierState>,
wait: watch::Receiver<usize>,
n: usize,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
}

#[derive(Debug)]
Expand All @@ -55,6 +59,7 @@ impl Barrier {
///
/// A barrier will block `n`-1 tasks which call [`Barrier::wait`] and then wake up all
/// tasks at once when the `n`th task calls `wait`.
#[track_caller]
pub fn new(mut n: usize) -> Barrier {
let (waker, wait) = crate::sync::watch::channel(0);

Expand All @@ -65,6 +70,32 @@ impl Barrier {
n = 1;
}

#[cfg(all(tokio_unstable, feature = "tracing"))]
let resource_span = {
let location = std::panic::Location::caller();
let resource_span = tracing::trace_span!(
"runtime.resource",
concrete_type = "Barrier",
kind = "Sync",
loc.file = location.file(),
loc.line = location.line(),
loc.col = location.column(),
);

resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
size = n,
);

tracing::trace!(
target: "runtime::resource::state_update",
arrived = 0,
)
});
resource_span
};

Barrier {
state: Mutex::new(BarrierState {
waker,
Expand All @@ -73,6 +104,8 @@ impl Barrier {
}),
n,
wait,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: resource_span,
}
}

Expand All @@ -85,6 +118,20 @@ impl Barrier {
/// [`BarrierWaitResult::is_leader`] when returning from this function, and all other tasks
/// will receive a result that will return `false` from `is_leader`.
pub async fn wait(&self) -> BarrierWaitResult {
#[cfg(all(tokio_unstable, feature = "tracing"))]
return trace::async_op(
|| self.wait_internal(),
self.resource_span.clone(),
"Barrier::wait",
"poll",
false,
)
.await;

#[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
return self.wait_internal().await;
}
async fn wait_internal(&self) -> BarrierWaitResult {
// NOTE: we are taking a _synchronous_ lock here.
// It is okay to do so because the critical section is fast and never yields, so it cannot
// deadlock even if another future is concurrently holding the lock.
Expand All @@ -96,7 +143,23 @@ impl Barrier {
let mut state = self.state.lock();
let generation = state.generation;
state.arrived += 1;
#[cfg(all(tokio_unstable, feature = "tracing"))]
tracing::trace!(
target: "runtime::resource::state_update",
arrived = 1,
arrived.op = "add",
);
#[cfg(all(tokio_unstable, feature = "tracing"))]
tracing::trace!(
target: "runtime::resource::async_op::state_update",
arrived = true,
);
if state.arrived == self.n {
#[cfg(all(tokio_unstable, feature = "tracing"))]
tracing::trace!(
target: "runtime::resource::async_op::state_update",
is_leader = true,
);
// we are the leader for this generation
// wake everyone, increment the generation, and return
state
Expand Down
Loading

0 comments on commit 4e3268d

Please sign in to comment.