Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replaced hand-rolled with mio-based event loop. #40

Merged
merged 1 commit into from
Dec 6, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ walker = "^1.0.0"

[target.x86_64-unknown-linux-gnu.dependencies]
inotify = "^0.1"
mio = "^0.5"

[target.x86_64-unknown-linux-musl.dependencies]
inotify = "^0.1"
mio = "^0.5"

[target.x86_64-apple-darwin.dependencies]
fsevent = "^0.2.11"
Expand Down
241 changes: 152 additions & 89 deletions src/inotify/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,121 @@ extern crate inotify as inotify_sys;
extern crate libc;
extern crate walker;

use mio::{self, EventLoop};
use self::inotify_sys::wrapper::{self, INotify, Watch};
use self::walker::Walker;
use std::collections::HashMap;
use std::fs::metadata;
use std::path::{Path, PathBuf};
use std::sync::mpsc::Sender;
use std::sync::mpsc::{self, Sender};
use std::sync::{Arc, RwLock};
use std::thread;
use std::thread::Builder as ThreadBuilder;
use super::{Error, Event, op, Op, Watcher};

mod flags;

pub struct INotifyWatcher {
const INOTIFY: mio::Token = mio::Token(0);

pub struct INotifyWatcher(mio::Sender<EventLoopMsg>);

struct INotifyHandler {
inotify: INotify,
tx: Sender<Event>,
watches: HashMap<PathBuf, (Watch, flags::Mask)>,
paths: Arc<RwLock<HashMap<Watch, PathBuf>>>
}

impl INotifyWatcher {
fn run(&mut self) {
let mut ino = self.inotify.clone();
let tx = self.tx.clone();
let paths = self.paths.clone();
thread::spawn(move || {
loop {
match ino.wait_for_events() {
Ok(es) => {
for e in es.iter() {
handle_event(e.clone(), &tx, &paths)
enum EventLoopMsg {
AddWatch(PathBuf, Sender<Result<(), Error>>),
RemoveWatch(PathBuf, Sender<Result<(), Error>>),
Shutdown,
}

impl mio::Handler for INotifyHandler {
type Timeout = ();
type Message = EventLoopMsg;

fn ready(&mut self, _event_loop: &mut EventLoop<INotifyHandler>, token: mio::Token, events: mio::EventSet) {
match token {
INOTIFY => {
assert!(events.is_readable());

match self.inotify.available_events() {
Ok(events) => {
assert!(!events.is_empty());

for e in events {
handle_event(e.clone(), &self.tx, &self.paths)
}
},
Err(e) => {
match e.kind() {
_ => {
let _ = tx.send(Event {
path: None,
op: Err(Error::Io(e))
});
}
}
let _ = self.tx.send(Event {
path: None,
op: Err(Error::Io(e))
});
}
}
}
});
_ => unreachable!(),
}
}

fn notify(&mut self, event_loop: &mut EventLoop<INotifyHandler>, msg: EventLoopMsg) {
match msg {
EventLoopMsg::AddWatch(path, tx) => {
let _ = tx.send(self.add_watch_recursively(path));
},
EventLoopMsg::RemoveWatch(path, tx) => {
let _ = tx.send(self.remove_watch(path));
},
EventLoopMsg::Shutdown => {
for path in self.watches.clone().keys() {
let _ = self.remove_watch(path.to_owned());
}
let _ = self.inotify.close();

event_loop.shutdown();
},
}
}
}

impl INotifyHandler {
fn add_watch_recursively(&mut self, path: PathBuf) -> Result<(), Error> {
let is_dir = match metadata(path.as_ref() as &Path) {
Ok(m) => m.is_dir(),
Err(e) => return Err(Error::Io(e)),
};
if is_dir {
match Walker::new(path.as_ref()) {
Ok(dir) => {
for entry in dir {
match entry {
Ok(entry) => {
let path = entry.path();
try!(self.add_watch(path));
},
Err(e) => return Err(Error::Io(e)),
}
}
self.add_watch(path)
},
Err(e) => Err(Error::Io(e))
}
} else {
self.add_watch(path)
}
}

fn add_watch<P: AsRef<Path>>(&mut self, path: P) -> Result<(), Error> {
let mut watching = flags::IN_ATTRIB
| flags::IN_CREATE
| flags::IN_DELETE
| flags::IN_DELETE_SELF
| flags::IN_MODIFY
| flags::IN_MOVED_FROM
| flags::IN_MOVED_TO
| flags::IN_MOVE_SELF;
let path = path.as_ref().to_path_buf();
fn add_watch(&mut self, path: PathBuf) -> Result<(), Error> {
let mut watching = flags::IN_ATTRIB
| flags::IN_CREATE
| flags::IN_DELETE
| flags::IN_DELETE_SELF
| flags::IN_MODIFY
| flags::IN_MOVED_FROM
| flags::IN_MOVED_TO
| flags::IN_MOVE_SELF;
match self.watches.get(&path) {
None => {},
Some(p) => {
Expand All @@ -77,6 +135,24 @@ impl INotifyWatcher {
}
}
}

fn remove_watch(&mut self, path: PathBuf) -> Result<(), Error> {
match self.watches.remove(&path) {
None => Err(Error::WatchNotFound),
Some(p) => {
let w = &p.0;
match self.inotify.rm_watch(w.clone()) {
Err(e) => Err(Error::Io(e)),
Ok(_) => {
// Nothing depends on the value being gone
// from here now that inotify isn't watching.
(*self.paths).write().unwrap().remove(w);
Ok(())
}
}
}
}
}
}

#[inline]
Expand Down Expand Up @@ -116,73 +192,60 @@ fn handle_event(event: wrapper::Event, tx: &Sender<Event>, paths: &Arc<RwLock<Ha

impl Watcher for INotifyWatcher {
fn new(tx: Sender<Event>) -> Result<INotifyWatcher, Error> {
let mut it = match INotify::init() {
Ok(i) => INotifyWatcher {
inotify: i,
tx: tx,
watches: HashMap::new(), // TODO: use bimap?
paths: Arc::new(RwLock::new(HashMap::new()))
},
Err(e) => return Err(Error::Io(e))
};
INotify::init()
.and_then(|inotify| EventLoop::new().map(|l| (inotify, l)))
.and_then(|(inotify, mut event_loop)| {
let inotify_fd = inotify.fd;
let evented_inotify = mio::unix::EventedFd(&inotify_fd);

let handler = INotifyHandler {
inotify: inotify,
tx: tx,
watches: HashMap::new(),
paths: Arc::new(RwLock::new(HashMap::new()))
};

event_loop.register(&evented_inotify,
INOTIFY,
mio::EventSet::readable(),
mio::PollOpt::level())
.map(|_| (event_loop, handler))
})
.map(|(mut event_loop, mut handler)| {
let channel = event_loop.channel();

ThreadBuilder::new()
.name("INotify Watcher".to_owned())
.spawn(move|| event_loop.run(&mut handler))
.unwrap();

it.run();
return Ok(it);
INotifyWatcher(channel)
})
.map_err(Error::Io)
}

fn watch<P: AsRef<Path>>(&mut self, path: P) -> Result<(), Error> {
let is_dir = match metadata(&path.as_ref()) {
Ok(m) => m.is_dir(),
Err(e) => return Err(Error::Io(e)),
};
if is_dir {
match Walker::new(path.as_ref()) {
Ok(dir) => {
for entry in dir {
match entry {
Ok(entry) => {
let path = entry.path();
try!(self.add_watch(&path));
},
Err(e) => return Err(Error::Io(e)),
}
}
self.add_watch(path.as_ref())
},
Err(e) => Err(Error::Io(e))
}
} else {
self.add_watch(&path.as_ref())
}
let (tx, rx) = mpsc::channel();
let msg = EventLoopMsg::AddWatch(path.as_ref().to_owned(), tx);

// we expect the event loop to live and reply => unwraps must not panic
self.0.send(msg).unwrap();
rx.recv().unwrap()
}

fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<(), Error> {
// FIXME:
// once Rust 1.1 is released, just use a &Path
// Relevant bug is https://github.com/rust-lang/rust/pull/25060
match self.watches.remove(&path.as_ref().to_path_buf()) {
None => Err(Error::WatchNotFound),
Some(p) => {
let w = &p.0;
match self.inotify.rm_watch(w.clone()) {
Err(e) => Err(Error::Io(e)),
Ok(_) => {
// Nothing depends on the value being gone
// from here now that inotify isn't watching.
(*self.paths).write().unwrap().remove(w);
Ok(())
}
}
}
}
let (tx, rx) = mpsc::channel();
let msg = EventLoopMsg::RemoveWatch(path.as_ref().to_owned(), tx);

// we expect the event loop to live and reply => unwraps must not panic
self.0.send(msg).unwrap();
rx.recv().unwrap()
}
}

impl Drop for INotifyWatcher {
fn drop(&mut self) {
for path in self.watches.clone().keys() {
let _ = self.unwatch(path);
}
let _ = self.inotify.close();
// we expect the event loop to live => unwrap must not panic
self.0.send(EventLoopMsg::Shutdown).unwrap();
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#[macro_use] extern crate log;
#[macro_use] extern crate bitflags;
#[cfg(target_os="linux")] extern crate mio;
#[cfg(target_os="macos")] extern crate fsevent_sys;
#[cfg(target_os="windows")] extern crate winapi;
extern crate libc;
Expand Down