From af8e18128e5a28015bc4e675db31dad7e2b29b84 Mon Sep 17 00:00:00 2001 From: benshu Date: Sun, 29 Nov 2015 20:14:08 +0100 Subject: [PATCH] Replaced hand-rolled with mio-based event loop. --- Cargo.toml | 2 + src/inotify/mod.rs | 241 ++++++++++++++++++++++++++++----------------- src/lib.rs | 1 + 3 files changed, 155 insertions(+), 89 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 47780f98..0894e7db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,9 +39,11 @@ walker = "^1.0.0" [target.x86_64-unknown-linux-gnu.dependencies] inotify = "^0.1" +mio = "^0.5" [target.x86_64-unknown-linux-musl.dependencies] inotify = "^0.1" +mio = "^0.5" [target.x86_64-apple-darwin.dependencies] fsevent = "^0.2.11" diff --git a/src/inotify/mod.rs b/src/inotify/mod.rs index 1831bca6..be6e5f2a 100644 --- a/src/inotify/mod.rs +++ b/src/inotify/mod.rs @@ -2,63 +2,121 @@ extern crate inotify as inotify_sys; extern crate libc; extern crate walker; +use mio::{self, EventLoop}; use self::inotify_sys::wrapper::{self, INotify, Watch}; use self::walker::Walker; use std::collections::HashMap; use std::fs::metadata; use std::path::{Path, PathBuf}; -use std::sync::mpsc::Sender; +use std::sync::mpsc::{self, Sender}; use std::sync::{Arc, RwLock}; -use std::thread; +use std::thread::Builder as ThreadBuilder; use super::{Error, Event, op, Op, Watcher}; mod flags; -pub struct INotifyWatcher { +const INOTIFY: mio::Token = mio::Token(0); + +pub struct INotifyWatcher(mio::Sender); + +struct INotifyHandler { inotify: INotify, tx: Sender, watches: HashMap, paths: Arc>> } -impl INotifyWatcher { - fn run(&mut self) { - let mut ino = self.inotify.clone(); - let tx = self.tx.clone(); - let paths = self.paths.clone(); - thread::spawn(move || { - loop { - match ino.wait_for_events() { - Ok(es) => { - for e in es.iter() { - handle_event(e.clone(), &tx, &paths) +enum EventLoopMsg { + AddWatch(PathBuf, Sender>), + RemoveWatch(PathBuf, Sender>), + Shutdown, +} + +impl mio::Handler for INotifyHandler { + type Timeout = (); + type Message = EventLoopMsg; + + fn ready(&mut self, _event_loop: &mut EventLoop, token: mio::Token, events: mio::EventSet) { + match token { + INOTIFY => { + assert!(events.is_readable()); + + match self.inotify.available_events() { + Ok(events) => { + assert!(!events.is_empty()); + + for e in events { + handle_event(e.clone(), &self.tx, &self.paths) } }, Err(e) => { - match e.kind() { - _ => { - let _ = tx.send(Event { - path: None, - op: Err(Error::Io(e)) - }); - } - } + let _ = self.tx.send(Event { + path: None, + op: Err(Error::Io(e)) + }); } } } - }); + _ => unreachable!(), + } + } + + fn notify(&mut self, event_loop: &mut EventLoop, msg: EventLoopMsg) { + match msg { + EventLoopMsg::AddWatch(path, tx) => { + let _ = tx.send(self.add_watch_recursively(path)); + }, + EventLoopMsg::RemoveWatch(path, tx) => { + let _ = tx.send(self.remove_watch(path)); + }, + EventLoopMsg::Shutdown => { + for path in self.watches.clone().keys() { + let _ = self.remove_watch(path.to_owned()); + } + let _ = self.inotify.close(); + + event_loop.shutdown(); + }, + } + } +} + +impl INotifyHandler { + fn add_watch_recursively(&mut self, path: PathBuf) -> Result<(), Error> { + let is_dir = match metadata(path.as_ref() as &Path) { + Ok(m) => m.is_dir(), + Err(e) => return Err(Error::Io(e)), + }; + if is_dir { + match Walker::new(path.as_ref()) { + Ok(dir) => { + for entry in dir { + match entry { + Ok(entry) => { + let path = entry.path(); + try!(self.add_watch(path)); + }, + Err(e) => return Err(Error::Io(e)), + } + } + self.add_watch(path) + }, + Err(e) => Err(Error::Io(e)) + } + } else { + self.add_watch(path) + } } - fn add_watch>(&mut self, path: P) -> Result<(), Error> { - let mut watching = flags::IN_ATTRIB - | flags::IN_CREATE - | flags::IN_DELETE - | flags::IN_DELETE_SELF - | flags::IN_MODIFY - | flags::IN_MOVED_FROM - | flags::IN_MOVED_TO - | flags::IN_MOVE_SELF; - let path = path.as_ref().to_path_buf(); + fn add_watch(&mut self, path: PathBuf) -> Result<(), Error> { + let mut watching = flags::IN_ATTRIB + | flags::IN_CREATE + | flags::IN_DELETE + | flags::IN_DELETE_SELF + | flags::IN_MODIFY + | flags::IN_MOVED_FROM + | flags::IN_MOVED_TO + | flags::IN_MOVE_SELF; match self.watches.get(&path) { None => {}, Some(p) => { @@ -77,6 +135,24 @@ impl INotifyWatcher { } } } + + fn remove_watch(&mut self, path: PathBuf) -> Result<(), Error> { + match self.watches.remove(&path) { + None => Err(Error::WatchNotFound), + Some(p) => { + let w = &p.0; + match self.inotify.rm_watch(w.clone()) { + Err(e) => Err(Error::Io(e)), + Ok(_) => { + // Nothing depends on the value being gone + // from here now that inotify isn't watching. + (*self.paths).write().unwrap().remove(w); + Ok(()) + } + } + } + } + } } #[inline] @@ -116,73 +192,60 @@ fn handle_event(event: wrapper::Event, tx: &Sender, paths: &Arc) -> Result { - let mut it = match INotify::init() { - Ok(i) => INotifyWatcher { - inotify: i, - tx: tx, - watches: HashMap::new(), // TODO: use bimap? - paths: Arc::new(RwLock::new(HashMap::new())) - }, - Err(e) => return Err(Error::Io(e)) - }; + INotify::init() + .and_then(|inotify| EventLoop::new().map(|l| (inotify, l))) + .and_then(|(inotify, mut event_loop)| { + let inotify_fd = inotify.fd; + let evented_inotify = mio::unix::EventedFd(&inotify_fd); + + let handler = INotifyHandler { + inotify: inotify, + tx: tx, + watches: HashMap::new(), + paths: Arc::new(RwLock::new(HashMap::new())) + }; + + event_loop.register(&evented_inotify, + INOTIFY, + mio::EventSet::readable(), + mio::PollOpt::level()) + .map(|_| (event_loop, handler)) + }) + .map(|(mut event_loop, mut handler)| { + let channel = event_loop.channel(); + + ThreadBuilder::new() + .name("INotify Watcher".to_owned()) + .spawn(move|| event_loop.run(&mut handler)) + .unwrap(); - it.run(); - return Ok(it); + INotifyWatcher(channel) + }) + .map_err(Error::Io) } fn watch>(&mut self, path: P) -> Result<(), Error> { - let is_dir = match metadata(&path.as_ref()) { - Ok(m) => m.is_dir(), - Err(e) => return Err(Error::Io(e)), - }; - if is_dir { - match Walker::new(path.as_ref()) { - Ok(dir) => { - for entry in dir { - match entry { - Ok(entry) => { - let path = entry.path(); - try!(self.add_watch(&path)); - }, - Err(e) => return Err(Error::Io(e)), - } - } - self.add_watch(path.as_ref()) - }, - Err(e) => Err(Error::Io(e)) - } - } else { - self.add_watch(&path.as_ref()) - } + let (tx, rx) = mpsc::channel(); + let msg = EventLoopMsg::AddWatch(path.as_ref().to_owned(), tx); + + // we expect the event loop to live and reply => unwraps must not panic + self.0.send(msg).unwrap(); + rx.recv().unwrap() } fn unwatch>(&mut self, path: P) -> Result<(), Error> { - // FIXME: - // once Rust 1.1 is released, just use a &Path - // Relevant bug is https://github.com/rust-lang/rust/pull/25060 - match self.watches.remove(&path.as_ref().to_path_buf()) { - None => Err(Error::WatchNotFound), - Some(p) => { - let w = &p.0; - match self.inotify.rm_watch(w.clone()) { - Err(e) => Err(Error::Io(e)), - Ok(_) => { - // Nothing depends on the value being gone - // from here now that inotify isn't watching. - (*self.paths).write().unwrap().remove(w); - Ok(()) - } - } - } - } + let (tx, rx) = mpsc::channel(); + let msg = EventLoopMsg::RemoveWatch(path.as_ref().to_owned(), tx); + + // we expect the event loop to live and reply => unwraps must not panic + self.0.send(msg).unwrap(); + rx.recv().unwrap() } } impl Drop for INotifyWatcher { fn drop(&mut self) { - for path in self.watches.clone().keys() { - let _ = self.unwatch(path); - } - let _ = self.inotify.close(); + // we expect the event loop to live => unwrap must not panic + self.0.send(EventLoopMsg::Shutdown).unwrap(); } } diff --git a/src/lib.rs b/src/lib.rs index 7fd30605..75d229b3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ #[macro_use] extern crate log; #[macro_use] extern crate bitflags; +#[cfg(target_os="linux")] extern crate mio; #[cfg(target_os="macos")] extern crate fsevent_sys; #[cfg(target_os="windows")] extern crate winapi; extern crate libc;