From 6e986a0bc99094076a1c702893e6169aeb8011d1 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Mon, 23 Sep 2024 16:23:55 +0200 Subject: [PATCH] optimize aggregation number based on number of children --- .../turbo-tasks-backend/src/backend/mod.rs | 22 ++++++--- .../backend/operation/aggregation_update.rs | 49 ++++++++++++++----- .../src/backend/operation/connect_child.rs | 44 +++++++++++++++-- .../src/backend/operation/mod.rs | 4 +- .../crates/turbo-tasks-backend/src/data.rs | 9 +++- 5 files changed, 104 insertions(+), 24 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index c831594cc46e82..70b92fb9629a2e 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -21,8 +21,8 @@ use auto_hash_map::{AutoMap, AutoSet}; use dashmap::DashMap; pub use operation::AnyOperation; use operation::{ - is_root_node, AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue, - CleanupOldEdgesOperation, ConnectChildOperation, OutdatedEdge, + get_aggregation_number, is_root_node, AggregatedDataUpdate, AggregationUpdateJob, + AggregationUpdateQueue, CleanupOldEdgesOperation, ConnectChildOperation, OutdatedEdge, }; use parking_lot::{Condvar, Mutex}; use rustc_hash::FxHasher; @@ -42,8 +42,9 @@ use turbo_tasks::{ use self::{operation::ExecuteContext, storage::Storage}; use crate::{ data::{ - ActiveType, CachedDataItem, CachedDataItemIndex, CachedDataItemKey, CachedDataItemValue, - CachedDataUpdate, CellRef, InProgressCellState, InProgressState, OutputValue, RootState, + ActiveType, AggregationNumber, CachedDataItem, CachedDataItemIndex, CachedDataItemKey, + CachedDataItemValue, CachedDataUpdate, CellRef, InProgressCellState, InProgressState, + OutputValue, RootState, }, get, get_many, remove, utils::{bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc}, @@ -256,7 +257,7 @@ impl TurboTasksBackend { if matches!(consistency, ReadConsistency::Strong) { // Ensure it's an root node loop { - let aggregation_number = get!(task, AggregationNumber).copied().unwrap_or_default(); + let aggregation_number = get_aggregation_number(&task); if is_root_node(aggregation_number) { break; } @@ -264,7 +265,8 @@ impl TurboTasksBackend { AggregationUpdateQueue::run( AggregationUpdateJob::UpdateAggregationNumber { task_id, - aggregation_number: u32::MAX, + base_aggregation_number: u32::MAX, + distance: None, }, &ctx, ); @@ -1058,7 +1060,13 @@ impl Backend for TurboTasksBackend { ); { let mut task = self.storage.access_mut(task_id); - let _ = task.add(CachedDataItem::AggregationNumber { value: u32::MAX }); + let _ = task.add(CachedDataItem::AggregationNumber { + value: AggregationNumber { + base: u32::MAX, + distance: 0, + effective: u32::MAX, + }, + }); let _ = task.add(CachedDataItem::AggregateRoot { value: RootState::new(root_type), }); diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index 490995d6d9c024..d2b798279f6115 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -1,11 +1,11 @@ -use std::collections::VecDeque; +use std::{cmp::max, collections::VecDeque, num::NonZeroU32}; use serde::{Deserialize, Serialize}; use turbo_tasks::TaskId; use super::{ExecuteContext, Operation, TaskGuard}; use crate::{ - data::{ActiveType, CachedDataItem, CachedDataItemKey, RootState}, + data::{ActiveType, AggregationNumber, CachedDataItem, CachedDataItemKey, RootState}, get, get_many, iter_many, remove, update, update_count, }; @@ -43,14 +43,17 @@ fn iter_uppers<'a>(task: &'a TaskGuard<'a>) -> impl Iterator + 'a } pub fn get_aggregation_number(task: &TaskGuard<'_>) -> u32 { - get!(task, AggregationNumber).copied().unwrap_or_default() + get!(task, AggregationNumber) + .map(|a| a.effective) + .unwrap_or_default() } #[derive(Serialize, Deserialize, Clone, Debug)] pub enum AggregationUpdateJob { UpdateAggregationNumber { task_id: TaskId, - aggregation_number: u32, + base_aggregation_number: u32, + distance: Option, }, InnerHasNewFollower { upper_ids: Vec, @@ -252,13 +255,35 @@ impl AggregationUpdateQueue { match job { AggregationUpdateJob::UpdateAggregationNumber { task_id, - aggregation_number, + base_aggregation_number, + distance: base_effective_distance, } => { let mut task = ctx.task(task_id); - let old = get_aggregation_number(&task); - if old < aggregation_number { + let current = get!(task, AggregationNumber).copied().unwrap_or_default(); + // The wanted new distance is either the provided one or the old distance + let distance = base_effective_distance.map_or(current.distance, |d| d.get()); + // The base aggregation number can only increase + let base_aggregation_number = max(current.base, base_aggregation_number); + let old = current.effective; + // The new target effecive aggregation number is base + distance + let aggregation_number = base_aggregation_number.saturating_add(distance); + if old >= aggregation_number { + if base_aggregation_number != current.base && distance != current.distance { + task.insert(CachedDataItem::AggregationNumber { + value: AggregationNumber { + base: base_aggregation_number, + distance, + effective: old, + }, + }); + } + } else { task.insert(CachedDataItem::AggregationNumber { - value: aggregation_number, + value: AggregationNumber { + base: base_aggregation_number, + distance, + effective: aggregation_number, + }, }); if !is_aggregating_node(old) && is_aggregating_node(aggregation_number) { @@ -293,7 +318,8 @@ impl AggregationUpdateQueue { for child_id in children { self.push(AggregationUpdateJob::UpdateAggregationNumber { task_id: child_id, - aggregation_number: aggregation_number + 1, + base_aggregation_number: aggregation_number + 1, + distance: None, }); } } @@ -636,10 +662,11 @@ impl AggregationUpdateQueue { } else { // both nodes have the same aggregation number // We need to change the aggregation number of the task - let new_aggregation_number = upper_aggregation_number + 1; + let current = get!(task, AggregationNumber).copied().unwrap_or_default(); self.push(AggregationUpdateJob::UpdateAggregationNumber { task_id, - aggregation_number: new_aggregation_number, + base_aggregation_number: current.base + 1, + distance: None, }); } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs index 8afbaf8e9f3457..a915fa776347b1 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs @@ -1,3 +1,5 @@ +use std::{cmp::max, num::NonZeroU32}; + use serde::{Deserialize, Serialize}; use turbo_tasks::TaskId; @@ -8,7 +10,8 @@ use super::{ ExecuteContext, Operation, }; use crate::{ - data::{CachedDataItem, CachedDataItemKey}, + backend::operation::is_root_node, + data::{CachedDataItem, CachedDataItemIndex, CachedDataItemKey}, get, }; @@ -41,19 +44,52 @@ impl ConnectChildOperation { } // Update the task aggregation let mut queue = AggregationUpdateQueue::new(); - let parent_aggregation = get!(parent_task, AggregationNumber) + + // Compute new parent aggregation number based on the number of children + let current_parent_aggregation = get!(parent_task, AggregationNumber) .copied() .unwrap_or_default(); + let parent_aggregation = if is_root_node(current_parent_aggregation.base) { + u32::MAX + } else { + let children_count = parent_task + .iter(CachedDataItemIndex::Children) + .filter(|(k, _)| { + matches!( + *k, + CachedDataItemKey::Child { .. } + | CachedDataItemKey::OutdatedChild { .. } + ) + }) + .count(); + let target_distance = children_count.ilog2() as u32 * 2; + let parent_aggregation = current_parent_aggregation + .base + .saturating_add(target_distance); + if target_distance != current_parent_aggregation.distance { + queue.push(AggregationUpdateJob::UpdateAggregationNumber { + task_id: parent_task_id, + base_aggregation_number: 0, + distance: NonZeroU32::new(target_distance), + }) + } + max(current_parent_aggregation.effective, parent_aggregation) + }; + + // Update child aggregation number based on parent aggregation number let is_aggregating_node = is_aggregating_node(parent_aggregation); if parent_task_id.is_transient() && !child_task_id.is_transient() { queue.push(AggregationUpdateJob::UpdateAggregationNumber { task_id: child_task_id, - aggregation_number: u32::MAX, + base_aggregation_number: u32::MAX, + distance: None, }); } else if !is_aggregating_node { queue.push(AggregationUpdateJob::UpdateAggregationNumber { task_id: child_task_id, - aggregation_number: parent_aggregation + AGGREGATION_NUMBER_BUFFER_SPACE + 1, + base_aggregation_number: parent_aggregation + .saturating_add(AGGREGATION_NUMBER_BUFFER_SPACE), + distance: None, }); } if is_aggregating_node { diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index d3f42e6c5b4446..6d3f13d1bc7ee4 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -352,7 +352,9 @@ impl_operation!(Invalidate invalidate::InvalidateOperation); impl_operation!(CleanupOldEdges cleanup_old_edges::CleanupOldEdgesOperation); impl_operation!(AggregationUpdate aggregation_update::AggregationUpdateQueue); -pub use aggregation_update::{is_root_node, AggregatedDataUpdate, AggregationUpdateJob}; +pub use aggregation_update::{ + get_aggregation_number, is_root_node, AggregatedDataUpdate, AggregationUpdateJob, +}; pub use cleanup_old_edges::OutdatedEdge; pub use update_cell::UpdateCellOperation; pub use update_output::UpdateOutputOperation; diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index f448795174fb4b..390ac1c093d16c 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -108,6 +108,13 @@ impl InProgressCellState { } } +#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] +pub struct AggregationNumber { + pub base: u32, + pub distance: u32, + pub effective: u32, +} + #[derive(Debug, Clone, KeyValuePair)] pub enum CachedDataItem { // Output @@ -175,7 +182,7 @@ pub enum CachedDataItem { // Aggregation Graph AggregationNumber { - value: u32, + value: AggregationNumber, }, Follower { task: TaskId,