Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracing fix #128

Merged
merged 11 commits into from
Jan 31, 2024
20 changes: 20 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,25 @@ pub struct Config {
/// may miss bugs
/// 2. [`lazy_static` values are dropped](mod@crate::lazy_static) at the end of an execution
pub silence_warnings: bool,

/// Whether to call the `Span::record()` method to update the step count (`i`) of the `Span`
sarsko marked this conversation as resolved.
Show resolved Hide resolved
/// containing the `TaskId` and the current step count for the given `TaskId`.
/// If `false`, this `Span` will look like this: `step{task=1}`, and if `true`, this `Span`
/// will look something like this: `step{task=1 i=3 i=9 i=12}`, or, if a `Subscriber` which
/// overwrites on calls to `span.record()` is used, something like this:
/// ```text
/// step{task=1 i=3}
/// step{task=1 i=9}
/// step{task=1 i=12}
/// ```
/// The reason this is a config option is that the most popular tracing `Subscriber`s, ie
/// `tracing_subscriber::fmt`, appends to the span on calls to `record()` (instead of
/// overwriting), which results in traces which are hard to read if the task is scheduled more
/// than a few times.
/// Thus: set `record_steps_in_span` to `true` if you want "append behavior", or if you are using
/// a `Subscriber` which overwrites on calls to `record()` and want to display the current step
/// count.
pub record_steps_in_span: bool,
}

impl Config {
Expand All @@ -231,6 +250,7 @@ impl Config {
max_steps: MaxSteps::FailAfter(1_000_000),
max_time: None,
silence_warnings: false,
record_steps_in_span: false,
}
}
}
Expand Down
105 changes: 76 additions & 29 deletions src/runtime/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::future::Future;
use std::panic;
use std::rc::Rc;
use std::sync::Arc;
use tracing::trace;
use tracing::{trace, Span};

use super::task::Tag;

Expand Down Expand Up @@ -144,7 +144,50 @@ impl Execution {
// Run a single step of the chosen task.
let ret = match next_step {
NextStep::Task(continuation) => {
panic::catch_unwind(panic::AssertUnwindSafe(|| continuation.borrow_mut().resume()))
// Enter the Task's span
// (Note that if any issues arise with spans and tracing, then
// 1) calling `exit` until `None` before entering the `Task`s `Span`,
// 2) storing the entirety of the `span_stack` when creating the `Task`, and
// 3) storing `top_level_span` as a stack
// should be tried.)
ExecutionState::with(|state| {
tracing::dispatcher::get_default(|subscriber| {
if let Some(span_id) = tracing::Span::current().id().as_ref() {
subscriber.exit(span_id);
}

// The `span_stack` stores `Span`s such that the top of the stack is the outermost `Span`,
// meaning that parents (left-most when printed) are entered first.
while let Some(span) = state.current_mut().span_stack.pop() {
sarsko marked this conversation as resolved.
Show resolved Hide resolved
if let Some(span_id) = span.id().as_ref() {
subscriber.enter(span_id)
}
}

if state.config.record_steps_in_span {
state.current().step_span.record("i", state.current_schedule.len());
}
});
});

let result = panic::catch_unwind(panic::AssertUnwindSafe(|| continuation.borrow_mut().resume()));

// Leave the Task's span and store the exited `Span` stack in order to restore it the next time the Task is run
ExecutionState::with(|state| {
tracing::dispatcher::get_default(|subscriber| {
debug_assert!(state.current().span_stack.is_empty());
while let Some(span_id) = tracing::Span::current().id().as_ref() {
state.current_mut().span_stack.push(tracing::Span::current().clone());
subscriber.exit(span_id);
}

if let Some(span_id) = state.top_level_span.id().as_ref() {
subscriber.enter(span_id)
}
});
});

result
}
NextStep::Failure(msg, schedule) => {
// Because we're creating the panic here, we don't need `persist_failure` to print
Expand All @@ -171,6 +214,7 @@ impl Execution {
Ok(panic_msg) => Box::new(format!("{}\noriginal panic: {}", message, panic_msg)),
Err(panic) => panic,
};

panic::resume_unwind(payload);
}
}
Expand Down Expand Up @@ -203,6 +247,9 @@ pub(crate) struct ExecutionState {

#[cfg(debug_assertions)]
has_cleaned_up: bool,

// The `Span` which the `ExecutionState` was created under. Will be the parent of all `Task` `Span`s
pub(crate) top_level_span: Span,
}

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
Expand Down Expand Up @@ -240,6 +287,7 @@ impl ExecutionState {
current_schedule: initial_schedule,
#[cfg(debug_assertions)]
has_cleaned_up: false,
top_level_span: tracing::Span::current(),
}
}

Expand Down Expand Up @@ -287,11 +335,8 @@ impl ExecutionState {
{
Self::with(|state| {
let schedule_len = state.current_schedule.len();
let parent_span_id = state.top_level_span.id();

let parent_span = state
.try_current()
.map(|t| t.span.clone())
.unwrap_or_else(tracing::Span::current);
let task_id = TaskId(state.tasks.len());
let tag = state.get_tag_or_default_for_current_task();
let clock = state.increment_clock_mut(); // Increment the parent's clock
Expand All @@ -303,9 +348,10 @@ impl ExecutionState {
task_id,
name,
clock.clone(),
parent_span,
parent_span_id,
schedule_len,
tag,
state.try_current().map(|t| t.id()),
);

state.tasks.push(task);
Expand All @@ -323,6 +369,7 @@ impl ExecutionState {
F: FnOnce() + Send + 'static,
{
Self::with(|state| {
let parent_span_id = state.top_level_span.id();
let task_id = TaskId(state.tasks.len());
let tag = state.get_tag_or_default_for_current_task();
let clock = if let Some(ref mut clock) = initial_clock {
Expand All @@ -336,11 +383,17 @@ impl ExecutionState {

let schedule_len = state.current_schedule.len();

let parent_span = state
.try_current()
.map(|t| t.span.clone())
.unwrap_or_else(tracing::Span::current);
let task = Task::from_closure(f, stack_size, task_id, name, clock, parent_span, schedule_len, tag);
let task = Task::from_closure(
f,
stack_size,
task_id,
name,
clock,
parent_span_id,
schedule_len,
tag,
state.try_current().map(|t| t.id()),
);
state.tasks.push(task);
task_id
})
Expand Down Expand Up @@ -593,31 +646,25 @@ impl ExecutionState {
}
}

trace!(?runnable, next_task=?self.next_task);
// Tracing this `in_scope` is purely a matter of taste. We do it because
// 1) It is an action taken by the scheduler, and should thus be traced under the scheduler's span
// 2) It creates a visual separation of scheduling decisions and `Task`-induced tracing.
// Note that there is a case to be made for not `in_scope`-ing it, as that makes seeing the context
// of the context switch clearer.
self.top_level_span
sarsko marked this conversation as resolved.
Show resolved Hide resolved
.in_scope(|| trace!(?runnable, next_task=?self.next_task));

Ok(())
}

/// Set the next task as the current task, and update our tracing span
/// Set the next task as the current task
fn advance_to_next_task(&mut self) {
debug_assert_ne!(self.next_task, ScheduledTask::None);
let previous_task = self.current_task;
self.current_task = self.next_task.take();

tracing::dispatcher::get_default(|subscriber| {
if let ScheduledTask::Some(previous) = previous_task {
if let Some(span_id) = self.get(previous).span.id() {
subscriber.exit(&span_id)
}
}

if let ScheduledTask::Some(tid) = self.current_task {
if let Some(span_id) = self.get(tid).span.id() {
subscriber.enter(&span_id);
}
self.current_schedule.push_task(tid);
}
});
if let ScheduledTask::Some(tid) = self.current_task {
self.current_schedule.push_task(tid);
}
}

// Sets the `tag` field of the current task.
Expand Down
41 changes: 39 additions & 2 deletions src/runtime/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,37 @@ use std::thread;
use std::time::Instant;
use tracing::{span, Level};

// A helper struct which on `drop` exits all current spans, then enters the span which was entered when it was constructed.
sarsko marked this conversation as resolved.
Show resolved Hide resolved
// The reason this exists is to solve the "span-stacking" issue which occurs when there is a panic inside `run` which is
// then caught by `panic::catch_unwind()` (such as when Shuttle is run inside proptest).
// In other words: it enables correct spans when doing proptest minimization.
#[must_use]
struct ResetSpanOnDrop {
span: tracing::Span,
}

impl ResetSpanOnDrop {
fn new() -> Self {
Self {
span: tracing::Span::current().clone(),
}
}
}

impl Drop for ResetSpanOnDrop {
// Exits all current spans, then enters the span which was entered when `self` was constructed.
fn drop(&mut self) {
tracing::dispatcher::get_default(|subscriber| {
while let Some(span_id) = tracing::Span::current().id().as_ref() {
subscriber.exit(span_id);
}
if let Some(span_id) = self.span.id().as_ref() {
subscriber.enter(span_id);
}
});
}
}

/// A `Runner` is the entry-point for testing concurrent code.
///
/// It takes as input a function to test and a `Scheduler` to run it under. It then executes that
Expand Down Expand Up @@ -43,6 +74,7 @@ impl<S: Scheduler + 'static> Runner<S> {
where
F: Fn() + Send + Sync + 'static,
{
let _span_drop_guard = ResetSpanOnDrop::new();
// Share continuations across executions to avoid reallocating them
// TODO it would be a lot nicer if this were a more generic "context" thing that we passed
// TODO around explicitly rather than being a thread local
Expand All @@ -66,7 +98,12 @@ impl<S: Scheduler + 'static> Runner<S> {
let execution = Execution::new(self.scheduler.clone(), schedule);
let f = Arc::clone(&f);

span!(Level::INFO, "execution", i).in_scope(|| execution.run(&self.config, move || f()));
// This is a slightly lazy way to ensure that everything outside of the "execution" span gets
// established correctly between executions. Fully `exit`ing and fully `enter`ing (explicitly
// `enter`/`exit` all `Span`s) would most likely obviate the need for this.
let _span_drop_guard2 = ResetSpanOnDrop::new();

span!(Level::ERROR, "execution", i).in_scope(|| execution.run(&self.config, move || f()));

i += 1;
}
Expand Down Expand Up @@ -133,7 +170,7 @@ impl PortfolioRunner {

let runner = Runner::new(scheduler, config);

span!(Level::INFO, "job", i).in_scope(|| {
span!(Level::ERROR, "job", i).in_scope(|| {
let ret = panic::catch_unwind(panic::AssertUnwindSafe(|| runner.run(move || f())));

match ret {
Expand Down
Loading
Loading