Skip to content

Commit

Permalink
Graph aggregation refactoring (vercel/turborepo#8082)
Browse files Browse the repository at this point in the history
### Description

* Deletes the aggregation tree
* Adds a new graph aggregation algorithm which is more efficient

The graph aggregation works as following:

For the graph aggregation: Every task is a node in the graph. Every
parent-child relationship is an edge in the graph.

* Every node has an "aggregation number" N.
* There are 2 kinds of nodes: Leaf nodes and aggregating nodes.
* If a node has N < LEAF_NUMBER, it's a leaf node, otherwise an
aggregating node.
* A higher N for a node usually means that a larger subgraph is
aggregated into that node.
* Next to normal edges there are two extra kind of edges for the graph
aggregation: Upper edges and follower edges.
* A node is considered as "inner" to another node when it has an "upper"
edge pointing towards it.
* The inner node has a lower N than the upper node. (This invariant
might be temporarily violated while tree balancing is scheduled but not
executed yet)
* Aggregating nodes store an aggregated version of the state of all
inner nodes and transitively inner nodes.
* Changes in nodes are propagated to all upper nodes.
* Every node has at least one upper node which is more aggregated than
the node. Except for the root node of the graph, which doesn't have
upper edges.
* An aggregating node also has follower edges. They point to the nodes
that are one normal edge after all inner and transitively inner nodes.
* An leaf node doesn't have follower edges. For all purposes the normal
edges of leaf nodes are considered as follower edges.
* Follower nodes have a higher N than the origin node. (This invariant
might be temporarily violated while tree balancing is scheduled but not
executed yet)
* This means large and larger subgraphs are aggregated.
* Graph operations will ensure that these invariants (Higher N on upper
and follower edges) are not violated.
* The N of a node can only increase. So graph operations need to "fix"
the invariants by increasing N or changing upper/follower edges. That
later one is preferred. N is usually only increased if two nodes have
equal N.
* When new edges between leaf nodes are added, the target node's N is
increased to the origin node's N + 4 if it's smaller. This adds a small
tolerance range so increasing N doesn't cause long chains of N += 1
between leaf nodes.

### Testing Instructions

<!--
  Give a quick description of steps to test your changes.
-->


Closes PACK-3036

---------

Co-authored-by: Alexander Lyon <arlyon@me.com>
  • Loading branch information
sokra and arlyon authored May 8, 2024
1 parent b45da48 commit 25dc933
Show file tree
Hide file tree
Showing 30 changed files with 4,325 additions and 2,882 deletions.
5 changes: 4 additions & 1 deletion crates/turbo-tasks-memory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ anyhow = { workspace = true }
auto-hash-map = { workspace = true }
concurrent-queue = { workspace = true }
dashmap = { workspace = true }
indexmap = { workspace = true }
nohash-hasher = { workspace = true }
num_cpus = "1.13.1"
once_cell = { workspace = true }
Expand All @@ -33,8 +34,10 @@ turbo-tasks-malloc = { workspace = true, default-features = false }

[dev-dependencies]
criterion = { workspace = true, features = ["async_tokio"] }
indexmap = { workspace = true }
lazy_static = { workspace = true }
loom = "0.7.2"
rand = { workspace = true, features = ["small_rng"] }
rstest = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true, features = ["full"] }
turbo-tasks-testing = { workspace = true }
Expand Down
81 changes: 81 additions & 0 deletions crates/turbo-tasks-memory/src/aggregation/aggregation_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::ops::{Deref, DerefMut};

use super::{
increase_aggregation_number_internal, AggregationContext, AggregationNode, AggregationNodeGuard,
};
use crate::aggregation::balance_queue::BalanceQueue;

/// Gives an reference to the aggregated data for a given item. This will
/// convert the item to a fully aggregated node.
pub fn aggregation_data<'l, C>(
ctx: &'l C,
node_id: &C::NodeRef,
) -> AggregationDataGuard<C::Guard<'l>>
where
C: AggregationContext + 'l,
{
let guard = ctx.node(node_id);
if guard.aggregation_number() == u32::MAX {
AggregationDataGuard { guard }
} else {
let mut balance_queue = BalanceQueue::new();
increase_aggregation_number_internal(
ctx,
&mut balance_queue,
guard,
node_id,
u32::MAX,
u32::MAX,
);
balance_queue.process(ctx);
let guard = ctx.node(node_id);
debug_assert!(guard.aggregation_number() == u32::MAX);
AggregationDataGuard { guard }
}
}

/// Converted the given node to a fully aggregated node. To make the next call
/// to `aggregation_data` instant.
pub fn prepare_aggregation_data<C: AggregationContext>(ctx: &C, node_id: &C::NodeRef) {
let mut balance_queue = BalanceQueue::new();
increase_aggregation_number_internal(
ctx,
&mut balance_queue,
ctx.node(node_id),
node_id,
u32::MAX,
u32::MAX,
);
balance_queue.process(ctx);
}

/// A reference to the aggregated data of a node. This holds a lock to the node.
pub struct AggregationDataGuard<G> {
guard: G,
}

impl<G> AggregationDataGuard<G> {
pub fn into_inner(self) -> G {
self.guard
}
}

impl<G: AggregationNodeGuard> Deref for AggregationDataGuard<G> {
type Target = G::Data;

fn deref(&self) -> &Self::Target {
match &*self.guard {
AggregationNode::Leaf { .. } => unreachable!(),
AggregationNode::Aggegating(aggregating) => &aggregating.data,
}
}
}

impl<G: AggregationNodeGuard> DerefMut for AggregationDataGuard<G> {
fn deref_mut(&mut self) -> &mut Self::Target {
match &mut *self.guard {
AggregationNode::Leaf { .. } => unreachable!(),
AggregationNode::Aggegating(aggregating) => &mut aggregating.data,
}
}
}
206 changes: 206 additions & 0 deletions crates/turbo-tasks-memory/src/aggregation/balance_edge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
use std::cmp::Ordering;

use super::{
balance_queue::BalanceQueue,
followers::{
add_follower_count, remove_follower_count, remove_positive_follower_count,
RemovePositveFollowerCountResult,
},
in_progress::is_in_progress,
increase_aggregation_number_internal,
uppers::{
add_upper_count, remove_positive_upper_count, remove_upper_count,
RemovePositiveUpperCountResult,
},
AggregationContext, AggregationNode,
};

// Migrated followers to uppers or uppers to followers depending on the
// aggregation numbers of the nodes involved in the edge. Might increase targets
// aggregation number if they are equal.
pub(super) fn balance_edge<C: AggregationContext>(
ctx: &C,
balance_queue: &mut BalanceQueue<C::NodeRef>,
upper_id: &C::NodeRef,
mut upper_aggregation_number: u32,
target_id: &C::NodeRef,
mut target_aggregation_number: u32,
) -> (u32, u32) {
// too many uppers on target
let mut extra_uppers = 0;
// too many followers on upper
let mut extra_followers = 0;
// The last info about uppers
let mut uppers_count: Option<isize> = None;
// The last info about followers
let mut followers_count = None;

loop {
let root = upper_aggregation_number == u32::MAX || target_aggregation_number == u32::MAX;
let order = if root {
Ordering::Greater
} else {
upper_aggregation_number.cmp(&target_aggregation_number)
};
match order {
Ordering::Equal => {
// we probably want to increase the aggregation number of target
let upper = ctx.node(upper_id);
upper_aggregation_number = upper.aggregation_number();
drop(upper);
if upper_aggregation_number != u32::MAX
&& upper_aggregation_number == target_aggregation_number
{
let target = ctx.node(target_id);
target_aggregation_number = target.aggregation_number();
if upper_aggregation_number == target_aggregation_number {
// increase target aggregation number
increase_aggregation_number_internal(
ctx,
balance_queue,
target,
target_id,
target_aggregation_number + 1,
target_aggregation_number + 1,
);
}
}
}
Ordering::Less => {
// target should probably be a follower of upper
if uppers_count.map_or(false, |count| count <= 0) {
// We already removed all uppers, maybe too many
break;
} else if extra_followers == 0 {
let upper = ctx.node(upper_id);
upper_aggregation_number = upper.aggregation_number();
if upper_aggregation_number < target_aggregation_number {
// target should be a follower of upper
// add some extra followers
let count = uppers_count.unwrap_or(1) as usize;
extra_followers += count;
followers_count = Some(add_follower_count(
ctx,
balance_queue,
upper,
upper_id,
target_id,
count,
true,
));
}
} else {
// we already have extra followers, remove some uppers to balance
let count = extra_followers + extra_uppers;
let target = ctx.node(target_id);
if is_in_progress(ctx, upper_id) {
drop(target);
let mut upper = ctx.node(upper_id);
if is_in_progress(ctx, upper_id) {
let AggregationNode::Aggegating(aggregating) = &mut *upper else {
unreachable!();
};
aggregating.enqueued_balancing.push((
upper_id.clone(),
upper_aggregation_number,
target_id.clone(),
target_aggregation_number,
));
drop(upper);
// Somebody else will balance this edge
return (upper_aggregation_number, target_aggregation_number);
}
} else {
let RemovePositiveUpperCountResult {
removed_count,
remaining_count,
} = remove_positive_upper_count(
ctx,
balance_queue,
target,
upper_id,
count,
);
decrease_numbers(removed_count, &mut extra_uppers, &mut extra_followers);
uppers_count = Some(remaining_count);
}
}
}
Ordering::Greater => {
// target should probably be an inner node of upper
if followers_count.map_or(false, |count| count <= 0) {
// We already removed all followers, maybe too many
break;
} else if extra_uppers == 0 {
let target = ctx.node(target_id);
target_aggregation_number = target.aggregation_number();
if root || target_aggregation_number < upper_aggregation_number {
// target should be a inner node of upper
if is_in_progress(ctx, upper_id) {
drop(target);
let mut upper = ctx.node(upper_id);
if is_in_progress(ctx, upper_id) {
let AggregationNode::Aggegating(aggregating) = &mut *upper else {
unreachable!();
};
aggregating.enqueued_balancing.push((
upper_id.clone(),
upper_aggregation_number,
target_id.clone(),
target_aggregation_number,
));
drop(upper);
// Somebody else will balance this edge
return (upper_aggregation_number, target_aggregation_number);
}
} else {
// add some extra uppers
let count = followers_count.unwrap_or(1) as usize;
extra_uppers += count;
uppers_count = Some(
add_upper_count(
ctx,
balance_queue,
target,
target_id,
upper_id,
count,
true,
)
.new_count,
);
}
}
} else {
// we already have extra uppers, try to remove some followers to balance
let count = extra_followers + extra_uppers;
let upper = ctx.node(upper_id);
let RemovePositveFollowerCountResult {
removed_count,
remaining_count,
} = remove_positive_follower_count(ctx, balance_queue, upper, target_id, count);
decrease_numbers(removed_count, &mut extra_followers, &mut extra_uppers);
followers_count = Some(remaining_count);
}
}
}
}
if extra_followers > 0 {
let upper = ctx.node(upper_id);
remove_follower_count(ctx, balance_queue, upper, target_id, extra_followers);
}
if extra_uppers > 0 {
let target = ctx.node(target_id);
remove_upper_count(ctx, balance_queue, target, upper_id, extra_uppers);
}
(upper_aggregation_number, target_aggregation_number)
}

fn decrease_numbers(amount: usize, a: &mut usize, b: &mut usize) {
if *a >= amount {
*a -= amount;
} else {
*b -= amount - *a;
*a = 0;
}
}
90 changes: 90 additions & 0 deletions crates/turbo-tasks-memory/src/aggregation/balance_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use std::{cmp::max, collections::HashMap, hash::Hash, mem::take};

use indexmap::IndexSet;

use super::{balance_edge, AggregationContext};

/// Enqueued edges that need to be balanced. Deduplicates edges and keeps track
/// of aggregation numbers read during balancing.
pub struct BalanceQueue<I> {
queue: IndexSet<(I, I)>,
aggregation_numbers: HashMap<I, u32>,
}

impl<I: Hash + Eq + Clone> BalanceQueue<I> {
pub fn new() -> Self {
Self {
queue: IndexSet::default(),
aggregation_numbers: HashMap::default(),
}
}

fn add_number(&mut self, id: I, number: u32) {
self.aggregation_numbers
.entry(id)
.and_modify(|n| *n = max(*n, number))
.or_insert(number);
}

/// Add an edge to the queue. The edge will be balanced during the next
/// call.
pub fn balance(
&mut self,
upper_id: I,
upper_aggregation_number: u32,
target_id: I,
target_aggregation_number: u32,
) {
debug_assert!(upper_id != target_id);
self.add_number(upper_id.clone(), upper_aggregation_number);
self.add_number(target_id.clone(), target_aggregation_number);
self.queue.insert((upper_id.clone(), target_id.clone()));
}

/// Add multiple edges to the queue. The edges will be balanced during the
/// next call.
pub fn balance_all(&mut self, edges: Vec<(I, u32, I, u32)>) {
for (upper_id, upper_aggregation_number, target_id, target_aggregation_number) in edges {
self.balance(
upper_id,
upper_aggregation_number,
target_id,
target_aggregation_number,
);
}
}

/// Process the queue and balance all enqueued edges.
pub fn process<C: AggregationContext<NodeRef = I>>(mut self, ctx: &C) {
while !self.queue.is_empty() {
let queue = take(&mut self.queue);
for (upper_id, target_id) in queue {
let upper_aggregation_number = self
.aggregation_numbers
.get(&upper_id)
.copied()
.unwrap_or_default();
let target_aggregation_number = self
.aggregation_numbers
.get(&target_id)
.copied()
.unwrap_or_default();

let (u, t) = balance_edge(
ctx,
&mut self,
&upper_id,
upper_aggregation_number,
&target_id,
target_aggregation_number,
);
if u != upper_aggregation_number {
self.add_number(upper_id, u);
}
if t != target_aggregation_number {
self.add_number(target_id, t);
}
}
}
}
}
Loading

0 comments on commit 25dc933

Please sign in to comment.