Skip to content

Commit

Permalink
Remove unnecessary Option from ZalsaLocal::query_stack
Browse files Browse the repository at this point in the history
  • Loading branch information
Veykril committed Dec 13, 2024
1 parent c2a6d62 commit fc14245
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 118 deletions.
181 changes: 91 additions & 90 deletions src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
mem,
panic::panic_any,
sync::{atomic::AtomicUsize, Arc},
thread::ThreadId,
Expand Down Expand Up @@ -203,18 +204,18 @@ impl Runtime {
},
});

let stack = local_state.take_query_stack();

let (stack, result) = DependencyGraph::block_on(
dg,
thread_id,
database_key,
other_id,
stack,
query_mutex_guard,
);

local_state.restore_query_stack(stack);
let result = local_state.with_query_stack(|stack| {
let (new_stack, result) = DependencyGraph::block_on(
dg,
thread_id,
database_key,
other_id,
mem::take(stack),
query_mutex_guard,
);
*stack = new_stack;
result
});

match result {
WaitResult::Completed => (),
Expand Down Expand Up @@ -249,87 +250,87 @@ impl Runtime {
database_key_index
);

let mut from_stack = local_state.take_query_stack();
let from_id = std::thread::current().id();

// Make a "dummy stack frame". As we iterate through the cycle, we will collect the
// inputs from each participant. Then, if we are participating in cycle recovery, we
// will propagate those results to all participants.
let mut cycle_query = ActiveQuery::new(database_key_index);

// Identify the cycle participants:
let cycle = {
let mut v = vec![];
dg.for_each_cycle_participant(
from_id,
&mut from_stack,
database_key_index,
to_id,
|aqs| {
aqs.iter_mut().for_each(|aq| {
cycle_query.add_from(aq);
v.push(aq.database_key_index);
let (me_recovered, others_recovered, cycle) = local_state.with_query_stack(|from_stack| {
let from_id = std::thread::current().id();

// Make a "dummy stack frame". As we iterate through the cycle, we will collect the
// inputs from each participant. Then, if we are participating in cycle recovery, we
// will propagate those results to all participants.
let mut cycle_query = ActiveQuery::new(database_key_index);

// Identify the cycle participants:
let cycle = {
let mut v = vec![];
dg.for_each_cycle_participant(
from_id,
from_stack,
database_key_index,
to_id,
|aqs| {
aqs.iter_mut().for_each(|aq| {
cycle_query.add_from(aq);
v.push(aq.database_key_index);
});
},
);

// We want to give the participants in a deterministic order
// (at least for this execution, not necessarily across executions),
// no matter where it started on the stack. Find the minimum
// key and rotate it to the front.
let min = v
.iter()
.map(|key| (key.ingredient_index.debug_name(db), key))
.min()
.unwrap()
.1;
let index = v.iter().position(|p| p == min).unwrap();
v.rotate_left(index);

// No need to store extra memory.
v.shrink_to_fit();

Cycle::new(Arc::new(v))
};
tracing::debug!("cycle {cycle:?}, cycle_query {cycle_query:#?}");

// We can remove the cycle participants from the list of dependencies;
// they are a strongly connected component (SCC) and we only care about
// dependencies to things outside the SCC that control whether it will
// form again.
cycle_query.remove_cycle_participants(&cycle);

// Mark each cycle participant that has recovery set, along with
// any frames that come after them on the same thread. Those frames
// are going to be unwound so that fallback can occur.
dg.for_each_cycle_participant(from_id, from_stack, database_key_index, to_id, |aqs| {
aqs.iter_mut()
.skip_while(|aq| {
match db
.zalsa()
.lookup_ingredient(aq.database_key_index.ingredient_index)
.cycle_recovery_strategy()
{
CycleRecoveryStrategy::Panic => true,
CycleRecoveryStrategy::Fallback => false,
}
})
.for_each(|aq| {
tracing::debug!("marking {:?} for fallback", aq.database_key_index);
aq.take_inputs_from(&cycle_query);
assert!(aq.cycle.is_none());
aq.cycle = Some(cycle.clone());
});
},
);

// We want to give the participants in a deterministic order
// (at least for this execution, not necessarily across executions),
// no matter where it started on the stack. Find the minimum
// key and rotate it to the front.
let min = v
.iter()
.map(|key| (key.ingredient_index.debug_name(db), key))
.min()
.unwrap()
.1;
let index = v.iter().position(|p| p == min).unwrap();
v.rotate_left(index);

// No need to store extra memory.
v.shrink_to_fit();

Cycle::new(Arc::new(v))
};
tracing::debug!("cycle {cycle:?}, cycle_query {cycle_query:#?}");

// We can remove the cycle participants from the list of dependencies;
// they are a strongly connected component (SCC) and we only care about
// dependencies to things outside the SCC that control whether it will
// form again.
cycle_query.remove_cycle_participants(&cycle);

// Mark each cycle participant that has recovery set, along with
// any frames that come after them on the same thread. Those frames
// are going to be unwound so that fallback can occur.
dg.for_each_cycle_participant(from_id, &mut from_stack, database_key_index, to_id, |aqs| {
aqs.iter_mut()
.skip_while(|aq| {
match db
.zalsa()
.lookup_ingredient(aq.database_key_index.ingredient_index)
.cycle_recovery_strategy()
{
CycleRecoveryStrategy::Panic => true,
CycleRecoveryStrategy::Fallback => false,
}
})
.for_each(|aq| {
tracing::debug!("marking {:?} for fallback", aq.database_key_index);
aq.take_inputs_from(&cycle_query);
assert!(aq.cycle.is_none());
aq.cycle = Some(cycle.clone());
});
});

// Unblock every thread that has cycle recovery with a `WaitResult::Cycle`.
// They will throw the cycle, which will be caught by the frame that has
// cycle recovery so that it can execute that recovery.
let (me_recovered, others_recovered) =
dg.maybe_unblock_runtimes_in_cycle(from_id, &from_stack, database_key_index, to_id);
(me_recovered, others_recovered, cycle)
});

// Unblock every thread that has cycle recovery with a `WaitResult::Cycle`.
// They will throw the cycle, which will be caught by the frame that has
// cycle recovery so that it can execute that recovery.
let (me_recovered, others_recovered) =
dg.maybe_unblock_runtimes_in_cycle(from_id, &from_stack, database_key_index, to_id);

local_state.restore_query_stack(from_stack);

if me_recovered {
// If the current thread has recovery, we want to throw
// so that it can begin.
Expand Down
2 changes: 1 addition & 1 deletion src/runtime/dependency_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl DependencyGraph {
let edge = self.edges.get_mut(&id).unwrap();
let prefix = edge
.stack
.iter_mut()
.iter()
.take_while(|p| p.database_key_index != key)
.count();
closure(&mut edge.stack[prefix..]);
Expand Down
32 changes: 5 additions & 27 deletions src/zalsa_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct ZalsaLocal {
///
/// Unwinding note: pushes onto this vector must be popped -- even
/// during unwinding.
query_stack: RefCell<Option<Vec<ActiveQuery>>>,
query_stack: RefCell<Vec<ActiveQuery>>,

/// Stores the most recent page for a given ingredient.
/// This is thread-local to avoid contention.
Expand All @@ -47,7 +47,7 @@ pub struct ZalsaLocal {
impl ZalsaLocal {
pub(crate) fn new() -> Self {
ZalsaLocal {
query_stack: RefCell::new(Some(vec![])),
query_stack: RefCell::new(vec![]),
most_recent_pages: RefCell::new(FxHashMap::default()),
}
}
Expand Down Expand Up @@ -88,7 +88,6 @@ impl ZalsaLocal {
#[inline]
pub(crate) fn push_query(&self, database_key_index: DatabaseKeyIndex) -> ActiveQueryGuard<'_> {
let mut query_stack = self.query_stack.borrow_mut();
let query_stack = query_stack.as_mut().expect("local stack taken");
query_stack.push(ActiveQuery::new(database_key_index));
ActiveQueryGuard {
local_state: self,
Expand All @@ -97,12 +96,9 @@ impl ZalsaLocal {
}
}

fn with_query_stack<R>(&self, c: impl FnOnce(&mut Vec<ActiveQuery>) -> R) -> R {
c(self
.query_stack
.borrow_mut()
.as_mut()
.expect("query stack taken"))
/// Executes a closure within the context of the current active query stacks.
pub(crate) fn with_query_stack<R>(&self, c: impl FnOnce(&mut Vec<ActiveQuery>) -> R) -> R {
c(self.query_stack.borrow_mut().as_mut())
}

fn query_in_progress(&self) -> bool {
Expand Down Expand Up @@ -233,24 +229,6 @@ impl ZalsaLocal {
})
}

/// Takes the query stack and returns it. This is used when
/// the current thread is blocking. The stack must be restored
/// with [`Self::restore_query_stack`] when the thread unblocks.
pub(crate) fn take_query_stack(&self) -> Vec<ActiveQuery> {
assert!(
self.query_stack.borrow().is_some(),
"query stack already taken"
);
self.query_stack.take().unwrap()
}

/// Restores a query stack taken with [`Self::take_query_stack`] once
/// the thread unblocks.
pub(crate) fn restore_query_stack(&self, stack: Vec<ActiveQuery>) {
assert!(self.query_stack.borrow().is_none(), "query stack not taken");
self.query_stack.replace(Some(stack));
}

/// Called when the active queries creates an index from the
/// entity table with the index `entity_index`. Has the following effects:
///
Expand Down

0 comments on commit fc14245

Please sign in to comment.