Skip to content

Commit

Permalink
Windows: fix shutdown leak
Browse files Browse the repository at this point in the history
- Each watcher now has a a semaphore that is used to indicate when the final completion routine is called (or a read failed)
- Also added some general error handling to make the code more robust in the face of deleted watch directories, etc
  • Loading branch information
jmquigs committed Nov 19, 2015
1 parent e6434a4 commit 02bca44
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 39 deletions.
77 changes: 53 additions & 24 deletions src/windows.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
extern crate kernel32;

use winapi::{OVERLAPPED, LPOVERLAPPED, HANDLE, INVALID_HANDLE_VALUE,
use winapi::{OVERLAPPED, LPOVERLAPPED, HANDLE, INVALID_HANDLE_VALUE, INFINITE, TRUE,
ERROR_OPERATION_ABORTED, FILE_NOTIFY_INFORMATION, fileapi, winbase, winnt};

use std::collections::HashMap;
Expand All @@ -22,7 +22,7 @@ const BUF_SIZE: u32 = 16384;
struct ReadData {
dir: PathBuf, // directory that is being watched
file: Option<PathBuf>, // if a file is being watched, this is its full path
meta_tx: Sender<MetaEvent>,
complete_sem: HANDLE,
}

struct ReadDirectoryRequest {
Expand All @@ -39,14 +39,19 @@ enum Action {
}

pub enum MetaEvent {
WatcherComplete
SingleWatchComplete
}

struct WatchState {
dir_handle: HANDLE,
complete_sem: HANDLE,
}

struct ReadDirectoryChangesServer {
rx: Receiver<Action>,
tx: Sender<Event>,
meta_tx: Sender<MetaEvent>,
watches: HashMap<PathBuf, HANDLE>
watches: HashMap<PathBuf, WatchState>
}

impl ReadDirectoryChangesServer {
Expand Down Expand Up @@ -75,15 +80,8 @@ impl ReadDirectoryChangesServer {
Action::Unwatch(path) => self.remove_watch(path),
Action::Stop => {
stopped = true;
for (_, handle) in &self.watches {
unsafe {
close_handle(*handle);
}
}
// wait for final read callback. required to avoid leaking callback
// memory
unsafe {
kernel32::SleepEx(500, 1);
for (_, ws) in &self.watches {
stop_watch(ws, &self.meta_tx);
}
break;
}
Expand Down Expand Up @@ -151,28 +149,50 @@ impl ReadDirectoryChangesServer {
} else {
None
};
// every watcher gets its own semaphore to signal completion
let semaphore = unsafe {
kernel32::CreateSemaphoreW(ptr::null_mut(), 0, 1, ptr::null_mut())
};
if semaphore == ptr::null_mut() || semaphore == INVALID_HANDLE_VALUE {
unsafe { kernel32::CloseHandle(handle); }
let _ = self.tx.send(Event {
path: Some(path),
op: Err(Error::Generic("Failed to create semaphore for watch.".to_owned()))
});
return;
}
let rd = ReadData {
dir: dir_target,
file: wf,
meta_tx: self.meta_tx.clone(),
complete_sem: semaphore
};
let ws = WatchState {
dir_handle: handle,
complete_sem: semaphore
};
self.watches.insert(path.clone(), handle);
self.watches.insert(path.clone(), ws);
start_read(&rd, &self.tx, handle);
}

fn remove_watch(&mut self, path: PathBuf) {
if let Some(handle) = self.watches.remove(&path) {
unsafe {
close_handle(handle);
if let Some(ws) = self.watches.remove(&path) {
stop_watch(&ws, &self.meta_tx);
}
}
}

fn stop_watch(ws:&WatchState,meta_tx: &Sender<MetaEvent>) {
unsafe {
let cio = kernel32::CancelIo(ws.dir_handle);
let ch = kernel32::CloseHandle(ws.dir_handle);
// have to wait for it, otherwise we leak the memory allocated for there read request
if cio != 0 && ch != 0 {
kernel32::WaitForSingleObjectEx(ws.complete_sem, INFINITE, TRUE);
}
kernel32::CloseHandle(ws.complete_sem);

unsafe fn close_handle(handle: HANDLE) {
// TODO: Handle errors
kernel32::CancelIo(handle);
kernel32::CloseHandle(handle);
}
let _ = meta_tx.send(MetaEvent::SingleWatchComplete);
}

fn start_read(rd: &ReadData, tx: &Sender<Event>, handle: HANDLE) {
Expand Down Expand Up @@ -208,7 +228,7 @@ fn start_read(rd: &ReadData, tx: &Sender<Event>, handle: HANDLE) {

// This is using an asynchronous call with a completion routine for receiving notifications
// An I/O completion port would probably be more performant
kernel32::ReadDirectoryChangesW(
let ret = kernel32::ReadDirectoryChangesW(
handle,
req_buf,
BUF_SIZE,
Expand All @@ -218,19 +238,28 @@ fn start_read(rd: &ReadData, tx: &Sender<Event>, handle: HANDLE) {
&mut *overlapped as *mut OVERLAPPED,
Some(handle_event));

if ret == 0 {
// error reading. retransmute request memory to allow drop.
// allow overlapped to drop by omitting forget()
let request: Box<ReadDirectoryRequest> = mem::transmute(request_p);

kernel32::ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut());
} else {
// read ok. forget overlapped to let the completion routine handle memory
mem::forget(overlapped);
}
}
}

unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, overlapped: LPOVERLAPPED) {
// TODO: Use Box::from_raw when it is no longer unstable
let overlapped: Box<OVERLAPPED> = mem::transmute(overlapped);
let request: Box<ReadDirectoryRequest> = mem::transmute(overlapped.hEvent);

if error_code == ERROR_OPERATION_ABORTED {
let _ = request.data.meta_tx.send(MetaEvent::WatcherComplete);
// received when dir is unwatched or watcher is shutdown; return and let overlapped/request
// get drop-cleaned
kernel32::ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut());
return;
}

Expand Down
65 changes: 50 additions & 15 deletions tests/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,50 +3,85 @@ extern crate tempdir;
extern crate tempfile;
extern crate time;


use notify::*;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::thread;
use std::sync::mpsc::{channel, Sender, Receiver};
use std::sync::mpsc::{channel, Receiver};
use tempdir::TempDir;
use tempfile::NamedTempFile;

fn check_for_error(rx:&Receiver<notify::Event>) {
while let Ok(res) = rx.try_recv() {
match res.op {
Err(e) => panic!("unexpected err: {:?}: {:?}", e, res.path),
_ => ()
}
};
}
#[cfg(target_os="windows")]
#[test]
fn shutdown() {
// create a watcher for n directories. start the watcher, then shut it down. inspect
// the watcher to make sure that it received final callbacks for all n watchers.

let mut dirs:Vec<tempdir::TempDir> = Vec::new();
let dir_count = 100;

// to get meta events, we have to pass in the meta channel
let (meta_tx,meta_rx) = channel();

let (tx, rx) = channel();
{
let (tx, _) = channel();
let mut dirs:Vec<tempdir::TempDir> = Vec::new();
let mut w = ReadDirectoryChangesWatcher::create(tx,meta_tx).unwrap();

for _ in 0..dir_count {
let d = TempDir::new("d").unwrap();
let d = TempDir::new("rsnotifytest").unwrap();
dirs.push(d);
}

for d in &dirs { // need the ref, otherwise its a move and the dir will be dropped!
//println!("{:?}", d.path());
w.watch(d.path()).unwrap();
dirs.push(d);
}

thread::sleep_ms(2000); // give watcher time to watch paths before we drop it

// unwatch half of the directories, let the others get stopped when we go out of scope
for d in &dirs[0..dir_count/2] {
w.unwatch(d.path()).unwrap();
}

thread::sleep_ms(2000); // sleep to unhook the watches
}

const TIMEOUT_S: f64 = 4.0;
check_for_error(&rx);

const TIMEOUT_S: f64 = 60.0; // give it PLENTY of time before we declare failure
let deadline = time::precise_time_s() + TIMEOUT_S;
let mut watchers_shutdown = 0;
while time::precise_time_s() < deadline {
while watchers_shutdown != dir_count && time::precise_time_s() < deadline {
if let Ok(actual) = meta_rx.try_recv() {
match actual {
WatcherComplete => watchers_shutdown += 1
notify::windows::MetaEvent::SingleWatchComplete => watchers_shutdown += 1
}
}
thread::sleep_ms(50);
thread::sleep_ms(50); // don't burn cpu, can take some time for completion events to fire
}

assert_eq!(watchers_shutdown,dir_count);
}

#[cfg(target_os="windows")]
#[test]
#[ignore]
// repeatedly watch and unwatch a directory; make sure process memory does not increase.
// you use task manager to watch the memory; it will fluctuate a bit, but should not leak overall
fn memtest_manual() {
loop {
let (tx, rx) = channel();
let d = TempDir::new("rsnotifytest").unwrap();
{
let (meta_tx,_) = channel();
let mut w = ReadDirectoryChangesWatcher::create(tx,meta_tx).unwrap();
w.watch(d.path()).unwrap();
thread::sleep_ms(1); // this should make us run pretty hot but not insane
}
check_for_error(&rx);
}
}

0 comments on commit 02bca44

Please sign in to comment.