diff --git a/turbopack/crates/turbo-tasks-memory/src/aggregation/balance_edge.rs b/turbopack/crates/turbo-tasks-memory/src/aggregation/balance_edge.rs index ee968cd3228d6..75ec433aefaf7 100644 --- a/turbopack/crates/turbo-tasks-memory/src/aggregation/balance_edge.rs +++ b/turbopack/crates/turbo-tasks-memory/src/aggregation/balance_edge.rs @@ -2,41 +2,29 @@ use std::cmp::Ordering; use super::{ balance_queue::BalanceQueue, - followers::{ - add_follower_count, remove_follower_count, remove_positive_follower_count, - RemovePositveFollowerCountResult, - }, - in_progress::is_in_progress, + in_progress::{is_in_progress, start_in_progress_all, start_in_progress_count}, increase::IncreaseReason, increase_aggregation_number_internal, - uppers::{ - add_upper_count, remove_positive_upper_count, remove_upper_count, - RemovePositiveUpperCountResult, - }, - AggregationContext, AggregationNode, + notify_lost_follower::notify_lost_follower, + notify_new_follower::notify_new_follower, + util::{get_aggregated_add_change, get_aggregated_remove_change, get_followers_or_children}, + AggregationContext, AggregationNode, PreparedInternalOperation, PreparedOperation, StackVec, }; -// Migrated followers to uppers or uppers to followers depending on the +// Migrate followers to uppers or uppers to followers depending on the // aggregation numbers of the nodes involved in the edge. Might increase targets // aggregation number if they are equal. pub(super) fn balance_edge( ctx: &C, balance_queue: &mut BalanceQueue, upper_id: &C::NodeRef, - mut upper_aggregation_number: u32, target_id: &C::NodeRef, - mut target_aggregation_number: u32, -) -> (u32, u32) { - // too many uppers on target - let mut extra_uppers = 0; - // too many followers on upper - let mut extra_followers = 0; - // The last info about uppers - let mut uppers_count: Option = None; - // The last info about followers - let mut followers_count = None; - +) { loop { + let (mut upper, mut target) = ctx.node_pair(upper_id, target_id); + let upper_aggregation_number = upper.aggregation_number(); + let target_aggregation_number = target.aggregation_number(); + let root = upper_aggregation_number == u32::MAX || target_aggregation_number == u32::MAX; let order = if root { Ordering::Greater @@ -45,164 +33,158 @@ pub(super) fn balance_edge( }; match order { Ordering::Equal => { - // we probably want to increase the aggregation number of target - let upper = ctx.node(upper_id); - upper_aggregation_number = upper.aggregation_number(); drop(upper); - if upper_aggregation_number != u32::MAX - && upper_aggregation_number == target_aggregation_number - { - let target = ctx.node(target_id); - target_aggregation_number = target.aggregation_number(); - if upper_aggregation_number == target_aggregation_number { - // increase target aggregation number - increase_aggregation_number_internal( - ctx, - balance_queue, - target, - target_id, - target_aggregation_number + 1, - target_aggregation_number + 1, - IncreaseReason::EqualAggregationNumberOnBalance, - ); - } - } + // increase target aggregation number + increase_aggregation_number_internal( + ctx, + balance_queue, + target, + target_id, + target_aggregation_number + 1, + target_aggregation_number + 1, + IncreaseReason::EqualAggregationNumberOnBalance, + ); } Ordering::Less => { - // target should probably be a follower of upper - if uppers_count.map_or(false, |count| count <= 0) { - // We already removed all uppers, maybe too many + if is_in_progress(ctx, upper_id) { + drop(target); + let AggregationNode::Aggegating(aggregating) = &mut *upper else { + unreachable!(); + }; + aggregating + .enqueued_balancing + .push((upper_id.clone(), target_id.clone())); + drop(upper); + // Somebody else will balance this edge break; - } else if extra_followers == 0 { - let upper = ctx.node(upper_id); - upper_aggregation_number = upper.aggregation_number(); - if upper_aggregation_number < target_aggregation_number { - // target should be a follower of upper - // add some extra followers - let count = uppers_count.unwrap_or(1) as usize; - extra_followers += count; - followers_count = Some(add_follower_count( - ctx, - balance_queue, - upper, - upper_id, - target_id, - count, - true, - )); - } + } + + // target should be a follower of upper + let count = target + .uppers_mut() + .remove_all_positive_clonable_count(upper_id); + if count == 0 { + break; + } + let added = upper + .followers_mut() + .unwrap() + .add_clonable_count(target_id, count); + + // target removed as upper + let remove_change = get_aggregated_remove_change(ctx, &target); + let followers = get_followers_or_children(ctx, &target); + + let upper_uppers = if added { + // target added as follower + let uppers = upper.uppers().iter().cloned().collect::>(); + start_in_progress_all(ctx, &uppers); + uppers } else { - // we already have extra followers, remove some uppers to balance - let count = extra_followers + extra_uppers; - let target = ctx.node(target_id); - if is_in_progress(ctx, upper_id) { - drop(target); - let mut upper = ctx.node(upper_id); - if is_in_progress(ctx, upper_id) { - let AggregationNode::Aggegating(aggregating) = &mut *upper else { - unreachable!(); - }; - aggregating.enqueued_balancing.push(( - upper_id.clone(), - upper_aggregation_number, - target_id.clone(), - target_aggregation_number, - )); - drop(upper); - // Somebody else will balance this edge - return (upper_aggregation_number, target_aggregation_number); - } - } else { - let RemovePositiveUpperCountResult { - removed_count, - remaining_count, - } = remove_positive_upper_count( - ctx, - balance_queue, - target, - upper_id, - count, - ); - decrease_numbers(removed_count, &mut extra_uppers, &mut extra_followers); - uppers_count = Some(remaining_count); - } + Default::default() + }; + + drop(target); + + // target removed as upper + let remove_prepared = + remove_change.and_then(|remove_change| upper.apply_change(ctx, remove_change)); + start_in_progress_count(ctx, upper_id, followers.len() as u32); + let prepared = followers + .into_iter() + .map(|child_id| { + upper.notify_lost_follower(ctx, balance_queue, upper_id, &child_id) + }) + .collect::>(); + drop(upper); + + // target added as follower + for upper_id in upper_uppers { + notify_new_follower( + ctx, + balance_queue, + ctx.node(&upper_id), + &upper_id, + target_id, + false, + ); } + + // target removed as upper + remove_prepared.apply(ctx); + prepared.apply(ctx, balance_queue); + + break; } Ordering::Greater => { - // target should probably be an inner node of upper - if followers_count.map_or(false, |count| count <= 0) { - // We already removed all followers, maybe too many + if is_in_progress(ctx, upper_id) { + let AggregationNode::Aggegating(aggregating) = &mut *upper else { + unreachable!(); + }; + aggregating + .enqueued_balancing + .push((upper_id.clone(), target_id.clone())); + drop(upper); + // Somebody else will balance this edge + break; + } + + // target should be a inner node of upper + let count = upper + .followers_mut() + .unwrap() + .remove_all_positive_clonable_count(target_id); + if count == 0 { break; - } else if extra_uppers == 0 { - let target = ctx.node(target_id); - target_aggregation_number = target.aggregation_number(); - if root || target_aggregation_number < upper_aggregation_number { - // target should be a inner node of upper - if is_in_progress(ctx, upper_id) { - drop(target); - let mut upper = ctx.node(upper_id); - if is_in_progress(ctx, upper_id) { - let AggregationNode::Aggegating(aggregating) = &mut *upper else { - unreachable!(); - }; - aggregating.enqueued_balancing.push(( - upper_id.clone(), - upper_aggregation_number, - target_id.clone(), - target_aggregation_number, - )); - drop(upper); - // Somebody else will balance this edge - return (upper_aggregation_number, target_aggregation_number); - } - } else { - // add some extra uppers - let count = followers_count.unwrap_or(1) as usize; - extra_uppers += count; - uppers_count = Some( - add_upper_count( - ctx, - balance_queue, - target, - target_id, - upper_id, - count, - true, - ) - .new_count, - ); - } - } + } + let added = target.uppers_mut().add_clonable_count(upper_id, count); + + // target removed as follower + let uppers = upper.uppers().iter().cloned().collect::>(); + start_in_progress_all(ctx, &uppers); + + let (add_change, followers) = if added { + // target added as upper + let add_change = get_aggregated_add_change(ctx, &target); + let followers = get_followers_or_children(ctx, &target); + start_in_progress_count(ctx, upper_id, followers.len() as u32); + (add_change, followers) } else { - // we already have extra uppers, try to remove some followers to balance - let count = extra_followers + extra_uppers; - let upper = ctx.node(upper_id); - let RemovePositveFollowerCountResult { - removed_count, - remaining_count, - } = remove_positive_follower_count(ctx, balance_queue, upper, target_id, count); - decrease_numbers(removed_count, &mut extra_followers, &mut extra_uppers); - followers_count = Some(remaining_count); + (None, Default::default()) + }; + + drop(target); + + // target added as upper + let add_prepared = + add_change.and_then(|add_change| upper.apply_change(ctx, add_change)); + let prepared = followers + .into_iter() + .filter_map(|child_id| { + upper.notify_new_follower(ctx, balance_queue, upper_id, &child_id, false) + }) + .collect::>(); + + drop(upper); + + add_prepared.apply(ctx); + for prepared in prepared { + prepared.apply(ctx, balance_queue); } + + // target removed as follower + for upper_id in uppers { + notify_lost_follower( + ctx, + balance_queue, + ctx.node(&upper_id), + &upper_id, + target_id, + ); + } + + break; } } } - if extra_followers > 0 { - let upper = ctx.node(upper_id); - remove_follower_count(ctx, balance_queue, upper, target_id, extra_followers); - } - if extra_uppers > 0 { - let target = ctx.node(target_id); - remove_upper_count(ctx, balance_queue, target, upper_id, extra_uppers); - } - (upper_aggregation_number, target_aggregation_number) -} - -fn decrease_numbers(amount: usize, a: &mut usize, b: &mut usize) { - if *a >= amount { - *a -= amount; - } else { - *b -= amount - *a; - *a = 0; - } } diff --git a/turbopack/crates/turbo-tasks-memory/src/aggregation/balance_queue.rs b/turbopack/crates/turbo-tasks-memory/src/aggregation/balance_queue.rs index 1f11d4dd9a98d..9504a877a2e92 100644 --- a/turbopack/crates/turbo-tasks-memory/src/aggregation/balance_queue.rs +++ b/turbopack/crates/turbo-tasks-memory/src/aggregation/balance_queue.rs @@ -1,4 +1,4 @@ -use std::{cmp::max, collections::HashMap, hash::Hash, mem::take}; +use std::{hash::Hash, mem::take}; use indexmap::IndexSet; @@ -8,49 +8,27 @@ use super::{balance_edge, AggregationContext}; /// of aggregation numbers read during balancing. pub struct BalanceQueue { queue: IndexSet<(I, I)>, - aggregation_numbers: HashMap, } impl BalanceQueue { pub fn new() -> Self { Self { queue: IndexSet::default(), - aggregation_numbers: HashMap::default(), } } - fn add_number(&mut self, id: I, number: u32) { - self.aggregation_numbers - .entry(id) - .and_modify(|n| *n = max(*n, number)) - .or_insert(number); - } - /// Add an edge to the queue. The edge will be balanced during the next /// call. - pub fn balance( - &mut self, - upper_id: I, - upper_aggregation_number: u32, - target_id: I, - target_aggregation_number: u32, - ) { + pub fn balance(&mut self, upper_id: I, target_id: I) { debug_assert!(upper_id != target_id); - self.add_number(upper_id.clone(), upper_aggregation_number); - self.add_number(target_id.clone(), target_aggregation_number); self.queue.insert((upper_id.clone(), target_id.clone())); } /// Add multiple edges to the queue. The edges will be balanced during the /// next call. - pub fn balance_all(&mut self, edges: Vec<(I, u32, I, u32)>) { - for (upper_id, upper_aggregation_number, target_id, target_aggregation_number) in edges { - self.balance( - upper_id, - upper_aggregation_number, - target_id, - target_aggregation_number, - ); + pub fn balance_all(&mut self, edges: Vec<(I, I)>) { + for (upper_id, target_id) in edges { + self.balance(upper_id, target_id); } } @@ -59,31 +37,7 @@ impl BalanceQueue { while !self.queue.is_empty() { let queue = take(&mut self.queue); for (upper_id, target_id) in queue { - let upper_aggregation_number = self - .aggregation_numbers - .get(&upper_id) - .copied() - .unwrap_or_default(); - let target_aggregation_number = self - .aggregation_numbers - .get(&target_id) - .copied() - .unwrap_or_default(); - - let (u, t) = balance_edge( - ctx, - &mut self, - &upper_id, - upper_aggregation_number, - &target_id, - target_aggregation_number, - ); - if u != upper_aggregation_number { - self.add_number(upper_id, u); - } - if t != target_aggregation_number { - self.add_number(target_id, t); - } + balance_edge(ctx, &mut self, &upper_id, &target_id); } } } diff --git a/turbopack/crates/turbo-tasks-memory/src/aggregation/followers.rs b/turbopack/crates/turbo-tasks-memory/src/aggregation/followers.rs index e69d374f3a63b..1490b2c4fa96b 100644 --- a/turbopack/crates/turbo-tasks-memory/src/aggregation/followers.rs +++ b/turbopack/crates/turbo-tasks-memory/src/aggregation/followers.rs @@ -1,11 +1,10 @@ use super::{ balance_queue::BalanceQueue, in_progress::start_in_progress_all, - notify_lost_follower, notify_new_follower, + notify_new_follower, optimize::{optimize_aggregation_number_for_followers, MAX_FOLLOWERS}, AggregationContext, AggregationNode, StackVec, }; -use crate::count_hash_set::RemovePositiveCountResult; /// Add a follower to a node. Followers will be propagated to the uppers of the /// node. @@ -87,115 +86,3 @@ pub fn on_added( } affected_nodes } - -/// Add a follower to a node with a count. Followers will be propagated to the -/// uppers of the node. -pub fn add_follower_count( - ctx: &C, - balance_queue: &mut BalanceQueue, - mut node: C::Guard<'_>, - node_id: &C::NodeRef, - follower_id: &C::NodeRef, - follower_count: usize, - already_optimizing_for_node: bool, -) -> isize { - let AggregationNode::Aggegating(aggregating) = &mut *node else { - unreachable!(); - }; - if aggregating - .followers - .add_clonable_count(follower_id, follower_count) - { - let count = aggregating.followers.get_count(follower_id); - on_added( - ctx, - balance_queue, - node, - node_id, - follower_id, - already_optimizing_for_node, - ); - count - } else { - aggregating.followers.get_count(follower_id) - } -} - -/// Remove a follower from a node. Followers will be propagated to the uppers of -/// the node. -pub fn remove_follower_count( - ctx: &C, - balance_queue: &mut BalanceQueue, - mut node: C::Guard<'_>, - follower_id: &C::NodeRef, - follower_count: usize, -) { - let AggregationNode::Aggegating(aggregating) = &mut *node else { - unreachable!(); - }; - if aggregating - .followers - .remove_clonable_count(follower_id, follower_count) - { - let uppers = aggregating.uppers.iter().cloned().collect::>(); - start_in_progress_all(ctx, &uppers); - drop(node); - for upper_id in uppers { - notify_lost_follower( - ctx, - balance_queue, - ctx.node(&upper_id), - &upper_id, - follower_id, - ); - } - } -} - -pub struct RemovePositveFollowerCountResult { - /// The amount of followers that have been removed. - pub removed_count: usize, - /// The amount of followers that are remaining. Might be negative. - pub remaining_count: isize, -} - -/// Remove a positive count of a follower from a node. Negative counts will not -/// be increased. The function returns how much of the count has been removed -/// and whats remaining. Followers will be propagated to the uppers of the node. -pub fn remove_positive_follower_count( - ctx: &C, - balance_queue: &mut BalanceQueue, - mut node: C::Guard<'_>, - follower_id: &C::NodeRef, - follower_count: usize, -) -> RemovePositveFollowerCountResult { - let AggregationNode::Aggegating(aggregating) = &mut *node else { - unreachable!(); - }; - let RemovePositiveCountResult { - removed, - removed_count, - count, - } = aggregating - .followers - .remove_positive_clonable_count(follower_id, follower_count); - - if removed { - let uppers = aggregating.uppers.iter().cloned().collect::>(); - start_in_progress_all(ctx, &uppers); - drop(node); - for upper_id in uppers { - notify_lost_follower( - ctx, - balance_queue, - ctx.node(&upper_id), - &upper_id, - follower_id, - ); - } - } - RemovePositveFollowerCountResult { - removed_count, - remaining_count: count, - } -} diff --git a/turbopack/crates/turbo-tasks-memory/src/aggregation/increase.rs b/turbopack/crates/turbo-tasks-memory/src/aggregation/increase.rs index 6fb42eeeb0731..93eaea79135da 100644 --- a/turbopack/crates/turbo-tasks-memory/src/aggregation/increase.rs +++ b/turbopack/crates/turbo-tasks-memory/src/aggregation/increase.rs @@ -121,7 +121,6 @@ pub(super) fn increase_aggregation_number_immediately( node_id, uppers: uppers_copy, followers, - target_aggregation_number, }) } } @@ -139,7 +138,6 @@ pub(super) fn increase_aggregation_number_immediately( node_id, uppers, followers, - target_aggregation_number, }) } } @@ -162,7 +160,6 @@ pub enum PreparedInternalIncreaseAggregationNumber { node_id: C::NodeRef, uppers: StackVec, followers: StackVec, - target_aggregation_number: u32, }, } @@ -262,15 +259,10 @@ impl PreparedInternalOperation } }; for follower_id in followers { - balance_queue.balance( - node_id.clone(), - target_aggregation_number, - follower_id, - 0, - ); + balance_queue.balance(node_id.clone(), follower_id); } for upper_id in uppers { - balance_queue.balance(upper_id, 0, node_id.clone(), target_aggregation_number); + balance_queue.balance(upper_id, node_id.clone()); } } PreparedInternalIncreaseAggregationNumber::Leaf { @@ -293,18 +285,12 @@ impl PreparedInternalOperation node_id, uppers, followers, - target_aggregation_number, } => { for follower_id in followers { - balance_queue.balance( - node_id.clone(), - target_aggregation_number, - follower_id, - 0, - ); + balance_queue.balance(node_id.clone(), follower_id); } for upper_id in uppers { - balance_queue.balance(upper_id, 0, node_id.clone(), target_aggregation_number); + balance_queue.balance(upper_id, node_id.clone()); } } } diff --git a/turbopack/crates/turbo-tasks-memory/src/aggregation/loom_tests.rs b/turbopack/crates/turbo-tasks-memory/src/aggregation/loom_tests.rs index d9e9f6b600c79..a99874f2c1323 100644 --- a/turbopack/crates/turbo-tasks-memory/src/aggregation/loom_tests.rs +++ b/turbopack/crates/turbo-tasks-memory/src/aggregation/loom_tests.rs @@ -158,6 +158,33 @@ impl AggregationContext for NodeAggregationContext { unsafe { NodeGuard::new(guard, r) } } + fn node_pair<'b>( + &'b self, + id1: &Self::NodeRef, + id2: &Self::NodeRef, + ) -> (Self::Guard<'b>, Self::Guard<'b>) { + let r1 = id1.0.clone(); + let r2 = id2.0.clone(); + loop { + { + let guard1 = id1.0.inner.lock().unwrap(); + if let Ok(guard2) = id2.0.inner.try_lock() { + return (unsafe { NodeGuard::new(guard1, r1) }, unsafe { + NodeGuard::new(guard2, r2) + }); + } + } + { + let guard2 = id2.0.inner.lock().unwrap(); + if let Ok(guard1) = id1.0.inner.try_lock() { + return (unsafe { NodeGuard::new(guard1, r1) }, unsafe { + NodeGuard::new(guard2, r2) + }); + } + } + } + } + fn atomic_in_progress_counter<'l>(&self, id: &'l NodeRef) -> &'l AtomicU32 where Self: 'l, diff --git a/turbopack/crates/turbo-tasks-memory/src/aggregation/mod.rs b/turbopack/crates/turbo-tasks-memory/src/aggregation/mod.rs index 64c6e8bba9c84..5eda642fa63a6 100644 --- a/turbopack/crates/turbo-tasks-memory/src/aggregation/mod.rs +++ b/turbopack/crates/turbo-tasks-memory/src/aggregation/mod.rs @@ -22,12 +22,12 @@ mod root_query; #[cfg(test)] mod tests; mod uppers; +mod util; pub use aggregation_data::{aggregation_data, AggregationDataGuard}; use balance_edge::balance_edge; use increase::increase_aggregation_number_internal; pub use new_edge::handle_new_edge; -use notify_lost_follower::notify_lost_follower; use notify_new_follower::notify_new_follower; pub use root_query::{query_root_info, RootQuery}; @@ -51,7 +51,7 @@ pub struct AggegatingNode { uppers: CountHashSet, followers: CountHashSet, data: D, - enqueued_balancing: Vec<(I, u32, I, u32)>, + enqueued_balancing: Vec<(I, I)>, } impl AggregationNode { @@ -109,6 +109,13 @@ impl AggregationNode { AggregationNode::Aggegating(aggregating) => Some(&aggregating.followers), } } + + fn followers_mut(&mut self) -> Option<&mut CountHashSet> { + match self { + AggregationNode::Leaf { .. } => None, + AggregationNode::Aggegating(aggregating) => Some(&mut aggregating.followers), + } + } } /// A prepared operation. Must be applied outside of node locks. @@ -186,7 +193,7 @@ impl, const N: usize> /// Context for aggregation operations. pub trait AggregationContext { - type NodeRef: Clone + Eq + Hash + Debug; + type NodeRef: Clone + Eq + Hash + Debug + 'static; type Guard<'l>: AggregationNodeGuard< NodeRef = Self::NodeRef, Data = Self::Data, @@ -194,12 +201,19 @@ pub trait AggregationContext { > where Self: 'l; - type Data; - type DataChange; + type Data: 'static; + type DataChange: 'static; /// Gets mutable access to an item. fn node<'l>(&'l self, id: &Self::NodeRef) -> Self::Guard<'l>; + /// Gets mutable access to two items. + fn node_pair<'l>( + &'l self, + id1: &Self::NodeRef, + id2: &Self::NodeRef, + ) -> (Self::Guard<'l>, Self::Guard<'l>); + /// Get the atomic in progress counter for a node. fn atomic_in_progress_counter<'l>(&self, id: &'l Self::NodeRef) -> &'l AtomicU32 where diff --git a/turbopack/crates/turbo-tasks-memory/src/aggregation/notify_lost_follower.rs b/turbopack/crates/turbo-tasks-memory/src/aggregation/notify_lost_follower.rs index 136dc4209671c..765750c3ae879 100644 --- a/turbopack/crates/turbo-tasks-memory/src/aggregation/notify_lost_follower.rs +++ b/turbopack/crates/turbo-tasks-memory/src/aggregation/notify_lost_follower.rs @@ -3,6 +3,7 @@ use std::{hash::Hash, thread::yield_now}; use super::{ balance_queue::BalanceQueue, in_progress::{finish_in_progress_without_node, start_in_progress, start_in_progress_all}, + util::get_aggregated_remove_change, AggegatingNode, AggregationContext, AggregationNode, AggregationNodeGuard, PreparedInternalOperation, PreparedOperation, StackVec, }; @@ -208,13 +209,3 @@ pub fn notify_lost_follower( drop(upper); p.apply(ctx, balance_queue); } - -fn get_aggregated_remove_change( - ctx: &C, - guard: &C::Guard<'_>, -) -> Option { - match &**guard { - AggregationNode::Leaf { .. } => guard.get_remove_change(), - AggregationNode::Aggegating(aggegating) => ctx.data_to_remove_change(&aggegating.data), - } -} diff --git a/turbopack/crates/turbo-tasks-memory/src/aggregation/tests.rs b/turbopack/crates/turbo-tasks-memory/src/aggregation/tests.rs index 5d1ce24b39beb..f8c4e7ae43be1 100644 --- a/turbopack/crates/turbo-tasks-memory/src/aggregation/tests.rs +++ b/turbopack/crates/turbo-tasks-memory/src/aggregation/tests.rs @@ -503,6 +503,33 @@ impl<'a> AggregationContext for NodeAggregationContext<'a> { unsafe { NodeGuard::new(guard, r) } } + fn node_pair<'b>( + &'b self, + id1: &Self::NodeRef, + id2: &Self::NodeRef, + ) -> (Self::Guard<'b>, Self::Guard<'b>) { + let r1 = id1.0.clone(); + let r2 = id2.0.clone(); + loop { + { + let guard1 = id1.0.inner.lock(); + if let Some(guard2) = id2.0.inner.try_lock() { + return (unsafe { NodeGuard::new(guard1, r1) }, unsafe { + NodeGuard::new(guard2, r2) + }); + } + } + { + let guard2 = id2.0.inner.lock(); + if let Some(guard1) = id1.0.inner.try_lock() { + return (unsafe { NodeGuard::new(guard1, r1) }, unsafe { + NodeGuard::new(guard2, r2) + }); + } + } + } + } + fn atomic_in_progress_counter<'l>(&self, id: &'l Self::NodeRef) -> &'l AtomicU32 where Self: 'l, diff --git a/turbopack/crates/turbo-tasks-memory/src/aggregation/uppers.rs b/turbopack/crates/turbo-tasks-memory/src/aggregation/uppers.rs index 581341b0f1630..89a59d184060a 100644 --- a/turbopack/crates/turbo-tasks-memory/src/aggregation/uppers.rs +++ b/turbopack/crates/turbo-tasks-memory/src/aggregation/uppers.rs @@ -5,65 +5,22 @@ use super::{ AggegatingNode, AggregationContext, AggregationNode, AggregationNodeGuard, PreparedInternalOperation, PreparedOperation, StackVec, }; -use crate::count_hash_set::RemovePositiveCountResult; /// Adds an upper node to a node. Returns the number of affected nodes by this /// operation. This will also propagate the followers to the new upper node. pub fn add_upper( - ctx: &C, - balance_queue: &mut BalanceQueue, - node: C::Guard<'_>, - node_id: &C::NodeRef, - upper_id: &C::NodeRef, - already_optimizing_for_upper: bool, -) -> usize { - add_upper_count( - ctx, - balance_queue, - node, - node_id, - upper_id, - 1, - already_optimizing_for_upper, - ) - .affected_nodes -} - -pub struct AddUpperCountResult { - pub new_count: isize, - pub affected_nodes: usize, -} - -/// Adds an upper node to a node with a given count. Returns the new count of -/// the upper node and the number of affected nodes by this operation. This will -/// also propagate the followers to the new upper node. -pub fn add_upper_count( ctx: &C, balance_queue: &mut BalanceQueue, mut node: C::Guard<'_>, node_id: &C::NodeRef, upper_id: &C::NodeRef, - count: usize, already_optimizing_for_upper: bool, -) -> AddUpperCountResult { - // TODO add_clonable_count could return the current count for better performance - let (added, count) = match &mut *node { - AggregationNode::Leaf { uppers, .. } => { - if uppers.add_clonable_count(upper_id, count) { - let count = uppers.get_count(upper_id); - (true, count) - } else { - (false, uppers.get_count(upper_id)) - } - } +) -> usize { + let added = match &mut *node { + AggregationNode::Leaf { uppers, .. } => uppers.add_clonable(upper_id), AggregationNode::Aggegating(aggegating) => { let AggegatingNode { ref mut uppers, .. } = **aggegating; - if uppers.add_clonable_count(upper_id, count) { - let count = uppers.get_count(upper_id); - (true, count) - } else { - (false, uppers.get_count(upper_id)) - } + uppers.add_clonable(upper_id) } }; let mut affected_nodes = 0; @@ -79,10 +36,7 @@ pub fn add_upper_count( } else { drop(node); } - AddUpperCountResult { - new_count: count, - affected_nodes, - } + affected_nodes } /// Called when an upper node was added to a node. This will propagate the @@ -148,110 +102,3 @@ pub fn on_added( affected_nodes } - -/// Removes an upper node from a node with a count. -pub fn remove_upper_count( - ctx: &C, - balance_queue: &mut BalanceQueue, - mut node: C::Guard<'_>, - upper_id: &C::NodeRef, - count: usize, -) { - let uppers = match &mut *node { - AggregationNode::Leaf { uppers, .. } => uppers, - AggregationNode::Aggegating(aggegating) => { - let AggegatingNode { ref mut uppers, .. } = **aggegating; - uppers - } - }; - let removed = uppers.remove_clonable_count(upper_id, count); - if removed { - uppers.shrink_amortized(); - on_removed(ctx, balance_queue, node, upper_id); - } -} - -pub struct RemovePositiveUpperCountResult { - pub removed_count: usize, - pub remaining_count: isize, -} - -/// Removes a positive count of an upper node from a node. -/// Returns the removed count and the remaining count of the upper node. -/// This will also propagate the followers to the removed upper node. -pub fn remove_positive_upper_count( - ctx: &C, - balance_queue: &mut BalanceQueue, - mut node: C::Guard<'_>, - upper_id: &C::NodeRef, - count: usize, -) -> RemovePositiveUpperCountResult { - let uppers = match &mut *node { - AggregationNode::Leaf { uppers, .. } => uppers, - AggregationNode::Aggegating(aggegating) => { - let AggegatingNode { ref mut uppers, .. } = **aggegating; - uppers - } - }; - let RemovePositiveCountResult { - removed, - removed_count, - count, - } = uppers.remove_positive_clonable_count(upper_id, count); - if removed { - uppers.shrink_amortized(); - on_removed(ctx, balance_queue, node, upper_id); - } - RemovePositiveUpperCountResult { - removed_count, - remaining_count: count, - } -} - -/// Called when an upper node was removed from a node. This will propagate the -/// followers to the removed upper node. -pub fn on_removed( - ctx: &C, - balance_queue: &mut BalanceQueue, - node: C::Guard<'_>, - upper_id: &C::NodeRef, -) { - match &*node { - AggregationNode::Leaf { .. } => { - let remove_change = node.get_remove_change(); - let children = node.children().collect::>(); - drop(node); - let mut upper = ctx.node(upper_id); - let remove_prepared = - remove_change.and_then(|remove_change| upper.apply_change(ctx, remove_change)); - start_in_progress_count(ctx, upper_id, children.len() as u32); - let prepared = children - .into_iter() - .map(|child_id| upper.notify_lost_follower(ctx, balance_queue, upper_id, &child_id)) - .collect::>(); - drop(upper); - remove_prepared.apply(ctx); - prepared.apply(ctx, balance_queue); - } - AggregationNode::Aggegating(aggegating) => { - let remove_change = ctx.data_to_remove_change(&aggegating.data); - let followers = aggegating - .followers - .iter() - .cloned() - .collect::>(); - drop(node); - let mut upper = ctx.node(upper_id); - let remove_prepared = - remove_change.and_then(|remove_change| upper.apply_change(ctx, remove_change)); - start_in_progress_count(ctx, upper_id, followers.len() as u32); - let prepared = followers - .into_iter() - .map(|child_id| upper.notify_lost_follower(ctx, balance_queue, upper_id, &child_id)) - .collect::>(); - drop(upper); - remove_prepared.apply(ctx); - prepared.apply(ctx, balance_queue); - } - } -} diff --git a/turbopack/crates/turbo-tasks-memory/src/aggregation/util.rs b/turbopack/crates/turbo-tasks-memory/src/aggregation/util.rs new file mode 100644 index 0000000000000..44a82a59dd380 --- /dev/null +++ b/turbopack/crates/turbo-tasks-memory/src/aggregation/util.rs @@ -0,0 +1,31 @@ +use super::{AggregationContext, AggregationNode, AggregationNodeGuard, StackVec}; + +pub fn get_aggregated_remove_change( + ctx: &C, + guard: &C::Guard<'_>, +) -> Option { + match &**guard { + AggregationNode::Leaf { .. } => guard.get_remove_change(), + AggregationNode::Aggegating(aggegating) => ctx.data_to_remove_change(&aggegating.data), + } +} + +pub fn get_aggregated_add_change( + ctx: &C, + guard: &C::Guard<'_>, +) -> Option { + match &**guard { + AggregationNode::Leaf { .. } => guard.get_add_change(), + AggregationNode::Aggegating(aggegating) => ctx.data_to_add_change(&aggegating.data), + } +} + +pub fn get_followers_or_children( + _ctx: &C, + guard: &C::Guard<'_>, +) -> StackVec { + match &**guard { + AggregationNode::Leaf { .. } => guard.children().collect(), + AggregationNode::Aggegating(aggegating) => aggegating.followers.iter().cloned().collect(), + } +} diff --git a/turbopack/crates/turbo-tasks-memory/src/count_hash_set.rs b/turbopack/crates/turbo-tasks-memory/src/count_hash_set.rs index d872a614c5df1..06f3ef1c94aa2 100644 --- a/turbopack/crates/turbo-tasks-memory/src/count_hash_set.rs +++ b/turbopack/crates/turbo-tasks-memory/src/count_hash_set.rs @@ -1,6 +1,5 @@ use std::{ borrow::Borrow, - cmp::Ordering, collections::hash_map::RandomState, fmt::{Debug, Formatter}, hash::{BuildHasher, Hash}, @@ -82,12 +81,6 @@ pub enum RemoveIfEntryResult { NotPresent, } -pub struct RemovePositiveCountResult { - pub removed: bool, - pub removed_count: usize, - pub count: isize, -} - impl CountHashSet { /// Returns true, when the value has become visible from outside pub fn add_count(&mut self, item: T, count: usize) -> bool { @@ -274,70 +267,17 @@ impl CountHashSet { } } - /// Returns true when the value is no longer visible from outside - pub fn remove_positive_clonable_count( - &mut self, - item: &T, - count: usize, - ) -> RemovePositiveCountResult { - if count == 0 { - return RemovePositiveCountResult { - removed: false, - removed_count: 0, - count: self.inner.get(item).copied().unwrap_or(0), - }; - } + pub fn remove_all_positive_clonable_count(&mut self, item: &T) -> usize { match self.inner.raw_entry_mut(item) { RawEntry::Occupied(mut e) => { - let value = e.get_mut(); - let old = *value; - match old.cmp(&(count as isize)) { - Ordering::Less => { - if old < 0 { - // It's already negative, can't remove anything - RemovePositiveCountResult { - removed: false, - removed_count: 0, - count: old, - } - } else { - // It's removed completely with count remaining - e.remove(); - RemovePositiveCountResult { - removed: true, - removed_count: old as usize, - count: 0, - } - } - } - Ordering::Equal => { - // It's perfectly removed - e.remove(); - RemovePositiveCountResult { - removed: true, - removed_count: count, - count: 0, - } - } - Ordering::Greater => { - // It's partially removed - *value -= count as isize; - RemovePositiveCountResult { - removed: false, - removed_count: count, - count: *value, - } - } - } - } - RawEntry::Vacant(_) => { - // It's not present - RemovePositiveCountResult { - removed: false, - removed_count: 0, - count: 0, + if *e.get_mut() > 0 { + let value = e.remove(); + value as usize + } else { + 0 } } + RawEntry::Vacant(_) => 0, } } } diff --git a/turbopack/crates/turbo-tasks-memory/src/task.rs b/turbopack/crates/turbo-tasks-memory/src/task.rs index 251aaaf1949ed..886ffcf24f2be 100644 --- a/turbopack/crates/turbo-tasks-memory/src/task.rs +++ b/turbopack/crates/turbo-tasks-memory/src/task.rs @@ -689,6 +689,10 @@ impl Task { self.state.write().into() } + fn try_state_mut(&self) -> Option> { + self.state.try_write().map(|guard| guard.into()) + } + fn full_state_mut(&self) -> FullTaskWriteGuard<'_> { TaskMetaStateWriteGuard::full_from(self.state.write()) } diff --git a/turbopack/crates/turbo-tasks-memory/src/task/aggregation.rs b/turbopack/crates/turbo-tasks-memory/src/task/aggregation.rs index aa610adcdf344..6841e50152b39 100644 --- a/turbopack/crates/turbo-tasks-memory/src/task/aggregation.rs +++ b/turbopack/crates/turbo-tasks-memory/src/task/aggregation.rs @@ -222,6 +222,29 @@ impl<'a> AggregationContext for TaskAggregationContext<'a> { TaskGuard::new(*reference, task.state_mut()) } + fn node_pair<'l>( + &'l self, + id1: &Self::NodeRef, + id2: &Self::NodeRef, + ) -> (Self::Guard<'l>, Self::Guard<'l>) { + let task1 = self.backend.task(*id1); + let task2 = self.backend.task(*id2); + loop { + { + let state1 = task1.state_mut(); + if let Some(state2) = task2.try_state_mut() { + return (TaskGuard::new(*id1, state1), TaskGuard::new(*id2, state2)); + } + } + { + let state2 = task2.state_mut(); + if let Some(state1) = task1.try_state_mut() { + return (TaskGuard::new(*id1, state1), TaskGuard::new(*id2, state2)); + } + } + } + } + fn atomic_in_progress_counter<'l>(&self, id: &'l TaskId) -> &'l AtomicU32 where Self: 'l,