From c8ab2f50fe5abea56114e6616241175c2921699b Mon Sep 17 00:00:00 2001 From: Alexander Matyushentsev Date: Fri, 25 Jan 2019 15:54:24 -0800 Subject: [PATCH] Issue #1128 - Use polling instead of fs notify to get annotation changes --- Gopkg.lock | 9 -------- workflow/executor/executor.go | 42 +++++++++++++++++++++++++---------- 2 files changed, 30 insertions(+), 21 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 08a10cc5dbfd..97d653bdbe02 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -152,14 +152,6 @@ revision = "afac545df32f2287a079e2dfb7ba2745a643747e" version = "v3.0.0" -[[projects]] - digest = "1:eb53021a8aa3f599d29c7102e65026242bdedce998a54837dc67f14b6a97c5fd" - name = "github.com/fsnotify/fsnotify" - packages = ["."] - pruneopts = "" - revision = "c2828203cd70a50dcccfb2761f8b1f8ceef9a8e9" - version = "v1.4.7" - [[projects]] branch = "master" digest = "1:ac2bf6881c6a96d07773dee3b9b2b369bc209c988505bd6cb283a8d549cb8699" @@ -1168,7 +1160,6 @@ "github.com/argoproj/pkg/time", "github.com/colinmarc/hdfs", "github.com/evanphx/json-patch", - "github.com/fsnotify/fsnotify", "github.com/ghodss/yaml", "github.com/go-openapi/spec", "github.com/gorilla/websocket", diff --git a/workflow/executor/executor.go b/workflow/executor/executor.go index cc55964e265c..bba57373675c 100644 --- a/workflow/executor/executor.go +++ b/workflow/executor/executor.go @@ -31,7 +31,6 @@ import ( "github.com/argoproj/argo/workflow/artifacts/s3" "github.com/argoproj/argo/workflow/common" argofile "github.com/argoproj/pkg/file" - "github.com/fsnotify/fsnotify" log "github.com/sirupsen/logrus" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -794,19 +793,38 @@ func (we *WorkflowExecutor) waitMainContainerStart() (string, error) { } } +func watchFileChanges(ctx context.Context, pollInterval time.Duration, filePath string) <-chan struct{} { + res := make(chan struct{}) + go func() { + defer close(res) + + var modTime *time.Time + for { + select { + case <-ctx.Done(): + return + default: + } + + file, err := os.Stat(filePath) + if err != nil { + log.Fatal(err) + } + newModTime := file.ModTime() + if modTime != nil && !modTime.Equal(file.ModTime()) { + res <- struct{}{} + } + modTime = &newModTime + time.Sleep(pollInterval) + } + }() + return res +} + // monitorAnnotations starts a goroutine which monitors for any changes to the pod annotations. // Emits an event on the returned channel upon any updates func (we *WorkflowExecutor) monitorAnnotations(ctx context.Context) <-chan struct{} { log.Infof("Starting annotations monitor") - // Create a fsnotify watcher on the local annotations file to listen for updates from the Downward API - watcher, err := fsnotify.NewWatcher() - if err != nil { - log.Fatal(err) - } - err = watcher.Add(we.PodAnnotationsPath) - if err != nil { - log.Fatal(err) - } // Create a channel to listen for a SIGUSR2. Upon receiving of the signal, we force reload our annotations // directly from kubernetes API. The controller uses this to fast-track notification of annotations @@ -819,12 +837,12 @@ func (we *WorkflowExecutor) monitorAnnotations(ctx context.Context) <-chan struc // Create a channel which will notify a listener on new updates to the annotations annotationUpdateCh := make(chan struct{}) + annotationChanges := watchFileChanges(ctx, 10*time.Second, we.PodAnnotationsPath) go func() { for { select { case <-ctx.Done(): log.Infof("Annotations monitor stopped") - _ = watcher.Close() signal.Stop(sigs) close(sigs) close(annotationUpdateCh) @@ -833,7 +851,7 @@ func (we *WorkflowExecutor) monitorAnnotations(ctx context.Context) <-chan struc log.Infof("Received update signal. Reloading annotations from API") annotationUpdateCh <- struct{}{} we.setExecutionControl() - case <-watcher.Events: + case <-annotationChanges: log.Infof("%s updated", we.PodAnnotationsPath) err := we.LoadExecutionControl() if err != nil {