diff --git a/pkg/cmd/server/origin/run_components.go b/pkg/cmd/server/origin/run_components.go index 4af15dafb213..116e1dd91eea 100644 --- a/pkg/cmd/server/origin/run_components.go +++ b/pkg/cmd/server/origin/run_components.go @@ -20,6 +20,7 @@ import ( buildclient "github.com/openshift/origin/pkg/build/client" buildcontrollerfactory "github.com/openshift/origin/pkg/build/controller/factory" buildstrategy "github.com/openshift/origin/pkg/build/controller/strategy" + "github.com/openshift/origin/pkg/client" cmdutil "github.com/openshift/origin/pkg/cmd/util" "github.com/openshift/origin/pkg/cmd/util/clientcmd" configchangecontroller "github.com/openshift/origin/pkg/deploy/controller/configchange" @@ -345,10 +346,7 @@ func (c *MasterConfig) RunSDNController() { // RunImageImportController starts the image import trigger controller process. func (c *MasterConfig) RunImageImportController() { osclient := c.ImageImportControllerClient() - factory := imagecontroller.ImportControllerFactory{ - Client: osclient, - } - controller := factory.Create() + controller := imagecontroller.NewImportController(client.ImageStreamsNamespacer(osclient), client.ImageStreamMappingsNamespacer(osclient), 10, 2*time.Minute) controller.Run() } diff --git a/pkg/image/controller/controller.go b/pkg/image/controller/controller.go index 7f6f14379e14..3ed64058550c 100644 --- a/pkg/image/controller/controller.go +++ b/pkg/image/controller/controller.go @@ -2,15 +2,24 @@ package controller import ( "fmt" + "sync" "time" "github.com/golang/glog" kapi "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/errors" + kapierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/cache" + kclient "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util" kerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/watch" "github.com/openshift/origin/pkg/client" "github.com/openshift/origin/pkg/dockerregistry" @@ -22,6 +31,91 @@ type ImportController struct { mappings client.ImageStreamMappingsNamespacer // injected for testing client dockerregistry.Client + + stopChan chan struct{} + + imageStreamController *framework.Controller + + work chan *api.ImageStream + workingSet sets.String + workingSetLock sync.Mutex + + // this should not be larger the capacity of the work channel + numParallelImports int +} + +func NewImportController(isNamespacer client.ImageStreamsNamespacer, ismNamespacer client.ImageStreamMappingsNamespacer, parallelImports int, resyncInterval time.Duration) *ImportController { + c := &ImportController{ + streams: isNamespacer, + mappings: ismNamespacer, + + numParallelImports: parallelImports, + work: make(chan *api.ImageStream, 20*parallelImports), + workingSet: sets.String{}, + } + + _, c.imageStreamController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return c.streams.ImageStreams(kapi.NamespaceAll).List(labels.Everything(), fields.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return c.streams.ImageStreams(kapi.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion) + }, + }, + &api.ImageStream{}, + resyncInterval, + framework.ResourceEventHandlerFuncs{ + AddFunc: c.imageStreamAdded, + UpdateFunc: c.imageStreamUpdated, + }, + ) + + return c +} + +// Runs controller loops and returns immediately +func (c *ImportController) Run() { + if c.stopChan == nil { + c.stopChan = make(chan struct{}) + go c.imageStreamController.Run(c.stopChan) + + for i := 0; i < c.numParallelImports; i++ { + go util.Until(c.handleImport, time.Second, c.stopChan) + } + } +} + +// Stop gracefully shuts down this controller +func (c *ImportController) Stop() { + if c.stopChan != nil { + close(c.stopChan) + c.stopChan = nil + } +} + +func (c *ImportController) imageStreamAdded(obj interface{}) { + imageStream := obj.(*api.ImageStream) + if needsImport(imageStream) { + glog.V(5).Infof("trying to add %s to the worklist", workingSetKey(imageStream)) + c.work <- imageStream + glog.V(3).Infof("added %s to the worklist", workingSetKey(imageStream)) + + } else { + glog.V(5).Infof("not adding %s to the worklist", workingSetKey(imageStream)) + } +} + +func (c *ImportController) imageStreamUpdated(oldObj interface{}, newObj interface{}) { + newImageStream := newObj.(*api.ImageStream) + if needsImport(newImageStream) { + glog.V(5).Infof("trying to add %s to the worklist", workingSetKey(newImageStream)) + c.work <- newImageStream + glog.V(3).Infof("added %s to the worklist", workingSetKey(newImageStream)) + + } else { + glog.V(5).Infof("not adding %s to the worklist", workingSetKey(newImageStream)) + } } // needsImport returns true if the provided image stream should have its tags imported. @@ -29,8 +123,81 @@ func needsImport(stream *api.ImageStream) bool { return stream.Annotations == nil || len(stream.Annotations[api.DockerImageRepositoryCheckAnnotation]) == 0 } -// retryCount is the number of times to retry on a conflict when updating an image stream -const retryCount = 2 +func (c *ImportController) handleImport() { + for { + select { + case <-c.stopChan: + return + + case staleImageStream := <-c.work: + glog.V(1).Infof("popped %s from the worklist", workingSetKey(staleImageStream)) + + c.importImageStream(staleImageStream) + } + } +} + +func (c *ImportController) importImageStream(staleImageStream *api.ImageStream) { + // if we're already in the workingset, that means that some thread is already trying to do an import for this. + // This does NOT mean that we shouldn't attempt to do this work, only that we shouldn't attempt to do it now. + if !c.addToWorkingSet(staleImageStream) { + // If there isn't any other work in the queue, wait for a while so that we don't hot loop. + // Then requeue to the end of the channel. That allows other work to continue without delay + if len(c.work) == 0 { + time.Sleep(100 * time.Millisecond) + } + glog.V(5).Infof("requeuing %s to the worklist", workingSetKey(staleImageStream)) + c.work <- staleImageStream + + return + } + defer c.removeFromWorkingSet(staleImageStream) + + err := kclient.RetryOnConflict(kclient.DefaultBackoff, func() error { + liveImageStream, err := c.streams.ImageStreams(staleImageStream.Namespace).Get(staleImageStream.Name) + // no work to do here + if kapierrors.IsNotFound(err) { + return nil + } + if err != nil { + return err + } + if !needsImport(liveImageStream) { + return nil + } + + // if we're notified, do work and then start waiting again. + return c.Next(liveImageStream) + }) + + if err != nil { + util.HandleError(err) + } +} + +func workingSetKey(imageStream *api.ImageStream) string { + return imageStream.Namespace + "/" + imageStream.Name +} + +// addToWorkingSet returns true if the image stream was added, false if it was +// already present +func (c *ImportController) addToWorkingSet(imageStream *api.ImageStream) bool { + c.workingSetLock.Lock() + defer c.workingSetLock.Unlock() + + if c.workingSet.Has(workingSetKey(imageStream)) { + return false + } + + c.workingSet.Insert(workingSetKey(imageStream)) + return true +} + +func (c *ImportController) removeFromWorkingSet(imageStream *api.ImageStream) { + c.workingSetLock.Lock() + defer c.workingSetLock.Unlock() + c.workingSet.Delete(workingSetKey(imageStream)) +} // Next processes the given image stream, looking for streams that have DockerImageRepository // set but have not yet been marked as "ready". If transient errors occur, err is returned but @@ -55,9 +222,6 @@ const retryCount = 2 // 4. ImageStreamMapping save error // 5. error when marking ImageStream as imported func (c *ImportController) Next(stream *api.ImageStream) error { - if !needsImport(stream) { - return nil - } glog.V(4).Infof("Importing stream %s/%s...", stream.Namespace, stream.Name) insecure := stream.Annotations[api.InsecureRepositoryAnnotation] == "true" @@ -73,7 +237,7 @@ func (c *ImportController) Next(stream *api.ImageStream) error { if retry { return err } - return c.done(stream, err.Error(), retryCount) + return c.done(stream, err.Error()) } if err != nil { errlist = append(errlist, err) @@ -88,10 +252,10 @@ func (c *ImportController) Next(stream *api.ImageStream) error { } if len(errlist) > 0 { - return c.done(stream, kerrors.NewAggregate(errlist).Error(), retryCount) + return c.done(stream, kerrors.NewAggregate(errlist).Error()) } - return c.done(stream, "", retryCount) + return c.done(stream, "") } // getTags returns a map of tags to be imported, a flag saying if we should retry @@ -254,7 +418,7 @@ func (c *ImportController) importTag(stream *api.ImageStream, tag string, ref ap } // done marks the stream as being processed due to an error or failure condition. -func (c *ImportController) done(stream *api.ImageStream, reason string, retry int) error { +func (c *ImportController) done(stream *api.ImageStream, reason string) error { if len(reason) == 0 { reason = unversioned.Now().UTC().Format(time.RFC3339) } else if len(reason) > 300 { @@ -265,12 +429,7 @@ func (c *ImportController) done(stream *api.ImageStream, reason string, retry in stream.Annotations = make(map[string]string) } stream.Annotations[api.DockerImageRepositoryCheckAnnotation] = reason - if _, err := c.streams.ImageStreams(stream.Namespace).Update(stream); err != nil && !errors.IsNotFound(err) { - if errors.IsConflict(err) && retry > 0 { - if stream, err := c.streams.ImageStreams(stream.Namespace).Get(stream.Name); err == nil { - return c.done(stream, reason, retry-1) - } - } + if _, err := c.streams.ImageStreams(stream.Namespace).Update(stream); err != nil { return err } return nil diff --git a/pkg/image/controller/factory.go b/pkg/image/controller/factory.go deleted file mode 100644 index 181d1dc524e0..000000000000 --- a/pkg/image/controller/factory.go +++ /dev/null @@ -1,59 +0,0 @@ -package controller - -import ( - "time" - - kapi "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/cache" - "k8s.io/kubernetes/pkg/fields" - "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" - kutil "k8s.io/kubernetes/pkg/util" - "k8s.io/kubernetes/pkg/watch" - - "github.com/openshift/origin/pkg/client" - "github.com/openshift/origin/pkg/controller" - "github.com/openshift/origin/pkg/image/api" -) - -// ImportControllerFactory can create an ImportController. -type ImportControllerFactory struct { - Client client.Interface -} - -// Create creates an ImportController. -func (f *ImportControllerFactory) Create() controller.RunnableController { - lw := &cache.ListWatch{ - ListFunc: func() (runtime.Object, error) { - return f.Client.ImageStreams(kapi.NamespaceAll).List(labels.Everything(), fields.Everything()) - }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return f.Client.ImageStreams(kapi.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion) - }, - } - q := cache.NewFIFO(cache.MetaNamespaceKeyFunc) - cache.NewReflector(lw, &api.ImageStream{}, q, 2*time.Minute).Run() - - c := &ImportController{ - streams: f.Client, - mappings: f.Client, - } - - return &controller.RetryController{ - Queue: q, - RetryManager: controller.NewQueueRetryManager( - q, - cache.MetaNamespaceKeyFunc, - func(obj interface{}, err error, retries controller.Retry) bool { - util.HandleError(err) - return retries.Count < 5 - }, - kutil.NewTokenBucketRateLimiter(1, 10), - ), - Handle: func(obj interface{}) error { - r := obj.(*api.ImageStream) - return c.Next(r) - }, - } -} diff --git a/test/cmd/images.sh b/test/cmd/images.sh index eb3618053f51..5e094dfa27e1 100755 --- a/test/cmd/images.sh +++ b/test/cmd/images.sh @@ -47,6 +47,7 @@ os::cmd::expect_success_and_text "oc get imageStreams postgresql --template='{{. os::cmd::expect_success_and_text "oc get imageStreams mongodb --template='{{.status.dockerImageRepository}}'" 'mongodb' # verify the image repository had its tags populated os::cmd::try_until_success 'oc get imagestreamtags wildfly:latest' +os::cmd::try_until_success "oc get imageStreams wildfly --template='{{ index .metadata.annotations \"openshift.io/image.dockerRepositoryCheck\"}}'" os::cmd::expect_success_and_text "oc get imageStreams wildfly --template='{{ index .metadata.annotations \"openshift.io/image.dockerRepositoryCheck\"}}'" '[0-9]{4}\-[0-9]{2}\-[0-9]{2}' # expect a date like YYYY-MM-DD os::cmd::expect_success_and_text 'oc get istag' 'wildfly' os::cmd::expect_success 'oc annotate istag/wildfly:latest foo=bar' @@ -65,6 +66,7 @@ os::cmd::expect_failure 'oc get imageStreams mongodb' os::cmd::expect_failure 'oc get imageStreams wildfly' os::cmd::try_until_success 'oc get imagestreamTags mysql:5.5' os::cmd::try_until_success 'oc get imagestreamTags mysql:5.6' +os::cmd::try_until_success "oc get imagestreams mysql --template='{{ index .metadata.annotations \"openshift.io/image.dockerRepositoryCheck\"}}'" os::cmd::expect_success_and_text "oc get imagestreams mysql --template='{{ index .metadata.annotations \"openshift.io/image.dockerRepositoryCheck\"}}'" '[0-9]{4}\-[0-9]{2}\-[0-9]{2}' # expect a date like YYYY-MM-DD os::cmd::expect_success 'oc describe istag/mysql:latest' os::cmd::expect_success_and_text 'oc describe istag/mysql:latest' 'Environment:'