Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Turbopack] new backend cleanup #71132

Merged
merged 1 commit into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ impl TurboTasksBackendInner {

// Check the dirty count of the root node
let dirty_tasks = get!(task, AggregatedDirtyContainerCount)
.copied()
.cloned()
.unwrap_or_default()
.get(self.session_id);
if dirty_tasks > 0 || is_dirty {
Expand All @@ -375,7 +375,14 @@ impl TurboTasksBackendInner {
value: RootState::new(ActiveType::CachedActiveUntilClean, task_id),
});
// A newly added AggregateRoot need to make sure to schedule the tasks
task_ids_to_schedule = get_many!(task, AggregatedDirtyContainer { task } count if count.get(self.session_id) > 0 => task);
task_ids_to_schedule = get_many!(
task,
AggregatedDirtyContainer {
task
} count if count.get(self.session_id) > 0 => {
*task
}
);
if is_dirty {
task_ids_to_schedule.push(task_id);
}
Expand Down Expand Up @@ -1076,7 +1083,7 @@ impl TurboTasksBackendInner {
// handle cell counters: update max index and remove cells that are no longer used
let mut removed_cells = HashMap::new();
let mut old_counters: HashMap<_, _> =
get_many!(task, CellTypeMaxIndex { cell_type } max_index => (cell_type, max_index));
get_many!(task, CellTypeMaxIndex { cell_type } max_index => (*cell_type, *max_index));
for (&cell_type, &max_index) in cell_counters.iter() {
if let Some(old_max_index) = old_counters.remove(&cell_type) {
if old_max_index != max_index {
Expand Down Expand Up @@ -1234,7 +1241,7 @@ impl TurboTasksBackendInner {

let data_update = if old_dirty_state.is_some() || new_dirty_state.is_some() {
let mut dirty_containers = get!(task, AggregatedDirtyContainerCount)
.copied()
.cloned()
.unwrap_or_default();
if let Some(old_dirty_state) = old_dirty_state {
dirty_containers.update_with_dirty_state(&old_dirty_state);
Expand All @@ -1245,7 +1252,7 @@ impl TurboTasksBackendInner {
(None, Some(new)) => dirty_containers.update_with_dirty_state(&new),
(Some(old), Some(new)) => dirty_containers.replace_dirty_state(&old, &new),
};
if !aggregated_update.is_default() {
if !aggregated_update.is_zero() {
if aggregated_update.get(self.session_id) < 0 {
if let Some(root_state) = get!(task, AggregateRoot) {
root_state.all_clean_event.notify(usize::MAX);
Expand Down Expand Up @@ -1403,7 +1410,7 @@ impl TurboTasksBackendInner {
task,
AggregatedCollectible {
collectible
} count if collectible.collectible_type == collectible_type && count > 0 => {
} count if collectible.collectible_type == collectible_type && *count > 0 => {
collectible.cell
}
) {
Expand All @@ -1416,7 +1423,7 @@ impl TurboTasksBackendInner {
Collectible {
collectible
} count if collectible.collectible_type == collectible_type => {
(collectible.cell, count)
(collectible.cell, *count)
}
) {
*collectibles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ fn get_followers_with_aggregation_number(
aggregation_number: u32,
) -> Vec<TaskId> {
if is_aggregating_node(aggregation_number) {
get_many!(task, Follower { task } count if count > 0 => task)
get_many!(task, Follower { task } count if *count > 0 => *task)
} else {
get_many!(task, Child { task } => task)
get_many!(task, Child { task } => *task)
}
}

Expand All @@ -42,11 +42,11 @@ fn get_followers(task: &TaskGuard<'_>) -> Vec<TaskId> {
}

pub fn get_uppers(task: &TaskGuard<'_>) -> Vec<TaskId> {
get_many!(task, Upper { task } count if count > 0 => task)
get_many!(task, Upper { task } count if *count > 0 => *task)
}

fn iter_uppers<'a>(task: &'a TaskGuard<'a>) -> impl Iterator<Item = TaskId> + 'a {
iter_many!(task, Upper { task } count if count > 0 => task)
iter_many!(task, Upper { task } count if *count > 0 => *task)
}

pub fn get_aggregation_number(task: &TaskGuard<'_>) -> u32 {
Expand Down Expand Up @@ -126,17 +126,17 @@ impl AggregatedDataUpdate {
let aggregation = get_aggregation_number(task);
let mut dirty_container_count = Default::default();
let mut collectibles_update: Vec<_> =
get_many!(task, Collectible { collectible } => (collectible, 1));
get_many!(task, Collectible { collectible } => (*collectible, 1));
if is_aggregating_node(aggregation) {
dirty_container_count = get!(task, AggregatedDirtyContainerCount)
.copied()
.cloned()
.unwrap_or_default();
let collectibles = iter_many!(
task,
AggregatedCollectible {
collectible
} count if count > 0 => {
collectible
} count if *count > 0 => {
*collectible
}
);
for collectible in collectibles {
Expand All @@ -148,7 +148,7 @@ impl AggregatedDataUpdate {
}

let mut result = Self::new().collectibles_update(collectibles_update);
if !dirty_container_count.is_default() {
if !dirty_container_count.is_zero() {
let DirtyContainerCount {
count,
count_in_session,
Expand All @@ -170,7 +170,7 @@ impl AggregatedDataUpdate {
collectibles_update,
} = &mut self;
if let Some((_, value)) = dirty_container_update.as_mut() {
*value = value.invert()
*value = value.negate()
}
for (_, value) in collectibles_update.iter_mut() {
*value = -*value;
Expand Down Expand Up @@ -199,7 +199,7 @@ impl AggregatedDataUpdate {
})
}

let mut aggregated_update = Default::default();
let mut aggregated_update = DirtyContainerCount::default();
update!(
task,
AggregatedDirtyContainer {
Expand All @@ -208,7 +208,7 @@ impl AggregatedDataUpdate {
|old: Option<DirtyContainerCount>| {
let mut new = old.unwrap_or_default();
aggregated_update = new.update_count(count);
(!new.is_default()).then_some(new)
(!new.is_zero()).then_some(new)
}
);

Expand All @@ -225,10 +225,10 @@ impl AggregatedDataUpdate {
if let Some(dirty_state) = dirty_state {
new.undo_update_with_dirty_state(&dirty_state);
}
if !aggregated_update.is_default() {
if !aggregated_update.is_zero() {
result.dirty_container_update = Some((task_id, aggregated_update));
}
(!new.is_default()).then_some(new)
(!new.is_zero()).then_some(new)
});
if let Some((_, count)) = result.dirty_container_update.as_ref() {
if count.get(session_id) < 0 {
Expand Down Expand Up @@ -269,8 +269,8 @@ impl AggregatedDataUpdate {
CollectiblesDependent {
collectible_type,
task,
} if collectible_type == ty => {
task
} if *collectible_type == ty => {
*task
}
);
if !dependent.is_empty() {
Expand Down Expand Up @@ -608,7 +608,7 @@ impl AggregationUpdateQueue {
value: RootState::new(ActiveType::CachedActiveUntilClean, task_id),
});
}
let dirty_containers: Vec<_> = get_many!(task, AggregatedDirtyContainer { task } count if count.get(session_id) > 0 => task);
let dirty_containers: Vec<_> = get_many!(task, AggregatedDirtyContainer { task } count if count.get(session_id) > 0 => *task);
if !dirty_containers.is_empty() {
self.push(AggregationUpdateJob::FindAndScheduleDirty {
task_ids: dirty_containers,
Expand Down Expand Up @@ -954,7 +954,7 @@ impl AggregationUpdateQueue {
if !is_aggregating_node(old) && is_aggregating_node(aggregation_number) {
// When converted from leaf to aggregating node, all children become
// followers
let children: Vec<_> = get_many!(task, Child { task } => task);
let children: Vec<_> = get_many!(task, Child { task } => *task);
for child_id in children {
task.add_new(CachedDataItem::Follower {
task: child_id,
Expand All @@ -966,7 +966,7 @@ impl AggregationUpdateQueue {
if is_aggregating_node(aggregation_number) {
// followers might become inner nodes when the aggregation number is
// increased
let followers = iter_many!(task, Follower { task } count if count > 0 => task);
let followers = iter_many!(task, Follower { task } count if *count > 0 => *task);
for follower_id in followers {
self.push(AggregationUpdateJob::BalanceEdge {
upper_id: task_id,
Expand All @@ -978,7 +978,7 @@ impl AggregationUpdateQueue {
self.push(AggregationUpdateJob::BalanceEdge { upper_id, task_id });
}
} else {
let children = iter_many!(task, Child { task } => task);
let children = iter_many!(task, Child { task } => *task);
for child_id in children {
self.push(AggregationUpdateJob::UpdateAggregationNumber {
task_id: child_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,23 +112,23 @@ pub fn make_task_dirty_internal(
}) => {
// Got dirty in that one session only
let mut dirty_container = get!(task, AggregatedDirtyContainerCount)
.copied()
.cloned()
.unwrap_or_default();
dirty_container.update_session_dependent(session_id, 1);
dirty_container
}
None => {
// Get dirty for all sessions
get!(task, AggregatedDirtyContainerCount)
.copied()
.cloned()
.unwrap_or_default()
}
_ => unreachable!(),
};
let aggregated_update = dirty_container.update_with_dirty_state(&DirtyState {
clean_in_session: None,
});
if !aggregated_update.is_default() {
if !aggregated_update.is_zero() {
queue.extend(AggregationUpdateJob::data_update(
task,
AggregatedDataUpdate::new().dirty_container_update(task_id, aggregated_update),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ impl UpdateCellOperation {

let dependent = get_many!(
task,
CellDependent { cell: dependent_cell, task } _value
if dependent_cell == cell
=> task
CellDependent { cell: dependent_cell, task }
if *dependent_cell == cell
=> *task
);

drop(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ impl UpdateOutputOperation {
value: output_value,
});

let dependent_tasks = get_many!(task, OutputDependent { task } => task);
let children = get_many!(task, Child { task } => task);
let dependent_tasks = get_many!(task, OutputDependent { task } => *task);
let children = get_many!(task, Child { task } => *task);

let mut queue = AggregationUpdateQueue::new();

Expand Down
6 changes: 3 additions & 3 deletions turbopack/crates/turbo-tasks-backend/src/backend/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ macro_rules! iter_many {
$task
.iter($crate::data::indicies::$key)
.filter_map(|(key, _)| match key {
&$crate::data::CachedDataItemKey::$key $key_pattern $(if $cond)? => Some(
$crate::data::CachedDataItemKey::$key $key_pattern $(if $cond)? => Some(
$iter_item
),
_ => None,
Expand All @@ -446,8 +446,8 @@ macro_rules! iter_many {
.iter($crate::data::indicies::$key)
.filter_map(|(key, value)| match (key, value) {
(
&$crate::data::CachedDataItemKey::$key $input,
&$crate::data::CachedDataItemValue::$key { value: $value_pattern }
$crate::data::CachedDataItemKey::$key $input,
$crate::data::CachedDataItemValue::$key { value: $value_pattern }
) $(if $cond)? => Some($iter_item),
_ => None,
})
Comment on lines 446 to 453
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Re: lines +433 to +455]

I needed to change that as DirtyContainerCount is no longer Copy

See this comment inline on Graphite.

Expand Down
25 changes: 22 additions & 3 deletions turbopack/crates/turbo-tasks-backend/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,18 @@ fn add_with_diff(v: &mut i32, u: i32) -> i32 {
}
}

#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
/// Represents a count of dirty containers. Since dirtyness can be session dependent, there might be
/// a different count for a specific session. It only need to store the highest session count, since
/// old sessions can't be visited again, so we can ignore their counts.
#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DirtyContainerCount {
pub count: i32,
pub count_in_session: Option<(SessionId, i32)>,
}

impl DirtyContainerCount {
/// Get the count for a specific session. It's only expected to be asked for the current
/// session, since old session counts might be dropped.
pub fn get(&self, session: SessionId) -> i32 {
if let Some((s, count)) = self.count_in_session {
if s == session {
Expand All @@ -124,13 +129,16 @@ impl DirtyContainerCount {
self.count
}

/// Increase/decrease the count by the given value.
pub fn update(&mut self, count: i32) -> DirtyContainerCount {
self.update_count(&DirtyContainerCount {
count,
count_in_session: None,
})
}

/// Increase/decrease the count by the given value, but does not update the count for a specific
/// session. This matches the "dirty, but clean in one session" behavior.
pub fn update_session_dependent(
&mut self,
ignore_session: SessionId,
Expand All @@ -142,6 +150,10 @@ impl DirtyContainerCount {
})
}

/// Adds the `count` to the current count. This correctly handles session dependent counts.
/// Returns a new count object that represents the aggregated count. The aggregated count will
/// be +1 when the self count changes from <= 0 to > 0 and -1 when the self count changes from >
/// 0 to <= 0. The same for the session dependent count.
pub fn update_count(&mut self, count: &DirtyContainerCount) -> DirtyContainerCount {
let mut diff = DirtyContainerCount::default();
match (
Expand Down Expand Up @@ -181,6 +193,7 @@ impl DirtyContainerCount {
diff
}

/// Applies a dirty state to the count. Returns an aggregated count that represents the change.
pub fn update_with_dirty_state(&mut self, dirty: &DirtyState) -> DirtyContainerCount {
if let Some(clean_in_session) = dirty.clean_in_session {
self.update_session_dependent(clean_in_session, 1)
Expand All @@ -189,6 +202,8 @@ impl DirtyContainerCount {
}
}

/// Undoes the effect of a dirty state on the count. Returns an aggregated count that represents
/// the change.
pub fn undo_update_with_dirty_state(&mut self, dirty: &DirtyState) -> DirtyContainerCount {
if let Some(clean_in_session) = dirty.clean_in_session {
self.update_session_dependent(clean_in_session, -1)
Expand All @@ -197,6 +212,8 @@ impl DirtyContainerCount {
}
}

/// Replaces the old dirty state with the new one. Returns an aggregated count that represents
/// the change.
pub fn replace_dirty_state(
&mut self,
old: &DirtyState,
Expand All @@ -207,11 +224,13 @@ impl DirtyContainerCount {
diff
}

pub fn is_default(&self) -> bool {
/// Returns true if the count is zero and appling it would have no effect
pub fn is_zero(&self) -> bool {
self.count == 0 && self.count_in_session.map(|(_, c)| c == 0).unwrap_or(true)
}

pub fn invert(&self) -> Self {
/// Negates the counts.
pub fn negate(&self) -> Self {
Self {
count: -self.count,
count_in_session: self.count_in_session.map(|(s, c)| (s, -c)),
Expand Down
Loading