Skip to content

Commit

Permalink
Emit concurrent traces in ConcurrencyHandler
Browse files Browse the repository at this point in the history
Summary:
Emitting all traces in the soft error message for easier debugging.

Also, I _think_ there's a deadlock if we error out for the `NestedInvocation::Error` case, and then try to run another command. The quickstack trace I saw all had the following:

```
Thread 215 (LWP 4048887):
#1  0x00007f3dabb259d9 in syscall () from /usr/local/fbcode/platform010/lib/libc.so.6
```
I'm not sure if this is related to something else, or it's because of the deadlock. Adding the lines to drop the data, create the drop guard, and then releasing the baton fixes it. I wasn't able to trigger this in my previous manual testing, but this seems like a correct thing to do regardless.

Reviewed By: krallin

Differential Revision: D39985753

fbshipit-source-id: 2e3bf9fa6ba5f340c0189aa9984c236fb80a6feb
  • Loading branch information
wendy728 authored and facebook-github-bot committed Oct 4, 2022
1 parent 75e7355 commit 34f5737
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 66 deletions.
1 change: 1 addition & 0 deletions buck2_server_ctx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ description = "Common parts of Buck commands"
anyhow = { workspace = true }
async-condvar-fair = { workspace = true }
async-trait = { workspace = true }
itertools = { workspace = true }
parking_lot = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions buck2_server_ctx/TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ rust_library(
"fbsource//third-party/rust:anyhow",
"fbsource//third-party/rust:async-condvar-fair",
"fbsource//third-party/rust:async-trait",
"fbsource//third-party/rust:itertools",
"fbsource//third-party/rust:parking_lot",
"fbsource//third-party/rust:thiserror",
"fbsource//third-party/rust:tracing",
Expand Down
191 changes: 125 additions & 66 deletions buck2_server_ctx/src/concurrency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,23 @@ use dice::DiceComputations;
use dice::DiceTransaction;
use dice::UserComputationData;
use gazebo::prelude::*;
use itertools::Itertools;
use parking_lot::FairMutex;
use starlark_map::small_map::SmallMap;
use thiserror::Error;

#[derive(Error, Debug)]
enum ConcurrencyHandlerError {
#[error(
"Recursive invocation of Buck, which is discouraged, but will probably work (using the same state). Trace Id: {0}"
"Recursive invocation of Buck, which is discouraged, but will probably work (using the same state). Trace Ids: {0}"
)]
NestedInvocationWithSameStates(String),
#[error(
"Recursive invocation of Buck, with a different state - computation will continue but may produce incorrect results. Trace Id: {0}"
"Recursive invocation of Buck, with a different state - computation will continue but may produce incorrect results. Trace Ids: {0}"
)]
NestedInvocationWithDifferentStates(String),
#[error(
"Parallel invocation of Buck, with a different state - computation will continue but may produce incorrect results. Trace Id: {0}"
"Parallel invocation of Buck, with a different state - computation will continue but may produce incorrect results. Trace Ids: {0}"
)]
ParallelInvocationWithDifferentStates(String),
}
Expand Down Expand Up @@ -81,6 +82,21 @@ impl FromStr for NestedInvocation {
}
}

#[derive(Clone, Dupe, Copy, Debug)]
pub enum RunState {
NestedSameState,
NestedDifferentState,
ParallelSameState,
ParallelDifferentState,
}

#[derive(Clone, Dupe, Copy, Debug)]
pub enum BypassSemaphore {
Run(RunState),
Block,
Error,
}

/// Manages concurrent commands, blocking when appropriate.
///
/// Currently, we allow concurrency if two `DiceTransactions` are deemed equivalent, such that
Expand Down Expand Up @@ -189,70 +205,31 @@ impl ConcurrencyHandler {
if let Some(active_dice) = &data.active_dice {
let is_same_state = active_dice.equivalent(&transaction);

let bypass_semaphore = if is_same_state {
// if the dice context is equivalent, then we can run concurrently with the
// current command.
if is_nested_invocation {
soft_error!(
"nested_invocation_same_dice_state",
anyhow::anyhow!(
ConcurrencyHandlerError::NestedInvocationWithSameStates(
trace.to_string(),
)
)
)?;
}
true
} else if is_nested_invocation {
match self.nested_invocation_config {
NestedInvocation::Error => {
return Err(anyhow::Error::new(
ConcurrencyHandlerError::NestedInvocationWithDifferentStates(
trace.to_string(),
),
));
}
NestedInvocation::Run => {
soft_error!(
"nested_invocation_different_dice_state",
anyhow::anyhow!(
ConcurrencyHandlerError::NestedInvocationWithDifferentStates(
trace.to_string(),
),
)
)?;
true
}
}
} else {
match self.parallel_invocation_config {
ParallelInvocation::Run => {
soft_error!(
"parallel_invocation_different_dice_state",
anyhow::anyhow!(
ConcurrencyHandlerError::ParallelInvocationWithDifferentStates(
trace.to_string(),
),
)
)?;
true
}
ParallelInvocation::Block => {
tracing::info!(
"Running parallel invocation with different states with blocking. Trace Id: {}",
trace.to_string(),
);
false
}
let bypass_semaphore =
self.determine_bypass_semaphore(is_same_state, is_nested_invocation);

match bypass_semaphore {
BypassSemaphore::Error => {
return Err(anyhow::Error::new(
ConcurrencyHandlerError::NestedInvocationWithDifferentStates(
format_traces(&data.active_traces, Some(trace.dupe())),
),
));
}
};
BypassSemaphore::Run(state) => {
*data.active_traces.entry(trace.dupe()).or_default() += 1;
self.emit_logs(state, &data.active_traces)?;

if bypass_semaphore {
*data.active_traces.entry(trace.dupe()).or_default() += 1;
break;
}
BypassSemaphore::Block => {
tracing::info!(
"Running parallel invocation with different states with blocking. Currently active trace IDs: {}",
format_traces(&data.active_traces, Some(trace.dupe())),
);

break;
} else {
(data, baton) = self.cond.wait_baton(data).await;
(data, baton) = self.cond.wait_baton(data).await;
}
}
} else {
data.active_dice = Some(transaction.dupe());
Expand All @@ -270,16 +247,98 @@ impl ConcurrencyHandler {
// this lets us dispose of the `Baton`, which is no longer necessary for
// ensuring that on exit/cancellation, the `notify` is passed onto another
// thread. (the drop of `OnExit` will take care of it).
let on_exit = OnExecExit(self.dupe(), trace);
let drop_guard = OnExecExit(self.dupe(), trace.dupe());
baton.dispose();

Ok((on_exit, transaction))
Ok((drop_guard, transaction))
}

/// Access dice without locking for dumps.
pub fn unsafe_dice(&self) -> &Arc<Dice> {
&self.dice
}

fn determine_bypass_semaphore(
&self,
is_same_state: bool,
is_nested_invocation: bool,
) -> BypassSemaphore {
if is_same_state {
if is_nested_invocation {
BypassSemaphore::Run(RunState::NestedSameState)
} else {
BypassSemaphore::Run(RunState::ParallelSameState)
}
} else if is_nested_invocation {
match self.nested_invocation_config {
NestedInvocation::Error => BypassSemaphore::Error,
NestedInvocation::Run => BypassSemaphore::Run(RunState::NestedDifferentState),
}
} else {
match self.parallel_invocation_config {
ParallelInvocation::Run => BypassSemaphore::Run(RunState::ParallelDifferentState),
ParallelInvocation::Block => BypassSemaphore::Block,
}
}
}

fn emit_logs(
&self,
state: RunState,
active_traces: &SmallMap<TraceId, usize>,
) -> anyhow::Result<()> {
let active_traces = format_traces(active_traces, None);

match state {
RunState::NestedSameState => {
soft_error!(
"nested_invocation_same_dice_state",
anyhow::anyhow!(ConcurrencyHandlerError::NestedInvocationWithSameStates(
active_traces,
))
)?;
}
RunState::NestedDifferentState => {
soft_error!(
"nested_invocation_different_dice_state",
anyhow::anyhow!(
ConcurrencyHandlerError::NestedInvocationWithDifferentStates(active_traces,),
)
)?;
}
RunState::ParallelDifferentState => {
soft_error!(
"parallel_invocation_different_dice_state",
anyhow::anyhow!(
ConcurrencyHandlerError::ParallelInvocationWithDifferentStates(
active_traces,
),
)
)?;
}
_ => {}
}

Ok(())
}
}

fn format_traces(
active_traces: &SmallMap<TraceId, usize>,
current_trace: Option<TraceId>,
) -> String {
let mut traces = active_traces
.keys()
.map(|trace| trace.to_string())
.join(", ");

if let Some(trace) = current_trace {
if !active_traces.contains_key(&trace) {
traces.push_str(&format!(". Current trace (not active yet): {}", &trace));
}
}

traces
}

/// Held to execute a command so that when the command is canceled, we properly remove its state
Expand Down

0 comments on commit 34f5737

Please sign in to comment.