Skip to content

Commit

Permalink
optimize aggregation number based on number of children
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Sep 25, 2024
1 parent 84665cd commit 6e986a0
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 24 deletions.
22 changes: 15 additions & 7 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use auto_hash_map::{AutoMap, AutoSet};
use dashmap::DashMap;
pub use operation::AnyOperation;
use operation::{
is_root_node, AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue,
CleanupOldEdgesOperation, ConnectChildOperation, OutdatedEdge,
get_aggregation_number, is_root_node, AggregatedDataUpdate, AggregationUpdateJob,
AggregationUpdateQueue, CleanupOldEdgesOperation, ConnectChildOperation, OutdatedEdge,
};
use parking_lot::{Condvar, Mutex};
use rustc_hash::FxHasher;
Expand All @@ -42,8 +42,9 @@ use turbo_tasks::{
use self::{operation::ExecuteContext, storage::Storage};
use crate::{
data::{
ActiveType, CachedDataItem, CachedDataItemIndex, CachedDataItemKey, CachedDataItemValue,
CachedDataUpdate, CellRef, InProgressCellState, InProgressState, OutputValue, RootState,
ActiveType, AggregationNumber, CachedDataItem, CachedDataItemIndex, CachedDataItemKey,
CachedDataItemValue, CachedDataUpdate, CellRef, InProgressCellState, InProgressState,
OutputValue, RootState,
},
get, get_many, remove,
utils::{bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc},
Expand Down Expand Up @@ -256,15 +257,16 @@ impl TurboTasksBackend {
if matches!(consistency, ReadConsistency::Strong) {
// Ensure it's an root node
loop {
let aggregation_number = get!(task, AggregationNumber).copied().unwrap_or_default();
let aggregation_number = get_aggregation_number(&task);
if is_root_node(aggregation_number) {
break;
}
drop(task);
AggregationUpdateQueue::run(
AggregationUpdateJob::UpdateAggregationNumber {
task_id,
aggregation_number: u32::MAX,
base_aggregation_number: u32::MAX,
distance: None,
},
&ctx,
);
Expand Down Expand Up @@ -1058,7 +1060,13 @@ impl Backend for TurboTasksBackend {
);
{
let mut task = self.storage.access_mut(task_id);
let _ = task.add(CachedDataItem::AggregationNumber { value: u32::MAX });
let _ = task.add(CachedDataItem::AggregationNumber {
value: AggregationNumber {
base: u32::MAX,
distance: 0,
effective: u32::MAX,
},
});
let _ = task.add(CachedDataItem::AggregateRoot {
value: RootState::new(root_type),
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::VecDeque;
use std::{cmp::max, collections::VecDeque, num::NonZeroU32};

use serde::{Deserialize, Serialize};
use turbo_tasks::TaskId;

use super::{ExecuteContext, Operation, TaskGuard};
use crate::{
data::{ActiveType, CachedDataItem, CachedDataItemKey, RootState},
data::{ActiveType, AggregationNumber, CachedDataItem, CachedDataItemKey, RootState},
get, get_many, iter_many, remove, update, update_count,
};

Expand Down Expand Up @@ -43,14 +43,17 @@ fn iter_uppers<'a>(task: &'a TaskGuard<'a>) -> impl Iterator<Item = TaskId> + 'a
}

pub fn get_aggregation_number(task: &TaskGuard<'_>) -> u32 {
get!(task, AggregationNumber).copied().unwrap_or_default()
get!(task, AggregationNumber)
.map(|a| a.effective)
.unwrap_or_default()
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum AggregationUpdateJob {
UpdateAggregationNumber {
task_id: TaskId,
aggregation_number: u32,
base_aggregation_number: u32,
distance: Option<NonZeroU32>,
},
InnerHasNewFollower {
upper_ids: Vec<TaskId>,
Expand Down Expand Up @@ -252,13 +255,35 @@ impl AggregationUpdateQueue {
match job {
AggregationUpdateJob::UpdateAggregationNumber {
task_id,
aggregation_number,
base_aggregation_number,
distance: base_effective_distance,
} => {
let mut task = ctx.task(task_id);
let old = get_aggregation_number(&task);
if old < aggregation_number {
let current = get!(task, AggregationNumber).copied().unwrap_or_default();
// The wanted new distance is either the provided one or the old distance
let distance = base_effective_distance.map_or(current.distance, |d| d.get());
// The base aggregation number can only increase
let base_aggregation_number = max(current.base, base_aggregation_number);
let old = current.effective;
// The new target effecive aggregation number is base + distance
let aggregation_number = base_aggregation_number.saturating_add(distance);
if old >= aggregation_number {
if base_aggregation_number != current.base && distance != current.distance {
task.insert(CachedDataItem::AggregationNumber {
value: AggregationNumber {
base: base_aggregation_number,
distance,
effective: old,
},
});
}
} else {
task.insert(CachedDataItem::AggregationNumber {
value: aggregation_number,
value: AggregationNumber {
base: base_aggregation_number,
distance,
effective: aggregation_number,
},
});

if !is_aggregating_node(old) && is_aggregating_node(aggregation_number) {
Expand Down Expand Up @@ -293,7 +318,8 @@ impl AggregationUpdateQueue {
for child_id in children {
self.push(AggregationUpdateJob::UpdateAggregationNumber {
task_id: child_id,
aggregation_number: aggregation_number + 1,
base_aggregation_number: aggregation_number + 1,
distance: None,
});
}
}
Expand Down Expand Up @@ -636,10 +662,11 @@ impl AggregationUpdateQueue {
} else {
// both nodes have the same aggregation number
// We need to change the aggregation number of the task
let new_aggregation_number = upper_aggregation_number + 1;
let current = get!(task, AggregationNumber).copied().unwrap_or_default();
self.push(AggregationUpdateJob::UpdateAggregationNumber {
task_id,
aggregation_number: new_aggregation_number,
base_aggregation_number: current.base + 1,
distance: None,
});
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::{cmp::max, num::NonZeroU32};

use serde::{Deserialize, Serialize};
use turbo_tasks::TaskId;

Expand All @@ -8,7 +10,8 @@ use super::{
ExecuteContext, Operation,
};
use crate::{
data::{CachedDataItem, CachedDataItemKey},
backend::operation::is_root_node,
data::{CachedDataItem, CachedDataItemIndex, CachedDataItemKey},
get,
};

Expand Down Expand Up @@ -41,19 +44,52 @@ impl ConnectChildOperation {
}
// Update the task aggregation
let mut queue = AggregationUpdateQueue::new();
let parent_aggregation = get!(parent_task, AggregationNumber)

// Compute new parent aggregation number based on the number of children
let current_parent_aggregation = get!(parent_task, AggregationNumber)
.copied()
.unwrap_or_default();
let parent_aggregation = if is_root_node(current_parent_aggregation.base) {
u32::MAX
} else {
let children_count = parent_task
.iter(CachedDataItemIndex::Children)
.filter(|(k, _)| {
matches!(
*k,
CachedDataItemKey::Child { .. }
| CachedDataItemKey::OutdatedChild { .. }
)
})
.count();
let target_distance = children_count.ilog2() as u32 * 2;
let parent_aggregation = current_parent_aggregation
.base
.saturating_add(target_distance);
if target_distance != current_parent_aggregation.distance {
queue.push(AggregationUpdateJob::UpdateAggregationNumber {
task_id: parent_task_id,
base_aggregation_number: 0,
distance: NonZeroU32::new(target_distance),
})
}
max(current_parent_aggregation.effective, parent_aggregation)
};

// Update child aggregation number based on parent aggregation number
let is_aggregating_node = is_aggregating_node(parent_aggregation);
if parent_task_id.is_transient() && !child_task_id.is_transient() {
queue.push(AggregationUpdateJob::UpdateAggregationNumber {
task_id: child_task_id,
aggregation_number: u32::MAX,
base_aggregation_number: u32::MAX,
distance: None,
});
} else if !is_aggregating_node {
queue.push(AggregationUpdateJob::UpdateAggregationNumber {
task_id: child_task_id,
aggregation_number: parent_aggregation + AGGREGATION_NUMBER_BUFFER_SPACE + 1,
base_aggregation_number: parent_aggregation
.saturating_add(AGGREGATION_NUMBER_BUFFER_SPACE),
distance: None,
});
}
if is_aggregating_node {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,9 @@ impl_operation!(Invalidate invalidate::InvalidateOperation);
impl_operation!(CleanupOldEdges cleanup_old_edges::CleanupOldEdgesOperation);
impl_operation!(AggregationUpdate aggregation_update::AggregationUpdateQueue);

pub use aggregation_update::{is_root_node, AggregatedDataUpdate, AggregationUpdateJob};
pub use aggregation_update::{
get_aggregation_number, is_root_node, AggregatedDataUpdate, AggregationUpdateJob,
};
pub use cleanup_old_edges::OutdatedEdge;
pub use update_cell::UpdateCellOperation;
pub use update_output::UpdateOutputOperation;
9 changes: 8 additions & 1 deletion turbopack/crates/turbo-tasks-backend/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ impl InProgressCellState {
}
}

#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
pub struct AggregationNumber {
pub base: u32,
pub distance: u32,
pub effective: u32,
}

#[derive(Debug, Clone, KeyValuePair)]
pub enum CachedDataItem {
// Output
Expand Down Expand Up @@ -175,7 +182,7 @@ pub enum CachedDataItem {

// Aggregation Graph
AggregationNumber {
value: u32,
value: AggregationNumber,
},
Follower {
task: TaskId,
Expand Down

0 comments on commit 6e986a0

Please sign in to comment.