Skip to content

Commit

Permalink
more informative trace
Browse files Browse the repository at this point in the history
  • Loading branch information
stepantubanov committed May 31, 2024
1 parent e07e781 commit 383bbaf
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 39 deletions.
13 changes: 12 additions & 1 deletion src/enabled/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,18 @@ pub(crate) enum TaskState {

impl TaskState {
pub(crate) fn can_execute(&self) -> bool {
matches!(self, Self::WaitingForPermit { blocked_locks, .. } if blocked_locks.is_empty())
self.executable_op().is_some()
}

pub(crate) fn executable_op(&self) -> Option<&'static OperationMetadata> {
match self {
Self::WaitingForPermit {
metadata,
blocked_locks,
..
} if blocked_locks.is_empty() => Some(metadata),
_ => None,
}
}
}

Expand Down
117 changes: 83 additions & 34 deletions src/enabled/runner.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use core::fmt;
use std::{
env,
error::Error,
fmt,
future::Future,
num::ParseIntError,
panic::{self, AssertUnwindSafe},
str::FromStr,
time::Duration,
Expand Down Expand Up @@ -144,7 +144,7 @@ impl Runner {
let mut controller = Controller::register(&initial_tasks);

let control = async {
let mut task_ids_from_trace = trace.task_ids.into_iter();
let mut steps_from_trace = trace.steps.into_iter();
let mut rng = Rng::new();

if let Some(before_iter) = &mut self.before_iter {
Expand All @@ -157,21 +157,27 @@ impl Runner {
before_step().await;
}

let task_id = task_ids_from_trace.next().or_else(|| {
let candidates = controller
.tasks()
.iter()
.filter_map(|(task, state)| {
state.can_execute().then_some(task.id())
})
.collect::<Vec<TaskId>>();

if candidates.is_empty() {
return None;
}

Some(candidates[rng.usize(..candidates.len())])
});
let task_id = steps_from_trace
.next()
.map(|(task_id, _, _)| {
// TODO: check task name, op name
task_id
})
.or_else(|| {
let candidates = controller
.tasks()
.iter()
.filter_map(|(task, state)| {
state.can_execute().then_some(task.id())
})
.collect::<Vec<TaskId>>();

if candidates.is_empty() {
return None;
}

Some(candidates[rng.usize(..candidates.len())])
});

let Some(task_id) = task_id else {
break;
Expand All @@ -191,6 +197,7 @@ impl Runner {
}
};

// TODO: handle panics
(state, _) = join!(f(state), control);
return state;
}
Expand Down Expand Up @@ -220,7 +227,22 @@ impl Runner {
break;
};

trace.task_ids.push(task_id);
let (task_name, op_name) = controller
.tasks()
.iter()
.find_map(|(task, state)| {
if task.id() == task_id {
let op_metadata = state
.executable_op()
.expect("task with chosen task_id isn't executable");
Some((task.name().clone(), OperationName(op_metadata.name.into())))
} else {
None
}
})
.expect("can't find task name & op name for chosen task");

trace.steps.push((task_id, task_name, op_name));
if let Some(before_step) = &mut self.before_step {
before_step().await;
}
Expand Down Expand Up @@ -266,40 +288,67 @@ impl Runner {
}

pub struct Trace {
task_ids: Vec<TaskId>,
steps: Vec<(TaskId, TaskName, OperationName)>,
}

struct OperationName(String);

impl Trace {
fn new() -> Self {
Self {
task_ids: Vec::new(),
}
Self { steps: Vec::new() }
}
}

impl fmt::Display for Trace {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.task_ids.is_empty() {
if self.steps.is_empty() {
return Ok(());
}

write!(f, "{}", self.task_ids[0].0)?;
for task_id in &self.task_ids[1..] {
write!(f, ",{}", task_id.0)?;
let (task_id, task_name, op_name) = &self.steps[0];
write!(f, "{}:{}.{}", task_id.0, task_name.0, op_name.0)?;
for (task_id, task_name, op_name) in &self.steps[1..] {
write!(f, ",{}:{}.{}", task_id.0, task_name.0, op_name.0)?;
}
Ok(())
}
}

impl FromStr for Trace {
type Err = ParseIntError;
type Err = ParseTraceError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self {
task_ids: s
.split(',')
.map(|part| part.parse().map(TaskId))
.collect::<Result<Vec<_>, _>>()?,
})
let steps = s
.split(',')
.map(|step| {
let (task_id, names) = step.split_once(':').ok_or(ParseTraceError)?;
let (task_name, op_name) = names.split_once('.').ok_or(ParseTraceError)?;

let task_id = TaskId(task_id.parse().map_err(|_| ParseTraceError)?);
Ok((
task_id,
TaskName(task_name.into()),
OperationName(op_name.into()),
))
})
.collect::<Result<Vec<_>, ParseTraceError>>()?;

Ok(Self { steps })
}
}

pub struct ParseTraceError;

impl fmt::Debug for ParseTraceError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("invalid trace")
}
}

impl fmt::Display for ParseTraceError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("invalid trace")
}
}

impl Error for ParseTraceError {}
18 changes: 14 additions & 4 deletions tests/examples/basic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::HashMap;
use std::sync::Mutex;

use parcheck::Trace;

struct Observer {
trace: Mutex<String>,
}
Expand Down Expand Up @@ -132,7 +134,7 @@ async fn does_not_double_panic() {

#[tokio::test]
#[should_panic(
expected = "operation 'outer' already in progress for task 'reentrant' (operation at tests/examples/basic.rs:142)"
expected = "operation 'outer' already in progress for task 'reentrant' (operation at tests/examples/basic.rs:144)"
)]
async fn detects_reentrant_task() {
parcheck::runner()
Expand All @@ -154,9 +156,13 @@ async fn detects_reentrant_task() {

#[tokio::test]
async fn replays_prefix_trace() {
// only a prefix of an actual execution
let trace: Trace = "0:replays_prefix_trace.op,0:replays_prefix_trace.op"
.parse()
.unwrap();

parcheck::runner()
// only a prefix of an actual execution
.replay("0,0".parse().unwrap())
.replay(trace)
.run(["replays_prefix_trace"], || async {
parcheck::task!("replays_prefix_trace", {
async {
Expand All @@ -172,8 +178,12 @@ async fn replays_prefix_trace() {

#[tokio::test]
async fn replays_full_trace() {
let trace: Trace = "0:replays_full_trace.op,0:replays_full_trace.op,0:replays_full_trace.op"
.parse()
.unwrap();

parcheck::runner()
.replay("0,0,0".parse().unwrap())
.replay(trace)
.run(["replays_full_trace"], || async {
parcheck::task!("replays_full_trace", {
async {
Expand Down

0 comments on commit 383bbaf

Please sign in to comment.