Skip to content

Commit

Permalink
Check for errors during FSEventStreamStart (#57)
Browse files Browse the repository at this point in the history
* Return an error if FSEventStreamStart fails

* Validate the 4096 path limit and error detection

* Add notes for #46 and #48 limitations of macOS

Co-authored-by: Martin Tournoij <martin@arp242.net>
  • Loading branch information
pbnjay and arp242 authored Oct 14, 2022
1 parent 0bd000f commit 3899270
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 9 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@

**Warning:** This API should be considered unstable.

## Caveats

Known caveats of the macOS FSEvents API which this package uses under the hood:

- FSEvents returns events for the named path only, so unless you want to follow updates to a symlink itself (unlikely), you should use `filepath.EvalSymlinks` to get the target path to watch.
- There is an internal macOS limitation of 4096 watched paths. Watching more paths will result in an error calling `Start()`. Note that FSEvents is intended to be a recursive watcher by design, it is actually more efficient to watch the containing path than each file in a large directory.

## Contributing

Request features and report bugs using the [GitHub Issue Tracker](https://github.com/fsnotify/fsevents/issues).
Expand Down
14 changes: 10 additions & 4 deletions fsevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (r *eventStreamRegistry) Delete(i uintptr) {

// Start listening to an event stream. This creates es.Events if it's not already
// a valid channel.
func (es *EventStream) Start() {
func (es *EventStream) Start() error {
if es.Events == nil {
es.Events = make(chan []Event)
}
Expand All @@ -146,7 +146,13 @@ func (es *EventStream) Start() {
cbInfo := registry.Add(es)
es.registryID = cbInfo
es.uuid = GetDeviceUUID(es.Device)
es.start(es.Paths, cbInfo)
err := es.start(es.Paths, cbInfo)
if err != nil {
// Remove eventstream from the registry
registry.Delete(es.registryID)
es.registryID = 0
}
return err
}

// Flush flushes events that have occurred but haven't been delivered.
Expand All @@ -170,8 +176,8 @@ func (es *EventStream) Stop() {

// Restart restarts the event listener. This
// can be used to change the current watch flags.
func (es *EventStream) Restart() {
func (es *EventStream) Restart() error {
es.Stop()
es.Resume = true
es.Start()
return es.Start()
}
93 changes: 92 additions & 1 deletion fsevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package fsevents

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -34,7 +35,10 @@ func TestBasicExample(t *testing.T) {
Flags: FileEvents,
}

es.Start()
err = es.Start()
if err != nil {
t.Fatal(err)
}

wait := make(chan Event)
go func() {
Expand All @@ -58,3 +62,90 @@ func TestBasicExample(t *testing.T) {
t.Fatal("timed out waiting for event")
}
}

func TestIssue48(t *testing.T) {
// FSEvents fails to start when watching >4096 paths
// This test validates that limit and checks that the error is propagated

path, err := ioutil.TempDir("", "fsmanyfiles")
if err != nil {
t.Fatal(err)
}
path, err = filepath.EvalSymlinks(path)
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(path)

// TODO: using this value fails to start
// dev, err := DeviceForPath(path)
// if err != nil {
// t.Fatal(err)
// }

var filenames []string
for i := 0; i < 4096; i++ {
newFilename := filepath.Join(path, fmt.Sprint("test", i))
err = ioutil.WriteFile(newFilename, []byte("test"), 0700)
if err != nil {
t.Fatal(err)
}
filenames = append(filenames, newFilename)
}

es := &EventStream{
Paths: filenames,
Latency: 500 * time.Millisecond,
Device: 0, //dev,
Flags: FileEvents,
}

err = es.Start()
if err != nil {
t.Fatal(err)
}

wait := make(chan Event)
go func() {
for msg := range es.Events {
for _, event := range msg {
t.Logf("Event: %#v", event)
wait <- event
es.Stop()
return
}
}
}()

// write some new contents to test42 in the watchlist
err = ioutil.WriteFile(filenames[42], []byte("special"), 0700)
if err != nil {
t.Fatal(err)
}

// should be reported as expected
<-wait

/////
// create one more file that puts it over the edge
newFilename := filepath.Join(path, fmt.Sprint("test", 4096))
err = ioutil.WriteFile(newFilename, []byte("test"), 0700)
if err != nil {
t.Fatal(err)
}
filenames = append(filenames, newFilename)

// create an all-new instances to avoid problems
es = &EventStream{
Paths: filenames,
Latency: 500 * time.Millisecond,
Device: 0, //dev,
Flags: FileEvents,
}

err = es.Start()
if err == nil {
es.Stop()
t.Fatal("eventstream error was not detected on >4096 files in watchlist")
}
}
17 changes: 13 additions & 4 deletions wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func setupStream(paths []string, flags CreateFlags, callbackInfo uintptr, eventI
return fsEventStreamRef(ref)
}

func (es *EventStream) start(paths []string, callbackInfo uintptr) {
func (es *EventStream) start(paths []string, callbackInfo uintptr) error {

since := eventIDSinceNow
if es.Resume {
Expand All @@ -413,14 +413,23 @@ func (es *EventStream) start(paths []string, callbackInfo uintptr) {

es.stream = setupStream(paths, es.Flags, callbackInfo, since, es.Latency, es.Device)

started := make(chan struct{})
started := make(chan error)

go func() {
runtime.LockOSThread()
es.rlref = cfRunLoopRef(C.CFRunLoopGetCurrent())
C.CFRetain(C.CFTypeRef(es.rlref))
C.FSEventStreamScheduleWithRunLoop(es.stream, C.CFRunLoopRef(es.rlref), C.kCFRunLoopDefaultMode)
C.FSEventStreamStart(es.stream)
if C.FSEventStreamStart(es.stream) == 0 {
// cleanup stream and runloop
C.FSEventStreamInvalidate(es.stream)
C.FSEventStreamRelease(es.stream)
C.CFRelease(C.CFTypeRef(es.rlref))
es.stream = nil
started <- fmt.Errorf("failed to start eventstream")
close(started)
return
}
close(started)
C.CFRunLoopRun()
}()
Expand All @@ -432,7 +441,7 @@ func (es *EventStream) start(paths []string, callbackInfo uintptr) {
es.hasFinalizer = true
}

<-started
return <-started
}

func finalizer(es *EventStream) {
Expand Down

0 comments on commit 3899270

Please sign in to comment.