From 1819bfb69df331a9abf04c9671af508c3c3ee0f1 Mon Sep 17 00:00:00 2001 From: Mohamed MHAMDI Date: Wed, 12 Apr 2023 16:48:48 +0200 Subject: [PATCH 1/2] fix(config): fix file source watcher stop behavior when Stop is called --- config/source/file/watcher.go | 32 ++++---- config/source/file/watcher_linux.go | 31 ++++--- config/source/file/watcher_test.go | 120 ++++++++++++++++++++++++++++ 3 files changed, 151 insertions(+), 32 deletions(-) create mode 100644 config/source/file/watcher_test.go diff --git a/config/source/file/watcher.go b/config/source/file/watcher.go index 4ae1673600..3b9dc82163 100644 --- a/config/source/file/watcher.go +++ b/config/source/file/watcher.go @@ -13,8 +13,7 @@ import ( type watcher struct { f *file - fw *fsnotify.Watcher - exit chan bool + fw *fsnotify.Watcher } func newWatcher(f *file) (source.Watcher, error) { @@ -26,23 +25,20 @@ func newWatcher(f *file) (source.Watcher, error) { fw.Add(f.path) return &watcher{ - f: f, - fw: fw, - exit: make(chan bool), + f: f, + fw: fw, }, nil } func (w *watcher) Next() (*source.ChangeSet, error) { - // is it closed? - select { - case <-w.exit: - return nil, source.ErrWatcherStopped - default: - } - // try get the event select { - case event, _ := <-w.fw.Events: + case event, ok := <-w.fw.Events: + // check if channel was closed (i.e. Watcher.Close() was called). + if !ok { + return nil, source.ErrWatcherStopped + } + if event.Op == fsnotify.Rename { // check existence of file, and add watch again _, err := os.Stat(event.Name) @@ -55,11 +51,15 @@ func (w *watcher) Next() (*source.ChangeSet, error) { if err != nil { return nil, err } + return c, nil - case err := <-w.fw.Errors: + case err, ok := <-w.fw.Errors: + // check if channel was closed (i.e. Watcher.Close() was called). + if !ok { + return nil, source.ErrWatcherStopped + } + return nil, err - case <-w.exit: - return nil, source.ErrWatcherStopped } } diff --git a/config/source/file/watcher_linux.go b/config/source/file/watcher_linux.go index 36b0ed391c..c08c5b8e43 100644 --- a/config/source/file/watcher_linux.go +++ b/config/source/file/watcher_linux.go @@ -13,8 +13,7 @@ import ( type watcher struct { f *file - fw *fsnotify.Watcher - exit chan bool + fw *fsnotify.Watcher } func newWatcher(f *file) (source.Watcher, error) { @@ -26,23 +25,20 @@ func newWatcher(f *file) (source.Watcher, error) { fw.Add(f.path) return &watcher{ - f: f, - fw: fw, - exit: make(chan bool), + f: f, + fw: fw, }, nil } func (w *watcher) Next() (*source.ChangeSet, error) { - // is it closed? - select { - case <-w.exit: - return nil, source.ErrWatcherStopped - default: - } - // try get the event select { - case event := <-w.fw.Events: + case event, ok := <-w.fw.Events: + // check if channel was closed (i.e. Watcher.Close() was called). + if !ok { + return nil, source.ErrWatcherStopped + } + if event.Op == fsnotify.Rename { // check existence of file, and add watch again _, err := os.Stat(event.Name) @@ -60,10 +56,13 @@ func (w *watcher) Next() (*source.ChangeSet, error) { w.fw.Add(w.f.path) return c, nil - case err := <-w.fw.Errors: + case err, ok := <-w.fw.Errors: + // check if channel was closed (i.e. Watcher.Close() was called). + if !ok { + return nil, source.ErrWatcherStopped + } + return nil, err - case <-w.exit: - return nil, source.ErrWatcherStopped } } diff --git a/config/source/file/watcher_test.go b/config/source/file/watcher_test.go new file mode 100644 index 0000000000..177b0f3c29 --- /dev/null +++ b/config/source/file/watcher_test.go @@ -0,0 +1,120 @@ +package file_test + +import ( + "bytes" + "errors" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "go-micro.dev/v4/config/source" + "go-micro.dev/v4/config/source/file" +) + +// createTestFile a local helper to creates a temporary file with the given data +func createTestFile(data []byte) (*os.File, func(), string, error) { + path := filepath.Join(os.TempDir(), fmt.Sprintf("file.%d", time.Now().UnixNano())) + fh, err := os.Create(path) + if err != nil { + return nil, func() {}, "", err + } + + _, err = fh.Write(data) + if err != nil { + return nil, func() {}, "", err + } + + return fh, func() { + fh.Close() + os.Remove(path) + }, path, err +} + +func TestWatcher(t *testing.T) { + data := []byte(`{"foo": "bar"}`) + fh, cleanup, path, err := createTestFile(data) + if err != nil { + t.Error(err) + } + defer cleanup() + + f := file.NewSource(file.WithPath(path)) + if err != nil { + t.Error(err) + } + + // create a watcher + w, err := f.Watch() + if err != nil { + t.Error(err) + } + + newdata := []byte(`{"foo": "baz"}`) + + go func() { + sc, err := w.Next() + if err != nil { + t.Error(err) + return + } + + if !bytes.Equal(sc.Data, newdata) { + t.Error("expected data to be different") + } + }() + + // rewrite to the file to trigger a change + _, err = fh.WriteAt(newdata, 0) + if err != nil { + t.Error(err) + } + + // wait for the underlying watcher to detect changes + time.Sleep(time.Second) +} + +func TestWatcherStop(t *testing.T) { + data := []byte(`{"foo": "bar"}`) + _, cleanup, path, err := createTestFile(data) + if err != nil { + t.Error(err) + } + defer cleanup() + + src := file.NewSource(file.WithPath(path)) + if err != nil { + t.Error(err) + } + + // create a watcher + w, err := src.Watch() + if err != nil { + t.Error(err) + } + + defer func() { + var err error + c := make(chan struct{}) + defer close(c) + + go func() { + _, err = w.Next() + c <- struct{}{} + }() + + select { + case <-time.After(2 * time.Second): + err = errors.New("timeout waiting for Watcher.Next() to return") + case <-c: + } + + if !errors.Is(err, source.ErrWatcherStopped) { + t.Error(err) + } + }() + + // stop the watcher + w.Stop() +} From d7eb0d667a7b3c42fb3cebf785fd16ebf0de28de Mon Sep 17 00:00:00 2001 From: Mohamed MHAMDI Date: Wed, 12 Apr 2023 17:23:19 +0200 Subject: [PATCH 2/2] fix(config): upgrade fsnotiy to v1.6.0 --- config/source/file/watcher.go | 2 +- config/source/file/watcher_linux.go | 2 +- go.mod | 2 +- go.sum | 3 +++ 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/config/source/file/watcher.go b/config/source/file/watcher.go index 3b9dc82163..5254a7a3e2 100644 --- a/config/source/file/watcher.go +++ b/config/source/file/watcher.go @@ -39,7 +39,7 @@ func (w *watcher) Next() (*source.ChangeSet, error) { return nil, source.ErrWatcherStopped } - if event.Op == fsnotify.Rename { + if event.Has(fsnotify.Rename) { // check existence of file, and add watch again _, err := os.Stat(event.Name) if err == nil || os.IsExist(err) { diff --git a/config/source/file/watcher_linux.go b/config/source/file/watcher_linux.go index c08c5b8e43..0bebeef085 100644 --- a/config/source/file/watcher_linux.go +++ b/config/source/file/watcher_linux.go @@ -39,7 +39,7 @@ func (w *watcher) Next() (*source.ChangeSet, error) { return nil, source.ErrWatcherStopped } - if event.Op == fsnotify.Rename { + if event.Has(fsnotify.Rename) { // check existence of file, and add watch again _, err := os.Stat(event.Name) if err == nil || os.IsExist(err) { diff --git a/go.mod b/go.mod index 3e8c71c14f..98930281c2 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/bitly/go-simplejson v0.5.0 github.com/ef-ds/deque v1.0.4 github.com/evanphx/json-patch/v5 v5.5.0 - github.com/fsnotify/fsnotify v1.4.9 + github.com/fsnotify/fsnotify v1.6.0 github.com/fsouza/go-dockerclient v1.7.3 github.com/go-acme/lego/v4 v4.4.0 github.com/go-git/go-git/v5 v5.4.2 diff --git a/go.sum b/go.sum index d9dd094360..1c1a925db2 100644 --- a/go.sum +++ b/go.sum @@ -339,6 +339,8 @@ github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/fsouza/go-dockerclient v1.7.3 h1:i6iMcktl688vsKUEExA6gU1UjPgIvmGtJeQ0mbuFqZo= github.com/fsouza/go-dockerclient v1.7.3/go.mod h1:8xfZB8o9SptLNJ13VoV5pMiRbZGWkU/Omu5VOu/KC9Y= github.com/fullsailor/pkcs7 v0.0.0-20190404230743-d7302db945fa/go.mod h1:KnogPXtdwXqoenmZCw6S+25EAm2MkxbG0deNDu4cbSA= @@ -1147,6 +1149,7 @@ golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201113234701-d7a72108b828/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=