diff --git a/src/runtime.rs b/src/runtime.rs index fb2e0720a..fe5605c93 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -1,4 +1,5 @@ use std::{ + mem, panic::panic_any, sync::{ atomic::{AtomicBool, Ordering}, @@ -198,18 +199,18 @@ impl Runtime { }, }); - let stack = local_state.take_query_stack(); - - let (stack, result) = DependencyGraph::block_on( - dg, - thread_id, - database_key, - other_id, - stack, - query_mutex_guard, - ); - - local_state.restore_query_stack(stack); + let result = local_state.with_query_stack(|stack| { + let (new_stack, result) = DependencyGraph::block_on( + dg, + thread_id, + database_key, + other_id, + mem::take(stack), + query_mutex_guard, + ); + *stack = new_stack; + result + }); match result { WaitResult::Completed => (), @@ -244,84 +245,85 @@ impl Runtime { database_key_index ); - let mut from_stack = local_state.take_query_stack(); - let from_id = std::thread::current().id(); - - // Make a "dummy stack frame". As we iterate through the cycle, we will collect the - // inputs from each participant. Then, if we are participating in cycle recovery, we - // will propagate those results to all participants. - let mut cycle_query = ActiveQuery::new(database_key_index); - - // Identify the cycle participants: - let cycle = { - let mut v = vec![]; - dg.for_each_cycle_participant( - from_id, - &mut from_stack, - database_key_index, - to_id, - |aqs| { - aqs.iter_mut().for_each(|aq| { - cycle_query.add_from(aq); - v.push(aq.database_key_index); + let (me_recovered, others_recovered, cycle) = local_state.with_query_stack(|from_stack| { + let from_id = std::thread::current().id(); + + // Make a "dummy stack frame". As we iterate through the cycle, we will collect the + // inputs from each participant. Then, if we are participating in cycle recovery, we + // will propagate those results to all participants. + let mut cycle_query = ActiveQuery::new(database_key_index); + + // Identify the cycle participants: + let cycle = { + let mut v = vec![]; + dg.for_each_cycle_participant( + from_id, + from_stack, + database_key_index, + to_id, + |aqs| { + aqs.iter_mut().for_each(|aq| { + cycle_query.add_from(aq); + v.push(aq.database_key_index); + }); + }, + ); + + // We want to give the participants in a deterministic order + // (at least for this execution, not necessarily across executions), + // no matter where it started on the stack. Find the minimum + // key and rotate it to the front. + + if let Some((_, index, _)) = v + .iter() + .enumerate() + .map(|(idx, key)| (key.ingredient_index.debug_name(db), idx, key)) + .min() + { + v.rotate_left(index); + } + + Cycle::new(Arc::new(v.into_boxed_slice())) + }; + tracing::debug!("cycle {cycle:?}, cycle_query {cycle_query:#?}"); + + // We can remove the cycle participants from the list of dependencies; + // they are a strongly connected component (SCC) and we only care about + // dependencies to things outside the SCC that control whether it will + // form again. + cycle_query.remove_cycle_participants(&cycle); + + // Mark each cycle participant that has recovery set, along with + // any frames that come after them on the same thread. Those frames + // are going to be unwound so that fallback can occur. + dg.for_each_cycle_participant(from_id, from_stack, database_key_index, to_id, |aqs| { + aqs.iter_mut() + .skip_while(|aq| { + match db + .zalsa() + .lookup_ingredient(aq.database_key_index.ingredient_index) + .cycle_recovery_strategy() + { + CycleRecoveryStrategy::Panic => true, + CycleRecoveryStrategy::Fallback => false, + } + }) + .for_each(|aq| { + tracing::debug!("marking {:?} for fallback", aq.database_key_index); + aq.take_inputs_from(&cycle_query); + assert!(aq.cycle.is_none()); + aq.cycle = Some(cycle.clone()); }); - }, - ); - - // We want to give the participants in a deterministic order - // (at least for this execution, not necessarily across executions), - // no matter where it started on the stack. Find the minimum - // key and rotate it to the front. - if let Some((_, index, _)) = v - .iter() - .enumerate() - .map(|(idx, key)| (key.ingredient_index.debug_name(db), idx, key)) - .min() - { - v.rotate_left(index); - } - - Cycle::new(Arc::new(v.into_boxed_slice())) - }; - tracing::debug!("cycle {cycle:?}, cycle_query {cycle_query:#?}"); - - // We can remove the cycle participants from the list of dependencies; - // they are a strongly connected component (SCC) and we only care about - // dependencies to things outside the SCC that control whether it will - // form again. - cycle_query.remove_cycle_participants(&cycle); - - // Mark each cycle participant that has recovery set, along with - // any frames that come after them on the same thread. Those frames - // are going to be unwound so that fallback can occur. - dg.for_each_cycle_participant(from_id, &mut from_stack, database_key_index, to_id, |aqs| { - aqs.iter_mut() - .skip_while(|aq| { - match db - .zalsa() - .lookup_ingredient(aq.database_key_index.ingredient_index) - .cycle_recovery_strategy() - { - CycleRecoveryStrategy::Panic => true, - CycleRecoveryStrategy::Fallback => false, - } - }) - .for_each(|aq| { - tracing::debug!("marking {:?} for fallback", aq.database_key_index); - aq.take_inputs_from(&cycle_query); - assert!(aq.cycle.is_none()); - aq.cycle = Some(cycle.clone()); - }); + }); + + // Unblock every thread that has cycle recovery with a `WaitResult::Cycle`. + // They will throw the cycle, which will be caught by the frame that has + // cycle recovery so that it can execute that recovery. + let (me_recovered, others_recovered) = + dg.maybe_unblock_runtimes_in_cycle(from_id, from_stack, database_key_index, to_id); + (me_recovered, others_recovered, cycle) }); - // Unblock every thread that has cycle recovery with a `WaitResult::Cycle`. - // They will throw the cycle, which will be caught by the frame that has - // cycle recovery so that it can execute that recovery. - let (me_recovered, others_recovered) = - dg.maybe_unblock_runtimes_in_cycle(from_id, &from_stack, database_key_index, to_id); - - local_state.restore_query_stack(from_stack); - if me_recovered { // If the current thread has recovery, we want to throw // so that it can begin. diff --git a/src/runtime/dependency_graph.rs b/src/runtime/dependency_graph.rs index 9db1752a4..84c5327fc 100644 --- a/src/runtime/dependency_graph.rs +++ b/src/runtime/dependency_graph.rs @@ -104,22 +104,13 @@ impl DependencyGraph { // load up the next thread (i.e., we start at B/QB2, // and then load up the dependency on C/QC2). let edge = self.edges.get_mut(&id).unwrap(); - let prefix = edge - .stack - .iter_mut() - .take_while(|p| p.database_key_index != key) - .count(); - closure(&mut edge.stack[prefix..]); + closure(strip_prefix_query_stack_mut(&mut edge.stack, key)); id = edge.blocked_on_id; key = edge.blocked_on_key; } // Finally, we copy in the results from `from_stack`. - let prefix = from_stack - .iter_mut() - .take_while(|p| p.database_key_index != key) - .count(); - closure(&mut from_stack[prefix..]); + closure(strip_prefix_query_stack_mut(from_stack, key)); } /// Unblock each blocked runtime (excluding the current one) if some @@ -142,15 +133,10 @@ impl DependencyGraph { let mut others_unblocked = false; while id != from_id { let edge = self.edges.get(&id).unwrap(); - let prefix = edge - .stack - .iter() - .take_while(|p| p.database_key_index != key) - .count(); let next_id = edge.blocked_on_id; let next_key = edge.blocked_on_key; - if let Some(cycle) = edge.stack[prefix..] + if let Some(cycle) = strip_prefix_query_stack(&edge.stack, key) .iter() .rev() .find_map(|aq| aq.cycle.clone()) @@ -171,11 +157,9 @@ impl DependencyGraph { key = next_key; } - let prefix = from_stack + let this_unblocked = strip_prefix_query_stack(from_stack, key) .iter() - .take_while(|p| p.database_key_index != key) - .count(); - let this_unblocked = from_stack[prefix..].iter().any(|aq| aq.cycle.is_some()); + .any(|aq| aq.cycle.is_some()); (this_unblocked, others_unblocked) } @@ -276,3 +260,22 @@ impl DependencyGraph { edge.condvar.notify_one(); } } + +fn strip_prefix_query_stack(stack_mut: &[ActiveQuery], key: DatabaseKeyIndex) -> &[ActiveQuery] { + let prefix = stack_mut + .iter() + .take_while(|p| p.database_key_index != key) + .count(); + &stack_mut[prefix..] +} + +fn strip_prefix_query_stack_mut( + stack_mut: &mut [ActiveQuery], + key: DatabaseKeyIndex, +) -> &mut [ActiveQuery] { + let prefix = stack_mut + .iter() + .take_while(|p| p.database_key_index != key) + .count(); + &mut stack_mut[prefix..] +} diff --git a/src/zalsa_local.rs b/src/zalsa_local.rs index 956713612..6988d6537 100644 --- a/src/zalsa_local.rs +++ b/src/zalsa_local.rs @@ -37,7 +37,7 @@ pub struct ZalsaLocal { /// /// Unwinding note: pushes onto this vector must be popped -- even /// during unwinding. - query_stack: RefCell>>, + query_stack: RefCell>, /// Stores the most recent page for a given ingredient. /// This is thread-local to avoid contention. @@ -47,7 +47,7 @@ pub struct ZalsaLocal { impl ZalsaLocal { pub(crate) fn new() -> Self { ZalsaLocal { - query_stack: RefCell::new(Some(vec![])), + query_stack: RefCell::new(vec![]), most_recent_pages: RefCell::new(FxHashMap::default()), } } @@ -88,7 +88,6 @@ impl ZalsaLocal { #[inline] pub(crate) fn push_query(&self, database_key_index: DatabaseKeyIndex) -> ActiveQueryGuard<'_> { let mut query_stack = self.query_stack.borrow_mut(); - let query_stack = query_stack.as_mut().expect("local stack taken"); query_stack.push(ActiveQuery::new(database_key_index)); ActiveQueryGuard { local_state: self, @@ -97,12 +96,9 @@ impl ZalsaLocal { } } - fn with_query_stack(&self, c: impl FnOnce(&mut Vec) -> R) -> R { - c(self - .query_stack - .borrow_mut() - .as_mut() - .expect("query stack taken")) + /// Executes a closure within the context of the current active query stacks. + pub(crate) fn with_query_stack(&self, c: impl FnOnce(&mut Vec) -> R) -> R { + c(self.query_stack.borrow_mut().as_mut()) } fn query_in_progress(&self) -> bool { @@ -233,24 +229,6 @@ impl ZalsaLocal { }) } - /// Takes the query stack and returns it. This is used when - /// the current thread is blocking. The stack must be restored - /// with [`Self::restore_query_stack`] when the thread unblocks. - pub(crate) fn take_query_stack(&self) -> Vec { - assert!( - self.query_stack.borrow().is_some(), - "query stack already taken" - ); - self.query_stack.take().unwrap() - } - - /// Restores a query stack taken with [`Self::take_query_stack`] once - /// the thread unblocks. - pub(crate) fn restore_query_stack(&self, stack: Vec) { - assert!(self.query_stack.borrow().is_none(), "query stack not taken"); - self.query_stack.replace(Some(stack)); - } - /// Called when the active queries creates an index from the /// entity table with the index `entity_index`. Has the following effects: ///