Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Turbopack] use double locking to balance edges #70244

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
320 changes: 151 additions & 169 deletions turbopack/crates/turbo-tasks-memory/src/aggregation/balance_edge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,29 @@ use std::cmp::Ordering;

use super::{
balance_queue::BalanceQueue,
followers::{
add_follower_count, remove_follower_count, remove_positive_follower_count,
RemovePositveFollowerCountResult,
},
in_progress::is_in_progress,
in_progress::{is_in_progress, start_in_progress_all, start_in_progress_count},
increase::IncreaseReason,
increase_aggregation_number_internal,
uppers::{
add_upper_count, remove_positive_upper_count, remove_upper_count,
RemovePositiveUpperCountResult,
},
AggregationContext, AggregationNode,
notify_lost_follower::notify_lost_follower,
notify_new_follower::notify_new_follower,
util::{get_aggregated_add_change, get_aggregated_remove_change, get_followers_or_children},
AggregationContext, AggregationNode, PreparedInternalOperation, PreparedOperation, StackVec,
};

// Migrated followers to uppers or uppers to followers depending on the
// Migrate followers to uppers or uppers to followers depending on the
// aggregation numbers of the nodes involved in the edge. Might increase targets
// aggregation number if they are equal.
pub(super) fn balance_edge<C: AggregationContext>(
ctx: &C,
balance_queue: &mut BalanceQueue<C::NodeRef>,
upper_id: &C::NodeRef,
mut upper_aggregation_number: u32,
target_id: &C::NodeRef,
mut target_aggregation_number: u32,
) -> (u32, u32) {
// too many uppers on target
let mut extra_uppers = 0;
// too many followers on upper
let mut extra_followers = 0;
// The last info about uppers
let mut uppers_count: Option<isize> = None;
// The last info about followers
let mut followers_count = None;

) {
loop {
let (mut upper, mut target) = ctx.node_pair(upper_id, target_id);
let upper_aggregation_number = upper.aggregation_number();
let target_aggregation_number = target.aggregation_number();

let root = upper_aggregation_number == u32::MAX || target_aggregation_number == u32::MAX;
let order = if root {
Ordering::Greater
Expand All @@ -45,164 +33,158 @@ pub(super) fn balance_edge<C: AggregationContext>(
};
match order {
Ordering::Equal => {
// we probably want to increase the aggregation number of target
let upper = ctx.node(upper_id);
upper_aggregation_number = upper.aggregation_number();
drop(upper);
if upper_aggregation_number != u32::MAX
&& upper_aggregation_number == target_aggregation_number
{
let target = ctx.node(target_id);
target_aggregation_number = target.aggregation_number();
if upper_aggregation_number == target_aggregation_number {
// increase target aggregation number
increase_aggregation_number_internal(
ctx,
balance_queue,
target,
target_id,
target_aggregation_number + 1,
target_aggregation_number + 1,
IncreaseReason::EqualAggregationNumberOnBalance,
);
}
}
// increase target aggregation number
increase_aggregation_number_internal(
ctx,
balance_queue,
target,
target_id,
target_aggregation_number + 1,
target_aggregation_number + 1,
IncreaseReason::EqualAggregationNumberOnBalance,
);
}
Ordering::Less => {
// target should probably be a follower of upper
if uppers_count.map_or(false, |count| count <= 0) {
// We already removed all uppers, maybe too many
if is_in_progress(ctx, upper_id) {
drop(target);
let AggregationNode::Aggegating(aggregating) = &mut *upper else {
unreachable!();
};
aggregating
.enqueued_balancing
.push((upper_id.clone(), target_id.clone()));
drop(upper);
// Somebody else will balance this edge
break;
} else if extra_followers == 0 {
let upper = ctx.node(upper_id);
upper_aggregation_number = upper.aggregation_number();
if upper_aggregation_number < target_aggregation_number {
// target should be a follower of upper
// add some extra followers
let count = uppers_count.unwrap_or(1) as usize;
extra_followers += count;
followers_count = Some(add_follower_count(
ctx,
balance_queue,
upper,
upper_id,
target_id,
count,
true,
));
}
}

// target should be a follower of upper
let count = target
.uppers_mut()
.remove_all_positive_clonable_count(upper_id);
if count == 0 {
break;
}
let added = upper
.followers_mut()
.unwrap()
.add_clonable_count(target_id, count);

// target removed as upper
let remove_change = get_aggregated_remove_change(ctx, &target);
let followers = get_followers_or_children(ctx, &target);

let upper_uppers = if added {
// target added as follower
let uppers = upper.uppers().iter().cloned().collect::<StackVec<_>>();
start_in_progress_all(ctx, &uppers);
uppers
} else {
// we already have extra followers, remove some uppers to balance
let count = extra_followers + extra_uppers;
let target = ctx.node(target_id);
if is_in_progress(ctx, upper_id) {
drop(target);
let mut upper = ctx.node(upper_id);
if is_in_progress(ctx, upper_id) {
let AggregationNode::Aggegating(aggregating) = &mut *upper else {
unreachable!();
};
aggregating.enqueued_balancing.push((
upper_id.clone(),
upper_aggregation_number,
target_id.clone(),
target_aggregation_number,
));
drop(upper);
// Somebody else will balance this edge
return (upper_aggregation_number, target_aggregation_number);
}
} else {
let RemovePositiveUpperCountResult {
removed_count,
remaining_count,
} = remove_positive_upper_count(
ctx,
balance_queue,
target,
upper_id,
count,
);
decrease_numbers(removed_count, &mut extra_uppers, &mut extra_followers);
uppers_count = Some(remaining_count);
}
Default::default()
};

drop(target);

// target removed as upper
let remove_prepared =
remove_change.and_then(|remove_change| upper.apply_change(ctx, remove_change));
start_in_progress_count(ctx, upper_id, followers.len() as u32);
let prepared = followers
.into_iter()
.map(|child_id| {
upper.notify_lost_follower(ctx, balance_queue, upper_id, &child_id)
})
.collect::<StackVec<_>>();
drop(upper);

// target added as follower
for upper_id in upper_uppers {
notify_new_follower(
ctx,
balance_queue,
ctx.node(&upper_id),
&upper_id,
target_id,
false,
);
}

// target removed as upper
remove_prepared.apply(ctx);
prepared.apply(ctx, balance_queue);

break;
}
Ordering::Greater => {
// target should probably be an inner node of upper
if followers_count.map_or(false, |count| count <= 0) {
// We already removed all followers, maybe too many
if is_in_progress(ctx, upper_id) {
let AggregationNode::Aggegating(aggregating) = &mut *upper else {
unreachable!();
};
aggregating
.enqueued_balancing
.push((upper_id.clone(), target_id.clone()));
drop(upper);
// Somebody else will balance this edge
break;
}

// target should be a inner node of upper
let count = upper
.followers_mut()
.unwrap()
.remove_all_positive_clonable_count(target_id);
if count == 0 {
break;
} else if extra_uppers == 0 {
let target = ctx.node(target_id);
target_aggregation_number = target.aggregation_number();
if root || target_aggregation_number < upper_aggregation_number {
// target should be a inner node of upper
if is_in_progress(ctx, upper_id) {
drop(target);
let mut upper = ctx.node(upper_id);
if is_in_progress(ctx, upper_id) {
let AggregationNode::Aggegating(aggregating) = &mut *upper else {
unreachable!();
};
aggregating.enqueued_balancing.push((
upper_id.clone(),
upper_aggregation_number,
target_id.clone(),
target_aggregation_number,
));
drop(upper);
// Somebody else will balance this edge
return (upper_aggregation_number, target_aggregation_number);
}
} else {
// add some extra uppers
let count = followers_count.unwrap_or(1) as usize;
extra_uppers += count;
uppers_count = Some(
add_upper_count(
ctx,
balance_queue,
target,
target_id,
upper_id,
count,
true,
)
.new_count,
);
}
}
}
let added = target.uppers_mut().add_clonable_count(upper_id, count);

// target removed as follower
let uppers = upper.uppers().iter().cloned().collect::<StackVec<_>>();
start_in_progress_all(ctx, &uppers);

let (add_change, followers) = if added {
// target added as upper
let add_change = get_aggregated_add_change(ctx, &target);
let followers = get_followers_or_children(ctx, &target);
start_in_progress_count(ctx, upper_id, followers.len() as u32);
(add_change, followers)
} else {
// we already have extra uppers, try to remove some followers to balance
let count = extra_followers + extra_uppers;
let upper = ctx.node(upper_id);
let RemovePositveFollowerCountResult {
removed_count,
remaining_count,
} = remove_positive_follower_count(ctx, balance_queue, upper, target_id, count);
decrease_numbers(removed_count, &mut extra_followers, &mut extra_uppers);
followers_count = Some(remaining_count);
(None, Default::default())
};

drop(target);

// target added as upper
let add_prepared =
add_change.and_then(|add_change| upper.apply_change(ctx, add_change));
let prepared = followers
.into_iter()
.filter_map(|child_id| {
upper.notify_new_follower(ctx, balance_queue, upper_id, &child_id, false)
})
.collect::<StackVec<_>>();

drop(upper);

add_prepared.apply(ctx);
for prepared in prepared {
prepared.apply(ctx, balance_queue);
}

// target removed as follower
for upper_id in uppers {
notify_lost_follower(
ctx,
balance_queue,
ctx.node(&upper_id),
&upper_id,
target_id,
);
}

break;
}
}
}
if extra_followers > 0 {
let upper = ctx.node(upper_id);
remove_follower_count(ctx, balance_queue, upper, target_id, extra_followers);
}
if extra_uppers > 0 {
let target = ctx.node(target_id);
remove_upper_count(ctx, balance_queue, target, upper_id, extra_uppers);
}
(upper_aggregation_number, target_aggregation_number)
}

fn decrease_numbers(amount: usize, a: &mut usize, b: &mut usize) {
if *a >= amount {
*a -= amount;
} else {
*b -= amount - *a;
*a = 0;
}
}
Loading
Loading