Skip to content

Commit

Permalink
Merge pull request #6407 from deads2k/parallel-image-import
Browse files Browse the repository at this point in the history
Merged by openshift-bot
  • Loading branch information
OpenShift Bot committed Jan 7, 2016
2 parents a626bb6 + a33813f commit 315002e
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 79 deletions.
6 changes: 2 additions & 4 deletions pkg/cmd/server/origin/run_components.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
buildclient "github.com/openshift/origin/pkg/build/client"
buildcontrollerfactory "github.com/openshift/origin/pkg/build/controller/factory"
buildstrategy "github.com/openshift/origin/pkg/build/controller/strategy"
"github.com/openshift/origin/pkg/client"
cmdutil "github.com/openshift/origin/pkg/cmd/util"
"github.com/openshift/origin/pkg/cmd/util/clientcmd"
configchangecontroller "github.com/openshift/origin/pkg/deploy/controller/configchange"
Expand Down Expand Up @@ -345,10 +346,7 @@ func (c *MasterConfig) RunSDNController() {
// RunImageImportController starts the image import trigger controller process.
func (c *MasterConfig) RunImageImportController() {
osclient := c.ImageImportControllerClient()
factory := imagecontroller.ImportControllerFactory{
Client: osclient,
}
controller := factory.Create()
controller := imagecontroller.NewImportController(client.ImageStreamsNamespacer(osclient), client.ImageStreamMappingsNamespacer(osclient), 10, 2*time.Minute)
controller.Run()
}

Expand Down
191 changes: 175 additions & 16 deletions pkg/image/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,24 @@ package controller

import (
"fmt"
"sync"
"time"

"github.com/golang/glog"

kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
kapierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
kclient "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
kerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch"

"github.com/openshift/origin/pkg/client"
"github.com/openshift/origin/pkg/dockerregistry"
Expand All @@ -22,15 +31,173 @@ type ImportController struct {
mappings client.ImageStreamMappingsNamespacer
// injected for testing
client dockerregistry.Client

stopChan chan struct{}

imageStreamController *framework.Controller

work chan *api.ImageStream
workingSet sets.String
workingSetLock sync.Mutex

// this should not be larger the capacity of the work channel
numParallelImports int
}

func NewImportController(isNamespacer client.ImageStreamsNamespacer, ismNamespacer client.ImageStreamMappingsNamespacer, parallelImports int, resyncInterval time.Duration) *ImportController {
c := &ImportController{
streams: isNamespacer,
mappings: ismNamespacer,

numParallelImports: parallelImports,
work: make(chan *api.ImageStream, 20*parallelImports),
workingSet: sets.String{},
}

_, c.imageStreamController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return c.streams.ImageStreams(kapi.NamespaceAll).List(labels.Everything(), fields.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return c.streams.ImageStreams(kapi.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion)
},
},
&api.ImageStream{},
resyncInterval,
framework.ResourceEventHandlerFuncs{
AddFunc: c.imageStreamAdded,
UpdateFunc: c.imageStreamUpdated,
},
)

return c
}

// Runs controller loops and returns immediately
func (c *ImportController) Run() {
if c.stopChan == nil {
c.stopChan = make(chan struct{})
go c.imageStreamController.Run(c.stopChan)

for i := 0; i < c.numParallelImports; i++ {
go util.Until(c.handleImport, time.Second, c.stopChan)
}
}
}

// Stop gracefully shuts down this controller
func (c *ImportController) Stop() {
if c.stopChan != nil {
close(c.stopChan)
c.stopChan = nil
}
}

func (c *ImportController) imageStreamAdded(obj interface{}) {
imageStream := obj.(*api.ImageStream)
if needsImport(imageStream) {
glog.V(5).Infof("trying to add %s to the worklist", workingSetKey(imageStream))
c.work <- imageStream
glog.V(3).Infof("added %s to the worklist", workingSetKey(imageStream))

} else {
glog.V(5).Infof("not adding %s to the worklist", workingSetKey(imageStream))
}
}

func (c *ImportController) imageStreamUpdated(oldObj interface{}, newObj interface{}) {
newImageStream := newObj.(*api.ImageStream)
if needsImport(newImageStream) {
glog.V(5).Infof("trying to add %s to the worklist", workingSetKey(newImageStream))
c.work <- newImageStream
glog.V(3).Infof("added %s to the worklist", workingSetKey(newImageStream))

} else {
glog.V(5).Infof("not adding %s to the worklist", workingSetKey(newImageStream))
}
}

// needsImport returns true if the provided image stream should have its tags imported.
func needsImport(stream *api.ImageStream) bool {
return stream.Annotations == nil || len(stream.Annotations[api.DockerImageRepositoryCheckAnnotation]) == 0
}

// retryCount is the number of times to retry on a conflict when updating an image stream
const retryCount = 2
func (c *ImportController) handleImport() {
for {
select {
case <-c.stopChan:
return

case staleImageStream := <-c.work:
glog.V(1).Infof("popped %s from the worklist", workingSetKey(staleImageStream))

c.importImageStream(staleImageStream)
}
}
}

func (c *ImportController) importImageStream(staleImageStream *api.ImageStream) {
// if we're already in the workingset, that means that some thread is already trying to do an import for this.
// This does NOT mean that we shouldn't attempt to do this work, only that we shouldn't attempt to do it now.
if !c.addToWorkingSet(staleImageStream) {
// If there isn't any other work in the queue, wait for a while so that we don't hot loop.
// Then requeue to the end of the channel. That allows other work to continue without delay
if len(c.work) == 0 {
time.Sleep(100 * time.Millisecond)
}
glog.V(5).Infof("requeuing %s to the worklist", workingSetKey(staleImageStream))
c.work <- staleImageStream

return
}
defer c.removeFromWorkingSet(staleImageStream)

err := kclient.RetryOnConflict(kclient.DefaultBackoff, func() error {
liveImageStream, err := c.streams.ImageStreams(staleImageStream.Namespace).Get(staleImageStream.Name)
// no work to do here
if kapierrors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
if !needsImport(liveImageStream) {
return nil
}

// if we're notified, do work and then start waiting again.
return c.Next(liveImageStream)
})

if err != nil {
util.HandleError(err)
}
}

func workingSetKey(imageStream *api.ImageStream) string {
return imageStream.Namespace + "/" + imageStream.Name
}

// addToWorkingSet returns true if the image stream was added, false if it was
// already present
func (c *ImportController) addToWorkingSet(imageStream *api.ImageStream) bool {
c.workingSetLock.Lock()
defer c.workingSetLock.Unlock()

if c.workingSet.Has(workingSetKey(imageStream)) {
return false
}

c.workingSet.Insert(workingSetKey(imageStream))
return true
}

func (c *ImportController) removeFromWorkingSet(imageStream *api.ImageStream) {
c.workingSetLock.Lock()
defer c.workingSetLock.Unlock()
c.workingSet.Delete(workingSetKey(imageStream))
}

// Next processes the given image stream, looking for streams that have DockerImageRepository
// set but have not yet been marked as "ready". If transient errors occur, err is returned but
Expand All @@ -55,9 +222,6 @@ const retryCount = 2
// 4. ImageStreamMapping save error
// 5. error when marking ImageStream as imported
func (c *ImportController) Next(stream *api.ImageStream) error {
if !needsImport(stream) {
return nil
}
glog.V(4).Infof("Importing stream %s/%s...", stream.Namespace, stream.Name)

insecure := stream.Annotations[api.InsecureRepositoryAnnotation] == "true"
Expand All @@ -73,7 +237,7 @@ func (c *ImportController) Next(stream *api.ImageStream) error {
if retry {
return err
}
return c.done(stream, err.Error(), retryCount)
return c.done(stream, err.Error())
}
if err != nil {
errlist = append(errlist, err)
Expand All @@ -88,10 +252,10 @@ func (c *ImportController) Next(stream *api.ImageStream) error {
}

if len(errlist) > 0 {
return c.done(stream, kerrors.NewAggregate(errlist).Error(), retryCount)
return c.done(stream, kerrors.NewAggregate(errlist).Error())
}

return c.done(stream, "", retryCount)
return c.done(stream, "")
}

// getTags returns a map of tags to be imported, a flag saying if we should retry
Expand Down Expand Up @@ -254,7 +418,7 @@ func (c *ImportController) importTag(stream *api.ImageStream, tag string, ref ap
}

// done marks the stream as being processed due to an error or failure condition.
func (c *ImportController) done(stream *api.ImageStream, reason string, retry int) error {
func (c *ImportController) done(stream *api.ImageStream, reason string) error {
if len(reason) == 0 {
reason = unversioned.Now().UTC().Format(time.RFC3339)
} else if len(reason) > 300 {
Expand All @@ -265,12 +429,7 @@ func (c *ImportController) done(stream *api.ImageStream, reason string, retry in
stream.Annotations = make(map[string]string)
}
stream.Annotations[api.DockerImageRepositoryCheckAnnotation] = reason
if _, err := c.streams.ImageStreams(stream.Namespace).Update(stream); err != nil && !errors.IsNotFound(err) {
if errors.IsConflict(err) && retry > 0 {
if stream, err := c.streams.ImageStreams(stream.Namespace).Get(stream.Name); err == nil {
return c.done(stream, reason, retry-1)
}
}
if _, err := c.streams.ImageStreams(stream.Namespace).Update(stream); err != nil {
return err
}
return nil
Expand Down
59 changes: 0 additions & 59 deletions pkg/image/controller/factory.go

This file was deleted.

2 changes: 2 additions & 0 deletions test/cmd/images.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ os::cmd::expect_success_and_text "oc get imageStreams postgresql --template='{{.
os::cmd::expect_success_and_text "oc get imageStreams mongodb --template='{{.status.dockerImageRepository}}'" 'mongodb'
# verify the image repository had its tags populated
os::cmd::try_until_success 'oc get imagestreamtags wildfly:latest'
os::cmd::try_until_success "oc get imageStreams wildfly --template='{{ index .metadata.annotations \"openshift.io/image.dockerRepositoryCheck\"}}'"
os::cmd::expect_success_and_text "oc get imageStreams wildfly --template='{{ index .metadata.annotations \"openshift.io/image.dockerRepositoryCheck\"}}'" '[0-9]{4}\-[0-9]{2}\-[0-9]{2}' # expect a date like YYYY-MM-DD
os::cmd::expect_success_and_text 'oc get istag' 'wildfly'
os::cmd::expect_success 'oc annotate istag/wildfly:latest foo=bar'
Expand All @@ -65,6 +66,7 @@ os::cmd::expect_failure 'oc get imageStreams mongodb'
os::cmd::expect_failure 'oc get imageStreams wildfly'
os::cmd::try_until_success 'oc get imagestreamTags mysql:5.5'
os::cmd::try_until_success 'oc get imagestreamTags mysql:5.6'
os::cmd::try_until_success "oc get imagestreams mysql --template='{{ index .metadata.annotations \"openshift.io/image.dockerRepositoryCheck\"}}'"
os::cmd::expect_success_and_text "oc get imagestreams mysql --template='{{ index .metadata.annotations \"openshift.io/image.dockerRepositoryCheck\"}}'" '[0-9]{4}\-[0-9]{2}\-[0-9]{2}' # expect a date like YYYY-MM-DD
os::cmd::expect_success 'oc describe istag/mysql:latest'
os::cmd::expect_success_and_text 'oc describe istag/mysql:latest' 'Environment:'
Expand Down

0 comments on commit 315002e

Please sign in to comment.