Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

only use recursive watchers on macOS and windows #4100

Merged
merged 2 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/turbo-tasks-fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ auto-hash-map = { workspace = true }
bitflags = "1.3.2"
bytes = "1.1.0"
concurrent-queue = { workspace = true }
dashmap = { workspace = true }
dunce = { workspace = true }
futures = { workspace = true }
futures-retry = { workspace = true }
Expand Down
138 changes: 120 additions & 18 deletions crates/turbo-tasks-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::{
path::{Path, PathBuf, MAIN_SEPARATOR},
sync::{
mpsc::{channel, RecvError, TryRecvError},
Arc, Mutex,
Arc, Mutex, MutexGuard,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MutexGuard is unused on Mac/Windows, giving a clippy warning.

},
time::Duration,
};
Expand Down Expand Up @@ -80,6 +80,64 @@ pub trait FileSystem: ValueToString {
fn metadata(&self, fs_path: FileSystemPathVc) -> FileMetaVc;
}

#[derive(Default)]
struct DiskWatcher {
watcher: Mutex<Option<RecommendedWatcher>>,
/// Keeps track of which directories are currently watched. This is only
/// used on a OS that doesn't support recursive watching.
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
watching: dashmap::DashSet<PathBuf>,
}

impl DiskWatcher {
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
fn restore_if_watching(&self, dir_path: &Path, root_path: &Path) -> Result<()> {
if self.watching.contains(dir_path) {
let mut watcher = self.watcher.lock().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a lock that's only used inside a loop. If we passed the HashSet to this method, we could lock once and iterate

self.start_watching(&mut watcher, dir_path, &root_path)?;
}
Ok(())
}

#[cfg(not(any(target_os = "macos", target_os = "windows")))]
fn ensure_watching(&self, dir_path: &Path, root_path: &Path) -> Result<()> {
if self.watching.contains(dir_path) {
return Ok(());
}
let mut watcher = self.watcher.lock().unwrap();
if self.watching.insert(dir_path.to_path_buf()) {
self.start_watching(&mut watcher, dir_path, root_path)?;
}
Ok(())
}

#[cfg(not(any(target_os = "macos", target_os = "windows")))]
fn start_watching(
&self,
watcher: &mut MutexGuard<Option<RecommendedWatcher>>,
dir_path: &Path,
root_path: &Path,
) -> Result<()> {
if let Some(watcher) = watcher.as_mut() {
let mut path = dir_path;
while let Err(err) = watcher.watch(path, RecursiveMode::NonRecursive) {
if path == root_path {
return Err(err).context(format!(
"Unable to watch {} (tried up to {})",
dir_path.display(),
path.display()
));
}
let Some(parent_path) = path.parent() else {
return Err(err).context(format!("Unable to watch {} (tried up to {})", dir_path.display(), path.display()));
};
path = parent_path;
}
}
Ok(())
}
}

#[turbo_tasks::value(cell = "new", eq = "manual")]
pub struct DiskFileSystem {
pub name: String,
Expand All @@ -93,20 +151,36 @@ pub struct DiskFileSystem {
dir_invalidator_map: Arc<InvalidatorMap>,
#[turbo_tasks(debug_ignore, trace_ignore)]
#[serde(skip)]
watcher: Mutex<Option<RecommendedWatcher>>,
watcher: Arc<DiskWatcher>,
}

impl DiskFileSystem {
/// Returns the root as Path
fn root_path(&self) -> &Path {
simplified(Path::new(&self.root))
}

/// registers the path as an invalidator for the current task,
/// has to be called within a turbo-tasks function
fn register_invalidator(&self, path: impl AsRef<Path>, file: bool) {
fn register_invalidator(&self, path: &Path) -> Result<()> {
let invalidator = turbo_tasks::get_invalidator();
if file {
self.invalidator_map.insert(path_to_key(path), invalidator);
} else {
self.dir_invalidator_map
.insert(path_to_key(path), invalidator);
self.invalidator_map.insert(path_to_key(path), invalidator);
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
if let Some(dir) = path.parent() {
self.watcher.ensure_watching(dir, self.root_path())?;
}
Ok(())
}

/// registers the path as an invalidator for the current task,
/// has to be called within a turbo-tasks function
fn register_dir_invalidator(&self, path: &Path) -> Result<()> {
let invalidator = turbo_tasks::get_invalidator();
self.dir_invalidator_map
.insert(path_to_key(path), invalidator);
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
self.watcher.ensure_watching(path, self.root_path())?;
Ok(())
}

pub fn invalidate(&self) {
Expand All @@ -119,7 +193,7 @@ impl DiskFileSystem {
}

pub fn start_watching(&self) -> Result<()> {
let mut watcher_guard = self.watcher.lock().unwrap();
let mut watcher_guard = self.watcher.watcher.lock().unwrap();
if watcher_guard.is_some() {
return Ok(());
}
Expand All @@ -133,7 +207,12 @@ impl DiskFileSystem {
let mut watcher = watcher(tx, Duration::from_millis(1))?;
// Add a path to be watched. All files and directories at that path and
// below will be monitored for changes.
#[cfg(any(target_os = "macos", target_os = "windows"))]
watcher.watch(&root, RecursiveMode::Recursive)?;
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
for dir_path in self.watcher.watching.iter() {
watcher.watch(&*dir_path, RecursiveMode::NonRecursive)?;
}

// We need to invalidate all reads that happened before watching
// Best is to start_watching before starting to read
Expand All @@ -145,12 +224,18 @@ impl DiskFileSystem {
}

watcher_guard.replace(watcher);
drop(watcher_guard);

let disk_watcher = self.watcher.clone();
let root_path = self.root_path().to_path_buf();
Comment on lines +229 to +230
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto unused. Could we make a consistent interface between Mac and Linux so we can just make calls to empty methods?


spawn_thread(move || {
let mut batched_invalidate_path = HashSet::new();
let mut batched_invalidate_path_dir = HashSet::new();
let mut batched_invalidate_path_and_children = HashSet::new();
let mut batched_invalidate_path_and_children_dir = HashSet::new();
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
let mut batched_new_paths = HashSet::new();

'outer: loop {
let mut event = rx.recv().map_err(|e| match e {
Expand All @@ -161,7 +246,16 @@ impl DiskFileSystem {
Ok(DebouncedEvent::Write(path)) => {
batched_invalidate_path.insert(path);
}
Ok(DebouncedEvent::Create(path)) | Ok(DebouncedEvent::Remove(path)) => {
Ok(DebouncedEvent::Create(path)) => {
batched_invalidate_path_and_children.insert(path.clone());
batched_invalidate_path_and_children_dir.insert(path.clone());
if let Some(parent) = path.parent() {
batched_invalidate_path_dir.insert(PathBuf::from(parent));
}
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
batched_new_paths.insert(path.clone());
}
Ok(DebouncedEvent::Remove(path)) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to remove a from the DiskWatcher in this case?

batched_invalidate_path_and_children.insert(path.clone());
batched_invalidate_path_and_children_dir.insert(path.clone());
if let Some(parent) = path.parent() {
Expand All @@ -177,6 +271,8 @@ impl DiskFileSystem {
if let Some(parent) = destination.parent() {
batched_invalidate_path_dir.insert(PathBuf::from(parent));
}
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
batched_new_paths.insert(destination.clone());
}
Ok(DebouncedEvent::Rescan) => {
batched_invalidate_path_and_children.insert(PathBuf::from(&root));
Expand Down Expand Up @@ -257,21 +353,27 @@ impl DiskFileSystem {
&mut batched_invalidate_path_and_children_dir,
);
}
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
{
for path in batched_new_paths.drain() {
let _ = disk_watcher.restore_if_watching(&path, &root_path);
}
}
}
});
Ok(())
}

pub fn stop_watching(&self) {
if let Some(watcher) = self.watcher.lock().unwrap().take() {
if let Some(watcher) = self.watcher.watcher.lock().unwrap().take() {
drop(watcher);
// thread will detect the stop because the channel is disconnected
}
}

pub async fn to_sys_path(&self, fs_path: FileSystemPathVc) -> Result<PathBuf> {
// just in case there's a windows unc path prefix we remove it with `dunce`
let path = simplified(Path::new(&self.root));
let path = self.root_path();
let fs_path = fs_path.await?;
Ok(if fs_path.path.is_empty() {
path.to_path_buf()
Expand Down Expand Up @@ -299,7 +401,7 @@ impl DiskFileSystemVc {
mutex_map: Default::default(),
invalidator_map: Arc::new(InvalidatorMap::new()),
dir_invalidator_map: Arc::new(InvalidatorMap::new()),
watcher: Mutex::new(None),
watcher: Default::default(),
};

Ok(Self::cell(instance))
Expand Down Expand Up @@ -329,7 +431,7 @@ impl FileSystem for DiskFileSystem {
#[turbo_tasks::function]
async fn read(&self, fs_path: FileSystemPathVc) -> Result<FileContentVc> {
let full_path = self.to_sys_path(fs_path).await?;
self.register_invalidator(&full_path, true);
self.register_invalidator(&full_path)?;

let content = read_file(full_path, &self.mutex_map).await?;
Ok(content.cell())
Expand All @@ -338,7 +440,7 @@ impl FileSystem for DiskFileSystem {
#[turbo_tasks::function]
async fn read_dir(&self, fs_path: FileSystemPathVc) -> Result<DirectoryContentVc> {
let full_path = self.to_sys_path(fs_path).await?;
self.register_invalidator(&full_path, false);
self.register_dir_invalidator(&full_path)?;
let fs_path = fs_path.await?;

// we use the sync std function here as it's a lot faster (600%) in
Expand Down Expand Up @@ -392,7 +494,7 @@ impl FileSystem for DiskFileSystem {
#[turbo_tasks::function]
async fn read_link(&self, fs_path: FileSystemPathVc) -> Result<LinkContentVc> {
let full_path = self.to_sys_path(fs_path).await?;
self.register_invalidator(&full_path, true);
self.register_invalidator(&full_path)?;

let _lock = self.mutex_map.lock(full_path.clone()).await;
let link_path = match retry_future(|| fs::read_link(&full_path)).await {
Expand Down Expand Up @@ -474,7 +576,7 @@ impl FileSystem for DiskFileSystem {
#[turbo_tasks::function]
async fn track(&self, fs_path: FileSystemPathVc) -> Result<CompletionVc> {
let full_path = self.to_sys_path(fs_path).await?;
self.register_invalidator(full_path, true);
self.register_invalidator(&full_path)?;
Ok(CompletionVc::new())
}

Expand Down Expand Up @@ -629,7 +731,7 @@ impl FileSystem for DiskFileSystem {
#[turbo_tasks::function]
async fn metadata(&self, fs_path: FileSystemPathVc) -> Result<FileMetaVc> {
let full_path = self.to_sys_path(fs_path).await?;
self.register_invalidator(&full_path, true);
self.register_invalidator(&full_path)?;

let _lock = self.mutex_map.lock(full_path.clone()).await;
let meta = retry_future(|| fs::metadata(full_path.clone()))
Expand Down