From 9a29bcac1d30fee1e2189a398edd9e70d86ac617 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sarek=20H=C3=B8verstad=20Skot=C3=A5m?= Date: Mon, 27 Nov 2023 14:07:36 +0000 Subject: [PATCH 01/10] Tracing fix --- src/lib.rs | 12 ++++++ src/runtime/execution.rs | 91 +++++++++++++++++++++++++++------------- src/runtime/runner.rs | 32 ++++++++++++++ src/runtime/task/mod.rs | 44 +++++++++++++------ tests/basic/mod.rs | 1 + tests/basic/tracing.rs | 82 ++++++++++++++++++++++++++++++++++++ 6 files changed, 221 insertions(+), 41 deletions(-) create mode 100644 tests/basic/tracing.rs diff --git a/src/lib.rs b/src/lib.rs index 1330466b..a7eecfa0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -220,6 +220,17 @@ pub struct Config { /// may miss bugs /// 2. [`lazy_static` values are dropped](mod@crate::lazy_static) at the end of an execution pub silence_warnings: bool, + + /// Whether to call the `Span::record()` method to update the step count (`i`) of the `Span` + /// containing the `TaskId` and the current step count for the given `TaskId`. + /// The reason this is a config option is that the most popular tracing `Subscriber`s, ie + /// `tracing_subscriber::fmt`, appends to the span on calls to `record()` (instead of + /// overwriting), which results in traces which are hard to read if the task is scheduled more + /// than a few times. + /// Thus: set `record_steps_in_span` to `true` if you want this behaviour, or if you are using + /// a `Subscriber` which overwrites on calls to `record()` and want to display the current step + /// count. + pub record_steps_in_span: bool, } impl Config { @@ -231,6 +242,7 @@ impl Config { max_steps: MaxSteps::FailAfter(1_000_000), max_time: None, silence_warnings: false, + record_steps_in_span: false, } } } diff --git a/src/runtime/execution.rs b/src/runtime/execution.rs index a69b1e2d..e2ff7978 100644 --- a/src/runtime/execution.rs +++ b/src/runtime/execution.rs @@ -16,7 +16,7 @@ use std::future::Future; use std::panic; use std::rc::Rc; use std::sync::Arc; -use tracing::trace; +use tracing::{trace, Span}; use super::task::Tag; @@ -144,7 +144,42 @@ impl Execution { // Run a single step of the chosen task. let ret = match next_step { NextStep::Task(continuation) => { - panic::catch_unwind(panic::AssertUnwindSafe(|| continuation.borrow_mut().resume())) + // Enter the Task's span + ExecutionState::with(|state| { + tracing::dispatcher::get_default(|subscriber| { + let current_span_id = tracing::Span::current().id(); + if let Some(span_id) = current_span_id.as_ref() { + subscriber.exit(span_id); + } + + if let Some(span_id) = state.current().span_id.as_ref() { + subscriber.enter(span_id) + } + + if state.config.record_steps_in_span { + state.current().step_span.record("i", state.current_schedule.len()); + } + }); + }); + + let result = panic::catch_unwind(panic::AssertUnwindSafe(|| continuation.borrow_mut().resume())); + + // Leave the Task's span and store which span it exited in order to restore it the next time the Task is run + ExecutionState::with(|state| { + tracing::dispatcher::get_default(|subscriber| { + let current_span_id = tracing::Span::current().id(); + if let Some(span_id) = current_span_id.as_ref() { + subscriber.exit(span_id); + } + state.current_mut().span_id = current_span_id; + + if let Some(span_id) = state.span.id().as_ref() { + subscriber.enter(span_id) + } + }); + }); + + result } NextStep::Failure(msg, schedule) => { // Because we're creating the panic here, we don't need `persist_failure` to print @@ -171,6 +206,7 @@ impl Execution { Ok(panic_msg) => Box::new(format!("{}\noriginal panic: {}", message, panic_msg)), Err(panic) => panic, }; + panic::resume_unwind(payload); } } @@ -203,6 +239,9 @@ pub(crate) struct ExecutionState { #[cfg(debug_assertions)] has_cleaned_up: bool, + + // The `Span` which the `ExecutionState` was created under. Will be the parent of all `Task` `Span`s + pub(crate) span: Span, } #[derive(Debug, PartialEq, Eq, Clone, Copy)] @@ -240,6 +279,7 @@ impl ExecutionState { current_schedule: initial_schedule, #[cfg(debug_assertions)] has_cleaned_up: false, + span: tracing::Span::current(), } } @@ -287,11 +327,8 @@ impl ExecutionState { { Self::with(|state| { let schedule_len = state.current_schedule.len(); + let parent_id = state.span.id(); - let parent_span = state - .try_current() - .map(|t| t.span.clone()) - .unwrap_or_else(tracing::Span::current); let task_id = TaskId(state.tasks.len()); let tag = state.get_tag_or_default_for_current_task(); let clock = state.increment_clock_mut(); // Increment the parent's clock @@ -303,9 +340,10 @@ impl ExecutionState { task_id, name, clock.clone(), - parent_span, + parent_id, schedule_len, tag, + state.try_current().map(|t| t.id()), ); state.tasks.push(task); @@ -323,6 +361,7 @@ impl ExecutionState { F: FnOnce() + Send + 'static, { Self::with(|state| { + let parent_id = state.span.id(); let task_id = TaskId(state.tasks.len()); let tag = state.get_tag_or_default_for_current_task(); let clock = if let Some(ref mut clock) = initial_clock { @@ -336,11 +375,17 @@ impl ExecutionState { let schedule_len = state.current_schedule.len(); - let parent_span = state - .try_current() - .map(|t| t.span.clone()) - .unwrap_or_else(tracing::Span::current); - let task = Task::from_closure(f, stack_size, task_id, name, clock, parent_span, schedule_len, tag); + let task = Task::from_closure( + f, + stack_size, + task_id, + name, + clock, + parent_id, + schedule_len, + tag, + state.try_current().map(|t| t.id()), + ); state.tasks.push(task); task_id }) @@ -593,31 +638,19 @@ impl ExecutionState { } } - trace!(?runnable, next_task=?self.next_task); + self.span.in_scope(|| trace!(?runnable, next_task=?self.next_task)); Ok(()) } - /// Set the next task as the current task, and update our tracing span + /// Set the next task as the current task fn advance_to_next_task(&mut self) { debug_assert_ne!(self.next_task, ScheduledTask::None); - let previous_task = self.current_task; self.current_task = self.next_task.take(); - tracing::dispatcher::get_default(|subscriber| { - if let ScheduledTask::Some(previous) = previous_task { - if let Some(span_id) = self.get(previous).span.id() { - subscriber.exit(&span_id) - } - } - - if let ScheduledTask::Some(tid) = self.current_task { - if let Some(span_id) = self.get(tid).span.id() { - subscriber.enter(&span_id); - } - self.current_schedule.push_task(tid); - } - }); + if let ScheduledTask::Some(tid) = self.current_task { + self.current_schedule.push_task(tid); + } } // Sets the `tag` field of the current task. diff --git a/src/runtime/runner.rs b/src/runtime/runner.rs index ea2188b0..96de5fe7 100644 --- a/src/runtime/runner.rs +++ b/src/runtime/runner.rs @@ -15,6 +15,37 @@ use std::thread; use std::time::Instant; use tracing::{span, Level}; +// A helper struct which on `drop` exits all current spans, then enters the span which was entered when it was constructed. +// The reason this exist is to solve the "span-stacking" issue which occurs when there is a panic inside `run` which is +// then caught by `panic::catch_unwind()` (such as when Shuttle is run inside proptest). +// In other words: it enables correct spans when doing proptest minimization. +struct ResetSpanOnDrop { + span_id: Option, +} + +impl ResetSpanOnDrop { + #[must_use] + fn new() -> Self { + Self { + span_id: tracing::Span::current().id(), + } + } +} + +impl Drop for ResetSpanOnDrop { + // Exits all current spans, then enters the span which was entered when `self` was constructed. + fn drop(&mut self) { + tracing::dispatcher::get_default(|subscriber| { + while let Some(span_id) = tracing::Span::current().id().as_ref() { + subscriber.exit(span_id); + } + if let Some(span_id) = self.span_id.as_ref() { + subscriber.enter(span_id); + } + }); + } +} + /// A `Runner` is the entry-point for testing concurrent code. /// /// It takes as input a function to test and a `Scheduler` to run it under. It then executes that @@ -43,6 +74,7 @@ impl Runner { where F: Fn() + Send + Sync + 'static, { + let _span_drop_guard = ResetSpanOnDrop::new(); // Share continuations across executions to avoid reallocating them // TODO it would be a lot nicer if this were a more generic "context" thing that we passed // TODO around explicitly rather than being a thread local diff --git a/src/runtime/task/mod.rs b/src/runtime/task/mod.rs index ac61586a..8297afb2 100644 --- a/src/runtime/task/mod.rs +++ b/src/runtime/task/mod.rs @@ -12,6 +12,7 @@ use std::future::Future; use std::rc::Rc; use std::sync::Arc; use std::task::{Context, Waker}; +use tracing::{event, field, info_span, Level, Span}; pub(crate) mod clock; pub(crate) mod waker; @@ -89,7 +90,11 @@ pub(crate) struct Task { name: Option, local_storage: StorageMap, - pub(super) span: tracing::Span, + + // The `Span` containing the `Task`s `id` and the current step count (if step count recording is enabled) + pub(super) step_span: Span, + // The current `tracing::span::Id` of the `Task` + pub(super) span_id: Option, // Arbitrarily settable tag which is inherited from the parent. tag: Option>, @@ -104,9 +109,10 @@ impl Task { id: TaskId, name: Option, clock: VectorClock, - parent_span: tracing::Span, + parent_id: Option, schedule_len: usize, tag: Option>, + current_task: Option, ) -> Self where F: FnOnce() + Send + 'static, @@ -117,11 +123,8 @@ impl Task { let waker = make_waker(id); let continuation = Rc::new(RefCell::new(continuation)); - let span = if name == Some("main-thread".to_string()) { - parent_span - } else { - tracing::info_span!(parent: parent_span.id(), "step", i = schedule_len, task = id.0) - }; + let step_span = info_span!(parent: parent_id.clone(), "step", task = id.0, i = field::Empty); + let span_id = step_span.id(); let mut task = Self { id, @@ -134,7 +137,8 @@ impl Task { detached: false, park_state: ParkState::default(), name, - span, + step_span, + span_id, local_storage: StorageMap::new(), tag: None, }; @@ -143,6 +147,9 @@ impl Task { task.set_tag(tag); } + info_span!(parent: parent_id, "new_task", parent = ?current_task, i = schedule_len) + .in_scope(|| event!(Level::INFO, "created task: {:?}", task.id)); + task } @@ -153,14 +160,25 @@ impl Task { id: TaskId, name: Option, clock: VectorClock, - parent_span: tracing::Span, + parent_id: Option, schedule_len: usize, tag: Option>, + current_task: Option, ) -> Self where F: FnOnce() + Send + 'static, { - Self::new(f, stack_size, id, name, clock, parent_span, schedule_len, tag) + Self::new( + f, + stack_size, + id, + name, + clock, + parent_id, + schedule_len, + tag, + current_task, + ) } #[allow(clippy::too_many_arguments)] @@ -170,9 +188,10 @@ impl Task { id: TaskId, name: Option, clock: VectorClock, - parent_span: tracing::Span, + parent_id: Option, schedule_len: usize, tag: Option>, + current_task: Option, ) -> Self where F: Future + Send + 'static, @@ -192,9 +211,10 @@ impl Task { id, name, clock, - parent_span, + parent_id, schedule_len, tag, + current_task, ) } diff --git a/tests/basic/mod.rs b/tests/basic/mod.rs index 9db64355..14b2c44e 100644 --- a/tests/basic/mod.rs +++ b/tests/basic/mod.rs @@ -19,4 +19,5 @@ mod shrink; mod tag; mod thread; mod timeout; +mod tracing; mod uncontrolled_nondeterminism; diff --git a/tests/basic/tracing.rs b/tests/basic/tracing.rs new file mode 100644 index 00000000..2a5b1d3f --- /dev/null +++ b/tests/basic/tracing.rs @@ -0,0 +1,82 @@ +use proptest::proptest; +use proptest::test_runner::Config; +use shuttle::sync::{Arc, Mutex}; +use shuttle::{check_random, thread}; +use test_log::test; +use tracing::{warn, warn_span}; + +// TODO: Custom Subscriber +// TODO: Test with futures +// TODO: Test with panics +// TODO: Test with record_steps_in_span enabled + +fn tracing_nested_spans() { + let lock = Arc::new(Mutex::new(0)); + let threads: Vec<_> = (0..3) + .map(|i| { + let lock = lock.clone(); + thread::spawn(move || { + let outer = warn_span!("outer", tid = i); + let _outer = outer.enter(); + { + let mut locked = lock.lock().unwrap(); + let inner = warn_span!("inner", tid = i); + let _inner = inner.enter(); + warn!("incrementing from {}", *locked); + *locked += 1; + } + }) + }) + .collect(); + + for thread in threads { + thread.join().unwrap(); + } +} + +fn tracing_nested_spans_panic_mod_5(number: usize) { + let lock = Arc::new(Mutex::new(0)); + let threads: Vec<_> = (0..3) + .map(|i| { + let lock = lock.clone(); + thread::spawn(move || { + let outer = warn_span!("outer", tid = i); + let _outer = outer.enter(); + { + let mut locked = lock.lock().unwrap(); + let inner = warn_span!("inner", tid = i); + let _inner = inner.enter(); + warn!("incrementing from {}", *locked); + *locked += 1; + } + if number % 5 == 0 { + panic!(); + } + }) + }) + .collect(); + + for thread in threads { + thread.join().unwrap(); + } +} + +#[test] +fn test_tracing_nested_spans() { + check_random(tracing_nested_spans, 10); +} + +// Test to check that spans don't stack on panic and that minimization works as it should +proptest! { + #![proptest_config( + Config { cases: 1000, failure_persistence: None, .. Config::default() } + )] + #[should_panic] + #[test] + fn test_stacks_cleaned_on_panic(i: usize) { + check_random(move || { + tracing_nested_spans_panic_mod_5(i); + }, + 10); + } +} From a7f56882586fe8f40816c07ee1eed33ec4c67a92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sarek=20H=C3=B8verstad=20Skot=C3=A5m?= Date: Mon, 4 Dec 2023 13:17:37 +0000 Subject: [PATCH 02/10] Own Span in Task to avoid dropping the Span while Task is switched out --- src/runtime/execution.rs | 8 ++++---- src/runtime/task/mod.rs | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/runtime/execution.rs b/src/runtime/execution.rs index e2ff7978..47cc0488 100644 --- a/src/runtime/execution.rs +++ b/src/runtime/execution.rs @@ -152,7 +152,7 @@ impl Execution { subscriber.exit(span_id); } - if let Some(span_id) = state.current().span_id.as_ref() { + if let Some(span_id) = state.current().span.id().as_ref() { subscriber.enter(span_id) } @@ -167,11 +167,11 @@ impl Execution { // Leave the Task's span and store which span it exited in order to restore it the next time the Task is run ExecutionState::with(|state| { tracing::dispatcher::get_default(|subscriber| { - let current_span_id = tracing::Span::current().id(); - if let Some(span_id) = current_span_id.as_ref() { + let current_span = tracing::Span::current(); + if let Some(span_id) = current_span.id().as_ref() { subscriber.exit(span_id); } - state.current_mut().span_id = current_span_id; + state.current_mut().span = current_span; if let Some(span_id) = state.span.id().as_ref() { subscriber.enter(span_id) diff --git a/src/runtime/task/mod.rs b/src/runtime/task/mod.rs index 8297afb2..5b4c07c6 100644 --- a/src/runtime/task/mod.rs +++ b/src/runtime/task/mod.rs @@ -93,8 +93,8 @@ pub(crate) struct Task { // The `Span` containing the `Task`s `id` and the current step count (if step count recording is enabled) pub(super) step_span: Span, - // The current `tracing::span::Id` of the `Task` - pub(super) span_id: Option, + // The current `Span` of the `Task`. We have to have it by ownership in order for the `Span` to not get dropped while the task is switched out. + pub(super) span: Span, // Arbitrarily settable tag which is inherited from the parent. tag: Option>, @@ -124,7 +124,7 @@ impl Task { let continuation = Rc::new(RefCell::new(continuation)); let step_span = info_span!(parent: parent_id.clone(), "step", task = id.0, i = field::Empty); - let span_id = step_span.id(); + let span = step_span.clone(); let mut task = Self { id, @@ -138,7 +138,7 @@ impl Task { park_state: ParkState::default(), name, step_span, - span_id, + span, local_storage: StorageMap::new(), tag: None, }; From c4c651764e106de357ab2a6f8cd6350d3dd0a508 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sarek=20H=C3=B8verstad=20Skot=C3=A5m?= Date: Mon, 11 Dec 2023 02:52:39 -0800 Subject: [PATCH 03/10] Respond to comments by @jorajeev - current_task -> parent_task_id - span -> top_level_span - parent_id -> parent_span_id - #[ignore] tracing tests - move test_tracing_nested_spans up --- src/lib.rs | 2 +- src/runtime/execution.rs | 17 +++++++++-------- src/runtime/task/mod.rs | 24 ++++++++++++------------ tests/basic/tracing.rs | 12 +++++++----- 4 files changed, 29 insertions(+), 26 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index a7eecfa0..d1b637e1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -227,7 +227,7 @@ pub struct Config { /// `tracing_subscriber::fmt`, appends to the span on calls to `record()` (instead of /// overwriting), which results in traces which are hard to read if the task is scheduled more /// than a few times. - /// Thus: set `record_steps_in_span` to `true` if you want this behaviour, or if you are using + /// Thus: set `record_steps_in_span` to `true` if you want "append behavior", or if you are using /// a `Subscriber` which overwrites on calls to `record()` and want to display the current step /// count. pub record_steps_in_span: bool, diff --git a/src/runtime/execution.rs b/src/runtime/execution.rs index 47cc0488..9bc7981a 100644 --- a/src/runtime/execution.rs +++ b/src/runtime/execution.rs @@ -173,7 +173,7 @@ impl Execution { } state.current_mut().span = current_span; - if let Some(span_id) = state.span.id().as_ref() { + if let Some(span_id) = state.top_level_span.id().as_ref() { subscriber.enter(span_id) } }); @@ -241,7 +241,7 @@ pub(crate) struct ExecutionState { has_cleaned_up: bool, // The `Span` which the `ExecutionState` was created under. Will be the parent of all `Task` `Span`s - pub(crate) span: Span, + pub(crate) top_level_span: Span, } #[derive(Debug, PartialEq, Eq, Clone, Copy)] @@ -279,7 +279,7 @@ impl ExecutionState { current_schedule: initial_schedule, #[cfg(debug_assertions)] has_cleaned_up: false, - span: tracing::Span::current(), + top_level_span: tracing::Span::current(), } } @@ -327,7 +327,7 @@ impl ExecutionState { { Self::with(|state| { let schedule_len = state.current_schedule.len(); - let parent_id = state.span.id(); + let parent_span_id = state.top_level_span.id(); let task_id = TaskId(state.tasks.len()); let tag = state.get_tag_or_default_for_current_task(); @@ -340,7 +340,7 @@ impl ExecutionState { task_id, name, clock.clone(), - parent_id, + parent_span_id, schedule_len, tag, state.try_current().map(|t| t.id()), @@ -361,7 +361,7 @@ impl ExecutionState { F: FnOnce() + Send + 'static, { Self::with(|state| { - let parent_id = state.span.id(); + let parent_span_id = state.top_level_span.id(); let task_id = TaskId(state.tasks.len()); let tag = state.get_tag_or_default_for_current_task(); let clock = if let Some(ref mut clock) = initial_clock { @@ -381,7 +381,7 @@ impl ExecutionState { task_id, name, clock, - parent_id, + parent_span_id, schedule_len, tag, state.try_current().map(|t| t.id()), @@ -638,7 +638,8 @@ impl ExecutionState { } } - self.span.in_scope(|| trace!(?runnable, next_task=?self.next_task)); + self.top_level_span + .in_scope(|| trace!(?runnable, next_task=?self.next_task)); Ok(()) } diff --git a/src/runtime/task/mod.rs b/src/runtime/task/mod.rs index 5b4c07c6..91f825dc 100644 --- a/src/runtime/task/mod.rs +++ b/src/runtime/task/mod.rs @@ -109,10 +109,10 @@ impl Task { id: TaskId, name: Option, clock: VectorClock, - parent_id: Option, + parent_span_id: Option, schedule_len: usize, tag: Option>, - current_task: Option, + parent_task_id: Option, ) -> Self where F: FnOnce() + Send + 'static, @@ -123,7 +123,7 @@ impl Task { let waker = make_waker(id); let continuation = Rc::new(RefCell::new(continuation)); - let step_span = info_span!(parent: parent_id.clone(), "step", task = id.0, i = field::Empty); + let step_span = info_span!(parent: parent_span_id.clone(), "step", task = id.0, i = field::Empty); let span = step_span.clone(); let mut task = Self { @@ -147,7 +147,7 @@ impl Task { task.set_tag(tag); } - info_span!(parent: parent_id, "new_task", parent = ?current_task, i = schedule_len) + info_span!(parent: parent_span_id, "new_task", parent = ?parent_task_id, i = schedule_len) .in_scope(|| event!(Level::INFO, "created task: {:?}", task.id)); task @@ -160,10 +160,10 @@ impl Task { id: TaskId, name: Option, clock: VectorClock, - parent_id: Option, + parent_span_id: Option, schedule_len: usize, tag: Option>, - current_task: Option, + parent_task_id: Option, ) -> Self where F: FnOnce() + Send + 'static, @@ -174,10 +174,10 @@ impl Task { id, name, clock, - parent_id, + parent_span_id, schedule_len, tag, - current_task, + parent_task_id, ) } @@ -188,10 +188,10 @@ impl Task { id: TaskId, name: Option, clock: VectorClock, - parent_id: Option, + parent_span_id: Option, schedule_len: usize, tag: Option>, - current_task: Option, + parent_task_id: Option, ) -> Self where F: Future + Send + 'static, @@ -211,10 +211,10 @@ impl Task { id, name, clock, - parent_id, + parent_span_id, schedule_len, tag, - current_task, + parent_task_id, ) } diff --git a/tests/basic/tracing.rs b/tests/basic/tracing.rs index 2a5b1d3f..e7e94f1e 100644 --- a/tests/basic/tracing.rs +++ b/tests/basic/tracing.rs @@ -34,6 +34,12 @@ fn tracing_nested_spans() { } } +#[ignore] +#[test] +fn test_tracing_nested_spans() { + check_random(tracing_nested_spans, 10); +} + fn tracing_nested_spans_panic_mod_5(number: usize) { let lock = Arc::new(Mutex::new(0)); let threads: Vec<_> = (0..3) @@ -61,17 +67,13 @@ fn tracing_nested_spans_panic_mod_5(number: usize) { } } -#[test] -fn test_tracing_nested_spans() { - check_random(tracing_nested_spans, 10); -} - // Test to check that spans don't stack on panic and that minimization works as it should proptest! { #![proptest_config( Config { cases: 1000, failure_persistence: None, .. Config::default() } )] #[should_panic] + #[ignore] #[test] fn test_stacks_cleaned_on_panic(i: usize) { check_random(move || { From b32e021549b45d0eca3bc1483a83e381aad5e625 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sarek=20H=C3=B8verstad=20Skot=C3=A5m?= Date: Mon, 11 Dec 2023 03:13:36 -0800 Subject: [PATCH 04/10] Make the execution_span and the step_span ERROR --- src/runtime/runner.rs | 4 ++-- src/runtime/task/mod.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/runtime/runner.rs b/src/runtime/runner.rs index 96de5fe7..184f0015 100644 --- a/src/runtime/runner.rs +++ b/src/runtime/runner.rs @@ -98,7 +98,7 @@ impl Runner { let execution = Execution::new(self.scheduler.clone(), schedule); let f = Arc::clone(&f); - span!(Level::INFO, "execution", i).in_scope(|| execution.run(&self.config, move || f())); + span!(Level::ERROR, "execution", i).in_scope(|| execution.run(&self.config, move || f())); i += 1; } @@ -165,7 +165,7 @@ impl PortfolioRunner { let runner = Runner::new(scheduler, config); - span!(Level::INFO, "job", i).in_scope(|| { + span!(Level::ERROR, "job", i).in_scope(|| { let ret = panic::catch_unwind(panic::AssertUnwindSafe(|| runner.run(move || f()))); match ret { diff --git a/src/runtime/task/mod.rs b/src/runtime/task/mod.rs index 91f825dc..9bdf7684 100644 --- a/src/runtime/task/mod.rs +++ b/src/runtime/task/mod.rs @@ -12,7 +12,7 @@ use std::future::Future; use std::rc::Rc; use std::sync::Arc; use std::task::{Context, Waker}; -use tracing::{event, field, info_span, Level, Span}; +use tracing::{error_span, event, field, Level, Span}; pub(crate) mod clock; pub(crate) mod waker; @@ -123,7 +123,7 @@ impl Task { let waker = make_waker(id); let continuation = Rc::new(RefCell::new(continuation)); - let step_span = info_span!(parent: parent_span_id.clone(), "step", task = id.0, i = field::Empty); + let step_span = error_span!(parent: parent_span_id.clone(), "step", task = id.0, i = field::Empty); let span = step_span.clone(); let mut task = Self { @@ -147,7 +147,7 @@ impl Task { task.set_tag(tag); } - info_span!(parent: parent_span_id, "new_task", parent = ?parent_task_id, i = schedule_len) + error_span!(parent: parent_span_id, "new_task", parent = ?parent_task_id, i = schedule_len) .in_scope(|| event!(Level::INFO, "created task: {:?}", task.id)); task From 6dd5a7623adefab506d5a2768a735f483249c23f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sarek=20H=C3=B8verstad=20Skot=C3=A5m?= Date: Mon, 8 Jan 2024 17:34:22 +0100 Subject: [PATCH 05/10] Add instrumented futures test --- tests/basic/tracing.rs | 50 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 47 insertions(+), 3 deletions(-) diff --git a/tests/basic/tracing.rs b/tests/basic/tracing.rs index e7e94f1e..acf88463 100644 --- a/tests/basic/tracing.rs +++ b/tests/basic/tracing.rs @@ -1,13 +1,19 @@ use proptest::proptest; use proptest::test_runner::Config; +use shuttle::future::spawn; use shuttle::sync::{Arc, Mutex}; use shuttle::{check_random, thread}; +use std::time::Duration; use test_log::test; -use tracing::{warn, warn_span}; +use tracing::{instrument::Instrument, warn, warn_span}; + +// NOTE: +// All of the tests testing tracing will either have to have weak assertions, +// or be #[ignore]d. The reason for this is that they are not thread safe +// (since everything tracing is managed globally), and there is no way to +// ensure that the tests are run single-threaded. // TODO: Custom Subscriber -// TODO: Test with futures -// TODO: Test with panics // TODO: Test with record_steps_in_span enabled fn tracing_nested_spans() { @@ -82,3 +88,41 @@ proptest! { 10); } } + +async fn spawn_instrumented_futures() { + let jhs = (0..2) + .map(|_| { + spawn(async { + async { + let span_id = tracing::Span::current().id(); + async { + // NOTE: Just a way to get a `thread::switch` call. + // Consider `pub`ing `thread::switch` ? + thread::sleep(Duration::from_millis(0)); + } + .instrument(warn_span!("Span")) + .await; + assert_eq!(span_id, tracing::Span::current().id()) + } + .await + }) + }) + .collect::>(); + for jh in jhs { + jh.await.unwrap(); + } +} + +#[ignore] +#[test] +fn instrumented_futures() { + let _res = tracing_subscriber::fmt::try_init(); + shuttle::check_random( + || { + shuttle::future::block_on(async move { + spawn_instrumented_futures().await; + }) + }, + 100, + ); +} From c2ece8d69a8a393b47cc094242415d716e682d07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sarek=20H=C3=B8verstad=20Skot=C3=A5m?= Date: Mon, 8 Jan 2024 18:22:00 +0100 Subject: [PATCH 06/10] Fix issue with tracing and instrumented futures --- src/runtime/execution.rs | 20 ++++++++++++-------- src/runtime/runner.rs | 5 +++++ src/runtime/task/mod.rs | 16 ++++++++++++---- 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/src/runtime/execution.rs b/src/runtime/execution.rs index 9bc7981a..89614cfe 100644 --- a/src/runtime/execution.rs +++ b/src/runtime/execution.rs @@ -145,15 +145,19 @@ impl Execution { let ret = match next_step { NextStep::Task(continuation) => { // Enter the Task's span + // Note that if any issues arises with spans and tracing, then calling `exit` until `None`, + // storing the entirety of the `span_stack` when creating the `Task` and storing + // `top_level_span` as a stack should be tried. ExecutionState::with(|state| { tracing::dispatcher::get_default(|subscriber| { - let current_span_id = tracing::Span::current().id(); - if let Some(span_id) = current_span_id.as_ref() { + if let Some(span_id) = tracing::Span::current().id().as_ref() { subscriber.exit(span_id); } - if let Some(span_id) = state.current().span.id().as_ref() { - subscriber.enter(span_id) + while let Some(span) = state.current_mut().span_stack.pop() { + if let Some(span_id) = span.id().as_ref() { + subscriber.enter(span_id) + } } if state.config.record_steps_in_span { @@ -164,14 +168,14 @@ impl Execution { let result = panic::catch_unwind(panic::AssertUnwindSafe(|| continuation.borrow_mut().resume())); - // Leave the Task's span and store which span it exited in order to restore it the next time the Task is run + // Leave the Task's span and store the exited `Span` stack in order to restore it the next time the Task is run ExecutionState::with(|state| { tracing::dispatcher::get_default(|subscriber| { - let current_span = tracing::Span::current(); - if let Some(span_id) = current_span.id().as_ref() { + debug_assert!(state.current().span_stack.is_empty()); + while let Some(span_id) = tracing::Span::current().id().as_ref() { + state.current_mut().span_stack.push(tracing::Span::current().clone()); subscriber.exit(span_id); } - state.current_mut().span = current_span; if let Some(span_id) = state.top_level_span.id().as_ref() { subscriber.enter(span_id) diff --git a/src/runtime/runner.rs b/src/runtime/runner.rs index 184f0015..e364927c 100644 --- a/src/runtime/runner.rs +++ b/src/runtime/runner.rs @@ -98,6 +98,11 @@ impl Runner { let execution = Execution::new(self.scheduler.clone(), schedule); let f = Arc::clone(&f); + // This is a slightly lazy way to ensure that everything outside of the "execution" span gets + // established correctly between executions. Fully `exit`ing and fully `enter`ing (explicitly + // `enter`/`exit` all `Span`s) would most likely obviate the need for this. + let _rsod = ResetSpanOnDrop::new(); + span!(Level::ERROR, "execution", i).in_scope(|| execution.run(&self.config, move || f())); i += 1; diff --git a/src/runtime/task/mod.rs b/src/runtime/task/mod.rs index 9bdf7684..efc12a9e 100644 --- a/src/runtime/task/mod.rs +++ b/src/runtime/task/mod.rs @@ -93,8 +93,13 @@ pub(crate) struct Task { // The `Span` containing the `Task`s `id` and the current step count (if step count recording is enabled) pub(super) step_span: Span, - // The current `Span` of the `Task`. We have to have it by ownership in order for the `Span` to not get dropped while the task is switched out. - pub(super) span: Span, + // The current `Span` "stack" of the `Task`. There are two things to note: + // 1: We have to own the `Span`s (versus storing `Id`s) for the `Span` to not get dropped while the task is switched out. + // 2: We have to store the stack of `Span`s in order to return to the correct `Span` once the `Entered<'_>` from an + // `instrument`ed future is dropped. + // `span_stack` is (as the name suggests) a stack. We `pop` it empty when resuming a `Task`, and `push` + `exit` + // `tracing::Span::current()` until there is no entered `Span` when we switch out of the `Task`. + pub(super) span_stack: Vec, // Arbitrarily settable tag which is inherited from the parent. tag: Option>, @@ -124,7 +129,10 @@ impl Task { let continuation = Rc::new(RefCell::new(continuation)); let step_span = error_span!(parent: parent_span_id.clone(), "step", task = id.0, i = field::Empty); - let span = step_span.clone(); + // Note that this is slightly lazy — we are starting storing at the step_span, but could have gotten the + // full `Span` stack and stored that. It should be fine, but if any issues arise, then full storing should + // be tried. + let span_stack = vec![step_span.clone()]; let mut task = Self { id, @@ -138,7 +146,7 @@ impl Task { park_state: ParkState::default(), name, step_span, - span, + span_stack, local_storage: StorageMap::new(), tag: None, }; From a1607eb53234affb92cb6719607202c4d730db88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sarek=20H=C3=B8verstad=20Skot=C3=A5m?= Date: Mon, 8 Jan 2024 18:22:53 +0100 Subject: [PATCH 07/10] Update instrumented futures tracing test to highlight top level span failures --- tests/basic/tracing.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/basic/tracing.rs b/tests/basic/tracing.rs index acf88463..9042e78b 100644 --- a/tests/basic/tracing.rs +++ b/tests/basic/tracing.rs @@ -116,6 +116,8 @@ async fn spawn_instrumented_futures() { #[ignore] #[test] fn instrumented_futures() { + let outer_span = warn_span!("OUTER"); + let _e = outer_span.enter(); let _res = tracing_subscriber::fmt::try_init(); shuttle::check_random( || { From 954b30f9026d39458a67e8abfaf506bfc64feae9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sarek=20H=C3=B8verstad=20Skot=C3=A5m?= Date: Mon, 8 Jan 2024 18:27:57 +0100 Subject: [PATCH 08/10] Clippy unneeded async --- tests/basic/tracing.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/tests/basic/tracing.rs b/tests/basic/tracing.rs index 9042e78b..1fdeb644 100644 --- a/tests/basic/tracing.rs +++ b/tests/basic/tracing.rs @@ -93,18 +93,15 @@ async fn spawn_instrumented_futures() { let jhs = (0..2) .map(|_| { spawn(async { + let span_id = tracing::Span::current().id(); async { - let span_id = tracing::Span::current().id(); - async { - // NOTE: Just a way to get a `thread::switch` call. - // Consider `pub`ing `thread::switch` ? - thread::sleep(Duration::from_millis(0)); - } - .instrument(warn_span!("Span")) - .await; - assert_eq!(span_id, tracing::Span::current().id()) + // NOTE: Just a way to get a `thread::switch` call. + // Consider `pub`ing `thread::switch` ? + thread::sleep(Duration::from_millis(0)); } - .await + .instrument(warn_span!("Span")) + .await; + assert_eq!(span_id, tracing::Span::current().id()) }) }) .collect::>(); @@ -125,6 +122,6 @@ fn instrumented_futures() { spawn_instrumented_futures().await; }) }, - 100, + 1000, ); } From 3fc910ddc1542b25662af3a8b09ffb66e7d0676c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sarek=20H=C3=B8verstad=20Skot=C3=A5m?= Date: Mon, 8 Jan 2024 19:10:49 +0100 Subject: [PATCH 09/10] Update ResetSpanOnDrop to store an owned copy of the Span --- src/runtime/runner.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/runtime/runner.rs b/src/runtime/runner.rs index e364927c..d61b1ca8 100644 --- a/src/runtime/runner.rs +++ b/src/runtime/runner.rs @@ -20,14 +20,14 @@ use tracing::{span, Level}; // then caught by `panic::catch_unwind()` (such as when Shuttle is run inside proptest). // In other words: it enables correct spans when doing proptest minimization. struct ResetSpanOnDrop { - span_id: Option, + span: tracing::Span, } impl ResetSpanOnDrop { #[must_use] fn new() -> Self { Self { - span_id: tracing::Span::current().id(), + span: tracing::Span::current().clone(), } } } @@ -39,7 +39,7 @@ impl Drop for ResetSpanOnDrop { while let Some(span_id) = tracing::Span::current().id().as_ref() { subscriber.exit(span_id); } - if let Some(span_id) = self.span_id.as_ref() { + if let Some(span_id) = self.span.id().as_ref() { subscriber.enter(span_id); } }); From f190a4df6f5e54544d8f471aab7179c9c9d0c412 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sarek=20H=C3=B8verstad=20Skot=C3=A5m?= Date: Fri, 26 Jan 2024 20:34:41 -0800 Subject: [PATCH 10/10] Address comments by @jorajeev --- src/lib.rs | 8 ++++++++ src/runtime/execution.rs | 15 ++++++++++++--- src/runtime/runner.rs | 6 +++--- src/runtime/task/mod.rs | 14 ++++++++++---- tests/basic/tracing.rs | 5 +---- 5 files changed, 34 insertions(+), 14 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d1b637e1..c7c5ea85 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -223,6 +223,14 @@ pub struct Config { /// Whether to call the `Span::record()` method to update the step count (`i`) of the `Span` /// containing the `TaskId` and the current step count for the given `TaskId`. + /// If `false`, this `Span` will look like this: `step{task=1}`, and if `true`, this `Span` + /// will look something like this: `step{task=1 i=3 i=9 i=12}`, or, if a `Subscriber` which + /// overwrites on calls to `span.record()` is used, something like this: + /// ```text + /// step{task=1 i=3} + /// step{task=1 i=9} + /// step{task=1 i=12} + /// ``` /// The reason this is a config option is that the most popular tracing `Subscriber`s, ie /// `tracing_subscriber::fmt`, appends to the span on calls to `record()` (instead of /// overwriting), which results in traces which are hard to read if the task is scheduled more diff --git a/src/runtime/execution.rs b/src/runtime/execution.rs index 89614cfe..ed6af502 100644 --- a/src/runtime/execution.rs +++ b/src/runtime/execution.rs @@ -145,15 +145,19 @@ impl Execution { let ret = match next_step { NextStep::Task(continuation) => { // Enter the Task's span - // Note that if any issues arises with spans and tracing, then calling `exit` until `None`, - // storing the entirety of the `span_stack` when creating the `Task` and storing - // `top_level_span` as a stack should be tried. + // (Note that if any issues arise with spans and tracing, then + // 1) calling `exit` until `None` before entering the `Task`s `Span`, + // 2) storing the entirety of the `span_stack` when creating the `Task`, and + // 3) storing `top_level_span` as a stack + // should be tried.) ExecutionState::with(|state| { tracing::dispatcher::get_default(|subscriber| { if let Some(span_id) = tracing::Span::current().id().as_ref() { subscriber.exit(span_id); } + // The `span_stack` stores `Span`s such that the top of the stack is the outermost `Span`, + // meaning that parents (left-most when printed) are entered first. while let Some(span) = state.current_mut().span_stack.pop() { if let Some(span_id) = span.id().as_ref() { subscriber.enter(span_id) @@ -642,6 +646,11 @@ impl ExecutionState { } } + // Tracing this `in_scope` is purely a matter of taste. We do it because + // 1) It is an action taken by the scheduler, and should thus be traced under the scheduler's span + // 2) It creates a visual separation of scheduling decisions and `Task`-induced tracing. + // Note that there is a case to be made for not `in_scope`-ing it, as that makes seeing the context + // of the context switch clearer. self.top_level_span .in_scope(|| trace!(?runnable, next_task=?self.next_task)); diff --git a/src/runtime/runner.rs b/src/runtime/runner.rs index d61b1ca8..fdcf5c1d 100644 --- a/src/runtime/runner.rs +++ b/src/runtime/runner.rs @@ -16,15 +16,15 @@ use std::time::Instant; use tracing::{span, Level}; // A helper struct which on `drop` exits all current spans, then enters the span which was entered when it was constructed. -// The reason this exist is to solve the "span-stacking" issue which occurs when there is a panic inside `run` which is +// The reason this exists is to solve the "span-stacking" issue which occurs when there is a panic inside `run` which is // then caught by `panic::catch_unwind()` (such as when Shuttle is run inside proptest). // In other words: it enables correct spans when doing proptest minimization. +#[must_use] struct ResetSpanOnDrop { span: tracing::Span, } impl ResetSpanOnDrop { - #[must_use] fn new() -> Self { Self { span: tracing::Span::current().clone(), @@ -101,7 +101,7 @@ impl Runner { // This is a slightly lazy way to ensure that everything outside of the "execution" span gets // established correctly between executions. Fully `exit`ing and fully `enter`ing (explicitly // `enter`/`exit` all `Span`s) would most likely obviate the need for this. - let _rsod = ResetSpanOnDrop::new(); + let _span_drop_guard2 = ResetSpanOnDrop::new(); span!(Level::ERROR, "execution", i).in_scope(|| execution.run(&self.config, move || f())); diff --git a/src/runtime/task/mod.rs b/src/runtime/task/mod.rs index efc12a9e..7a385a66 100644 --- a/src/runtime/task/mod.rs +++ b/src/runtime/task/mod.rs @@ -91,14 +91,20 @@ pub(crate) struct Task { local_storage: StorageMap, - // The `Span` containing the `Task`s `id` and the current step count (if step count recording is enabled) + // The `Span` which looks like this: step{task=task_id}, or, if step count recording is enabled, like this: + // step{task=task_id i=step_count}. Becomes the parent of the spans created by the `Task`. pub(super) step_span: Span, - // The current `Span` "stack" of the `Task`. There are two things to note: + + // The current `Span` "stack" of the `Task`. + // `Span`s are stored such that the `Task`s current `Span` is at `span_stack[0]`, that `Span`s parent (if it exists) + // is at `span_stack[1]`, and so on, until `span_stack[span_stack.len()-1]`, which is the "outermost" (left-most when printed) + // `Span`. This means that `span_stack[span_stack.len()-1]` will usually be the `Span` saying `execution{i=X}`. + // We `pop` it empty when resuming a `Task`, and `push` + `exit` `tracing::Span::current()` + // until there is no entered `Span` when we switch out of the `Task`. + // There are two things to note: // 1: We have to own the `Span`s (versus storing `Id`s) for the `Span` to not get dropped while the task is switched out. // 2: We have to store the stack of `Span`s in order to return to the correct `Span` once the `Entered<'_>` from an // `instrument`ed future is dropped. - // `span_stack` is (as the name suggests) a stack. We `pop` it empty when resuming a `Task`, and `push` + `exit` - // `tracing::Span::current()` until there is no entered `Span` when we switch out of the `Task`. pub(super) span_stack: Vec, // Arbitrarily settable tag which is inherited from the parent. diff --git a/tests/basic/tracing.rs b/tests/basic/tracing.rs index 1fdeb644..8f246035 100644 --- a/tests/basic/tracing.rs +++ b/tests/basic/tracing.rs @@ -3,7 +3,6 @@ use proptest::test_runner::Config; use shuttle::future::spawn; use shuttle::sync::{Arc, Mutex}; use shuttle::{check_random, thread}; -use std::time::Duration; use test_log::test; use tracing::{instrument::Instrument, warn, warn_span}; @@ -95,9 +94,7 @@ async fn spawn_instrumented_futures() { spawn(async { let span_id = tracing::Span::current().id(); async { - // NOTE: Just a way to get a `thread::switch` call. - // Consider `pub`ing `thread::switch` ? - thread::sleep(Duration::from_millis(0)); + thread::yield_now(); } .instrument(warn_span!("Span")) .await;