diff --git a/src/windows.rs b/src/windows.rs index 366539cb..b57e2e6b 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -1,6 +1,6 @@ extern crate kernel32; -use winapi::{OVERLAPPED, LPOVERLAPPED, HANDLE, INVALID_HANDLE_VALUE, +use winapi::{OVERLAPPED, LPOVERLAPPED, HANDLE, INVALID_HANDLE_VALUE, INFINITE, TRUE, ERROR_OPERATION_ABORTED, FILE_NOTIFY_INFORMATION, fileapi, winbase, winnt}; use std::collections::HashMap; @@ -22,7 +22,7 @@ const BUF_SIZE: u32 = 16384; struct ReadData { dir: PathBuf, // directory that is being watched file: Option, // if a file is being watched, this is its full path - meta_tx: Sender, + complete_sem: HANDLE, } struct ReadDirectoryRequest { @@ -39,14 +39,19 @@ enum Action { } pub enum MetaEvent { - WatcherComplete + SingleWatchComplete +} + +struct WatchState { + dir_handle: HANDLE, + complete_sem: HANDLE, } struct ReadDirectoryChangesServer { rx: Receiver, tx: Sender, meta_tx: Sender, - watches: HashMap + watches: HashMap } impl ReadDirectoryChangesServer { @@ -75,15 +80,8 @@ impl ReadDirectoryChangesServer { Action::Unwatch(path) => self.remove_watch(path), Action::Stop => { stopped = true; - for (_, handle) in &self.watches { - unsafe { - close_handle(*handle); - } - } - // wait for final read callback. required to avoid leaking callback - // memory - unsafe { - kernel32::SleepEx(500, 1); + for (_, ws) in &self.watches { + stop_watch(ws, &self.meta_tx); } break; } @@ -151,28 +149,50 @@ impl ReadDirectoryChangesServer { } else { None }; + // every watcher gets its own semaphore to signal completion + let semaphore = unsafe { + kernel32::CreateSemaphoreW(ptr::null_mut(), 0, 1, ptr::null_mut()) + }; + if semaphore == ptr::null_mut() || semaphore == INVALID_HANDLE_VALUE { + unsafe { kernel32::CloseHandle(handle); } + let _ = self.tx.send(Event { + path: Some(path), + op: Err(Error::Generic("Failed to create semaphore for watch.".to_owned())) + }); + return; + } let rd = ReadData { dir: dir_target, file: wf, - meta_tx: self.meta_tx.clone(), + complete_sem: semaphore + }; + let ws = WatchState { + dir_handle: handle, + complete_sem: semaphore }; - self.watches.insert(path.clone(), handle); + self.watches.insert(path.clone(), ws); start_read(&rd, &self.tx, handle); } fn remove_watch(&mut self, path: PathBuf) { - if let Some(handle) = self.watches.remove(&path) { - unsafe { - close_handle(handle); + if let Some(ws) = self.watches.remove(&path) { + stop_watch(&ws, &self.meta_tx); } } } + +fn stop_watch(ws:&WatchState,meta_tx: &Sender) { + unsafe { + let cio = kernel32::CancelIo(ws.dir_handle); + let ch = kernel32::CloseHandle(ws.dir_handle); + // have to wait for it, otherwise we leak the memory allocated for there read request + if cio != 0 && ch != 0 { + kernel32::WaitForSingleObjectEx(ws.complete_sem, INFINITE, TRUE); } + kernel32::CloseHandle(ws.complete_sem); -unsafe fn close_handle(handle: HANDLE) { - // TODO: Handle errors - kernel32::CancelIo(handle); - kernel32::CloseHandle(handle); + } + let _ = meta_tx.send(MetaEvent::SingleWatchComplete); } fn start_read(rd: &ReadData, tx: &Sender, handle: HANDLE) { @@ -208,7 +228,7 @@ fn start_read(rd: &ReadData, tx: &Sender, handle: HANDLE) { // This is using an asynchronous call with a completion routine for receiving notifications // An I/O completion port would probably be more performant - kernel32::ReadDirectoryChangesW( + let ret = kernel32::ReadDirectoryChangesW( handle, req_buf, BUF_SIZE, @@ -218,9 +238,18 @@ fn start_read(rd: &ReadData, tx: &Sender, handle: HANDLE) { &mut *overlapped as *mut OVERLAPPED, Some(handle_event)); + if ret == 0 { + // error reading. retransmute request memory to allow drop. + // allow overlapped to drop by omitting forget() + let request: Box = mem::transmute(request_p); + + kernel32::ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut()); + } else { + // read ok. forget overlapped to let the completion routine handle memory mem::forget(overlapped); } } +} unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, overlapped: LPOVERLAPPED) { // TODO: Use Box::from_raw when it is no longer unstable @@ -228,9 +257,9 @@ unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, ove let request: Box = mem::transmute(overlapped.hEvent); if error_code == ERROR_OPERATION_ABORTED { - let _ = request.data.meta_tx.send(MetaEvent::WatcherComplete); // received when dir is unwatched or watcher is shutdown; return and let overlapped/request // get drop-cleaned + kernel32::ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut()); return; } diff --git a/tests/windows.rs b/tests/windows.rs index af586f89..666c8af2 100644 --- a/tests/windows.rs +++ b/tests/windows.rs @@ -3,50 +3,85 @@ extern crate tempdir; extern crate tempfile; extern crate time; - use notify::*; -use std::io::Write; -use std::path::{Path, PathBuf}; use std::thread; -use std::sync::mpsc::{channel, Sender, Receiver}; +use std::sync::mpsc::{channel, Receiver}; use tempdir::TempDir; -use tempfile::NamedTempFile; +fn check_for_error(rx:&Receiver) { + while let Ok(res) = rx.try_recv() { + match res.op { + Err(e) => panic!("unexpected err: {:?}: {:?}", e, res.path), + _ => () + } + }; +} #[cfg(target_os="windows")] #[test] fn shutdown() { // create a watcher for n directories. start the watcher, then shut it down. inspect // the watcher to make sure that it received final callbacks for all n watchers. - - let mut dirs:Vec = Vec::new(); let dir_count = 100; // to get meta events, we have to pass in the meta channel let (meta_tx,meta_rx) = channel(); - + let (tx, rx) = channel(); { - let (tx, _) = channel(); + let mut dirs:Vec = Vec::new(); let mut w = ReadDirectoryChangesWatcher::create(tx,meta_tx).unwrap(); for _ in 0..dir_count { - let d = TempDir::new("d").unwrap(); + let d = TempDir::new("rsnotifytest").unwrap(); + dirs.push(d); + } + + for d in &dirs { // need the ref, otherwise its a move and the dir will be dropped! //println!("{:?}", d.path()); w.watch(d.path()).unwrap(); - dirs.push(d); } + + thread::sleep_ms(2000); // give watcher time to watch paths before we drop it + + // unwatch half of the directories, let the others get stopped when we go out of scope + for d in &dirs[0..dir_count/2] { + w.unwatch(d.path()).unwrap(); + } + + thread::sleep_ms(2000); // sleep to unhook the watches } - const TIMEOUT_S: f64 = 4.0; + check_for_error(&rx); + + const TIMEOUT_S: f64 = 60.0; // give it PLENTY of time before we declare failure let deadline = time::precise_time_s() + TIMEOUT_S; let mut watchers_shutdown = 0; - while time::precise_time_s() < deadline { + while watchers_shutdown != dir_count && time::precise_time_s() < deadline { if let Ok(actual) = meta_rx.try_recv() { match actual { - WatcherComplete => watchers_shutdown += 1 + notify::windows::MetaEvent::SingleWatchComplete => watchers_shutdown += 1 } } - thread::sleep_ms(50); + thread::sleep_ms(50); // don't burn cpu, can take some time for completion events to fire } assert_eq!(watchers_shutdown,dir_count); } + +#[cfg(target_os="windows")] +#[test] +#[ignore] +// repeatedly watch and unwatch a directory; make sure process memory does not increase. +// you use task manager to watch the memory; it will fluctuate a bit, but should not leak overall +fn memtest_manual() { + loop { + let (tx, rx) = channel(); + let d = TempDir::new("rsnotifytest").unwrap(); + { + let (meta_tx,_) = channel(); + let mut w = ReadDirectoryChangesWatcher::create(tx,meta_tx).unwrap(); + w.watch(d.path()).unwrap(); + thread::sleep_ms(1); // this should make us run pretty hot but not insane + } + check_for_error(&rx); + } +}