Skip to content

Commit

Permalink
fix race condition when scheduling dirty tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Oct 9, 2024
1 parent 18e0bde commit 72d3048
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 17 deletions.
10 changes: 2 additions & 8 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,14 +353,8 @@ impl TurboTasksBackendInner {
task = ctx.task(task_id, TaskDataCategory::All);
}

let is_dirty = get!(task, Dirty)
.map(|dirty_state| {
dirty_state
.clean_in_session
.map(|clean_in_session| clean_in_session != self.session_id)
.unwrap_or(true)
})
.unwrap_or(false);
let is_dirty =
get!(task, Dirty).map_or(false, |dirty_state| dirty_state.get(self.session_id));

// Check the dirty count of the root node
let dirty_tasks = get!(task, AggregatedDirtyContainerCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,15 @@ impl AggregatedDataUpdate {
} = self;
let mut result = Self::default();
if let Some((dirty_container_id, count)) = dirty_container_update {
// When a dirty container count is increased and the task is considered as active
// `AggregateRoot` we need to schedule the dirty tasks in the new dirty container
let current_session_update = count.get(session_id);
if current_session_update > 0 && task.has_key(&CachedDataItemKey::AggregateRoot {}) {
queue.push(AggregationUpdateJob::FindAndScheduleDirty {
task_ids: vec![*dirty_container_id],
})
}

let mut aggregated_update = Default::default();
update!(
task,
Expand All @@ -195,12 +204,7 @@ impl AggregatedDataUpdate {
(!new.is_default()).then_some(new)
}
);
let current_session_update = aggregated_update.get(session_id);
if current_session_update > 0 && task.has_key(&CachedDataItemKey::AggregateRoot {}) {
queue.push(AggregationUpdateJob::FindAndScheduleDirty {
task_ids: vec![*dirty_container_id],
})
}

let dirty_state = get!(task, Dirty).copied();
let task_id = task.id();
update!(task, AggregatedDirtyContainerCount, |old: Option<
Expand Down Expand Up @@ -563,20 +567,22 @@ impl AggregationUpdateQueue {
}
if let Some(task_id) = popped {
let mut task = ctx.task(task_id, TaskDataCategory::Meta);
#[allow(clippy::collapsible_if, reason = "readablility")]
if task.has_key(&CachedDataItemKey::Dirty {}) {
let session_id = ctx.session_id();
let dirty = get!(task, Dirty).map_or(false, |d| d.get(session_id));
if dirty {
let description = ctx.backend.get_task_desc_fn(task_id);
if task.add(CachedDataItem::new_scheduled(description)) {
ctx.turbo_tasks.schedule(task_id);
}
}
if is_aggregating_node(get_aggregation_number(&task)) {
// TODO if it has an `AggregateRoot` we can skip visiting the nested nodes since
// this would already be scheduled by the `AggregateRoot`
if !task.has_key(&CachedDataItemKey::AggregateRoot {}) {
task.insert(CachedDataItem::AggregateRoot {
value: RootState::new(ActiveType::CachedActiveUntilClean, task_id),
});
}
let session_id = ctx.session_id();
let dirty_containers: Vec<_> = get_many!(task, AggregatedDirtyContainer { task } count if count.get(session_id) > 0 => task);
if !dirty_containers.is_empty() {
self.push(AggregationUpdateJob::FindAndScheduleDirty {
Expand Down
6 changes: 6 additions & 0 deletions turbopack/crates/turbo-tasks-backend/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ pub struct DirtyState {
pub clean_in_session: Option<SessionId>,
}

impl DirtyState {
pub fn get(&self, session: SessionId) -> bool {
self.clean_in_session != Some(session)
}
}

fn add_with_diff(v: &mut i32, u: i32) -> i32 {
let old = *v;
*v += u;
Expand Down

0 comments on commit 72d3048

Please sign in to comment.