Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Reorder boot and verify informer cache sync #2103

Merged
merged 1 commit into from
May 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 30 additions & 16 deletions cmd/helm-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/spf13/pflag"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
Expand Down Expand Up @@ -166,19 +167,16 @@ func main() {
TLSHostname: *tillerTLSHostname,
})

// The status updater, to keep track the release status for each
// HelmRelease. It runs as a separate loop for now.
statusUpdater := status.New(ifClient, kubeClient, helmClient, *namespace)
go statusUpdater.Loop(shutdown, log.With(logger, "component", "annotator"))

// setup shared informer for HelmReleases
nsOpt := ifinformers.WithNamespace(*namespace)
ifInformerFactory := ifinformers.NewSharedInformerFactoryWithOptions(ifClient, *chartsSyncInterval, nsOpt)
fhrInformer := ifInformerFactory.Flux().V1beta1().HelmReleases()
go ifInformerFactory.Start(shutdown)

// setup workqueue for HelmReleases
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ChartRelease")

// release instance is needed during the sync of git chart changes and during the sync of HelmRelease changes
// release instance is needed during the sync of git chart changes
// and during the sync of HelmRelease changes
rel := release.New(log.With(logger, "component", "release"), helmClient)
chartSync := chartsync.New(
log.With(logger, "component", "chartsync"),
Expand All @@ -188,21 +186,37 @@ func main() {
chartsync.Config{LogDiffs: *logReleaseDiffs, UpdateDeps: *updateDependencies, GitTimeout: *gitTimeout, GitPollInterval: *gitPollInterval},
*namespace,
)
chartSync.Run(shutdown, errc, shutdownWg)

// start FluxRelease informer
// prepare operator and start FluxRelease informer
// NB: the operator needs to do its magic with the informer
// _before_ starting it or else the cache sync seems to hang at
// random
opr := operator.New(log.With(logger, "component", "operator"), *logReleaseDiffs, kubeClient, fhrInformer, queue, chartSync)
checkpoint.CheckForUpdates(product, version, nil, log.With(logger, "component", "checkpoint"))
go ifInformerFactory.Start(shutdown)

// wait for the caches to be synced before starting _any_ workers
mainLogger.Log("info", "waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(shutdown, fhrInformer.Informer().HasSynced); !ok {
mainLogger.Log("error", "failed to wait for caches to sync")
os.Exit(1)
}
mainLogger.Log("info", "informer caches synced")

// start operator
go opr.Run(1, shutdown, shutdownWg)

// start git sync loop
go chartSync.Run(shutdown, errc, shutdownWg)

// the status updater, to keep track of the release status for
// every HelmRelease
statusUpdater := status.New(ifClient, kubeClient, helmClient, *namespace)
go statusUpdater.Loop(shutdown, log.With(logger, "component", "annotator"))

// start HTTP server
go daemonhttp.ListenAndServe(*listenAddr, chartSync, log.With(logger, "component", "daemonhttp"), shutdown)

// start operator
go func() {
if err = opr.Run(1, shutdown, shutdownWg); err != nil {
errc <- fmt.Errorf(ErrOperatorFailure, err)
}
}()
checkpoint.CheckForUpdates(product, version, nil, log.With(logger, "component", "checkpoint"))

shutdownErr := <-errc
logger.Log("exiting...", shutdownErr)
Expand Down
19 changes: 4 additions & 15 deletions integrations/helm/operator/operator.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package operator

import (
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -117,22 +116,14 @@ func New(
return controller
}

// Run sets up the event handlers for our Custom Resource, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitGroup) error {
// Run starts workers handling the enqueued events. It will block until
// stopCh is closed, at which point it will shutdown the workqueue and
// wait for workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer runtime.HandleCrash()
defer c.releaseWorkqueue.ShutDown()

c.logger.Log("info", "starting operator")
// Wait for the caches to be synced before starting workers
c.logger.Log("info", "waiting for informer caches to sync")

if ok := cache.WaitForCacheSync(stopCh, c.fhrSynced); !ok {
return errors.New("failed to wait for caches to sync")
}
c.logger.Log("info", "unformer caches synced")

c.logger.Log("info", "starting workers")
for i := 0; i < threadiness; i++ {
Expand All @@ -145,8 +136,6 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitG
wg.Done()
}
c.logger.Log("info", "stopping workers")

return nil
}

// runWorker is a long-running function calling the
Expand Down