Skip to content
This repository has been archived by the owner on Jun 8, 2022. It is now read-only.

Event Processing Pipeline (WIP) #65

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1b0fe62
skip integration tests in -short mode
nathany Sep 11, 2013
95f4bc1
should fetch from fork
nathany Sep 11, 2013
f88b2a8
trigger processing pipeline
nathany Sep 11, 2013
eea7078
newPipeline selectively enables pipeline steps
nathany Sep 21, 2013
ac4b907
introduce Triggers type, a newPipeline option
nathany Sep 21, 2013
03b497f
add WatchPath, Options and a Hidden file filter
nathany Sep 21, 2013
158eb58
file name pattern pipeline step
nathany Sep 21, 2013
fe9c957
a simple throttling filter
nathany Sep 21, 2013
edb9d9d
Verbose option logs all new events
nathany Sep 21, 2013
150925e
handle relative file paths
nathany Sep 22, 2013
374279f
hidden file checking for Windows
nathany Sep 22, 2013
fc1722a
godoc
nathany Sep 22, 2013
dd592a9
use 64-bit vagrant boxes for the race detector (at least for linux)
nathany Sep 22, 2013
84f1118
update AUTHORS
nathany Sep 22, 2013
cd4a1f2
start a CHANGELOG #66
nathany Sep 22, 2013
bba2306
handy project file for sublime text users
nathany Sep 22, 2013
1766670
remove FD_SET and friends
nathany Sep 27, 2013
14aaf6a
Merge remote-tracking branch 'origin/master' into pipeline
nathany Oct 27, 2013
72cb051
documentation updates
nathany Oct 27, 2013
7524685
Event as an exported interface
nathany Oct 27, 2013
2ebc386
unfinished recursive watcher
nathany Oct 27, 2013
05aeb7f
a step closer to autoWatch
nathany Nov 14, 2013
0b9aa69
Merge branch 'master' into pipeline
nathany Nov 14, 2013
bcbf494
Merge branch 'master' into pipeline
nathany Nov 30, 2013
171ba26
copyright headers
nathany Jan 4, 2014
5d97943
spelling mistakes
nathany Jan 4, 2014
142233c
gofmt
nathany Jan 4, 2014
463d45e
TestFsnotifyRecursive
nathany Jan 4, 2014
bfc2020
autowatch pipeline mock/test
nathany Jan 4, 2014
f2614ea
IsDir() for BSD and pipeline tests passing
nathany Jan 4, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Changelog

## unreleased

* `WatchPath()` replaces Watch() and WatchFlags()
* `Triggers` constants replace FSN_CREATE, etc.
* `Event` interface replaces FileEvent
* `Event.Path()` replaces FileEvent.Name
* [Feature] Option to include Hidden folders/files
* [Feature] Option to filter file names with shell patterns
* [Feature] Option to throttle events to avoid duplicates
* [Feature] Option to watch folders recursively

## v0.8.12 / 2013-11-13

* [API] Remove FD_SET and friends from Linux adapter
Expand Down
16 changes: 8 additions & 8 deletions Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ INSTALL = {
}

GO_ARCHIVES = {
"precise32" => "go1.1.2.linux-386.tar.gz",
"freebsd32" => "go1.1.2.freebsd-386.tar.gz"
"precise64" => "go1.1.2.linux-amd64.tar.gz",
"freebsd64" => "go1.1.2.freebsd-amd64.tar.gz"
}

# shell script to bootstrap Go
Expand Down Expand Up @@ -58,10 +58,10 @@ end
Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|

config.vm.define "linux" do |linux|
linux.vm.box = "precise32"
linux.vm.box_url = "http://files.vagrantup.com/precise32.box"
linux.vm.box = "precise64"
linux.vm.box_url = "http://files.vagrantup.com/precise64.box"
linux.vm.synced_folder src_path, "/home/vagrant/go"
linux.vm.provision :shell, :inline => bootstrap("linux", "precise32")
linux.vm.provision :shell, :inline => bootstrap("linux", "precise64")
end

# Pete Cheslock's BSD box
Expand All @@ -71,12 +71,12 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
# * and requires a private host-only network
# * and will prompt for the administrator password of the host
config.vm.define "bsd" do |bsd|
bsd.vm.box = "freebsd32"
bsd.vm.box_url = "http://dyn-vm.s3.amazonaws.com/vagrant/dyn-virtualbox-freebsd-9.1-i386.box"
bsd.vm.box = "freebsd64"
bsd.vm.box_url = "http://dyn-vm.s3.amazonaws.com/vagrant/dyn-virtualbox-freebsd-9.1.box"
bsd.vm.synced_folder ".", "/vagrant", :disabled => true
bsd.vm.synced_folder src_path, "/home/vagrant/go", :nfs => true
bsd.vm.network :private_network, :ip => '10.1.10.5'
bsd.vm.provision :shell, :inline => bootstrap("bsd", "freebsd32")
bsd.vm.provision :shell, :inline => bootstrap("bsd", "freebsd64")
end

end
2 changes: 1 addition & 1 deletion example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func ExampleNewWatcher() {
}
}()

err = watcher.Watch("/tmp/foo")
err = watcher.WatchPath("/tmp/foo", &fsnotify.Options{Recursive: true})
if err != nil {
log.Fatal(err)
}
Expand Down
94 changes: 54 additions & 40 deletions fsnotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,35 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package fsnotify implements filesystem notification.
// Package fsnotify implements file system notifications.
package fsnotify

import "fmt"

// Options for watching paths
type Options struct {
Verbose bool // log events, helpful for debugging
Hidden bool // include hidden files (.DS_Store) and directories (.git, .hg)
Triggers Triggers // Create | Modify | Delete | Rename events (default: all)
Pattern string // comma separated list of shell file name patterns (see filepath.Match)
Throttle bool // events on a file are discarded for the next second
Recursive bool // watch all subdirectories of the specified path
}

// Trigger types to watch for
type Triggers uint32

const (
Create Triggers = 1 << iota
Modify
Delete
Rename

allTriggers Triggers = Modify | Delete | Rename | Create
)

const (
// DEPRECATION(-): please use Triggers
FSN_CREATE = 1
FSN_MODIFY = 2
FSN_DELETE = 4
Expand All @@ -16,68 +39,59 @@ const (
FSN_ALL = FSN_MODIFY | FSN_DELETE | FSN_RENAME | FSN_CREATE
)

// Purge events from interal chan to external chan if passes filter
func (w *Watcher) purgeEvents() {
// Forward events from internal channel to external channel if passes filter
func (w *Watcher) forwardEvents() {
for ev := range w.internalEvent {
sendEvent := false
w.fsnmut.Lock()
fsnFlags := w.fsnFlags[ev.Name]
w.fsnmut.Unlock()
w.pipelinesmut.Lock()
pipeline := w.pipelines[ev.Path()]
w.pipelinesmut.Unlock()

if (fsnFlags&FSN_CREATE == FSN_CREATE) && ev.IsCreate() {
sendEvent = true
}

if (fsnFlags&FSN_MODIFY == FSN_MODIFY) && ev.IsModify() {
sendEvent = true
}

if (fsnFlags&FSN_DELETE == FSN_DELETE) && ev.IsDelete() {
sendEvent = true
}

if (fsnFlags&FSN_RENAME == FSN_RENAME) && ev.IsRename() {
sendEvent = true
}

if sendEvent {
forward := pipeline.processEvent(ev)
if forward {
w.Event <- ev
}

// If there's no file, then no more events for user
// BSD must keep watch for internal use (watches DELETEs to keep track
// what files exist for create events)
if ev.IsDelete() {
w.fsnmut.Lock()
delete(w.fsnFlags, ev.Name)
w.fsnmut.Unlock()
w.pipelinesmut.Lock()
delete(w.pipelines, ev.Path())
w.pipelinesmut.Unlock()
}
}

close(w.Event)
}

// WatchPath watches a given file path with a particular set of options
func (w *Watcher) WatchPath(path string, options *Options) (err error) {
pipeline := newPipeline(options, w)

// TODO: check adapter capabilities
if options.Recursive {
return w.watchRecursively(path, pipeline)
}
return w.watch(path, pipeline)
}

// DEPRECATION(-): please use WatchPath()
// Watch a given file path
func (w *Watcher) Watch(path string) error {
w.fsnmut.Lock()
w.fsnFlags[path] = FSN_ALL
w.fsnmut.Unlock()
return w.watch(path)
return w.WatchPath(path, &Options{Triggers: allTriggers, Hidden: true})
}

// DEPRECATION(-): please use WatchPath()
// Watch a given file path for a particular set of notifications (FSN_MODIFY etc.)
func (w *Watcher) WatchFlags(path string, flags uint32) error {
w.fsnmut.Lock()
w.fsnFlags[path] = flags
w.fsnmut.Unlock()
return w.watch(path)
func (w *Watcher) WatchFlags(path string, flags Triggers) error {
return w.WatchPath(path, &Options{Triggers: flags, Hidden: true})
}

// Remove a watch on a file
func (w *Watcher) RemoveWatch(path string) error {
w.fsnmut.Lock()
delete(w.fsnFlags, path)
w.fsnmut.Unlock()
w.pipelinesmut.Lock()
delete(w.pipelines, path)
w.pipelinesmut.Unlock()
return w.removeWatch(path)
}

Expand Down Expand Up @@ -106,5 +120,5 @@ func (e *FileEvent) String() string {
events = events[1:]
}

return fmt.Sprintf("%q: %s", e.Name, events)
return fmt.Sprintf("%q: %s", e.Path(), events)
}
9 changes: 9 additions & 0 deletions fsnotify.sublime-project
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"folders":
[
{
"path": ".",
"folder_exclude_patterns": [".vagrant"]
}
]
}
70 changes: 42 additions & 28 deletions fsnotify_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ const (
)

type FileEvent struct {
mask uint32 // Mask of events
Name string // File name (optional)
create bool // set by fsnotify package if found new file
mask uint32 // Mask of events
Name string // DEPRECATION(-): please use Path() instead
create bool // set by fsnotify package if found new file
fileInfo os.FileInfo
}

// IsCreate reports whether the FileEvent was triggerd by a creation
Expand All @@ -53,22 +54,28 @@ func (e *FileEvent) IsModify() bool {
// IsRename reports whether the FileEvent was triggerd by a change name
func (e *FileEvent) IsRename() bool { return (e.mask & sys_NOTE_RENAME) == sys_NOTE_RENAME }

// IsDir reports whether the FileEvent was triggerd by a directory
func (e *FileEvent) IsDir() bool { return e.fileInfo != nil && e.fileInfo.IsDir() }

// Path is the relative path and file name of the event
func (e *FileEvent) Path() string { return e.Name }

type Watcher struct {
mu sync.Mutex // Mutex for the Watcher itself.
kq int // File descriptor (as returned by the kqueue() syscall)
watches map[string]int // Map of watched file diescriptors (key: path)
watches map[string]int // Map of watched file descriptors (key: path)
wmut sync.Mutex // Protects access to watches.
fsnFlags map[string]uint32 // Map of watched files to flags used for filter
fsnmut sync.Mutex // Protects access to fsnFlags.
pipelines map[string]pipeline // Map of watched files to pipelines used for filtering
pipelinesmut sync.Mutex // Protects access to pipelines.
enFlags map[string]uint32 // Map of watched files to evfilt note flags used in kqueue
enmut sync.Mutex // Protects access to enFlags.
paths map[int]string // Map of watched paths (key: watch descriptor)
finfo map[int]os.FileInfo // Map of file information (isDir, isReg; key: watch descriptor)
pmut sync.Mutex // Protects access to paths and finfo.
fileExists map[string]bool // Keep track of if we know this file exists (to stop duplicate create events)
femut sync.Mutex // Proctects access to fileExists.
femut sync.Mutex // Protects access to fileExists.
externalWatches map[string]bool // Map of watches added by user of the library.
ewmut sync.Mutex // Protects access to internalWatches.
ewmut sync.Mutex // Protects access to externalWatches.
Error chan error // Errors are sent on this channel
internalEvent chan *FileEvent // Events are queued on this channel
Event chan *FileEvent // Events are returned on this channel
Expand All @@ -87,7 +94,7 @@ func NewWatcher() (*Watcher, error) {
w := &Watcher{
kq: fd,
watches: make(map[string]int),
fsnFlags: make(map[string]uint32),
pipelines: make(map[string]pipeline),
enFlags: make(map[string]uint32),
paths: make(map[int]string),
finfo: make(map[int]os.FileInfo),
Expand All @@ -100,7 +107,7 @@ func NewWatcher() (*Watcher, error) {
}

go w.readEvents()
go w.purgeEvents()
go w.forwardEvents()
return w, nil
}

Expand Down Expand Up @@ -226,7 +233,11 @@ func (w *Watcher) addWatch(path string, flags uint32) error {
}

// Watch adds path to the watched file set, watching all events.
func (w *Watcher) watch(path string) error {
func (w *Watcher) watch(path string, pipeline pipeline) error {
w.pipelinesmut.Lock()
w.pipelines[path] = pipeline
w.pipelinesmut.Unlock()

w.ewmut.Lock()
w.externalWatches[path] = true
w.ewmut.Unlock()
Expand Down Expand Up @@ -280,7 +291,7 @@ func (w *Watcher) removeWatch(path string) error {
}
w.pmut.Unlock()
for idx := 0; idx < len(pathsToRemove); idx++ {
// Since these are internal, not much sense in propogating error
// Since these are internal, not much sense in propagating error
// to the user, as that will just confuse them with an error about
// a path they did not explicitly watch themselves.
w.removeWatch(pathsToRemove[idx])
Expand Down Expand Up @@ -340,15 +351,17 @@ func (w *Watcher) readEvents() {
}
}

// Flush the events we recieved to the events channel
// Flush the events we received to the events channel
for len(events) > 0 {
fileEvent := new(FileEvent)
watchEvent := &events[0]
fileEvent.mask = uint32(watchEvent.Fflags)
w.pmut.Lock()
fileEvent.Name = w.paths[int(watchEvent.Ident)]
fileInfo := w.finfo[int(watchEvent.Ident)]
fileEvent.fileInfo = w.finfo[int(watchEvent.Ident)]
w.pmut.Unlock()

fileInfo := fileEvent.fileInfo
if fileInfo != nil && fileInfo.IsDir() && !fileEvent.IsDelete() {
// Double check to make sure the directory exist. This can happen when
// we do a rm -fr on a recursively watched folders and we receive a
Expand Down Expand Up @@ -414,14 +427,14 @@ func (w *Watcher) watchDirectoryFiles(dirPath string) error {
for _, fileInfo := range files {
filePath := filepath.Join(dirPath, fileInfo.Name())

// Inherit fsnFlags from parent directory
w.fsnmut.Lock()
if flags, found := w.fsnFlags[dirPath]; found {
w.fsnFlags[filePath] = flags
// Inherit pipeline from parent directory
w.pipelinesmut.Lock()
if pipe, found := w.pipelines[dirPath]; found {
w.pipelines[filePath] = pipe
} else {
w.fsnFlags[filePath] = FSN_ALL
w.pipelines[filePath] = pipeline{}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not entirely sure what to do with these fallbacks in BSD and Linux, especially as more steps are added to the pipeline

}
w.fsnmut.Unlock()
w.pipelinesmut.Unlock()

if fileInfo.IsDir() == false {
// Watch file to mimic linux fsnotify
Expand All @@ -430,7 +443,7 @@ func (w *Watcher) watchDirectoryFiles(dirPath string) error {
return e
}
} else {
// If the user is currently waching directory
// If the user is currently watching directory
// we want to preserve the flags used
w.enmut.Lock()
currFlags, found := w.enFlags[filePath]
Expand All @@ -456,7 +469,7 @@ func (w *Watcher) watchDirectoryFiles(dirPath string) error {

// sendDirectoryEvents searches the directory for newly created files
// and sends them over the event channel. This functionality is to have
// the BSD version of fsnotify mach linux fsnotify which provides a
// the BSD version of fsnotify match linux fsnotify which provides a
// create event for files created in a watched directory.
func (w *Watcher) sendDirectoryChangeEvents(dirPath string) {
// Get all files
Expand All @@ -472,19 +485,20 @@ func (w *Watcher) sendDirectoryChangeEvents(dirPath string) {
_, doesExist := w.fileExists[filePath]
w.femut.Unlock()
if !doesExist {
// Inherit fsnFlags from parent directory
w.fsnmut.Lock()
if flags, found := w.fsnFlags[dirPath]; found {
w.fsnFlags[filePath] = flags
// Inherit pipeline from parent directory
w.pipelinesmut.Lock()
if pipe, found := w.pipelines[dirPath]; found {
w.pipelines[filePath] = pipe
} else {
w.fsnFlags[filePath] = FSN_ALL
w.pipelines[filePath] = pipeline{}
}
w.fsnmut.Unlock()
w.pipelinesmut.Unlock()

// Send create event
fileEvent := new(FileEvent)
fileEvent.Name = filePath
fileEvent.create = true
fileEvent.fileInfo = fileInfo
w.internalEvent <- fileEvent
}
w.femut.Lock()
Expand Down
Loading