diff --git a/Cargo.lock b/Cargo.lock index 02bdd049155f0..f28d155539c69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7451,6 +7451,7 @@ dependencies = [ "bytes", "concurrent-queue", "criterion", + "dashmap", "dunce", "futures", "futures-retry", diff --git a/crates/turbo-tasks-fs/Cargo.toml b/crates/turbo-tasks-fs/Cargo.toml index 8ec5735e09472..33f00d10481e3 100644 --- a/crates/turbo-tasks-fs/Cargo.toml +++ b/crates/turbo-tasks-fs/Cargo.toml @@ -18,6 +18,7 @@ auto-hash-map = { workspace = true } bitflags = "1.3.2" bytes = "1.1.0" concurrent-queue = { workspace = true } +dashmap = { workspace = true } dunce = { workspace = true } futures = { workspace = true } futures-retry = { workspace = true } diff --git a/crates/turbo-tasks-fs/src/lib.rs b/crates/turbo-tasks-fs/src/lib.rs index a5e5b5517efae..1b97d68e03bfa 100644 --- a/crates/turbo-tasks-fs/src/lib.rs +++ b/crates/turbo-tasks-fs/src/lib.rs @@ -29,7 +29,7 @@ use std::{ path::{Path, PathBuf, MAIN_SEPARATOR}, sync::{ mpsc::{channel, RecvError, TryRecvError}, - Arc, Mutex, + Arc, Mutex, MutexGuard, }, time::Duration, }; @@ -80,6 +80,64 @@ pub trait FileSystem: ValueToString { fn metadata(&self, fs_path: FileSystemPathVc) -> FileMetaVc; } +#[derive(Default)] +struct DiskWatcher { + watcher: Mutex>, + /// Keeps track of which directories are currently watched. This is only + /// used on a OS that doesn't support recursive watching. + #[cfg(not(any(target_os = "macos", target_os = "windows")))] + watching: dashmap::DashSet, +} + +impl DiskWatcher { + #[cfg(not(any(target_os = "macos", target_os = "windows")))] + fn restore_if_watching(&self, dir_path: &Path, root_path: &Path) -> Result<()> { + if self.watching.contains(dir_path) { + let mut watcher = self.watcher.lock().unwrap(); + self.start_watching(&mut watcher, dir_path, &root_path)?; + } + Ok(()) + } + + #[cfg(not(any(target_os = "macos", target_os = "windows")))] + fn ensure_watching(&self, dir_path: &Path, root_path: &Path) -> Result<()> { + if self.watching.contains(dir_path) { + return Ok(()); + } + let mut watcher = self.watcher.lock().unwrap(); + if self.watching.insert(dir_path.to_path_buf()) { + self.start_watching(&mut watcher, dir_path, root_path)?; + } + Ok(()) + } + + #[cfg(not(any(target_os = "macos", target_os = "windows")))] + fn start_watching( + &self, + watcher: &mut MutexGuard>, + dir_path: &Path, + root_path: &Path, + ) -> Result<()> { + if let Some(watcher) = watcher.as_mut() { + let mut path = dir_path; + while let Err(err) = watcher.watch(path, RecursiveMode::NonRecursive) { + if path == root_path { + return Err(err).context(format!( + "Unable to watch {} (tried up to {})", + dir_path.display(), + path.display() + )); + } + let Some(parent_path) = path.parent() else { + return Err(err).context(format!("Unable to watch {} (tried up to {})", dir_path.display(), path.display())); + }; + path = parent_path; + } + } + Ok(()) + } +} + #[turbo_tasks::value(cell = "new", eq = "manual")] pub struct DiskFileSystem { pub name: String, @@ -93,20 +151,36 @@ pub struct DiskFileSystem { dir_invalidator_map: Arc, #[turbo_tasks(debug_ignore, trace_ignore)] #[serde(skip)] - watcher: Mutex>, + watcher: Arc, } impl DiskFileSystem { + /// Returns the root as Path + fn root_path(&self) -> &Path { + simplified(Path::new(&self.root)) + } + /// registers the path as an invalidator for the current task, /// has to be called within a turbo-tasks function - fn register_invalidator(&self, path: impl AsRef, file: bool) { + fn register_invalidator(&self, path: &Path) -> Result<()> { let invalidator = turbo_tasks::get_invalidator(); - if file { - self.invalidator_map.insert(path_to_key(path), invalidator); - } else { - self.dir_invalidator_map - .insert(path_to_key(path), invalidator); + self.invalidator_map.insert(path_to_key(path), invalidator); + #[cfg(not(any(target_os = "macos", target_os = "windows")))] + if let Some(dir) = path.parent() { + self.watcher.ensure_watching(dir, self.root_path())?; } + Ok(()) + } + + /// registers the path as an invalidator for the current task, + /// has to be called within a turbo-tasks function + fn register_dir_invalidator(&self, path: &Path) -> Result<()> { + let invalidator = turbo_tasks::get_invalidator(); + self.dir_invalidator_map + .insert(path_to_key(path), invalidator); + #[cfg(not(any(target_os = "macos", target_os = "windows")))] + self.watcher.ensure_watching(path, self.root_path())?; + Ok(()) } pub fn invalidate(&self) { @@ -119,7 +193,7 @@ impl DiskFileSystem { } pub fn start_watching(&self) -> Result<()> { - let mut watcher_guard = self.watcher.lock().unwrap(); + let mut watcher_guard = self.watcher.watcher.lock().unwrap(); if watcher_guard.is_some() { return Ok(()); } @@ -133,7 +207,12 @@ impl DiskFileSystem { let mut watcher = watcher(tx, Duration::from_millis(1))?; // Add a path to be watched. All files and directories at that path and // below will be monitored for changes. + #[cfg(any(target_os = "macos", target_os = "windows"))] watcher.watch(&root, RecursiveMode::Recursive)?; + #[cfg(not(any(target_os = "macos", target_os = "windows")))] + for dir_path in self.watcher.watching.iter() { + watcher.watch(&*dir_path, RecursiveMode::NonRecursive)?; + } // We need to invalidate all reads that happened before watching // Best is to start_watching before starting to read @@ -145,12 +224,18 @@ impl DiskFileSystem { } watcher_guard.replace(watcher); + drop(watcher_guard); + + let disk_watcher = self.watcher.clone(); + let root_path = self.root_path().to_path_buf(); spawn_thread(move || { let mut batched_invalidate_path = HashSet::new(); let mut batched_invalidate_path_dir = HashSet::new(); let mut batched_invalidate_path_and_children = HashSet::new(); let mut batched_invalidate_path_and_children_dir = HashSet::new(); + #[cfg(not(any(target_os = "macos", target_os = "windows")))] + let mut batched_new_paths = HashSet::new(); 'outer: loop { let mut event = rx.recv().map_err(|e| match e { @@ -161,7 +246,16 @@ impl DiskFileSystem { Ok(DebouncedEvent::Write(path)) => { batched_invalidate_path.insert(path); } - Ok(DebouncedEvent::Create(path)) | Ok(DebouncedEvent::Remove(path)) => { + Ok(DebouncedEvent::Create(path)) => { + batched_invalidate_path_and_children.insert(path.clone()); + batched_invalidate_path_and_children_dir.insert(path.clone()); + if let Some(parent) = path.parent() { + batched_invalidate_path_dir.insert(PathBuf::from(parent)); + } + #[cfg(not(any(target_os = "macos", target_os = "windows")))] + batched_new_paths.insert(path.clone()); + } + Ok(DebouncedEvent::Remove(path)) => { batched_invalidate_path_and_children.insert(path.clone()); batched_invalidate_path_and_children_dir.insert(path.clone()); if let Some(parent) = path.parent() { @@ -177,6 +271,8 @@ impl DiskFileSystem { if let Some(parent) = destination.parent() { batched_invalidate_path_dir.insert(PathBuf::from(parent)); } + #[cfg(not(any(target_os = "macos", target_os = "windows")))] + batched_new_paths.insert(destination.clone()); } Ok(DebouncedEvent::Rescan) => { batched_invalidate_path_and_children.insert(PathBuf::from(&root)); @@ -257,13 +353,19 @@ impl DiskFileSystem { &mut batched_invalidate_path_and_children_dir, ); } + #[cfg(not(any(target_os = "macos", target_os = "windows")))] + { + for path in batched_new_paths.drain() { + let _ = disk_watcher.restore_if_watching(&path, &root_path); + } + } } }); Ok(()) } pub fn stop_watching(&self) { - if let Some(watcher) = self.watcher.lock().unwrap().take() { + if let Some(watcher) = self.watcher.watcher.lock().unwrap().take() { drop(watcher); // thread will detect the stop because the channel is disconnected } @@ -271,7 +373,7 @@ impl DiskFileSystem { pub async fn to_sys_path(&self, fs_path: FileSystemPathVc) -> Result { // just in case there's a windows unc path prefix we remove it with `dunce` - let path = simplified(Path::new(&self.root)); + let path = self.root_path(); let fs_path = fs_path.await?; Ok(if fs_path.path.is_empty() { path.to_path_buf() @@ -299,7 +401,7 @@ impl DiskFileSystemVc { mutex_map: Default::default(), invalidator_map: Arc::new(InvalidatorMap::new()), dir_invalidator_map: Arc::new(InvalidatorMap::new()), - watcher: Mutex::new(None), + watcher: Default::default(), }; Ok(Self::cell(instance)) @@ -329,7 +431,7 @@ impl FileSystem for DiskFileSystem { #[turbo_tasks::function] async fn read(&self, fs_path: FileSystemPathVc) -> Result { let full_path = self.to_sys_path(fs_path).await?; - self.register_invalidator(&full_path, true); + self.register_invalidator(&full_path)?; let content = read_file(full_path, &self.mutex_map).await?; Ok(content.cell()) @@ -338,7 +440,7 @@ impl FileSystem for DiskFileSystem { #[turbo_tasks::function] async fn read_dir(&self, fs_path: FileSystemPathVc) -> Result { let full_path = self.to_sys_path(fs_path).await?; - self.register_invalidator(&full_path, false); + self.register_dir_invalidator(&full_path)?; let fs_path = fs_path.await?; // we use the sync std function here as it's a lot faster (600%) in @@ -392,7 +494,7 @@ impl FileSystem for DiskFileSystem { #[turbo_tasks::function] async fn read_link(&self, fs_path: FileSystemPathVc) -> Result { let full_path = self.to_sys_path(fs_path).await?; - self.register_invalidator(&full_path, true); + self.register_invalidator(&full_path)?; let _lock = self.mutex_map.lock(full_path.clone()).await; let link_path = match retry_future(|| fs::read_link(&full_path)).await { @@ -474,7 +576,7 @@ impl FileSystem for DiskFileSystem { #[turbo_tasks::function] async fn track(&self, fs_path: FileSystemPathVc) -> Result { let full_path = self.to_sys_path(fs_path).await?; - self.register_invalidator(full_path, true); + self.register_invalidator(&full_path)?; Ok(CompletionVc::new()) } @@ -629,7 +731,7 @@ impl FileSystem for DiskFileSystem { #[turbo_tasks::function] async fn metadata(&self, fs_path: FileSystemPathVc) -> Result { let full_path = self.to_sys_path(fs_path).await?; - self.register_invalidator(&full_path, true); + self.register_invalidator(&full_path)?; let _lock = self.mutex_map.lock(full_path.clone()).await; let meta = retry_future(|| fs::metadata(full_path.clone()))