Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Reorder boot and verify informer cache sync
Browse files Browse the repository at this point in the history
Before this change the operator would hang at random on "waiting for
informer caches to sync", without anything else happening or it timing
out.

After an extensive search in recently merged PRs, this seems to be
related to an change of order in #1906, where the informer would be
initialized before the operator had registered its event handlers.

In addition to reverting this change and ensuring the informer cache is
synced before it is used, reorder everything so that the operator is
started before anything else that may depend on it, so it can start
processing changes from the queue as soon as possible.
  • Loading branch information
hiddeco committed May 29, 2019
1 parent 123e661 commit 38fa0ec
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 31 deletions.
46 changes: 30 additions & 16 deletions cmd/helm-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/spf13/pflag"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
Expand Down Expand Up @@ -166,19 +167,16 @@ func main() {
TLSHostname: *tillerTLSHostname,
})

// The status updater, to keep track the release status for each
// HelmRelease. It runs as a separate loop for now.
statusUpdater := status.New(ifClient, kubeClient, helmClient, *namespace)
go statusUpdater.Loop(shutdown, log.With(logger, "component", "annotator"))

// setup shared informer for HelmReleases
nsOpt := ifinformers.WithNamespace(*namespace)
ifInformerFactory := ifinformers.NewSharedInformerFactoryWithOptions(ifClient, *chartsSyncInterval, nsOpt)
fhrInformer := ifInformerFactory.Flux().V1beta1().HelmReleases()
go ifInformerFactory.Start(shutdown)

// setup workqueue for HelmReleases
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ChartRelease")

// release instance is needed during the sync of git chart changes and during the sync of HelmRelease changes
// release instance is needed during the sync of git chart changes
// and during the sync of HelmRelease changes
rel := release.New(log.With(logger, "component", "release"), helmClient)
chartSync := chartsync.New(
log.With(logger, "component", "chartsync"),
Expand All @@ -188,21 +186,37 @@ func main() {
chartsync.Config{LogDiffs: *logReleaseDiffs, UpdateDeps: *updateDependencies, GitTimeout: *gitTimeout, GitPollInterval: *gitPollInterval},
*namespace,
)
chartSync.Run(shutdown, errc, shutdownWg)

// start FluxRelease informer
// prepare operator and start FluxRelease informer
// NB: the operator needs to do its magic with the informer
// _before_ starting it or else the cache sync seems to hang at
// random
opr := operator.New(log.With(logger, "component", "operator"), *logReleaseDiffs, kubeClient, fhrInformer, queue, chartSync)
checkpoint.CheckForUpdates(product, version, nil, log.With(logger, "component", "checkpoint"))
go ifInformerFactory.Start(shutdown)

// wait for the caches to be synced before starting _any_ workers
mainLogger.Log("info", "waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(shutdown, fhrInformer.Informer().HasSynced); !ok {
mainLogger.Log("error", "failed to wait for caches to sync")
os.Exit(1)
}
mainLogger.Log("info", "informer caches synced")

// start operator
go opr.Run(1, shutdown, shutdownWg)

// start git sync loop
go chartSync.Run(shutdown, errc, shutdownWg)

// the status updater, to keep track of the release status for
// every HelmRelease
statusUpdater := status.New(ifClient, kubeClient, helmClient, *namespace)
go statusUpdater.Loop(shutdown, log.With(logger, "component", "annotator"))

// start HTTP server
go daemonhttp.ListenAndServe(*listenAddr, chartSync, log.With(logger, "component", "daemonhttp"), shutdown)

// start operator
go func() {
if err = opr.Run(1, shutdown, shutdownWg); err != nil {
errc <- fmt.Errorf(ErrOperatorFailure, err)
}
}()
checkpoint.CheckForUpdates(product, version, nil, log.With(logger, "component", "checkpoint"))

shutdownErr := <-errc
logger.Log("exiting...", shutdownErr)
Expand Down
19 changes: 4 additions & 15 deletions integrations/helm/operator/operator.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package operator

import (
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -117,22 +116,14 @@ func New(
return controller
}

// Run sets up the event handlers for our Custom Resource, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitGroup) error {
// Run starts workers handling the enqueued events. It will block until
// stopCh is closed, at which point it will shutdown the workqueue and
// wait for workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer runtime.HandleCrash()
defer c.releaseWorkqueue.ShutDown()

c.logger.Log("info", "starting operator")
// Wait for the caches to be synced before starting workers
c.logger.Log("info", "waiting for informer caches to sync")

if ok := cache.WaitForCacheSync(stopCh, c.fhrSynced); !ok {
return errors.New("failed to wait for caches to sync")
}
c.logger.Log("info", "unformer caches synced")

c.logger.Log("info", "starting workers")
for i := 0; i < threadiness; i++ {
Expand All @@ -145,8 +136,6 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitG
wg.Done()
}
c.logger.Log("info", "stopping workers")

return nil
}

// runWorker is a long-running function calling the
Expand Down

0 comments on commit 38fa0ec

Please sign in to comment.