Skip to content

Commit

Permalink
Merge pull request #40 from benschulz/inotify-race
Browse files Browse the repository at this point in the history
Replaced hand-rolled with mio-based event loop.
  • Loading branch information
passcod committed Dec 6, 2015
2 parents b601e9c + af8e181 commit b0a68e2
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 89 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ walker = "^1.0.0"

[target.x86_64-unknown-linux-gnu.dependencies]
inotify = "^0.1"
mio = "^0.5"

[target.x86_64-unknown-linux-musl.dependencies]
inotify = "^0.1"
mio = "^0.5"

[target.x86_64-apple-darwin.dependencies]
fsevent = "^0.2.11"
Expand Down
241 changes: 152 additions & 89 deletions src/inotify/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,121 @@ extern crate inotify as inotify_sys;
extern crate libc;
extern crate walker;

use mio::{self, EventLoop};
use self::inotify_sys::wrapper::{self, INotify, Watch};
use self::walker::Walker;
use std::collections::HashMap;
use std::fs::metadata;
use std::path::{Path, PathBuf};
use std::sync::mpsc::Sender;
use std::sync::mpsc::{self, Sender};
use std::sync::{Arc, RwLock};
use std::thread;
use std::thread::Builder as ThreadBuilder;
use super::{Error, Event, op, Op, Watcher};

mod flags;

pub struct INotifyWatcher {
const INOTIFY: mio::Token = mio::Token(0);

pub struct INotifyWatcher(mio::Sender<EventLoopMsg>);

struct INotifyHandler {
inotify: INotify,
tx: Sender<Event>,
watches: HashMap<PathBuf, (Watch, flags::Mask)>,
paths: Arc<RwLock<HashMap<Watch, PathBuf>>>
}

impl INotifyWatcher {
fn run(&mut self) {
let mut ino = self.inotify.clone();
let tx = self.tx.clone();
let paths = self.paths.clone();
thread::spawn(move || {
loop {
match ino.wait_for_events() {
Ok(es) => {
for e in es.iter() {
handle_event(e.clone(), &tx, &paths)
enum EventLoopMsg {
AddWatch(PathBuf, Sender<Result<(), Error>>),
RemoveWatch(PathBuf, Sender<Result<(), Error>>),
Shutdown,
}

impl mio::Handler for INotifyHandler {
type Timeout = ();
type Message = EventLoopMsg;

fn ready(&mut self, _event_loop: &mut EventLoop<INotifyHandler>, token: mio::Token, events: mio::EventSet) {
match token {
INOTIFY => {
assert!(events.is_readable());

match self.inotify.available_events() {
Ok(events) => {
assert!(!events.is_empty());

for e in events {
handle_event(e.clone(), &self.tx, &self.paths)
}
},
Err(e) => {
match e.kind() {
_ => {
let _ = tx.send(Event {
path: None,
op: Err(Error::Io(e))
});
}
}
let _ = self.tx.send(Event {
path: None,
op: Err(Error::Io(e))
});
}
}
}
});
_ => unreachable!(),
}
}

fn notify(&mut self, event_loop: &mut EventLoop<INotifyHandler>, msg: EventLoopMsg) {
match msg {
EventLoopMsg::AddWatch(path, tx) => {
let _ = tx.send(self.add_watch_recursively(path));
},
EventLoopMsg::RemoveWatch(path, tx) => {
let _ = tx.send(self.remove_watch(path));
},
EventLoopMsg::Shutdown => {
for path in self.watches.clone().keys() {
let _ = self.remove_watch(path.to_owned());
}
let _ = self.inotify.close();

event_loop.shutdown();
},
}
}
}

impl INotifyHandler {
fn add_watch_recursively(&mut self, path: PathBuf) -> Result<(), Error> {
let is_dir = match metadata(path.as_ref() as &Path) {
Ok(m) => m.is_dir(),
Err(e) => return Err(Error::Io(e)),
};
if is_dir {
match Walker::new(path.as_ref()) {
Ok(dir) => {
for entry in dir {
match entry {
Ok(entry) => {
let path = entry.path();
try!(self.add_watch(path));
},
Err(e) => return Err(Error::Io(e)),
}
}
self.add_watch(path)
},
Err(e) => Err(Error::Io(e))
}
} else {
self.add_watch(path)
}
}

fn add_watch<P: AsRef<Path>>(&mut self, path: P) -> Result<(), Error> {
let mut watching = flags::IN_ATTRIB
| flags::IN_CREATE
| flags::IN_DELETE
| flags::IN_DELETE_SELF
| flags::IN_MODIFY
| flags::IN_MOVED_FROM
| flags::IN_MOVED_TO
| flags::IN_MOVE_SELF;
let path = path.as_ref().to_path_buf();
fn add_watch(&mut self, path: PathBuf) -> Result<(), Error> {
let mut watching = flags::IN_ATTRIB
| flags::IN_CREATE
| flags::IN_DELETE
| flags::IN_DELETE_SELF
| flags::IN_MODIFY
| flags::IN_MOVED_FROM
| flags::IN_MOVED_TO
| flags::IN_MOVE_SELF;
match self.watches.get(&path) {
None => {},
Some(p) => {
Expand All @@ -77,6 +135,24 @@ impl INotifyWatcher {
}
}
}

fn remove_watch(&mut self, path: PathBuf) -> Result<(), Error> {
match self.watches.remove(&path) {
None => Err(Error::WatchNotFound),
Some(p) => {
let w = &p.0;
match self.inotify.rm_watch(w.clone()) {
Err(e) => Err(Error::Io(e)),
Ok(_) => {
// Nothing depends on the value being gone
// from here now that inotify isn't watching.
(*self.paths).write().unwrap().remove(w);
Ok(())
}
}
}
}
}
}

#[inline]
Expand Down Expand Up @@ -116,73 +192,60 @@ fn handle_event(event: wrapper::Event, tx: &Sender<Event>, paths: &Arc<RwLock<Ha

impl Watcher for INotifyWatcher {
fn new(tx: Sender<Event>) -> Result<INotifyWatcher, Error> {
let mut it = match INotify::init() {
Ok(i) => INotifyWatcher {
inotify: i,
tx: tx,
watches: HashMap::new(), // TODO: use bimap?
paths: Arc::new(RwLock::new(HashMap::new()))
},
Err(e) => return Err(Error::Io(e))
};
INotify::init()
.and_then(|inotify| EventLoop::new().map(|l| (inotify, l)))
.and_then(|(inotify, mut event_loop)| {
let inotify_fd = inotify.fd;
let evented_inotify = mio::unix::EventedFd(&inotify_fd);

let handler = INotifyHandler {
inotify: inotify,
tx: tx,
watches: HashMap::new(),
paths: Arc::new(RwLock::new(HashMap::new()))
};

event_loop.register(&evented_inotify,
INOTIFY,
mio::EventSet::readable(),
mio::PollOpt::level())
.map(|_| (event_loop, handler))
})
.map(|(mut event_loop, mut handler)| {
let channel = event_loop.channel();

ThreadBuilder::new()
.name("INotify Watcher".to_owned())
.spawn(move|| event_loop.run(&mut handler))
.unwrap();

it.run();
return Ok(it);
INotifyWatcher(channel)
})
.map_err(Error::Io)
}

fn watch<P: AsRef<Path>>(&mut self, path: P) -> Result<(), Error> {
let is_dir = match metadata(&path.as_ref()) {
Ok(m) => m.is_dir(),
Err(e) => return Err(Error::Io(e)),
};
if is_dir {
match Walker::new(path.as_ref()) {
Ok(dir) => {
for entry in dir {
match entry {
Ok(entry) => {
let path = entry.path();
try!(self.add_watch(&path));
},
Err(e) => return Err(Error::Io(e)),
}
}
self.add_watch(path.as_ref())
},
Err(e) => Err(Error::Io(e))
}
} else {
self.add_watch(&path.as_ref())
}
let (tx, rx) = mpsc::channel();
let msg = EventLoopMsg::AddWatch(path.as_ref().to_owned(), tx);

// we expect the event loop to live and reply => unwraps must not panic
self.0.send(msg).unwrap();
rx.recv().unwrap()
}

fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<(), Error> {
// FIXME:
// once Rust 1.1 is released, just use a &Path
// Relevant bug is https://github.com/rust-lang/rust/pull/25060
match self.watches.remove(&path.as_ref().to_path_buf()) {
None => Err(Error::WatchNotFound),
Some(p) => {
let w = &p.0;
match self.inotify.rm_watch(w.clone()) {
Err(e) => Err(Error::Io(e)),
Ok(_) => {
// Nothing depends on the value being gone
// from here now that inotify isn't watching.
(*self.paths).write().unwrap().remove(w);
Ok(())
}
}
}
}
let (tx, rx) = mpsc::channel();
let msg = EventLoopMsg::RemoveWatch(path.as_ref().to_owned(), tx);

// we expect the event loop to live and reply => unwraps must not panic
self.0.send(msg).unwrap();
rx.recv().unwrap()
}
}

impl Drop for INotifyWatcher {
fn drop(&mut self) {
for path in self.watches.clone().keys() {
let _ = self.unwatch(path);
}
let _ = self.inotify.close();
// we expect the event loop to live => unwrap must not panic
self.0.send(EventLoopMsg::Shutdown).unwrap();
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#[macro_use] extern crate log;
#[macro_use] extern crate bitflags;
#[cfg(target_os="linux")] extern crate mio;
#[cfg(target_os="macos")] extern crate fsevent_sys;
#[cfg(target_os="windows")] extern crate winapi;
extern crate libc;
Expand Down

0 comments on commit b0a68e2

Please sign in to comment.