Skip to content

Commit

Permalink
[Turbopack] Preparations for persisting (#70468)
Browse files Browse the repository at this point in the history
### What?

parallel filesystem invalidation

use turbo_tasks spawn blocking
  • Loading branch information
sokra authored and kdy1 committed Oct 10, 2024
1 parent c164679 commit 3765a73
Show file tree
Hide file tree
Showing 16 changed files with 196 additions and 129 deletions.
11 changes: 5 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ proc-macro2 = "1.0.79"
qstring = "0.7.2"
quote = "1.0.23"
rand = "0.8.5"
rayon = "1.10.0"
regex = "1.10.6"
rstest = "0.16.0"
rustc-hash = "1.1.0"
Expand Down
43 changes: 19 additions & 24 deletions crates/next-api/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,21 +216,19 @@ impl ProjectContainer {
impl ProjectContainer {
#[tracing::instrument(level = "info", name = "initialize project", skip_all)]
pub async fn initialize(self: Vc<Self>, options: ProjectOptions) -> Result<()> {
let poll_interval = options.watch.poll_interval;
let watch = options.watch;

self.await?.options_state.set(Some(options));

let project = self.project();
project
.project_fs()
.strongly_consistent()
.await?
.start_watching_with_invalidation_reason(poll_interval)?;
project
.output_fs()
.strongly_consistent()
.await?
.invalidate_with_reason();
let project_fs = project.project_fs().strongly_consistent().await?;
if watch.enable {
project_fs.start_watching_with_invalidation_reason(watch.poll_interval)?;
} else {
project_fs.invalidate_with_reason();
}
let output_fs = project.output_fs().strongly_consistent().await?;
output_fs.invalidate_with_reason();
Ok(())
}

Expand Down Expand Up @@ -293,20 +291,23 @@ impl ProjectContainer {
}

// TODO: Handle mode switch, should prevent mode being switched.
let watch = new_options.watch;

let project = self.project();
let prev_project_fs = project.project_fs().strongly_consistent().await?;
let prev_output_fs = project.output_fs().strongly_consistent().await?;

let poll_interval = new_options.watch.poll_interval;

this.options_state.set(Some(new_options));
let project_fs = project.project_fs().strongly_consistent().await?;
let output_fs = project.output_fs().strongly_consistent().await?;

if !ReadRef::ptr_eq(&prev_project_fs, &project_fs) {
// TODO stop watching: prev_project_fs.stop_watching()?;
project_fs.start_watching_with_invalidation_reason(poll_interval)?;
if watch.enable {
// TODO stop watching: prev_project_fs.stop_watching()?;
project_fs.start_watching_with_invalidation_reason(watch.poll_interval)?;
} else {
project_fs.invalidate_with_reason();
}
}
if !ReadRef::ptr_eq(&prev_output_fs, &output_fs) {
prev_output_fs.invalidate_with_reason();
Expand Down Expand Up @@ -533,18 +534,12 @@ impl Project {
}

#[turbo_tasks::function]
async fn project_fs(&self) -> Result<Vc<DiskFileSystem>> {
let disk_fs = DiskFileSystem::new(
fn project_fs(&self) -> Vc<DiskFileSystem> {
DiskFileSystem::new(
PROJECT_FILESYSTEM_NAME.into(),
self.root_path.clone(),
vec![],
);
if self.watch.enable {
disk_fs
.await?
.start_watching_with_invalidation_reason(self.watch.poll_interval)?;
}
Ok(disk_fs)
)
}

#[turbo_tasks::function]
Expand Down
10 changes: 5 additions & 5 deletions crates/next-core/src/app_structure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub struct AppDirModules {
pub default: Option<Vc<FileSystemPath>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub route: Option<Vc<FileSystemPath>>,
#[serde(skip_serializing_if = "Metadata::is_empty")]
#[serde(skip_serializing_if = "Metadata::is_empty", default)]
pub metadata: Metadata,
}

Expand Down Expand Up @@ -137,13 +137,13 @@ impl From<MetadataWithAltItem> for MetadataItem {
/// Metadata file that can be placed in any segment of the app directory.
#[derive(Default, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, TraceRawVcs)]
pub struct Metadata {
#[serde(skip_serializing_if = "Vec::is_empty")]
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub icon: Vec<MetadataWithAltItem>,
#[serde(skip_serializing_if = "Vec::is_empty")]
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub apple: Vec<MetadataWithAltItem>,
#[serde(skip_serializing_if = "Vec::is_empty")]
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub twitter: Vec<MetadataWithAltItem>,
#[serde(skip_serializing_if = "Vec::is_empty")]
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub open_graph: Vec<MetadataWithAltItem>,
#[serde(skip_serializing_if = "Option::is_none")]
pub sitemap: Option<MetadataItem>,
Expand Down
38 changes: 37 additions & 1 deletion crates/next-core/src/next_manifests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Default for MiddlewaresManifest {
Serialize,
Deserialize,
)]
#[serde(rename_all = "camelCase")]
#[serde(rename_all = "camelCase", default)]
pub struct MiddlewareMatcher {
// When skipped next.js with fill that during merging.
#[serde(skip_serializing_if = "Option::is_none")]
Expand Down Expand Up @@ -308,3 +308,39 @@ pub struct ClientBuildManifest<'a> {
#[serde(flatten)]
pub pages: HashMap<RcStr, Vec<&'a str>>,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_middleware_matcher_serialization() {
let matchers = vec![
MiddlewareMatcher {
regexp: None,
locale: false,
has: None,
missing: None,
original_source: "".into(),
},
MiddlewareMatcher {
regexp: Some(".*".into()),
locale: true,
has: Some(vec![RouteHas::Query {
key: "foo".into(),
value: None,
}]),
missing: Some(vec![RouteHas::Query {
key: "bar".into(),
value: Some("value".into()),
}]),
original_source: "source".into(),
},
];

let serialized = serde_json::to_string(&matchers).unwrap();
let deserialized: Vec<MiddlewareMatcher> = serde_json::from_str(&serialized).unwrap();

assert_eq!(matchers, deserialized);
}
}
14 changes: 13 additions & 1 deletion packages/next/src/build/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1492,7 +1492,15 @@ export default async function build(
'Building'
)
const promises: Promise<any>[] = []
const sema = new Sema(10)

// Concurrency will start at INITIAL_CONCURRENCY and
// slowly ramp up to CONCURRENCY by increasing the
// concurrency by 1 every time a task is completed.
const INITIAL_CONCURRENCY = 5
const CONCURRENCY = 10

const sema = new Sema(INITIAL_CONCURRENCY)
let remainingRampup = CONCURRENCY - INITIAL_CONCURRENCY
const enqueue = (fn: () => Promise<void>) => {
promises.push(
(async () => {
Expand All @@ -1501,6 +1509,10 @@ export default async function build(
await fn()
} finally {
sema.release()
if (remainingRampup > 0) {
remainingRampup--
sema.release()
}
progress()
}
})()
Expand Down
1 change: 1 addition & 0 deletions turbopack/crates/turbo-tasks-fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ jsonc-parser = { version = "0.21.0", features = ["serde"] }
mime = { workspace = true }
notify = { workspace = true }
parking_lot = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true, features = ["rc"] }
serde_bytes = { workspace = true }
serde_json = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion turbopack/crates/turbo-tasks-fs/src/invalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl InvalidationReasonKind for WatchChangeKind {

/// Invalidation was caused by a directory starting to watch from which was read
/// before.
#[derive(PartialEq, Eq, Hash)]
#[derive(PartialEq, Eq, Hash, Clone)]
pub struct WatchStart {
pub name: RcStr,
pub path: RcStr,
Expand Down
100 changes: 57 additions & 43 deletions turbopack/crates/turbo-tasks-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use invalidation::InvalidateFilesystem;
use invalidator_map::InvalidatorMap;
use jsonc_parser::{parse_to_serde_value, ParseOptions};
use mime::Mime;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use read_glob::read_glob;
pub use read_glob::ReadGlobResult;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -273,30 +274,43 @@ impl DiskFileSystem {

pub fn invalidate(&self) {
let _span = tracing::info_span!("invalidate filesystem", path = &*self.root).entered();
for (_, invalidators) in take(&mut *self.invalidator_map.lock().unwrap()).into_iter() {
invalidators.into_iter().for_each(|i| i.invalidate());
}
for (_, invalidators) in take(&mut *self.dir_invalidator_map.lock().unwrap()).into_iter() {
invalidators.into_iter().for_each(|i| i.invalidate());
}
let span = tracing::Span::current();
let handle = tokio::runtime::Handle::current();
let invalidator_map = take(&mut *self.invalidator_map.lock().unwrap());
let dir_invalidator_map = take(&mut *self.dir_invalidator_map.lock().unwrap());
let iter = invalidator_map
.into_par_iter()
.chain(dir_invalidator_map.into_par_iter())
.flat_map(|(_, invalidators)| invalidators.into_par_iter());
iter.for_each(|i| {
let _span = span.clone().entered();
let _guard = handle.enter();
i.invalidate()
});
self.serialization_invalidator.invalidate();
}

pub fn invalidate_with_reason(&self) {
let _span = tracing::info_span!("invalidate filesystem", path = &*self.root).entered();
for (path, invalidators) in take(&mut *self.invalidator_map.lock().unwrap()).into_iter() {
let reason = InvalidateFilesystem { path: path.into() };
invalidators
.into_iter()
.for_each(|i| i.invalidate_with_reason(reason.clone()));
}
for (path, invalidators) in take(&mut *self.dir_invalidator_map.lock().unwrap()).into_iter()
{
let reason = InvalidateFilesystem { path: path.into() };
invalidators
.into_iter()
.for_each(|i| i.invalidate_with_reason(reason.clone()));
}
let span = tracing::Span::current();
let handle = tokio::runtime::Handle::current();
let invalidator_map = take(&mut *self.invalidator_map.lock().unwrap());
let dir_invalidator_map = take(&mut *self.dir_invalidator_map.lock().unwrap());
let iter = invalidator_map
.into_par_iter()
.chain(dir_invalidator_map.into_par_iter())
.flat_map(|(path, invalidators)| {
let _span = span.clone().entered();
let reason = InvalidateFilesystem { path: path.into() };
invalidators
.into_par_iter()
.map(move |i| (reason.clone(), i))
});
iter.for_each(|(reason, invalidator)| {
let _span = span.clone().entered();
let _guard = handle.enter();
invalidator.invalidate_with_reason(reason)
});
self.serialization_invalidator.invalidate();
}

Expand Down Expand Up @@ -333,6 +347,7 @@ impl DiskFileSystem {
invalidator_map,
dir_invalidator_map,
poll_interval,
self.serialization_invalidator.clone(),
)?;
self.serialization_invalidator.invalidate();

Expand Down Expand Up @@ -441,11 +456,11 @@ impl DiskFileSystem {

// we use the sync std function here as it's a lot faster (600%) in
// node-file-trace
let read_dir = match retry_blocking(
&full_path,
tracing::info_span!("read directory", path = display(full_path.display())),
|path| std::fs::read_dir(path),
)
let read_dir = match retry_blocking(&full_path, |path| {
let _span =
tracing::info_span!("read directory", path = display(path.display())).entered();
std::fs::read_dir(path)
})
.await
{
Ok(dir) => dir,
Expand Down Expand Up @@ -810,26 +825,25 @@ impl FileSystem for DiskFileSystem {
} else {
PathBuf::from(unix_to_sys(target).as_ref())
};
retry_blocking(
&target_path,
tracing::info_span!("write symlink", path = display(full_path.display())),
move |target_path| {
// we use the sync std method here because `symlink` is fast
// if we put it into a task, it will be slower
#[cfg(not(target_family = "windows"))]
{
std::os::unix::fs::symlink(target_path, &full_path)
}
#[cfg(target_family = "windows")]
{
if link_type.contains(LinkType::DIRECTORY) {
std::os::windows::fs::symlink_dir(target_path, &full_path)
} else {
std::os::windows::fs::symlink_file(target_path, &full_path)
}
retry_blocking(&target_path, move |target_path| {
let _span =
tracing::info_span!("write symlink", path = display(target_path.display()))
.entered();
// we use the sync std method here because `symlink` is fast
// if we put it into a task, it will be slower
#[cfg(not(target_family = "windows"))]
{
std::os::unix::fs::symlink(target_path, &full_path)
}
#[cfg(target_family = "windows")]
{
if link_type.contains(LinkType::DIRECTORY) {
std::os::windows::fs::symlink_dir(target_path, &full_path)
} else {
std::os::windows::fs::symlink_file(target_path, &full_path)
}
},
)
}
})
.await
.with_context(|| format!("create symlink to {}", target))?;
}
Expand Down
Loading

0 comments on commit 3765a73

Please sign in to comment.