Skip to content

Commit

Permalink
[Turbopack] make filesystem tasks session dependent instead of invali…
Browse files Browse the repository at this point in the history
…dating on startup (#70945)

### What?

Before we did invalidate all filesystem tasks when an incremental build starts. But that has a performance linear to the number of filesystem tasks. We like to avoid that.

This PR refactors that and introduces a new dirty state, which means "task is dirty in general, but considered as clean for a certain session". All filesystem tasks are marked this way. When an incremental build starts we only increment the session id, which makes all filesystem tasks considered as dirty. No looping over all filesystem tasks to make them dirty anymore.

### Why?

Most of the cost before was spend in updating the task aggregation when task where marked dirty. This requires to restore a lot of aggregated tasks. This cost has moved to the point when the filesystem task is recomputed. So it's per page. This has benefits for next dev which doesn't build all pages.
  • Loading branch information
sokra authored Oct 11, 2024
1 parent ee26991 commit 1caa151
Show file tree
Hide file tree
Showing 27 changed files with 680 additions and 326 deletions.
8 changes: 6 additions & 2 deletions crates/next-api/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,9 @@ impl ProjectContainer {
let project = self.project();
let project_fs = project.project_fs().strongly_consistent().await?;
if watch.enable {
project_fs.start_watching_with_invalidation_reason(watch.poll_interval)?;
project_fs
.start_watching_with_invalidation_reason(watch.poll_interval)
.await?;
} else {
project_fs.invalidate_with_reason();
}
Expand Down Expand Up @@ -304,7 +306,9 @@ impl ProjectContainer {
if !ReadRef::ptr_eq(&prev_project_fs, &project_fs) {
if watch.enable {
// TODO stop watching: prev_project_fs.stop_watching()?;
project_fs.start_watching_with_invalidation_reason(watch.poll_interval)?;
project_fs
.start_watching_with_invalidation_reason(watch.poll_interval)
.await?;
} else {
project_fs.invalidate_with_reason();
}
Expand Down
2 changes: 1 addition & 1 deletion turbopack/crates/node-file-trace/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl Args {
async fn create_fs(name: &str, root: &str, watch: bool) -> Result<Vc<Box<dyn FileSystem>>> {
let fs = DiskFileSystem::new(name.into(), root.into(), vec![]);
if watch {
fs.await?.start_watching(None)?;
fs.await?.start_watching(None).await?;
} else {
fs.await?.invalidate_with_reason();
}
Expand Down
381 changes: 245 additions & 136 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,14 @@ pub enum OutdatedEdge {
}

impl CleanupOldEdgesOperation {
pub fn run(
task_id: TaskId,
outdated: Vec<OutdatedEdge>,
data_update: Option<AggregationUpdateJob>,
mut ctx: ExecuteContext<'_>,
) {
let mut queue = AggregationUpdateQueue::new();
queue.extend(data_update);
pub fn run(task_id: TaskId, outdated: Vec<OutdatedEdge>, ctx: &mut ExecuteContext<'_>) {
let queue = AggregationUpdateQueue::new();
CleanupOldEdgesOperation::RemoveEdges {
task_id,
outdated,
queue,
}
.execute(&mut ctx);
.execute(ctx);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ pub enum ConnectChildOperation {
impl ConnectChildOperation {
pub fn run(parent_task_id: TaskId, child_task_id: TaskId, mut ctx: ExecuteContext<'_>) {
let mut parent_task = ctx.task(parent_task_id, TaskDataCategory::All);
parent_task.remove(&CachedDataItemKey::OutdatedChild {
task: child_task_id,
});
// Quick skip if the child was already connected before
if parent_task
.remove(&CachedDataItemKey::OutdatedChild {
task: child_task_id,
})
.is_some()
{
return;
}
if parent_task.add(CachedDataItem::Child {
task: child_task_id,
value: (),
}) {
// When task is added to a AggregateRoot is need to be scheduled,
// indirect connections are handled by the aggregation update.
let mut should_schedule = false;
if parent_task.has_key(&CachedDataItemKey::AggregateRoot {}) {
should_schedule = true;
}
// Update the task aggregation
let mut queue = AggregationUpdateQueue::new();

Expand Down Expand Up @@ -110,15 +110,15 @@ impl ConnectChildOperation {

{
let mut task = ctx.task(child_task_id, TaskDataCategory::Data);
should_schedule = should_schedule || !task.has_key(&CachedDataItemKey::Output {});
if should_schedule {
if !task.has_key(&CachedDataItemKey::Output {}) {
let description = ctx.backend.get_task_desc_fn(child_task_id);
should_schedule = task.add(CachedDataItem::new_scheduled(description));
let should_schedule = task.add(CachedDataItem::new_scheduled(description));
drop(task);
if should_schedule {
ctx.schedule(child_task_id);
}
}
}
if should_schedule {
ctx.schedule(child_task_id);
}

ConnectChildOperation::UpdateAggregation {
aggregation_update: queue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
storage::{get, get_mut},
TaskDataCategory,
},
data::{CachedDataItem, CachedDataItemKey, InProgressState},
data::{CachedDataItem, CachedDataItemKey, CachedDataItemValue, DirtyState, InProgressState},
};

#[derive(Serialize, Deserialize, Clone, Default)]
Expand Down Expand Up @@ -91,22 +91,54 @@ pub fn make_task_dirty_internal(
*stale = true;
}
}
if task.add(CachedDataItem::Dirty { value: () }) {
let dirty_container = get!(task, AggregatedDirtyContainerCount)
.copied()
.unwrap_or_default();
if dirty_container == 0 {
queue.extend(AggregationUpdateJob::data_update(
task,
AggregatedDataUpdate::new().dirty_container(task_id),
));
let old = task.insert(CachedDataItem::Dirty {
value: DirtyState {
clean_in_session: None,
},
});
let mut dirty_container = match old {
Some(CachedDataItemValue::Dirty {
value: DirtyState {
clean_in_session: None,
},
}) => {
// already dirty
return;
}
let root = task.has_key(&CachedDataItemKey::AggregateRoot {});
if root {
let description = ctx.backend.get_task_desc_fn(task_id);
if task.add(CachedDataItem::new_scheduled(description)) {
ctx.schedule(task_id);
}
Some(CachedDataItemValue::Dirty {
value: DirtyState {
clean_in_session: Some(session_id),
},
}) => {
// Got dirty in that one session only
let mut dirty_container = get!(task, AggregatedDirtyContainerCount)
.copied()
.unwrap_or_default();
dirty_container.update_session_dependent(session_id, 1);
dirty_container
}
None => {
// Get dirty for all sessions
get!(task, AggregatedDirtyContainerCount)
.copied()
.unwrap_or_default()
}
_ => unreachable!(),
};
let aggregated_update = dirty_container.update_with_dirty_state(&DirtyState {
clean_in_session: None,
});
if !aggregated_update.is_default() {
queue.extend(AggregationUpdateJob::data_update(
task,
AggregatedDataUpdate::new().dirty_container_update(task_id, aggregated_update),
));
}
let root = task.has_key(&CachedDataItemKey::AggregateRoot {});
if root {
let description = ctx.backend.get_task_desc_fn(task_id);
if task.add(CachedDataItem::new_scheduled(description)) {
ctx.schedule(task_id);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
};

use serde::{Deserialize, Serialize};
use turbo_tasks::{KeyValuePair, TaskId, TurboTasksBackendApi};
use turbo_tasks::{KeyValuePair, SessionId, TaskId, TurboTasksBackendApi};

use crate::{
backend::{
Expand Down Expand Up @@ -97,6 +97,10 @@ impl<'a> ExecuteContext<'a> {
}
}

pub fn session_id(&self) -> SessionId {
self.backend.session_id()
}

pub fn task(&mut self, task_id: TaskId, category: TaskDataCategory) -> TaskGuard<'a> {
let mut task = self.backend.storage.access_mut(task_id);
if !task.persistance_state().is_restored(category) {
Expand Down
4 changes: 3 additions & 1 deletion turbopack/crates/turbo-tasks-backend/src/backing_storage.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use anyhow::Result;
use turbo_tasks::{backend::CachedTaskType, TaskId};
use turbo_tasks::{backend::CachedTaskType, SessionId, TaskId};

use crate::{
backend::{AnyOperation, TaskDataCategory},
Expand All @@ -14,9 +14,11 @@ pub struct ReadTransaction(pub *const ());

pub trait BackingStorage {
fn next_free_task_id(&self) -> TaskId;
fn next_session_id(&self) -> SessionId;
fn uncompleted_operations(&self) -> Vec<AnyOperation>;
fn save_snapshot(
&self,
session_id: SessionId,
operations: Vec<Arc<AnyOperation>>,
task_cache_updates: Vec<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>,
meta_updates: Vec<ChunkedVec<CachedDataUpdate>>,
Expand Down
Loading

0 comments on commit 1caa151

Please sign in to comment.