Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unnecessary Option from ZalsaLocal::query_stack #624

Merged
merged 2 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 89 additions & 87 deletions src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
mem,
panic::panic_any,
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -198,18 +199,18 @@ impl Runtime {
},
});

let stack = local_state.take_query_stack();

let (stack, result) = DependencyGraph::block_on(
dg,
thread_id,
database_key,
other_id,
stack,
query_mutex_guard,
);

local_state.restore_query_stack(stack);
let result = local_state.with_query_stack(|stack| {
let (new_stack, result) = DependencyGraph::block_on(
dg,
thread_id,
database_key,
other_id,
mem::take(stack),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mem::take is effectively what was Option::take before here

query_mutex_guard,
);
*stack = new_stack;
result
});

match result {
WaitResult::Completed => (),
Expand Down Expand Up @@ -244,84 +245,85 @@ impl Runtime {
database_key_index
);

let mut from_stack = local_state.take_query_stack();
let from_id = std::thread::current().id();

// Make a "dummy stack frame". As we iterate through the cycle, we will collect the
// inputs from each participant. Then, if we are participating in cycle recovery, we
// will propagate those results to all participants.
let mut cycle_query = ActiveQuery::new(database_key_index);

// Identify the cycle participants:
let cycle = {
let mut v = vec![];
dg.for_each_cycle_participant(
from_id,
&mut from_stack,
database_key_index,
to_id,
|aqs| {
aqs.iter_mut().for_each(|aq| {
cycle_query.add_from(aq);
v.push(aq.database_key_index);
let (me_recovered, others_recovered, cycle) = local_state.with_query_stack(|from_stack| {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff here is just the closure wrapping causing an extra level of indentation

let from_id = std::thread::current().id();

// Make a "dummy stack frame". As we iterate through the cycle, we will collect the
// inputs from each participant. Then, if we are participating in cycle recovery, we
// will propagate those results to all participants.
let mut cycle_query = ActiveQuery::new(database_key_index);

// Identify the cycle participants:
let cycle = {
let mut v = vec![];
dg.for_each_cycle_participant(
from_id,
from_stack,
database_key_index,
to_id,
|aqs| {
aqs.iter_mut().for_each(|aq| {
cycle_query.add_from(aq);
v.push(aq.database_key_index);
});
},
);

// We want to give the participants in a deterministic order
// (at least for this execution, not necessarily across executions),
// no matter where it started on the stack. Find the minimum
// key and rotate it to the front.

if let Some((_, index, _)) = v
.iter()
.enumerate()
.map(|(idx, key)| (key.ingredient_index.debug_name(db), idx, key))
.min()
{
v.rotate_left(index);
}

Cycle::new(Arc::new(v.into_boxed_slice()))
};
tracing::debug!("cycle {cycle:?}, cycle_query {cycle_query:#?}");

// We can remove the cycle participants from the list of dependencies;
// they are a strongly connected component (SCC) and we only care about
// dependencies to things outside the SCC that control whether it will
// form again.
cycle_query.remove_cycle_participants(&cycle);

// Mark each cycle participant that has recovery set, along with
// any frames that come after them on the same thread. Those frames
// are going to be unwound so that fallback can occur.
dg.for_each_cycle_participant(from_id, from_stack, database_key_index, to_id, |aqs| {
aqs.iter_mut()
.skip_while(|aq| {
match db
.zalsa()
.lookup_ingredient(aq.database_key_index.ingredient_index)
.cycle_recovery_strategy()
{
CycleRecoveryStrategy::Panic => true,
CycleRecoveryStrategy::Fallback => false,
}
})
.for_each(|aq| {
tracing::debug!("marking {:?} for fallback", aq.database_key_index);
aq.take_inputs_from(&cycle_query);
assert!(aq.cycle.is_none());
aq.cycle = Some(cycle.clone());
});
},
);

// We want to give the participants in a deterministic order
// (at least for this execution, not necessarily across executions),
// no matter where it started on the stack. Find the minimum
// key and rotate it to the front.
if let Some((_, index, _)) = v
.iter()
.enumerate()
.map(|(idx, key)| (key.ingredient_index.debug_name(db), idx, key))
.min()
{
v.rotate_left(index);
}

Cycle::new(Arc::new(v.into_boxed_slice()))
};
tracing::debug!("cycle {cycle:?}, cycle_query {cycle_query:#?}");

// We can remove the cycle participants from the list of dependencies;
// they are a strongly connected component (SCC) and we only care about
// dependencies to things outside the SCC that control whether it will
// form again.
cycle_query.remove_cycle_participants(&cycle);

// Mark each cycle participant that has recovery set, along with
// any frames that come after them on the same thread. Those frames
// are going to be unwound so that fallback can occur.
dg.for_each_cycle_participant(from_id, &mut from_stack, database_key_index, to_id, |aqs| {
aqs.iter_mut()
.skip_while(|aq| {
match db
.zalsa()
.lookup_ingredient(aq.database_key_index.ingredient_index)
.cycle_recovery_strategy()
{
CycleRecoveryStrategy::Panic => true,
CycleRecoveryStrategy::Fallback => false,
}
})
.for_each(|aq| {
tracing::debug!("marking {:?} for fallback", aq.database_key_index);
aq.take_inputs_from(&cycle_query);
assert!(aq.cycle.is_none());
aq.cycle = Some(cycle.clone());
});
});

// Unblock every thread that has cycle recovery with a `WaitResult::Cycle`.
// They will throw the cycle, which will be caught by the frame that has
// cycle recovery so that it can execute that recovery.
let (me_recovered, others_recovered) =
dg.maybe_unblock_runtimes_in_cycle(from_id, from_stack, database_key_index, to_id);
(me_recovered, others_recovered, cycle)
});

// Unblock every thread that has cycle recovery with a `WaitResult::Cycle`.
// They will throw the cycle, which will be caught by the frame that has
// cycle recovery so that it can execute that recovery.
let (me_recovered, others_recovered) =
dg.maybe_unblock_runtimes_in_cycle(from_id, &from_stack, database_key_index, to_id);

local_state.restore_query_stack(from_stack);

if me_recovered {
// If the current thread has recovery, we want to throw
// so that it can begin.
Expand Down
45 changes: 24 additions & 21 deletions src/runtime/dependency_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,22 +104,13 @@ impl DependencyGraph {
// load up the next thread (i.e., we start at B/QB2,
// and then load up the dependency on C/QC2).
let edge = self.edges.get_mut(&id).unwrap();
let prefix = edge
.stack
.iter_mut()
.take_while(|p| p.database_key_index != key)
.count();
closure(&mut edge.stack[prefix..]);
closure(strip_prefix_query_stack_mut(&mut edge.stack, key));
id = edge.blocked_on_id;
key = edge.blocked_on_key;
}

// Finally, we copy in the results from `from_stack`.
let prefix = from_stack
.iter_mut()
.take_while(|p| p.database_key_index != key)
.count();
closure(&mut from_stack[prefix..]);
closure(strip_prefix_query_stack_mut(from_stack, key));
}

/// Unblock each blocked runtime (excluding the current one) if some
Expand All @@ -142,15 +133,10 @@ impl DependencyGraph {
let mut others_unblocked = false;
while id != from_id {
let edge = self.edges.get(&id).unwrap();
let prefix = edge
.stack
.iter()
.take_while(|p| p.database_key_index != key)
.count();
let next_id = edge.blocked_on_id;
let next_key = edge.blocked_on_key;

if let Some(cycle) = edge.stack[prefix..]
if let Some(cycle) = strip_prefix_query_stack(&edge.stack, key)
.iter()
.rev()
.find_map(|aq| aq.cycle.clone())
Expand All @@ -171,11 +157,9 @@ impl DependencyGraph {
key = next_key;
}

let prefix = from_stack
let this_unblocked = strip_prefix_query_stack(from_stack, key)
.iter()
.take_while(|p| p.database_key_index != key)
.count();
let this_unblocked = from_stack[prefix..].iter().any(|aq| aq.cycle.is_some());
.any(|aq| aq.cycle.is_some());

(this_unblocked, others_unblocked)
}
Expand Down Expand Up @@ -276,3 +260,22 @@ impl DependencyGraph {
edge.condvar.notify_one();
}
}

fn strip_prefix_query_stack(stack_mut: &[ActiveQuery], key: DatabaseKeyIndex) -> &[ActiveQuery] {
let prefix = stack_mut
.iter()
.take_while(|p| p.database_key_index != key)
.count();
&stack_mut[prefix..]
}

fn strip_prefix_query_stack_mut(
stack_mut: &mut [ActiveQuery],
key: DatabaseKeyIndex,
) -> &mut [ActiveQuery] {
let prefix = stack_mut
.iter()
.take_while(|p| p.database_key_index != key)
.count();
&mut stack_mut[prefix..]
}
32 changes: 5 additions & 27 deletions src/zalsa_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct ZalsaLocal {
///
/// Unwinding note: pushes onto this vector must be popped -- even
/// during unwinding.
query_stack: RefCell<Option<Vec<ActiveQuery>>>,
query_stack: RefCell<Vec<ActiveQuery>>,

/// Stores the most recent page for a given ingredient.
/// This is thread-local to avoid contention.
Expand All @@ -47,7 +47,7 @@ pub struct ZalsaLocal {
impl ZalsaLocal {
pub(crate) fn new() -> Self {
ZalsaLocal {
query_stack: RefCell::new(Some(vec![])),
query_stack: RefCell::new(vec![]),
most_recent_pages: RefCell::new(FxHashMap::default()),
}
}
Expand Down Expand Up @@ -88,7 +88,6 @@ impl ZalsaLocal {
#[inline]
pub(crate) fn push_query(&self, database_key_index: DatabaseKeyIndex) -> ActiveQueryGuard<'_> {
let mut query_stack = self.query_stack.borrow_mut();
let query_stack = query_stack.as_mut().expect("local stack taken");
query_stack.push(ActiveQuery::new(database_key_index));
ActiveQueryGuard {
local_state: self,
Expand All @@ -97,12 +96,9 @@ impl ZalsaLocal {
}
}

fn with_query_stack<R>(&self, c: impl FnOnce(&mut Vec<ActiveQuery>) -> R) -> R {
c(self
.query_stack
.borrow_mut()
.as_mut()
.expect("query stack taken"))
/// Executes a closure within the context of the current active query stacks.
pub(crate) fn with_query_stack<R>(&self, c: impl FnOnce(&mut Vec<ActiveQuery>) -> R) -> R {
c(self.query_stack.borrow_mut().as_mut())
}

fn query_in_progress(&self) -> bool {
Expand Down Expand Up @@ -233,24 +229,6 @@ impl ZalsaLocal {
})
}

/// Takes the query stack and returns it. This is used when
/// the current thread is blocking. The stack must be restored
/// with [`Self::restore_query_stack`] when the thread unblocks.
pub(crate) fn take_query_stack(&self) -> Vec<ActiveQuery> {
assert!(
self.query_stack.borrow().is_some(),
"query stack already taken"
);
self.query_stack.take().unwrap()
}

/// Restores a query stack taken with [`Self::take_query_stack`] once
/// the thread unblocks.
pub(crate) fn restore_query_stack(&self, stack: Vec<ActiveQuery>) {
assert!(self.query_stack.borrow().is_none(), "query stack not taken");
self.query_stack.replace(Some(stack));
}

/// Called when the active queries creates an index from the
/// entity table with the index `entity_index`. Has the following effects:
///
Expand Down
Loading