From 51e02a2ebea6a88203b323bd57c84e56c9d8c33f Mon Sep 17 00:00:00 2001 From: Hidde Beydals Date: Fri, 5 Apr 2019 19:05:19 +0200 Subject: [PATCH 1/6] Move sync of all HelmReleases to operator Before this change there was a ticker in the ChartChangeSync loop which 'rereleased' all HelmReleases on the set interval, to prevent mutations due to e.g. manual chart releases. This commit moves this logic to the operator by utilizing the resync interval that can be set on the shared informer. On every resync interval, all objects known at that time to the informer are redelivered as 'updates', causing the same effect as the ticker process described above, but with several improvements. 1. The syncHandler validates if a HelmRelease still exists before attempting to run the release. This is an improvement compared to the blind run we did before, where in case a user had many HelmReleases, the Helm release could be removed at the time the release attempt was made. 2. We now act on information from one source (except for the `reconcileReleaseDef()` call on repository changes, see added comment). --- cmd/helm-operator/main.go | 4 +-- integrations/helm/chartsync/chartsync.go | 43 ++++-------------------- integrations/helm/operator/operator.go | 14 ++++---- 3 files changed, 14 insertions(+), 47 deletions(-) diff --git a/cmd/helm-operator/main.go b/cmd/helm-operator/main.go index c63c05231..29f901b8a 100644 --- a/cmd/helm-operator/main.go +++ b/cmd/helm-operator/main.go @@ -173,17 +173,15 @@ func main() { rel := release.New(log.With(logger, "component", "release"), helmClient) chartSync := chartsync.New( log.With(logger, "component", "chartsync"), - chartsync.Polling{Interval: *chartsSyncInterval}, chartsync.Clients{KubeClient: *kubeClient, IfClient: *ifClient}, rel, chartsync.Config{LogDiffs: *logReleaseDiffs, UpdateDeps: *updateDependencies, GitTimeout: *gitTimeout, GitPollInterval: *gitPollInterval}, *namespace, - statusUpdater, ) chartSync.Run(shutdown, errc, shutdownWg) nsOpt := ifinformers.WithNamespace(*namespace) - ifInformerFactory := ifinformers.NewSharedInformerFactoryWithOptions(ifClient, 30*time.Second, nsOpt) + ifInformerFactory := ifinformers.NewSharedInformerFactoryWithOptions(ifClient, *chartsSyncInterval, nsOpt) fhrInformer := ifInformerFactory.Flux().V1beta1().HelmReleases() // start FluxRelease informer diff --git a/integrations/helm/chartsync/chartsync.go b/integrations/helm/chartsync/chartsync.go index f437e1e9d..609d15e2d 100644 --- a/integrations/helm/chartsync/chartsync.go +++ b/integrations/helm/chartsync/chartsync.go @@ -75,10 +75,6 @@ const ( ReasonSuccess = "HelmSuccess" ) -type Polling struct { - Interval time.Duration -} - type Clients struct { KubeClient kubernetes.Clientset IfClient ifclientset.Clientset @@ -109,7 +105,6 @@ type clone struct { } type ChartChangeSync struct { - Polling logger log.Logger kubeClient kubernetes.Clientset ifClient ifclientset.Clientset @@ -124,10 +119,9 @@ type ChartChangeSync struct { namespace string } -func New(logger log.Logger, polling Polling, clients Clients, release *release.Release, config Config, namespace string, statusUpdater *status.Updater) *ChartChangeSync { +func New(logger log.Logger, clients Clients, release *release.Release, config Config, namespace string) *ChartChangeSync { return &ChartChangeSync{ logger: logger, - Polling: polling, kubeClient: clients.KubeClient, ifClient: clients.IfClient, release: release, @@ -151,9 +145,6 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn wg.Done() }() - ticker := time.NewTicker(chs.Polling.Interval) - defer ticker.Stop() - for { select { case reposChanged := <-chs.mirrors.Changes(): @@ -242,18 +233,14 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn } } + // TODO(hidde): if we could somehow signal the + // operator an 'update' has happened we can control + // everything through the queue it maintains... + // Annotating the FHR could be an option as those + // count as updates but I am not sure if Flux would + // see those as in-cluster mutations. chs.reconcileReleaseDef(fhr) } - case <-ticker.C: - // Re-release any chart releases that have apparently - // changed in the cluster. - chs.logger.Log("info", fmt.Sprint("Start of releasesync")) - err := chs.reapplyReleaseDefs() - if err != nil { - chs.logger.Log("error", fmt.Sprintf("Failure to do manual release sync: %s", err)) - } - chs.logger.Log("info", fmt.Sprint("End of releasesync")) - case <-stopCh: chs.logger.Log("stopping", "true") return @@ -400,22 +387,6 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { } } -// reapplyReleaseDefs goes through the resource definitions and -// reconciles them with Helm releases. This is a "backstop" for the -// other sync processes, to cover the case of a release being changed -// out-of-band (e.g., by someone using `helm upgrade`). -func (chs *ChartChangeSync) reapplyReleaseDefs() error { - resources, err := chs.getCustomResources() - if err != nil { - return fmt.Errorf("failed to get HelmRelease resources from the API server: %s", err.Error()) - } - - for _, fhr := range resources { - chs.reconcileReleaseDef(fhr) - } - return nil -} - // DeleteRelease deletes the helm release associated with a // HelmRelease. This exists mainly so that the operator code can // call it when it is handling a resource deletion. diff --git a/integrations/helm/operator/operator.go b/integrations/helm/operator/operator.go index 22f1cdddb..921f6de0e 100644 --- a/integrations/helm/operator/operator.go +++ b/integrations/helm/operator/operator.go @@ -297,15 +297,13 @@ func (c *Controller) enqueueUpdateJob(old, new interface{}) { return } - if diff := cmp.Diff(oldFhr.Spec, newFhr.Spec); diff != "" { - c.logger.Log("info", "UPGRADING release") - if c.logDiffs { - c.logger.Log("info", "Custom Resource driven release upgrade", "diff", diff) - } else { - c.logger.Log("info", "Custom Resource driven release upgrade") - } - c.enqueueJob(new) + c.logger.Log("info", "UPGRADING release") + if diff := cmp.Diff(oldFhr.Spec, newFhr.Spec); diff != "" && c.logDiffs { + c.logger.Log("info", "Custom Resource driven release upgrade", "diff", diff) + } else { + c.logger.Log("info", "Custom Resource driven release upgrade") } + c.enqueueJob(new) } func (c *Controller) deleteRelease(fhr flux_v1beta1.HelmRelease) { From c8d6cde6659ac59c600c14d32b01d2b30cb99039 Mon Sep 17 00:00:00 2001 From: Hidde Beydals Date: Mon, 8 Apr 2019 19:43:11 +0200 Subject: [PATCH 2/6] Do not trigger update event on status changes Before this change, and due to the fact that we no longer compare the spec of the given HelmRelease against the spec of the old one, every status and condition update would enqueue a new release job. By updating the subresource instead of the resource, we no longer trigger the update event, causing no unwanted trigger of a release. --- chart/flux/templates/helm-operator-crd.yaml | 2 ++ deploy-helm/flux-helm-release-crd.yaml | 2 ++ .../apis/flux.weave.works/v1beta1/types.go | 1 - .../v1beta1/fake/fake_helmrelease.go | 12 +++++++ .../flux.weave.works/v1beta1/helmrelease.go | 17 +++++++++ integrations/helm/status/conditions.go | 24 +++++-------- integrations/helm/status/status.go | 36 +++++-------------- 7 files changed, 50 insertions(+), 44 deletions(-) diff --git a/chart/flux/templates/helm-operator-crd.yaml b/chart/flux/templates/helm-operator-crd.yaml index fbb2658a4..c35661750 100644 --- a/chart/flux/templates/helm-operator-crd.yaml +++ b/chart/flux/templates/helm-operator-crd.yaml @@ -21,6 +21,8 @@ spec: shortNames: - hr scope: Namespaced + subresources: + status: {} version: v1beta1 versions: - name: v1beta1 diff --git a/deploy-helm/flux-helm-release-crd.yaml b/deploy-helm/flux-helm-release-crd.yaml index 7c2b9c0e5..e12535d5a 100644 --- a/deploy-helm/flux-helm-release-crd.yaml +++ b/deploy-helm/flux-helm-release-crd.yaml @@ -12,6 +12,8 @@ spec: shortNames: - hr scope: Namespaced + subresources: + status: {} version: v1beta1 versions: - name: v1beta1 diff --git a/integrations/apis/flux.weave.works/v1beta1/types.go b/integrations/apis/flux.weave.works/v1beta1/types.go index 8dd744b16..bf67d5642 100644 --- a/integrations/apis/flux.weave.works/v1beta1/types.go +++ b/integrations/apis/flux.weave.works/v1beta1/types.go @@ -12,7 +12,6 @@ import ( ) // +genclient -// +genclient:noStatus // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // FluxHelmRelease represents custom resource associated with a Helm Chart diff --git a/integrations/client/clientset/versioned/typed/flux.weave.works/v1beta1/fake/fake_helmrelease.go b/integrations/client/clientset/versioned/typed/flux.weave.works/v1beta1/fake/fake_helmrelease.go index 338b18bd1..d8f8a0b56 100644 --- a/integrations/client/clientset/versioned/typed/flux.weave.works/v1beta1/fake/fake_helmrelease.go +++ b/integrations/client/clientset/versioned/typed/flux.weave.works/v1beta1/fake/fake_helmrelease.go @@ -97,6 +97,18 @@ func (c *FakeHelmReleases) Update(helmRelease *v1beta1.HelmRelease) (result *v1b return obj.(*v1beta1.HelmRelease), err } +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakeHelmReleases) UpdateStatus(helmRelease *v1beta1.HelmRelease) (*v1beta1.HelmRelease, error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateSubresourceAction(helmreleasesResource, "status", c.ns, helmRelease), &v1beta1.HelmRelease{}) + + if obj == nil { + return nil, err + } + return obj.(*v1beta1.HelmRelease), err +} + // Delete takes name of the helmRelease and deletes it. Returns an error if one occurs. func (c *FakeHelmReleases) Delete(name string, options *v1.DeleteOptions) error { _, err := c.Fake. diff --git a/integrations/client/clientset/versioned/typed/flux.weave.works/v1beta1/helmrelease.go b/integrations/client/clientset/versioned/typed/flux.weave.works/v1beta1/helmrelease.go index fef6fdc51..e149b286d 100644 --- a/integrations/client/clientset/versioned/typed/flux.weave.works/v1beta1/helmrelease.go +++ b/integrations/client/clientset/versioned/typed/flux.weave.works/v1beta1/helmrelease.go @@ -34,6 +34,7 @@ type HelmReleasesGetter interface { type HelmReleaseInterface interface { Create(*v1beta1.HelmRelease) (*v1beta1.HelmRelease, error) Update(*v1beta1.HelmRelease) (*v1beta1.HelmRelease, error) + UpdateStatus(*v1beta1.HelmRelease) (*v1beta1.HelmRelease, error) Delete(name string, options *v1.DeleteOptions) error DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error Get(name string, options v1.GetOptions) (*v1beta1.HelmRelease, error) @@ -117,6 +118,22 @@ func (c *helmReleases) Update(helmRelease *v1beta1.HelmRelease) (result *v1beta1 return } +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). + +func (c *helmReleases) UpdateStatus(helmRelease *v1beta1.HelmRelease) (result *v1beta1.HelmRelease, err error) { + result = &v1beta1.HelmRelease{} + err = c.client.Put(). + Namespace(c.ns). + Resource("helmreleases"). + Name(helmRelease.Name). + SubResource("status"). + Body(helmRelease). + Do(). + Into(result) + return +} + // Delete takes name of the helmRelease and deletes it. Returns an error if one occurs. func (c *helmReleases) Delete(name string, options *v1.DeleteOptions) error { return c.client.Delete(). diff --git a/integrations/helm/status/conditions.go b/integrations/helm/status/conditions.go index 0d154b588..6715675ed 100644 --- a/integrations/helm/status/conditions.go +++ b/integrations/helm/status/conditions.go @@ -1,17 +1,14 @@ package status import ( - "encoding/json" - "github.com/weaveworks/flux/integrations/apis/flux.weave.works/v1beta1" v1beta1client "github.com/weaveworks/flux/integrations/client/clientset/versioned/typed/flux.weave.works/v1beta1" - "k8s.io/apimachinery/pkg/types" ) // We can't rely on having UpdateStatus, or strategic merge patching // for custom resources. So we have to create an object which // represents the merge path or JSON patch to apply. -func UpdateConditionsPatch(status *v1beta1.HelmReleaseStatus, updates ...v1beta1.HelmReleaseCondition) (types.PatchType, interface{}) { +func UpdateConditionsPatch(status *v1beta1.HelmReleaseStatus, updates ...v1beta1.HelmReleaseCondition) { newConditions := make([]v1beta1.HelmReleaseCondition, len(status.Conditions)) oldConditions := status.Conditions for i, c := range oldConditions { @@ -28,20 +25,15 @@ updates: newConditions = append(newConditions, up) } status.Conditions = newConditions - return types.MergePatchType, map[string]interface{}{ - "status": map[string]interface{}{ - "conditions": newConditions, - }, - } } -// UpdateConditions applies the updates to the HelmRelease given, and patches the resource in the cluster. +// UpdateConditions applies the updates to the HelmRelease given, and +// updates the resource in the cluster. func UpdateConditions(client v1beta1client.HelmReleaseInterface, fhr *v1beta1.HelmRelease, updates ...v1beta1.HelmReleaseCondition) error { - t, obj := UpdateConditionsPatch(&fhr.Status, updates...) - bytes, err := json.Marshal(obj) - if err != nil { - return err - } - _, err = client.Patch(fhr.Name, t, bytes) + fhrCopy := fhr.DeepCopy() + + UpdateConditionsPatch(&fhrCopy.Status, updates...) + _, err := client.UpdateStatus(fhrCopy) + return err } diff --git a/integrations/helm/status/status.go b/integrations/helm/status/status.go index 17b6151f3..d21a49b08 100644 --- a/integrations/helm/status/status.go +++ b/integrations/helm/status/status.go @@ -14,12 +14,10 @@ correspond. Specifically, package status import ( - "encoding/json" "time" "github.com/go-kit/kit/log" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" kube "k8s.io/client-go/kubernetes" "k8s.io/helm/pkg/helm" @@ -104,34 +102,18 @@ bail: } func UpdateReleaseStatus(client v1beta1client.HelmReleaseInterface, fhr v1beta1.HelmRelease, releaseName, releaseStatus string) error { - patchBytes, err := json.Marshal(map[string]interface{}{ - "status": map[string]interface{}{ - "releaseName": releaseName, - "releaseStatus": releaseStatus, - }, - }) - if err == nil { - // CustomResources don't get - // StrategicMergePatch, for now, but since we - // want to unconditionally set the value, this - // is OK. - _, err = client.Patch(fhr.Name, types.MergePatchType, patchBytes) - } + fhr.Status.ReleaseName = releaseName + fhr.Status.ReleaseStatus = releaseStatus + + _, err := client.UpdateStatus(&fhr) + return err } func UpdateReleaseRevision(client v1beta1client.HelmReleaseInterface, fhr v1beta1.HelmRelease, revision string) error { - patchBytes, err := json.Marshal(map[string]interface{}{ - "status": map[string]interface{}{ - "revision": revision, - }, - }) - if err == nil { - // CustomResources don't get - // StrategicMergePatch, for now, but since we - // want to unconditionally set the value, this - // is OK. - _, err = client.Patch(fhr.Name, types.MergePatchType, patchBytes) - } + fhr.Status.Revision = revision + + _, err := client.UpdateStatus(&fhr) + return err } From 5893fa0a3cf0ddbfc7759ae42ab234db3431f647 Mon Sep 17 00:00:00 2001 From: Hidde Beydals Date: Mon, 8 Apr 2019 23:05:26 +0200 Subject: [PATCH 3/6] Enqueue Helm releases on git chart source changes This commit changes a couple of things in how the operator processes and acts on mirror changes. 1. On mirror changes, instead of first requesting all namespaces, and then collecting all HelmReleases (per namespace) from the API, all HelmReleases are now collected from the informer (which caches all HelmReleases the operator has seen), which already takes the namespace restriction into account. 2. Instead of looping over a set of _all_ HelmReleases, determining if they make use of one of the changed mirrors, and calculating what we should do based upon the changed mirror state; we now loop over the changed mirrors we received and request HelmReleases for each one of them, we then calculate what we should do once per mirror and execute it for each HelmRelease we requested earlier. This greatly reduces the amount of calculations we make without changing the outcome. 3. We no longer call the blocking `chs.reconcileReleaseDef()` for each HelmRelease but instead enqueue the HelmReleases in the working queue the operator consumes from. As the operator consuming items from the queue validates if a HelmRelease still exists just before executing release, this prevents cases where we would attempt to release a HelmRelease we still had in memory (due to collecting all HelmReleases at once), while the HelmRelease already had been removed. --- cmd/helm-operator/main.go | 21 ++- integrations/helm/chartsync/chartsync.go | 226 +++++++++++------------ integrations/helm/operator/operator.go | 3 +- 3 files changed, 124 insertions(+), 126 deletions(-) diff --git a/cmd/helm-operator/main.go b/cmd/helm-operator/main.go index 29f901b8a..270ede645 100644 --- a/cmd/helm-operator/main.go +++ b/cmd/helm-operator/main.go @@ -13,6 +13,7 @@ import ( "github.com/spf13/pflag" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/workqueue" "github.com/weaveworks/flux/checkpoint" clientset "github.com/weaveworks/flux/integrations/client/clientset/versioned" @@ -169,25 +170,27 @@ func main() { statusUpdater := status.New(ifClient, kubeClient, helmClient, *namespace) go statusUpdater.Loop(shutdown, log.With(logger, "component", "annotator")) - // release instance is needed during the sync of Charts changes and during the sync of HelmRelease changes + nsOpt := ifinformers.WithNamespace(*namespace) + ifInformerFactory := ifinformers.NewSharedInformerFactoryWithOptions(ifClient, *chartsSyncInterval, nsOpt) + fhrInformer := ifInformerFactory.Flux().V1beta1().HelmReleases() + go ifInformerFactory.Start(shutdown) + + queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ChartRelease") + + // release instance is needed during the sync of git chart changes and during the sync of HelmRelease changes rel := release.New(log.With(logger, "component", "release"), helmClient) chartSync := chartsync.New( log.With(logger, "component", "chartsync"), - chartsync.Clients{KubeClient: *kubeClient, IfClient: *ifClient}, + chartsync.Clients{KubeClient: *kubeClient, IfClient: *ifClient, FhrLister: fhrInformer.Lister()}, rel, + queue, chartsync.Config{LogDiffs: *logReleaseDiffs, UpdateDeps: *updateDependencies, GitTimeout: *gitTimeout, GitPollInterval: *gitPollInterval}, *namespace, ) chartSync.Run(shutdown, errc, shutdownWg) - nsOpt := ifinformers.WithNamespace(*namespace) - ifInformerFactory := ifinformers.NewSharedInformerFactoryWithOptions(ifClient, *chartsSyncInterval, nsOpt) - fhrInformer := ifInformerFactory.Flux().V1beta1().HelmReleases() - // start FluxRelease informer - opr := operator.New(log.With(logger, "component", "operator"), *logReleaseDiffs, kubeClient, fhrInformer, chartSync) - go ifInformerFactory.Start(shutdown) - + opr := operator.New(log.With(logger, "component", "operator"), *logReleaseDiffs, kubeClient, fhrInformer, queue, chartSync) checkpoint.CheckForUpdates(product, version, nil, log.With(logger, "component", "checkpoint")) // start HTTP server diff --git a/integrations/helm/chartsync/chartsync.go b/integrations/helm/chartsync/chartsync.go index 609d15e2d..ba1c867b5 100644 --- a/integrations/helm/chartsync/chartsync.go +++ b/integrations/helm/chartsync/chartsync.go @@ -8,19 +8,17 @@ reconciled: 1a. There is a HelmRelease resource, but no corresponding release. This can happen when the helm operator is first run, for - example. The ChartChangeSync periodically checks for this by - running through the resources and installing any that aren't - released already. + example. 1b. The release corresponding to a HelmRelease has been updated by some other means, perhaps while the operator wasn't running. This - is also checked periodically, by doing a dry-run release and - comparing the result to the release. + is also checked, by doing a dry-run release and comparing the result + to the release. 2. The chart has changed in git, meaning the release is out of - date. The ChartChangeSync responds to new git commits by looking at - each chart that's referenced by a HelmRelease, and if it's - changed since the last seen commit, updating the release. + date. The ChartChangeSync responds to new git commits by looking up + each chart that makes use of the mirror that has new commits, + replacing the clone for that chart, and scheduling a new release. 1a.) and 1b.) run on the same schedule, and 2.) is run when a git mirror reports it has fetched from upstream _and_ (upon checking) the @@ -44,6 +42,8 @@ import ( "sync" "time" + "k8s.io/apimachinery/pkg/labels" + "github.com/go-kit/kit/log" google_protobuf "github.com/golang/protobuf/ptypes/any" "github.com/google/go-cmp/cmp" @@ -52,12 +52,15 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" hapi_chart "k8s.io/helm/pkg/proto/hapi/chart" hapi_release "k8s.io/helm/pkg/proto/hapi/release" "github.com/weaveworks/flux/git" + "github.com/weaveworks/flux/integrations/apis/flux.weave.works/v1beta1" fluxv1beta1 "github.com/weaveworks/flux/integrations/apis/flux.weave.works/v1beta1" ifclientset "github.com/weaveworks/flux/integrations/client/clientset/versioned" + iflister "github.com/weaveworks/flux/integrations/client/listers/flux.weave.works/v1beta1" helmop "github.com/weaveworks/flux/integrations/helm" "github.com/weaveworks/flux/integrations/helm/release" "github.com/weaveworks/flux/integrations/helm/status" @@ -78,6 +81,7 @@ const ( type Clients struct { KubeClient kubernetes.Clientset IfClient ifclientset.Clientset + FhrLister iflister.HelmReleaseLister } type Config struct { @@ -104,12 +108,19 @@ type clone struct { head string } +// ReleaseQueue is an add-only workqueue.RateLimitingInterface +type ReleaseQueue interface { + AddRateLimited(item interface{}) +} + type ChartChangeSync struct { - logger log.Logger - kubeClient kubernetes.Clientset - ifClient ifclientset.Clientset - release *release.Release - config Config + logger log.Logger + kubeClient kubernetes.Clientset + ifClient ifclientset.Clientset + fhrLister iflister.HelmReleaseLister + release *release.Release + releaseQueue ReleaseQueue + config Config mirrors *git.Mirrors @@ -119,16 +130,18 @@ type ChartChangeSync struct { namespace string } -func New(logger log.Logger, clients Clients, release *release.Release, config Config, namespace string) *ChartChangeSync { +func New(logger log.Logger, clients Clients, release *release.Release, releaseQueue ReleaseQueue, config Config, namespace string) *ChartChangeSync { return &ChartChangeSync{ - logger: logger, - kubeClient: clients.KubeClient, - ifClient: clients.IfClient, - release: release, - config: config.WithDefaults(), - mirrors: git.NewMirrors(), - clones: make(map[string]clone), - namespace: namespace, + logger: logger, + kubeClient: clients.KubeClient, + ifClient: clients.IfClient, + fhrLister: clients.FhrLister, + release: release, + releaseQueue: releaseQueue, + config: config.WithDefaults(), + mirrors: git.NewMirrors(), + clones: make(map[string]clone), + namespace: namespace, } } @@ -136,7 +149,8 @@ func New(logger log.Logger, clients Clients, release *release.Release, config Co // Helm releases in the cluster, what HelmRelease declare, and // changes in the git repos mentioned by any HelmRelease. func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *sync.WaitGroup) { - chs.logger.Log("info", "Starting charts sync loop") + chs.logger.Log("info", "Starting git chart sync loop") + wg.Add(1) go func() { defer runtime.HandleCrash() @@ -147,99 +161,91 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn for { select { - case reposChanged := <-chs.mirrors.Changes(): - // TODO(michael): the inefficient way, for now, until - // it's clear how to better optimalise it - resources, err := chs.getCustomResources() - if err != nil { - chs.logger.Log("warning", "failed to get custom resources", "err", err) - continue - } - for _, fhr := range resources { - if fhr.Spec.ChartSource.GitChartSource == nil { - continue - } - - repoURL := fhr.Spec.ChartSource.GitChartSource.GitURL - repoName := mirrorName(fhr.Spec.ChartSource.GitChartSource) - - if _, ok := reposChanged[repoName]; !ok { + case mirrorsChanged := <-chs.mirrors.Changes(): + for mirror := range mirrorsChanged { + resources, err := chs.getCustomResourcesForMirror(mirror) + if err != nil { + chs.logger.Log("warning", "failed to get custom resources", "err", err) continue } - repo, ok := chs.mirrors.Get(repoName) + repo, ok := chs.mirrors.Get(mirror) if !ok { - // Then why .. did you say .. it had changed? It may have been removed. Add it back and let it signal again. - chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, "git mirror missing; starting mirroring again") - chs.logger.Log("warning", "mirrored git repo disappeared after signalling change", "repo", repoName) - chs.maybeMirror(fhr) + for _, fhr := range resources { + // Then why .. did you say .. it had changed? It may have been removed. Add it back and let it signal again. + chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, "git mirror missing; starting mirroring again") + chs.logger.Log("warning", "mirrored git repo disappeared after signalling change", "repo", mirror) + chs.maybeMirror(fhr) + } continue } status, err := repo.Status() if status != git.RepoReady { - chs.logger.Log("info", "repo not ready yet, while attempting chart sync", "repo", repoURL, "status", string(status)) - // TODO(michael) log if there's a problem with the following? - chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, err.Error()) - continue - } - - ref := fhr.Spec.ChartSource.GitChartSource.RefOrDefault() - path := fhr.Spec.ChartSource.GitChartSource.Path - releaseName := release.GetReleaseName(fhr) - - ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) - refHead, err := repo.Revision(ctx, ref) - cancel() - if err != nil { - chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonGitNotReady, "problem cloning from local git mirror: "+err.Error()) - chs.logger.Log("warning", "could not get revision for ref while checking for changes", "repo", repoURL, "ref", ref, "err", err) + for _, fhr := range resources { + chs.logger.Log("info", "repo not ready yet, while attempting chart sync", "repo", mirror, "status", string(status)) + // TODO(michael) log if there's a problem with the following? + chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, err.Error()) + } continue } - // This FHR is using a git repo; and, it appears to have had commits since we last saw it. - // Check explicitly whether we should update its clone. - chs.clonesMu.Lock() - cloneForChart, ok := chs.clones[releaseName] - chs.clonesMu.Unlock() + for _, fhr := range resources { + ref := fhr.Spec.ChartSource.GitChartSource.RefOrDefault() + path := fhr.Spec.ChartSource.GitChartSource.Path + releaseName := release.GetReleaseName(fhr) - if ok { // found clone ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) - commits, err := repo.CommitsBetween(ctx, cloneForChart.head, refHead, path) + refHead, err := repo.Revision(ctx, ref) cancel() if err != nil { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonGitNotReady, "problem cloning from local git mirror: "+err.Error()) - chs.logger.Log("warning", "could not get revision for ref while checking for changes", "repo", repoURL, "ref", ref, "err", err) + chs.logger.Log("warning", "could not get revision for ref while checking for changes", "repo", mirror, "ref", ref, "err", err) continue } - ok = len(commits) == 0 - } - if !ok { // didn't find clone, or it needs updating - ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) - newClone, err := repo.Export(ctx, refHead) - cancel() - if err != nil { - chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonGitNotReady, "problem cloning from local git mirror: "+err.Error()) - chs.logger.Log("warning", "could not clone from mirror while checking for changes", "repo", repoURL, "ref", ref, "err", err) - continue - } - newCloneForChart := clone{remote: repoURL, ref: ref, head: refHead, export: newClone} + // The git repo of this appears to have had commits since we last saw it, + // check explicitly whether we should update its clone. chs.clonesMu.Lock() - chs.clones[releaseName] = newCloneForChart + cloneForChart, ok := chs.clones[releaseName] chs.clonesMu.Unlock() - if cloneForChart.export != nil { - cloneForChart.export.Clean() + + if ok { // found clone + ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) + commits, err := repo.CommitsBetween(ctx, cloneForChart.head, refHead, path) + cancel() + if err != nil { + chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonGitNotReady, "problem cloning from local git mirror: "+err.Error()) + chs.logger.Log("warning", "could not get revision for ref while checking for changes", "repo", mirror, "ref", ref, "err", err) + continue + } + ok = len(commits) == 0 + } + + if !ok { // didn't find clone, or it needs updating + ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) + newClone, err := repo.Export(ctx, refHead) + cancel() + if err != nil { + chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonGitNotReady, "problem cloning from local git mirror: "+err.Error()) + chs.logger.Log("warning", "could not clone from mirror while checking for changes", "repo", mirror, "ref", ref, "err", err) + continue + } + newCloneForChart := clone{remote: mirror, ref: ref, head: refHead, export: newClone} + chs.clonesMu.Lock() + chs.clones[releaseName] = newCloneForChart + chs.clonesMu.Unlock() + if cloneForChart.export != nil { + cloneForChart.export.Clean() + } } - } - // TODO(hidde): if we could somehow signal the - // operator an 'update' has happened we can control - // everything through the queue it maintains... - // Annotating the FHR could be an option as those - // count as updates but I am not sure if Flux would - // see those as in-cluster mutations. - chs.reconcileReleaseDef(fhr) + cacheKey, err := cache.MetaNamespaceKeyFunc(fhr.GetObjectMeta()) + if err != nil { + continue + } + chs.releaseQueue.AddRateLimited(cacheKey) + } } case <-stopCh: chs.logger.Log("stopping", "true") @@ -302,7 +308,7 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { // is being referenced in the chart source. if ok { ok = chartClone.remote == chartSource.GitURL && chartClone.ref == chartSource.RefOrDefault() - if !ok { + if !ok { if chartClone.export != nil { chartClone.export.Clean() } @@ -419,32 +425,20 @@ func (chs *ChartChangeSync) SyncMirrors() { chs.logger.Log("info", "Finished syncing mirrors") } -// getCustomResources assembles all custom resources in all namespaces -// or in the allowed namespace if specified -func (chs *ChartChangeSync) getCustomResources() ([]fluxv1beta1.HelmRelease, error) { - var namespaces []string - if chs.namespace != "" { - namespaces = append(namespaces, chs.namespace) - } else { - nso, err := chs.kubeClient.CoreV1().Namespaces().List(metav1.ListOptions{}) - if err != nil { - return nil, fmt.Errorf("Failure while retrieving kubernetes namespaces: %s", err) - } - for _, n := range nso.Items { - namespaces = append(namespaces, n.GetName()) - } +// getCustomResourcesForMirror retrieves all the resources that make +// use of the given mirror from the lister. +func (chs *ChartChangeSync) getCustomResourcesForMirror(mirror string) ([]fluxv1beta1.HelmRelease, error) { + var fhrs []v1beta1.HelmRelease + list, err := chs.fhrLister.List(labels.Everything()) + if err != nil { + return nil, err } - var fhrs []fluxv1beta1.HelmRelease - for _, ns := range namespaces { - list, err := chs.ifClient.FluxV1beta1().HelmReleases(ns).List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - - for _, fhr := range list.Items { - fhrs = append(fhrs, fhr) + for _, fhr := range list { + if mirror != mirrorName(fhr.Spec.GitChartSource) { + continue } + fhrs = append(fhrs, *fhr) } return fhrs, nil } diff --git a/integrations/helm/operator/operator.go b/integrations/helm/operator/operator.go index 921f6de0e..0965a488f 100644 --- a/integrations/helm/operator/operator.go +++ b/integrations/helm/operator/operator.go @@ -76,6 +76,7 @@ func New( logReleaseDiffs bool, kubeclientset kubernetes.Interface, fhrInformer fhrv1.HelmReleaseInformer, + releaseWorkqueue workqueue.RateLimitingInterface, sync *chartsync.ChartChangeSync) *Controller { // Add helm-operator types to the default Kubernetes Scheme so Events can be @@ -91,7 +92,7 @@ func New( logDiffs: logReleaseDiffs, fhrLister: fhrInformer.Lister(), fhrSynced: fhrInformer.Informer().HasSynced, - releaseWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ChartRelease"), + releaseWorkqueue: releaseWorkqueue, recorder: recorder, sync: sync, } From 34badffba3ab8342a8f057fab0e146ad435ce8ac Mon Sep 17 00:00:00 2001 From: Hidde Beydals Date: Tue, 9 Apr 2019 16:12:24 +0200 Subject: [PATCH 4/6] Do not upgrade Helm release if spec has diverged This is a optimistic lock and prevents the operator from potentially releasing an old revision of a `HelmRelease`. As there is a (small) chance the spec is changed between the time we started preparing and calculating the upgrade, and the actual upgrade itself. --- integrations/helm/chartsync/chartsync.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/integrations/helm/chartsync/chartsync.go b/integrations/helm/chartsync/chartsync.go index ba1c867b5..a6cbe72e2 100644 --- a/integrations/helm/chartsync/chartsync.go +++ b/integrations/helm/chartsync/chartsync.go @@ -379,7 +379,16 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { return } if changed { - _, err := chs.release.Install(chartPath, releaseName, fhr, release.UpgradeAction, opts, &chs.kubeClient) + cFhr, err := chs.ifClient.FluxV1beta1().HelmReleases(fhr.Namespace).Get(fhr.Name, metav1.GetOptions{}) + if err != nil { + chs.logger.Log("warning", "Failed to retrieve HelmRelease scheduled for upgrade", "namespace", fhr.Namespace, "name", fhr.Name, "error", err) + return + } + if diff := cmp.Diff(fhr.Spec, cFhr.Spec); diff != "" { + chs.logger.Log("warning", "HelmRelease spec has diverged since we calculated if we should upgrade, skipping upgrade", "namespace", fhr.Namespace, "name", fhr.Name) + return + } + _, err = chs.release.Install(chartPath, releaseName, fhr, release.UpgradeAction, opts, &chs.kubeClient) if err != nil { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionFalse, ReasonUpgradeFailed, err.Error()) chs.logger.Log("warning", "Failed to upgrade chart", "namespace", fhr.Namespace, "name", fhr.Name, "error", err) From a02f152be24bce757b8167ac4e0508c957d220f3 Mon Sep 17 00:00:00 2001 From: Hidde Beydals Date: Tue, 9 Apr 2019 18:09:57 +0200 Subject: [PATCH 5/6] Revise and uniform all Helm operator logs --- cmd/helm-operator/main.go | 6 +-- integrations/helm/chartsync/chartsync.go | 59 +++++++++++++----------- integrations/helm/helm.go | 2 +- integrations/helm/http/daemon/server.go | 9 ++-- integrations/helm/operator/operator.go | 51 +++++++++----------- 5 files changed, 65 insertions(+), 62 deletions(-) diff --git a/cmd/helm-operator/main.go b/cmd/helm-operator/main.go index 270ede645..3b76a4e8c 100644 --- a/cmd/helm-operator/main.go +++ b/cmd/helm-operator/main.go @@ -137,19 +137,19 @@ func main() { cfg, err := clientcmd.BuildConfigFromFlags(*master, *kubeconfig) if err != nil { - mainLogger.Log("error", fmt.Sprintf("Error building kubeconfig: %v", err)) + mainLogger.Log("error", fmt.Sprintf("error building kubeconfig: %v", err)) os.Exit(1) } kubeClient, err := kubernetes.NewForConfig(cfg) if err != nil { - mainLogger.Log("error", fmt.Sprintf("Error building kubernetes clientset: %v", err)) + mainLogger.Log("error", fmt.Sprintf("error building kubernetes clientset: %v", err)) os.Exit(1) } ifClient, err := clientset.NewForConfig(cfg) if err != nil { - mainLogger.Log("error", fmt.Sprintf("Error building integrations clientset: %v", err)) + mainLogger.Log("error", fmt.Sprintf("error building integrations clientset: %v", err)) os.Exit(1) } diff --git a/integrations/helm/chartsync/chartsync.go b/integrations/helm/chartsync/chartsync.go index a6cbe72e2..681f578d5 100644 --- a/integrations/helm/chartsync/chartsync.go +++ b/integrations/helm/chartsync/chartsync.go @@ -149,7 +149,7 @@ func New(logger log.Logger, clients Clients, release *release.Release, releaseQu // Helm releases in the cluster, what HelmRelease declare, and // changes in the git repos mentioned by any HelmRelease. func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *sync.WaitGroup) { - chs.logger.Log("info", "Starting git chart sync loop") + chs.logger.Log("info", "starting git chart sync loop") wg.Add(1) go func() { @@ -169,27 +169,32 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn continue } + // Retrieve the mirror we got a change signal for repo, ok := chs.mirrors.Get(mirror) if !ok { + // Then why .. did you say .. it had changed? It may have been removed. Add it back and let it signal again. + chs.logger.Log("warning", "mirrored git repo disappeared after signalling change", "repo", mirror) for _, fhr := range resources { - // Then why .. did you say .. it had changed? It may have been removed. Add it back and let it signal again. chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, "git mirror missing; starting mirroring again") - chs.logger.Log("warning", "mirrored git repo disappeared after signalling change", "repo", mirror) chs.maybeMirror(fhr) } continue } + // Ensure the repo is ready status, err := repo.Status() if status != git.RepoReady { + chs.logger.Log("info", "repo not ready yet, while attempting chart sync", "repo", mirror, "status", string(status)) for _, fhr := range resources { - chs.logger.Log("info", "repo not ready yet, while attempting chart sync", "repo", mirror, "status", string(status)) // TODO(michael) log if there's a problem with the following? chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, err.Error()) } continue } + // Determine if we need to update the clone and + // schedule an upgrade for every HelmRelease that + // makes use of the mirror for _, fhr := range resources { ref := fhr.Spec.ChartSource.GitChartSource.RefOrDefault() path := fhr.Spec.ChartSource.GitChartSource.Path @@ -200,7 +205,7 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn cancel() if err != nil { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonGitNotReady, "problem cloning from local git mirror: "+err.Error()) - chs.logger.Log("warning", "could not get revision for ref while checking for changes", "repo", mirror, "ref", ref, "err", err) + chs.logger.Log("warning", "could not get revision for ref while checking for changes", "resource", fhr.ResourceID().String(), "repo", mirror, "ref", ref, "err", err) continue } @@ -216,7 +221,7 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn cancel() if err != nil { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonGitNotReady, "problem cloning from local git mirror: "+err.Error()) - chs.logger.Log("warning", "could not get revision for ref while checking for changes", "repo", mirror, "ref", ref, "err", err) + chs.logger.Log("warning", "could not get revision for ref while checking for changes", "resource", fhr.ResourceID().String(), "repo", mirror, "ref", ref, "err", err) continue } ok = len(commits) == 0 @@ -228,7 +233,7 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn cancel() if err != nil { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonGitNotReady, "problem cloning from local git mirror: "+err.Error()) - chs.logger.Log("warning", "could not clone from mirror while checking for changes", "repo", mirror, "ref", ref, "err", err) + chs.logger.Log("warning", "could not clone from mirror while checking for changes", "resource", fhr.ResourceID().String(), "repo", mirror, "ref", ref, "err", err) continue } newCloneForChart := clone{remote: mirror, ref: ref, head: refHead, export: newClone} @@ -240,10 +245,12 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn } } + // Enqueue release cacheKey, err := cache.MetaNamespaceKeyFunc(fhr.GetObjectMeta()) if err != nil { continue } + chs.logger.Log("info", "enqueing release upgrade due to change in git chart source", "resource", fhr.ResourceID().String()) chs.releaseQueue.AddRateLimited(cacheKey) } } @@ -325,12 +332,12 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { if !ok { chs.maybeMirror(fhr) chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, "git repo "+chartSource.GitURL+" not mirrored yet") - chs.logger.Log("info", "chart repo not cloned yet", "releaseName", releaseName, "resource", fmt.Sprintf("%s:%s/%s", fhr.Namespace, fhr.Kind, fhr.Name)) + chs.logger.Log("info", "chart repo not cloned yet", "resource", fhr.ResourceID().String()) } else { status, err := repo.Status() if status != git.RepoReady { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, "git repo not mirrored yet: "+err.Error()) - chs.logger.Log("info", "chart repo not ready yet", "releaseName", releaseName, "resource", fmt.Sprintf("%s:%s/%s", fhr.Namespace, fhr.Kind, fhr.Name), "status", string(status), "err", err) + chs.logger.Log("info", "chart repo not ready yet", "resource", fhr.ResourceID().String(), "status", string(status), "err", err) } } return @@ -342,7 +349,7 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { if chs.config.UpdateDeps && !fhr.Spec.ChartSource.GitChartSource.SkipDepUpdate { if err := updateDependencies(chartPath, ""); err != nil { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionFalse, ReasonDependencyFailed, err.Error()) - chs.logger.Log("warning", "Failed to update chart dependencies", "namespace", fhr.Namespace, "name", fhr.Name, "error", err) + chs.logger.Log("warning", "failed to update chart dependencies", "resource", fhr.ResourceID().String(), "err", err) return } } @@ -351,7 +358,7 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { path, err := ensureChartFetched(chs.config.ChartCache, chartSource) if err != nil { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonDownloadFailed, "chart download failed: "+err.Error()) - chs.logger.Log("info", "chart download failed", "releaseName", releaseName, "resource", fhr.ResourceID().String(), "err", err) + chs.logger.Log("info", "chart download failed", "resource", fhr.ResourceID().String(), "err", err) return } chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionTrue, ReasonDownloaded, "chart fetched: "+filepath.Base(path)) @@ -363,7 +370,7 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { _, err := chs.release.Install(chartPath, releaseName, fhr, release.InstallAction, opts, &chs.kubeClient) if err != nil { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionFalse, ReasonInstallFailed, err.Error()) - chs.logger.Log("warning", "Failed to install chart", "namespace", fhr.Namespace, "name", fhr.Name, "error", err) + chs.logger.Log("warning", "failed to install chart", "resource", fhr.ResourceID().String(), "err", err) return } chs.setCondition(&fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionTrue, ReasonSuccess, "helm install succeeded") @@ -375,28 +382,28 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { changed, err := chs.shouldUpgrade(chartPath, rel, fhr) if err != nil { - chs.logger.Log("warning", "Unable to determine if release has changed", "namespace", fhr.Namespace, "name", fhr.Name, "error", err) + chs.logger.Log("warning", "unable to determine if release has changed", "resource", fhr.ResourceID().String(), "err", err) return } if changed { cFhr, err := chs.ifClient.FluxV1beta1().HelmReleases(fhr.Namespace).Get(fhr.Name, metav1.GetOptions{}) if err != nil { - chs.logger.Log("warning", "Failed to retrieve HelmRelease scheduled for upgrade", "namespace", fhr.Namespace, "name", fhr.Name, "error", err) + chs.logger.Log("warning", "failed to retrieve HelmRelease scheduled for upgrade", "resource", fhr.ResourceID().String(), "err", err) return } if diff := cmp.Diff(fhr.Spec, cFhr.Spec); diff != "" { - chs.logger.Log("warning", "HelmRelease spec has diverged since we calculated if we should upgrade, skipping upgrade", "namespace", fhr.Namespace, "name", fhr.Name) + chs.logger.Log("warning", "HelmRelease spec has diverged since we calculated if we should upgrade, skipping upgrade", "resource", fhr.ResourceID().String()) return } _, err = chs.release.Install(chartPath, releaseName, fhr, release.UpgradeAction, opts, &chs.kubeClient) if err != nil { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionFalse, ReasonUpgradeFailed, err.Error()) - chs.logger.Log("warning", "Failed to upgrade chart", "namespace", fhr.Namespace, "name", fhr.Name, "error", err) + chs.logger.Log("warning", "failed to upgrade chart", "resource", fhr.ResourceID().String(), "err", err) return } chs.setCondition(&fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionTrue, ReasonSuccess, "helm upgrade succeeded") if err = status.UpdateReleaseRevision(chs.ifClient.FluxV1beta1().HelmReleases(fhr.Namespace), fhr, chartRevision); err != nil { - chs.logger.Log("warning", "could not update the release revision", "namespace", fhr.Namespace, "resource", fhr.Name, "err", err) + chs.logger.Log("warning", "could not update the release revision", "resource", fhr.ResourceID().String(), "err", err) } return } @@ -410,7 +417,7 @@ func (chs *ChartChangeSync) DeleteRelease(fhr fluxv1beta1.HelmRelease) { name := release.GetReleaseName(fhr) err := chs.release.Delete(name) if err != nil { - chs.logger.Log("warning", "Chart release not deleted", "release", name, "error", err) + chs.logger.Log("warning", "chart release not deleted", "resource", fhr.ResourceID().String(), "release", name, "err", err) } // Remove the clone we may have for this HelmRelease @@ -427,11 +434,11 @@ func (chs *ChartChangeSync) DeleteRelease(fhr fluxv1beta1.HelmRelease) { // SyncMirrors instructs all mirrors to refresh from their upstream. func (chs *ChartChangeSync) SyncMirrors() { - chs.logger.Log("info", "Starting mirror sync") + chs.logger.Log("info", "starting mirror sync") for _, err := range chs.mirrors.RefreshAll(chs.config.GitTimeout) { - chs.logger.Log("error", fmt.Sprintf("Failure while syncing mirror: %s", err)) + chs.logger.Log("error", fmt.Sprintf("failure while syncing mirror: %s", err)) } - chs.logger.Log("info", "Finished syncing mirrors") + chs.logger.Log("info", "finished syncing mirrors") } // getCustomResourcesForMirror retrieves all the resources that make @@ -521,7 +528,7 @@ func sortChartFields(c *hapi_chart.Chart) *hapi_chart.Chart { // doing a dry run install from the chart in the git repo. func (chs *ChartChangeSync) shouldUpgrade(chartsRepo string, currRel *hapi_release.Release, fhr fluxv1beta1.HelmRelease) (bool, error) { if currRel == nil { - return false, fmt.Errorf("No Chart release provided for %v", fhr.GetName()) + return false, fmt.Errorf("no chart release provided for %v", fhr.GetName()) } currVals := currRel.GetConfig() @@ -540,18 +547,18 @@ func (chs *ChartChangeSync) shouldUpgrade(chartsRepo string, currRel *hapi_relea // compare values && Chart if diff := cmp.Diff(currVals, desVals); diff != "" { if chs.config.LogDiffs { - chs.logger.Log("error", fmt.Sprintf("Release %s: values have diverged due to manual Chart release", currRel.GetName()), "diff", diff) + chs.logger.Log("error", fmt.Sprintf("release %s: values have diverged due to manual chart release", currRel.GetName()), "resource", fhr.ResourceID().String(), "diff", diff) } else { - chs.logger.Log("error", fmt.Sprintf("Release %s: values have diverged due to manual Chart release", currRel.GetName())) + chs.logger.Log("error", fmt.Sprintf("release %s: values have diverged due to manual chart release", currRel.GetName()), "resource", fhr.ResourceID().String()) } return true, nil } if diff := cmp.Diff(sortChartFields(currChart), sortChartFields(desChart)); diff != "" { if chs.config.LogDiffs { - chs.logger.Log("error", fmt.Sprintf("Release %s: Chart has diverged due to manual Chart release", currRel.GetName()), "diff", diff) + chs.logger.Log("error", fmt.Sprintf("release %s: chart has diverged due to manual chart release", currRel.GetName()), "resource", fhr.ResourceID().String(), "diff", diff) } else { - chs.logger.Log("error", fmt.Sprintf("Release %s: Chart has diverged due to manual Chart release", currRel.GetName())) + chs.logger.Log("error", fmt.Sprintf("release %s: chart has diverged due to manual chart release", currRel.GetName()), "resource", fhr.ResourceID().String()) } return true, nil } diff --git a/integrations/helm/helm.go b/integrations/helm/helm.go index afd8e4403..f803ee8c3 100644 --- a/integrations/helm/helm.go +++ b/integrations/helm/helm.go @@ -75,7 +75,7 @@ func ClientSetup(logger log.Logger, kubeClient *kubernetes.Clientset, tillerOpts for { helmClient, host, err = newClient(kubeClient, tillerOpts) if err != nil { - logger.Log("error", fmt.Sprintf("Error creating helm client: %s", err.Error())) + logger.Log("error", fmt.Sprintf("error creating helm client: %s", err.Error())) time.Sleep(20 * time.Second) continue } diff --git a/integrations/helm/http/daemon/server.go b/integrations/helm/http/daemon/server.go index f254d9981..d7a1cedc7 100644 --- a/integrations/helm/http/daemon/server.go +++ b/integrations/helm/http/daemon/server.go @@ -3,14 +3,15 @@ package daemon import ( "context" "fmt" + "net/http" + "sync/atomic" + "time" + "github.com/go-kit/kit/log" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/weaveworks/flux/integrations/helm/api" transport "github.com/weaveworks/flux/integrations/helm/http" - "net/http" - "sync/atomic" - "time" ) // ListenAndServe starts a HTTP server instrumented with Prometheus metrics, @@ -37,7 +38,7 @@ func ListenAndServe(listenAddr string, apiServer api.Server, logger log.Logger, IdleTimeout: 15 * time.Second, } - logger.Log("info", fmt.Sprintf("Starting HTTP server on %s", listenAddr)) + logger.Log("info", fmt.Sprintf("starting HTTP server on %s", listenAddr)) // run server in background go func() { diff --git a/integrations/helm/operator/operator.go b/integrations/helm/operator/operator.go index 0965a488f..cc6dcb9fd 100644 --- a/integrations/helm/operator/operator.go +++ b/integrations/helm/operator/operator.go @@ -97,13 +97,11 @@ func New( sync: sync, } - controller.logger.Log("info", "Setting up event handlers") + controller.logger.Log("info", "setting up event handlers") // ----- EVENT HANDLERS for HelmRelease resources change --------- fhrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(new interface{}) { - controller.logger.Log("info", "CREATING release") - controller.logger.Log("info", "Custom Resource driven release install") _, ok := checkCustomResourceType(controller.logger, new) if ok { controller.enqueueJob(new) @@ -119,7 +117,7 @@ func New( } }, }) - controller.logger.Log("info", "Event handlers set up") + controller.logger.Log("info", "event handlers set up") return controller } @@ -132,16 +130,16 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitG defer runtime.HandleCrash() defer c.releaseWorkqueue.ShutDown() - c.logger.Log("info", "Starting operator") + c.logger.Log("info", "starting operator") // Wait for the caches to be synced before starting workers - c.logger.Log("info", "Waiting for informer caches to sync") + c.logger.Log("info", "waiting for informer caches to sync") if ok := cache.WaitForCacheSync(stopCh, c.fhrSynced); !ok { return errors.New("failed to wait for caches to sync") } - c.logger.Log("info", "Informer caches synced") + c.logger.Log("info", "unformer caches synced") - c.logger.Log("info", "Starting workers") + c.logger.Log("info", "starting workers") for i := 0; i < threadiness; i++ { wg.Add(1) go wait.Until(c.runWorker, time.Second, stopCh) @@ -151,7 +149,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitG for i := 0; i < threadiness; i++ { wg.Done() } - c.logger.Log("info", "Stopping workers") + c.logger.Log("info", "stopping workers") return nil } @@ -167,11 +165,7 @@ func (c *Controller) runWorker() { // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the syncHandler. func (c *Controller) processNextWorkItem() bool { - c.logger.Log("debug", "Processing next work queue job ...") - obj, shutdown := c.releaseWorkqueue.Get() - c.logger.Log("debug", fmt.Sprintf("PROCESSING item [%#v]", obj)) - if shutdown { return false } @@ -198,7 +192,7 @@ func (c *Controller) processNextWorkItem() bool { // Forget not to get into a loop of attempting to // process a work item that is invalid. c.releaseWorkqueue.Forget(obj) - runtime.HandleError(fmt.Errorf("Expected string in workqueue but got %#v", obj)) + runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) return nil } @@ -206,14 +200,11 @@ func (c *Controller) processNextWorkItem() bool { // HelmRelease resource to sync the corresponding Chart release. // If the sync failed, then we return while the item will get requeued if err := c.syncHandler(key); err != nil { - return fmt.Errorf("error syncing '%s': %s", key, err.Error()) + return fmt.Errorf("errored syncing HelmRelease '%s': %s", key, err.Error()) } // If no error occurs we Forget this item so it does not // get queued again until another change happens. c.releaseWorkqueue.Forget(obj) - - c.logger.Log("info", fmt.Sprintf("Successfully synced '%s'", key)) - return nil }(obj) @@ -227,13 +218,11 @@ func (c *Controller) processNextWorkItem() bool { // syncHandler acts according to the action // Deletes/creates or updates a Chart release func (c *Controller) syncHandler(key string) error { - c.logger.Log("debug", fmt.Sprintf("Starting to sync cache key %s", key)) - // Retrieve namespace and Custom Resource name from the key namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - c.logger.Log("info", fmt.Sprintf("Invalid cache key: %v", err)) - runtime.HandleError(fmt.Errorf("Invalid cache key: %s", key)) + c.logger.Log("error", fmt.Sprintf("key '%s' is invalid: %v", key, err)) + runtime.HandleError(fmt.Errorf("key '%s' is invalid", key)) return nil } @@ -298,17 +287,23 @@ func (c *Controller) enqueueUpdateJob(old, new interface{}) { return } - c.logger.Log("info", "UPGRADING release") + log := []string{"info", "enqueuing release upgrade"} if diff := cmp.Diff(oldFhr.Spec, newFhr.Spec); diff != "" && c.logDiffs { - c.logger.Log("info", "Custom Resource driven release upgrade", "diff", diff) - } else { - c.logger.Log("info", "Custom Resource driven release upgrade") + log = append(log, "diff", diff) } + log = append(log, "resource", newFhr.ResourceID().String()) + + l := make([]interface{}, len(log)) + for i, v := range log { + l[i] = v + } + + c.logger.Log(l...) + c.enqueueJob(new) } func (c *Controller) deleteRelease(fhr flux_v1beta1.HelmRelease) { - c.logger.Log("info", "DELETING release") - c.logger.Log("info", "Custom Resource driven release deletion") + c.logger.Log("info", "deleting release", "resource", fhr.ResourceID().String()) c.sync.DeleteRelease(fhr) } From 439787a983e77d2bbde3a43ceed1b07dd2454b0d Mon Sep 17 00:00:00 2001 From: Hidde Beydals Date: Wed, 10 Apr 2019 15:57:03 +0200 Subject: [PATCH 6/6] Stop logging broadcasted events in the operator And just broadcast the chart was synced, without implying success, as the event will always fire regardless of the sync outcome. --- integrations/helm/operator/operator.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/integrations/helm/operator/operator.go b/integrations/helm/operator/operator.go index cc6dcb9fd..30761ab67 100644 --- a/integrations/helm/operator/operator.go +++ b/integrations/helm/operator/operator.go @@ -7,7 +7,6 @@ import ( "time" "github.com/go-kit/kit/log" - "github.com/golang/glog" "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -40,12 +39,9 @@ const ( // a HelmRelease fails to be released/updated ErrChartSync = "ErrChartSync" - // MessageChartSynced - the message used for Events when a resource - // fails to sync due to failing to release the Chart - MessageChartSynced = "Chart managed by HelmRelease processed successfully" - // MessageErrChartSync - the message used for an Event fired when a HelmRelease - // is synced successfully - MessageErrChartSync = "Chart %s managed by HelmRelease failed to be processed" + // MessageChartSynced - the message used for an Event fired when a HelmRelease + // is synced. + MessageChartSynced = "Chart managed by HelmRelease processed" ) // Controller is the operator implementation for HelmRelease resources @@ -83,7 +79,6 @@ func New( // logged for helm-operator types. ifscheme.AddToScheme(scheme.Scheme) eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})