From 571cefba9923466ba37bd161af023301379de300 Mon Sep 17 00:00:00 2001 From: Michael Bridgen Date: Fri, 13 Jul 2018 12:32:12 +0100 Subject: [PATCH 1/2] Rewrite helm-operator to use weaveworks/flux/git The go-git package seems to introduce a host of problems, especially with SSH, that are obviated by using the git binary. Given a couple of modest additions to github.com/weaveworks/flux/git, we can use that instead. Namely: read-only repos, and repo "exports" (that is, clones that come with only the ability to look at the files). It also turns out that it's possible to simplify the chartsync and releasesync packages. Given the whole picture in the existing code, it's possible to boild it down to two mechanically simple processes: - chartsync looks for the charts used by FluxHelmRelease resources, and checks whether they've changed since the last commit; - releasesync installs any FluxHelmRelease resources that don't have corresponding releases, and upgrades any that do and differ. (In fact, just the latter would be fine since new commits would result in release diffs; but it's more _eventual_ than having both). Since these are now fairly compact, I've just merged them into the chartsync package. I've removed the releasesync_tests.go here, since it tested internal machinery that no longer exists, but I ought to replace the test coverage forthwith. --- Gopkg.lock | 140 +----- Gopkg.toml | 4 - cmd/helm-operator/main.go | 125 ++--- git/operations_test.go | 2 + integrations/helm/chartsync/chartsync.go | 475 ++++++++---------- integrations/helm/chartsync/utils.go | 80 --- integrations/helm/git/git.go | 210 -------- integrations/helm/helm.go | 12 + integrations/helm/operator/operator.go | 20 +- integrations/helm/release/release.go | 56 +-- integrations/helm/releasesync/releasesync.go | 325 ------------ .../helm/releasesync/releasesync_test.go | 353 ------------- integrations/helm/releasesync/utils.go | 39 -- 13 files changed, 307 insertions(+), 1534 deletions(-) delete mode 100644 integrations/helm/chartsync/utils.go delete mode 100644 integrations/helm/git/git.go delete mode 100644 integrations/helm/releasesync/releasesync.go delete mode 100644 integrations/helm/releasesync/releasesync_test.go delete mode 100644 integrations/helm/releasesync/utils.go diff --git a/Gopkg.lock b/Gopkg.lock index c7bcdf5b4..77a5a4799 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -164,17 +164,6 @@ packages = ["."] revision = "e89373fe6b4a7413d7acd6da1725b83ef713e6e4" -[[projects]] - name = "github.com/google/go-cmp" - packages = [ - "cmp", - "cmp/internal/diff", - "cmp/internal/function", - "cmp/internal/value" - ] - revision = "3af367b6b30c263d47e8895973edcca9a49cf029" - version = "v0.2.0" - [[projects]] branch = "master" name = "github.com/google/gofuzz" @@ -245,12 +234,6 @@ revision = "76626ae9c91c4f2a10f34cad8ce83ea42c93bb75" version = "v1.0" -[[projects]] - branch = "master" - name = "github.com/jbenet/go-context" - packages = ["io"] - revision = "d14ea06fba99483203c19d92cfcd13ebe73135f4" - [[projects]] name = "github.com/json-iterator/go" packages = ["."] @@ -262,12 +245,6 @@ revision = "b691ffe85e96a3e4ce5de9d8a17f1f884190e6d0" version = "v1.0.0" -[[projects]] - name = "github.com/kevinburke/ssh_config" - packages = ["."] - revision = "0ff8514904a8ebfcfb3c32ad73e1f8498a7f81b4" - version = "0.3" - [[projects]] branch = "master" name = "github.com/kr/logfmt" @@ -280,12 +257,6 @@ revision = "3247c84500bff8d9fb6d579d800f20b3e091582c" version = "v1.0.0" -[[projects]] - branch = "master" - name = "github.com/mitchellh/go-homedir" - packages = ["."] - revision = "b8bc1bf767474819792c23f32d8286a45736f1c6" - [[projects]] name = "github.com/modern-go/concurrent" packages = ["."] @@ -304,12 +275,6 @@ revision = "279bed98673dd5bef374d3b6e4b09e2af76183bf" version = "v1.0.0-rc1" -[[projects]] - name = "github.com/pelletier/go-buffruneio" - packages = ["."] - revision = "c37440a7cf42ac63b919c752ca73a85067e05992" - version = "v0.2.0" - [[projects]] branch = "master" name = "github.com/petar/GoLLRB" @@ -385,12 +350,6 @@ revision = "572520ed46dbddaed19ea3d9541bdd0494163693" version = "v0.1" -[[projects]] - name = "github.com/sergi/go-diff" - packages = ["diffmatchpatch"] - revision = "1744e2970ca51c86172c8190fadad617561ed6e7" - version = "v1.0.0" - [[projects]] name = "github.com/sirupsen/logrus" packages = ["."] @@ -409,17 +368,6 @@ revision = "583c0c0531f06d5278b7d917446061adc344b5cd" version = "v1.0.1" -[[projects]] - name = "github.com/src-d/gcfg" - packages = [ - ".", - "scanner", - "token", - "types" - ] - revision = "f187355171c936ac84a82793659ebb4936bc1c23" - version = "v1.3.0" - [[projects]] name = "github.com/stretchr/testify" packages = ["assert"] @@ -450,33 +398,12 @@ revision = "0599d764e054d4e983bb120e30759179fafe3942" version = "v1.2.0" -[[projects]] - branch = "master" - name = "github.com/xanzy/ssh-agent" - packages = ["."] - revision = "ba9c9e33906f58169366275e3450db66139a31a9" - [[projects]] branch = "master" name = "golang.org/x/crypto" packages = [ - "cast5", - "curve25519", - "ed25519", - "ed25519/internal/edwards25519", - "internal/chacha20", - "openpgp", - "openpgp/armor", - "openpgp/elgamal", - "openpgp/errors", - "openpgp/packet", - "openpgp/s2k", "pbkdf2", - "poly1305", "scrypt", - "ssh", - "ssh/agent", - "ssh/knownhosts", "ssh/terminal" ] revision = "432090b8f568c018896cd8a0fb0345872bbac6ce" @@ -573,71 +500,6 @@ revision = "3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4" version = "v0.9.0" -[[projects]] - name = "gopkg.in/src-d/go-billy.v4" - packages = [ - ".", - "helper/chroot", - "helper/polyfill", - "osfs", - "util" - ] - revision = "027dceab1aa836eb310d92c2eb2266e4dc9f4b81" - version = "v4.1.0" - -[[projects]] - name = "gopkg.in/src-d/go-git.v4" - packages = [ - ".", - "config", - "internal/revision", - "plumbing", - "plumbing/cache", - "plumbing/filemode", - "plumbing/format/config", - "plumbing/format/diff", - "plumbing/format/gitignore", - "plumbing/format/idxfile", - "plumbing/format/index", - "plumbing/format/objfile", - "plumbing/format/packfile", - "plumbing/format/pktline", - "plumbing/object", - "plumbing/protocol/packp", - "plumbing/protocol/packp/capability", - "plumbing/protocol/packp/sideband", - "plumbing/revlist", - "plumbing/storer", - "plumbing/transport", - "plumbing/transport/client", - "plumbing/transport/file", - "plumbing/transport/git", - "plumbing/transport/http", - "plumbing/transport/internal/common", - "plumbing/transport/server", - "plumbing/transport/ssh", - "storage", - "storage/filesystem", - "storage/filesystem/internal/dotgit", - "storage/memory", - "utils/binary", - "utils/diff", - "utils/ioutil", - "utils/merkletrie", - "utils/merkletrie/filesystem", - "utils/merkletrie/index", - "utils/merkletrie/internal/frame", - "utils/merkletrie/noder" - ] - revision = "886dc83f3ed518a78772055497bcc7d7621b468e" - version = "v4.1.1" - -[[projects]] - name = "gopkg.in/warnings.v0" - packages = ["."] - revision = "ec4a0fea49c7b46c2aeb0b51aac55779c607e52b" - version = "v0.1.2" - [[projects]] branch = "v2" name = "gopkg.in/yaml.v2" @@ -883,6 +745,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "753bd801312bfb0aacc2d33e343346c5a73bbdab2cde843d0b31aabefe14895f" + inputs-digest = "39117abf5941771d8502f737e9680717e0d6659d66b5f3da5ec48a142cdc986a" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 3e2a6d966..ec587fc3b 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -42,10 +42,6 @@ required = ["k8s.io/code-generator/cmd/client-gen"] name = "k8s.io/helm" version = "v2.8.1" -[[constraint]] - name = "github.com/google/go-cmp" - version = "0.2.0" - [[constraint]] name = "github.com/justinbarrick/go-k8s-portforward" version = "v1.0.0" diff --git a/cmd/helm-operator/main.go b/cmd/helm-operator/main.go index aa2577577..9ba6d9734 100644 --- a/cmd/helm-operator/main.go +++ b/cmd/helm-operator/main.go @@ -1,35 +1,28 @@ package main import ( - "sync" - "syscall" - "time" - - "net/url" - - "github.com/spf13/pflag" - + "context" "fmt" "os" "os/signal" + "sync" + "syscall" + "time" "github.com/go-kit/kit/log" + "github.com/spf13/pflag" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + "github.com/weaveworks/flux/git" clientset "github.com/weaveworks/flux/integrations/client/clientset/versioned" ifinformers "github.com/weaveworks/flux/integrations/client/informers/externalversions" fluxhelm "github.com/weaveworks/flux/integrations/helm" + helmop "github.com/weaveworks/flux/integrations/helm" "github.com/weaveworks/flux/integrations/helm/chartsync" - "github.com/weaveworks/flux/integrations/helm/git" "github.com/weaveworks/flux/integrations/helm/operator" "github.com/weaveworks/flux/integrations/helm/release" - "github.com/weaveworks/flux/integrations/helm/releasesync" "github.com/weaveworks/flux/integrations/helm/status" - "github.com/weaveworks/flux/ssh" - - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/clientcmd" - - gitssh "gopkg.in/src-d/go-git.v4/plumbing/transport/ssh" ) var ( @@ -51,32 +44,25 @@ var ( tillerTLSCert *string tillerTLSCACert *string - chartsSyncInterval *time.Duration - chartsSyncTimeout *time.Duration - eventHandlerWorkers *uint - - customKubectl *string - gitURL *string - gitBranch *string - //gitConfigPath *string - gitChartsPath *string + chartsSyncInterval *time.Duration + chartsSyncTimeout *time.Duration - k8sSecretName *string - k8sSecretVolumeMountPath *string - k8sSecretDataKey *string + gitURL *string + gitBranch *string + gitChartsPath *string + gitPollInterval *time.Duration queueWorkerCount *int name *string listenAddr *string gcInterval *time.Duration - - ErrOperatorFailure = "Operator failure: %q" ) const ( - defaultGitConfigPath = "releaseconfig" defaultGitChartsPath = "charts" + + ErrOperatorFailure = "Operator failure: %q" ) func init() { @@ -84,7 +70,7 @@ func init() { fs = pflag.NewFlagSet("default", pflag.ExitOnError) fs.Usage = func() { fmt.Fprintf(os.Stderr, "DESCRIPTION\n") - fmt.Fprintf(os.Stderr, " helm-operator is a Kubernetes operator for Helm integration into flux.\n") + fmt.Fprintf(os.Stderr, " helm-operator releases Helm charts from git.\n") fmt.Fprintf(os.Stderr, "\n") fmt.Fprintf(os.Stderr, "FLAGS\n") fs.PrintDefaults() @@ -105,17 +91,11 @@ func init() { chartsSyncInterval = fs.Duration("charts-sync-interval", 3*time.Minute, "Interval at which to check for changed charts") chartsSyncTimeout = fs.Duration("charts-sync-timeout", 1*time.Minute, "Timeout when checking for changed charts") - eventHandlerWorkers = fs.Uint("event-handler-workers", 2, "Number of workers processing events for Flux-Helm custom resources") - customKubectl = fs.String("kubernetes-kubectl", "", "Optional, explicit path to kubectl tool") gitURL = fs.String("git-url", "", "URL of git repo with Helm Charts; e.g., git@github.com:weaveworks/flux-example") gitBranch = fs.String("git-branch", "master", "branch of git repo") gitChartsPath = fs.String("git-charts-path", defaultGitChartsPath, "path within git repo to locate Helm Charts (relative path)") - - // k8s-secret backed ssh keyring configuration - // generated by flux daemon - k8sSecretVolumeMountPath = fs.String("k8s-secret-volume-mount-path", "/etc/fluxd/ssh", "Mount location of the k8s secret storing the private SSH key") - k8sSecretDataKey = fs.String("k8s-secret-data-key", "identity", "Data key holding the private SSH key within the k8s secret") + gitPollInterval = fs.Duration("git-poll-interval", 5*time.Minute, "period on which to poll for changes to the git repo") queueWorkerCount = fs.Int("queue-worker-count", 2, "Number of workers to process queue with Chart release jobs. Two by default") } @@ -178,7 +158,6 @@ func main() { IP: *tillerIP, Port: *tillerPort, Namespace: *tillerNamespace, - TLSVerify: *tillerTLSVerify, TLSEnable: *tillerTLSEnable, TLSKey: *tillerTLSKey, @@ -191,48 +170,44 @@ func main() { statusUpdater := status.New(ifClient, kubeClient, helmClient) go statusUpdater.Loop(shutdown, log.With(logger, "component", "annotator")) - gitURLParsed, err := url.Parse(*gitURL) - if err != nil { - mainLogger.Log("error", fmt.Sprintf("Error parsing -git-url %q: %v", *gitURL, err)) - os.Exit(1) - } + gitRemote := git.Remote{URL: *gitURL} + repo := git.NewRepo(gitRemote, git.PollInterval(*gitPollInterval), git.ReadOnly) - // GIT REPO SETUP --------------------------------------------------------------------- - var gitAuth *gitssh.PublicKeys - for { - gitAuth, err = git.GetRepoAuth(gitURLParsed.User.Username(), *k8sSecretVolumeMountPath, *k8sSecretDataKey) + // Chart releases sync due to Custom Resources changes ------------------------------- + { + mainLogger.Log("info", "Attempting to clone repo ...", "url", gitRemote.URL) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + err := repo.Ready(ctx) + cancel() if err != nil { - mainLogger.Log("error", fmt.Sprintf("Failed to set up git authorization : %#v", err)) - time.Sleep(20 * time.Second) - continue - } - if err == nil { - break + mainLogger.Log("error", err) + os.Exit(2) } - } + mainLogger.Log("info", "Repo cloned", "url", gitRemote.URL) - gitRemoteConfig, err := git.NewGitRemoteConfig(*gitURL, *gitBranch, *gitChartsPath) - if err != nil { - mainLogger.Log("err", err) - os.Exit(1) + // Start the repo fetching from upstream + shutdownWg.Add(1) + go func() { + errc <- repo.Start(shutdown, shutdownWg) + }() } - gitLogger := log.With(logger, "component", "git") - - // Chart releases sync due to Custom Resources changes ------------------------------- - mainLogger.Log("info", "Starting to clone repo ...") - checkout := git.RepoSetup(gitLogger, gitAuth, gitRemoteConfig, git.ChangesClone) - defer checkout.Cleanup() - mainLogger.Log("info", "Repo cloned") - // release instance is needed during the sync of Charts changes and during the sync of FluxHelRelease changes - rel := release.New(log.With(logger, "component", "release"), helmClient, checkout) - relsync := releasesync.New(log.With(logger, "component", "releasesync"), rel) + releaseConfig := release.Config{ + ChartsPath: *gitChartsPath, + } + repoConfig := helmop.RepoConfig{ + Repo: repo, + Branch: *gitBranch, + ChartsPath: *gitChartsPath, + } + // release instance is needed during the sync of Charts changes and during the sync of FluxHelmRelease changes + rel := release.New(log.With(logger, "component", "release"), helmClient, releaseConfig) // CHARTS CHANGES SYNC ------------------------------------------------------------------ chartSync := chartsync.New(log.With(logger, "component", "chartsync"), chartsync.Polling{Interval: *chartsSyncInterval, Timeout: *chartsSyncTimeout}, chartsync.Clients{KubeClient: *kubeClient, IfClient: *ifClient}, - rel, *relsync) + rel, repoConfig) chartSync.Run(shutdown, errc, shutdownWg) // OPERATOR - CUSTOM RESOURCE CHANGE SYNC ----------------------------------------------- @@ -243,7 +218,7 @@ func main() { // Reference to shared index informers for the FluxHelmRelease fhrInformer := ifInformerFactory.Helm().V1alpha2().FluxHelmReleases() - opr := operator.New(log.With(logger, "component", "operator"), kubeClient, fhrInformer, rel) + opr := operator.New(log.With(logger, "component", "operator"), kubeClient, fhrInformer, rel, repoConfig) // Starts handling k8s events related to the given resource kind go ifInformerFactory.Start(shutdown) @@ -253,9 +228,3 @@ func main() { errc <- fmt.Errorf(ErrOperatorFailure, err) } } - -// Helper functions ----------------------------------------------------------------------- -func optionalVar(fs *pflag.FlagSet, value ssh.OptionalValue, name, usage string) ssh.OptionalValue { - fs.Var(value, name, usage) - return value -} diff --git a/git/operations_test.go b/git/operations_test.go index eab0553f8..f82509f33 100644 --- a/git/operations_test.go +++ b/git/operations_test.go @@ -216,6 +216,8 @@ func TestCheckPush(t *testing.T) { } } +// --- + func createRepo(dir string, subdirs []string) error { var ( err error diff --git a/integrations/helm/chartsync/chartsync.go b/integrations/helm/chartsync/chartsync.go index 3a50698ab..77a279c39 100644 --- a/integrations/helm/chartsync/chartsync.go +++ b/integrations/helm/chartsync/chartsync.go @@ -1,34 +1,59 @@ /* -Package chartsync provides the functionality for updating a Chart release -due to (git repo) changes of Charts, while no Custom Resource changes. -Helm operator regularly checks the Chart repo and if new commits are found -all Custom Resources related to the changed Charts are updates, resulting in new -Chart release(s). +This package has the algorithm for making sure the Helm releases in +the cluster match what are defined in the FluxHelmRelease resources. + +There are several ways they can be mismatched. Here's how they are +reconciled: + + 1a. There is a FluxHelmRelease resource, but no corresponding + release. This can happen when the helm operator is first run, for + example. The ChartChangeSync periodically checks for this by + running through the resources and installing any that aren't + released already. + + 1b. The release corresponding to a FluxHelmRelease has been updated by + some other means, perhaps while the operator wasn't running. This + is also checked periodically, by doing a dry-run release and + comparing the result to the release. + + 2. The chart has changed in git, meaning the release is out of + date. The ChartChangeSync responds to new git commits by looking at + each chart that's referenced by a FluxHelmRelease, and if it's + changed since the last seen commit, updating the release. + +1a.) and 1b.) run on the same schedule, and 2.) is run when the git +mirror reports it has fetched from upstream _and_ (upon checking) the +head of the branch has changed. + +Since both 1*.) and 2.) look at the charts in the git repo, but run on +different schedules (non-deterministically), there's a chance that +they can fight each other. For example, the git mirror may fetch new +commits which are used in 1), then treated as changes subsequently by +2). To keep consistency between the two, the current revision is used +by 1), and advanced only by 2). + */ package chartsync import ( "context" "fmt" - "io/ioutil" - "os" "path/filepath" "sync" "time" - "github.com/weaveworks/flux/integrations/helm/releasesync" - "github.com/go-kit/kit/log" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" + hapi_release "k8s.io/helm/pkg/proto/hapi/release" ifv1 "github.com/weaveworks/flux/apis/helm.integrations.flux.weave.works/v1alpha2" + "github.com/weaveworks/flux/git" ifclientset "github.com/weaveworks/flux/integrations/client/clientset/versioned" - helmgit "github.com/weaveworks/flux/integrations/helm/git" - chartrelease "github.com/weaveworks/flux/integrations/helm/release" + helmop "github.com/weaveworks/flux/integrations/helm" + "github.com/weaveworks/flux/integrations/helm/release" ) type Polling struct { @@ -44,84 +69,81 @@ type Clients struct { type ChartChangeSync struct { logger log.Logger Polling - kubeClient kubernetes.Clientset - ifClient ifclientset.Clientset - release *chartrelease.Release - relsync releasesync.ReleaseChangeSync - lastCheckedRevision string + kubeClient kubernetes.Clientset + ifClient ifclientset.Clientset + release *release.Release + config helmop.RepoConfig } -func New( - logger log.Logger, - polling Polling, - clients Clients, - release *chartrelease.Release, - relsync releasesync.ReleaseChangeSync) *ChartChangeSync { - - lastCheckedRevision := "" - gitRef, err := release.Repo.ConfigSync.GetRevision() - if err != nil { - // we shall try again later - } - lastCheckedRevision = gitRef.String() - +func New(logger log.Logger, polling Polling, clients Clients, release *release.Release, config helmop.RepoConfig) *ChartChangeSync { return &ChartChangeSync{ - logger: logger, - Polling: polling, - kubeClient: clients.KubeClient, - ifClient: clients.IfClient, - release: release, - relsync: relsync, - lastCheckedRevision: lastCheckedRevision, + logger: logger, + Polling: polling, + kubeClient: clients.KubeClient, + ifClient: clients.IfClient, + release: release, + config: config, } } -// Run creates a syncing loop monitoring repo chart changes +// Run creates a syncing loop monitoring repo chart changes. It is +// assumed that the *git.Repo given to the config is ready to use +// before this is invoked. +// +// The behaviour if the git mirror becomes unavailable while it's +// running is not defined (this could be tightened up). func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *sync.WaitGroup) { chs.logger.Log("info", "Starting charts sync loop") - wg.Add(1) go func() { defer runtime.HandleCrash() defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) + currentRevision, err := chs.config.Repo.Revision(ctx, chs.config.Branch) + cancel() + if err != nil { + errc <- err + return + } + ticker := time.NewTicker(chs.Polling.Interval) defer ticker.Stop() for { select { - case <-ticker.C: - ns, err := GetNamespaces(chs.logger, chs.kubeClient) + case <-chs.config.Repo.C: + ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) + head, err := chs.config.Repo.Revision(ctx, chs.config.Branch) + cancel() if err != nil { - errc <- err + chs.logger.Log("warning", "failure using git repo", "error", err.Error()) + continue } - var syncNeeded bool + if head == currentRevision { + chs.logger.Log("info", "no new commits on branch", "branch", chs.config.Branch, "head", head) + continue + } - // Syncing git repo Charts only changes + // Sync changes to charts in the git repo chs.logger.Log("info", fmt.Sprint("Start of chartsync")) - syncNeeded, err = chs.DoChartChangeSync(ns) + err = chs.ApplyChartChanges(currentRevision, head) if err != nil { chs.logger.Log("error", fmt.Sprintf("Failure to do chart sync: %#v", err)) - chs.logger.Log("info", fmt.Sprint("End of chartsync")) - } - if !syncNeeded { - chs.logger.Log("info", fmt.Sprint("No repo changes of Charts")) } + currentRevision = head chs.logger.Log("info", fmt.Sprint("End of chartsync")) - // Syncing manual Chart releases + case <-ticker.C: + // Re-release any chart releases that have apparently + // changed in the cluster. chs.logger.Log("info", fmt.Sprint("Start of releasesync")) - syncNeeded, err = chs.relsync.DoReleaseChangeSync(chs.ifClient, ns) + err = chs.ReapplyReleaseDefs(currentRevision) if err != nil { - chs.logger.Log("error", fmt.Sprintf("Failure to do manual release sync: %#v", err)) - chs.logger.Log("info", fmt.Sprint("End of releasesync")) - } - if !syncNeeded { - chs.logger.Log("info", fmt.Sprint("No manual changes of Chart releases")) + chs.logger.Log("error", fmt.Sprintf("Failure to do manual release sync: %s", err)) } chs.logger.Log("info", fmt.Sprint("End of releasesync")) - continue case <-stopCh: chs.logger.Log("stopping", "true") @@ -131,259 +153,192 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn }() } -func (chs *ChartChangeSync) DoChartChangeSync(ns []string) (bool, error) { - var exist bool - var newRev string - var err error - if exist, newRev, err = chs.newCommits(); err != nil { - return false, fmt.Errorf("Failure during retrieving commits: %#v", err) - } - if !exist { - return false, nil - } - - chartDirs, err := getChartDirs(chs.logger, chs.release.Repo.ConfigSync) +// ApplyChartChanges looks at the FluxHelmRelease resources in the +// cluster, figures out which refer to charts that have changed since +// the last commit, then re-releases those that have. +func (chs *ChartChangeSync) ApplyChartChanges(prevRef, head string) error { + resources, err := chs.getCustomResources() if err != nil { - return false, fmt.Errorf("Failure to get charts under the charts path: %#v", err) + return fmt.Errorf("Failure getting FHR custom resources: %s", err.Error()) } - chartFhrs := make(map[string][]ifv1.FluxHelmRelease) - for _, chart := range chartDirs { - err = chs.getCustomResources(ns, chart, chartFhrs) - if err != nil { - return false, fmt.Errorf("Failure during retrieving Custom Resources related to Chart [%s]: %#v", chart, err) + // Release all the resources whose charts have changed. More than + // one FluxHelmRelease resource can refer to a given chart, so to + // avoid repeated checking, keep track of which charts have + // changed or not changed. + chartHasChanged := map[string]bool{} + + // Lazily clone the repo if and when it turns out we need it + var clone *git.Export + defer func() { + if clone != nil { + clone.Clean() } - } + }() - ctx, cancel := context.WithTimeout(context.Background(), helmgit.DefaultCloneTimeout) - chartsToRelease, err := chs.releaseNeeded(ctx, newRev, chartDirs, chartFhrs) - cancel() - if err != nil { - return false, fmt.Errorf("Failure while establishing upgrade need of releases: %#v", err) - } - if len(chartsToRelease) == 0 { - chs.lastCheckedRevision = newRev - return false, nil - } + for _, fhr := range resources { + chartPath := filepath.Join(chs.config.ChartsPath, fhr.Spec.ChartGitPath) + changed, ok := chartHasChanged[chartPath] + if !ok { + ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) + commits, err := chs.config.Repo.CommitsBetween(ctx, prevRef, head, chartPath) + cancel() + if err != nil { + return fmt.Errorf("error while checking if chart at %q has changed in %s..%s: %s", chartPath, prevRef, head, err.Error()) + } + changed = len(commits) > 0 + chartHasChanged[chartPath] = changed + } + if changed { + if clone == nil { + clone, err = chs.exportAtRef(head) + if err != nil { + return fmt.Errorf("failed to clone repo at %s: %s", head, err.Error()) + } + } - if err = chs.releaseCharts(chartsToRelease, chartFhrs); err != nil { - return false, fmt.Errorf("Failure to release Chart(s): %#v", err) + rlsName := release.GetReleaseName(fhr) + opts := release.InstallOptions{DryRun: false} + if _, err = chs.release.Install(clone.Dir(), rlsName, fhr, release.UpgradeAction, opts); err != nil { + // NB in this step, failure to release is considered non-fatal, i.e,. we move on to the next rather than giving up entirely. + chs.logger.Log("warning", "failure to release chart with changes in git", "error", err, "chart", chartPath, "release", rlsName) + } + } } - return true, nil + return nil } -// GetNamespaces gets current kubernetes cluster namespaces -func GetNamespaces(logger log.Logger, kubeClient kubernetes.Clientset) ([]string, error) { - ns := []string{} +func (chs *ChartChangeSync) ReapplyReleaseDefs(ref string) error { + var clone *git.Export + defer func() { + if clone != nil { + clone.Clean() + } + }() - nso, err := kubeClient.CoreV1().Namespaces().List(metav1.ListOptions{}) + resources, err := chs.getCustomResources() if err != nil { - errm := fmt.Errorf("Failure while retrieving kubernetes namespaces: %#v", err) - logger.Log("error", errm.Error()) - return nil, errm + return fmt.Errorf("failed to get FluxHelmRelease resources from the API server: %s", err.Error()) } - for _, n := range nso.Items { - ns = append(ns, n.GetName()) - } - - return ns, nil -} - -// getChartDirs retrieves charts under the charts directory (under the repo root) -// go-git(.v4) does not implement finding commits under a specific path. This means -// the individual chart paths cannor be currently used with "git log" -func getChartDirs(logger log.Logger, checkout *helmgit.Checkout) ([]string, error) { - chartDirs := []string{} - - repoRoot := checkout.Dir - if repoRoot == "" { - return nil, helmgit.ErrNoRepoCloned - } - chartsFullPath := filepath.Join(repoRoot, checkout.Config.Path) - - files, err := ioutil.ReadDir(chartsFullPath) - if err != nil { - errm := fmt.Errorf("Failure to access directory %s: %#v", chartsFullPath, err) - logger.Log("error", errm.Error()) - return nil, errm - } + for _, fhr := range resources { + releaseName := release.GetReleaseName(fhr) + rel, err := chs.release.GetDeployedRelease(releaseName) + if err != nil { + return fmt.Errorf("failed to get release %q: %s", releaseName, err) + } - // We only choose subdirectories that represent Charts - for _, f := range files { - if f.IsDir() { - chartDir := filepath.Join(chartsFullPath, f.Name()) - chartMeta := filepath.Join(chartDir, "Chart.yaml") - if _, err := os.Stat(chartMeta); os.IsNotExist(err) { - continue + // At this point, one way or another, we are going to need a clone of the repo. + if clone == nil { + clone, err = chs.exportAtRef(ref) + if err != nil { + return err } - chartDirs = append(chartDirs, f.Name()) } - } - - return chartDirs, nil -} - -// newCommits determines if charts need to be released -// go-git.v4 does not provide a possibility to find commit for a particular path. -// So we find if there are any commits at all sice last time -func (chs *ChartChangeSync) newCommits() (bool, string, error) { - chs.logger.Log("info", "Getting new commits") - checkout := chs.release.Repo.ConfigSync - - chs.logger.Log("info", fmt.Sprintf("Repo dir = %s", checkout.Dir)) + opts := release.InstallOptions{DryRun: false} + if rel == nil { + _, err := chs.release.Install(clone.Dir(), releaseName, fhr, release.InstallAction, opts) + if err != nil { + chs.logger.Log("warning", "Failed to install chart", "namespace", fhr.Namespace, "name", fhr.Name, "error", err) + } + continue + } - if checkout.Dir == "" { - ctx, cancel := context.WithTimeout(context.Background(), helmgit.DefaultCloneTimeout) - err := checkout.Clone(ctx, helmgit.ChangesClone) - cancel() + changed, err := chs.shouldUpgrade(clone.Dir(), rel, fhr) if err != nil { - errm := fmt.Errorf("Failure while cloning repo : %#v", err) - chs.logger.Log("error", errm.Error()) - return false, "", errm + chs.logger.Log("warning", "Unable to determine if release has changed", "namespace", fhr.Namespace, "name", fhr.Name, "error", err) + continue + } + if changed { + _, err := chs.release.Install(clone.Dir(), releaseName, fhr, release.UpgradeAction, opts) + if err != nil { + chs.logger.Log("warning", "Failed to upgrade chart", "namespace", fhr.Namespace, "name", fhr.Name, "error", err) + } } - chs.logger.Log("info", "Cloned repo") } + return nil +} + +//--- - ctx, cancel := context.WithTimeout(context.Background(), helmgit.DefaultPullTimeout) - err := checkout.Pull(ctx) +func (chs *ChartChangeSync) exportAtRef(ref string) (*git.Export, error) { + ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) + clone, err := chs.config.Repo.Export(ctx, ref) cancel() if err != nil { - return false, "", fmt.Errorf("Failure while pulling repo: %#v", err) + return nil, fmt.Errorf("error cloning repo at ref %s for chart releases: %s", ref, err.Error()) } + return clone, nil +} - // get latest revision - newRev, err := checkout.GetRevision() +// GetNamespaces gets current kubernetes cluster namespaces +func (chs *ChartChangeSync) getNamespaces() ([]string, error) { + var ns []string + nso, err := chs.kubeClient.CoreV1().Namespaces().List(metav1.ListOptions{}) if err != nil { - return false, "", fmt.Errorf("Failure while getting repo revision: %s", err.Error()) + return nil, fmt.Errorf("Failure while retrieving kubernetes namespaces: %s", err) } - chs.logger.Log("info", fmt.Sprintf("Got revision %s", newRev.String())) - oldRev := chs.lastCheckedRevision - if oldRev == "" { - chs.lastCheckedRevision = newRev.String() - chs.logger.Log("debug", fmt.Sprintf("Populated lastCheckedRevision with %s", chs.lastCheckedRevision)) - - return false, "", nil + for _, n := range nso.Items { + ns = append(ns, n.GetName()) } - chs.logger.Log("info", fmt.Sprintf("lastCheckedRevision: %s", chs.lastCheckedRevision)) - - if oldRev != newRev.String() { - return true, newRev.String(), nil - } - return false, "", nil + return ns, nil } -// getCustomResources assembles custom resources referencing a particular chart -func (chs *ChartChangeSync) getCustomResources(namespaces []string, chart string, chartFhrs map[string][]ifv1.FluxHelmRelease) error { - chartSelector := map[string]string{ - "chart": chart, +// getCustomResources assembles all custom resources +func (chs *ChartChangeSync) getCustomResources() ([]ifv1.FluxHelmRelease, error) { + namespaces, err := chs.getNamespaces() + if err != nil { + return nil, err } - labelsSet := labels.Set(chartSelector) - listOptions := metav1.ListOptions{LabelSelector: labelsSet.AsSelector().String()} - fhrs := []ifv1.FluxHelmRelease{} + var fhrs []ifv1.FluxHelmRelease for _, ns := range namespaces { - list, err := chs.ifClient.HelmV1alpha2().FluxHelmReleases(ns).List(listOptions) + list, err := chs.ifClient.HelmV1alpha2().FluxHelmReleases(ns).List(metav1.ListOptions{}) if err != nil { - chs.logger.Log("error", fmt.Errorf("Failure while retrieving FluxHelmReleases: %#v", err)) - continue + return nil, err } for _, fhr := range list.Items { fhrs = append(fhrs, fhr) } } - - chartFhrs[chart] = fhrs - - return nil + return fhrs, nil } -// releaseCharts upgrades releases with changed Charts -// input: -// chartD ... provides chart name and its directory information -// fhr ...... provides chart name and all Custom Resources associated with this chart -func (chs *ChartChangeSync) releaseCharts(chartsToRelease []string, chartFhrs map[string][]ifv1.FluxHelmRelease) error { - checkout := chs.release.Repo.ConfigSync - - ctx, cancel := context.WithTimeout(context.Background(), helmgit.DefaultPullTimeout) - err := checkout.Pull(ctx) - cancel() - if err != nil { - return fmt.Errorf("Failure while pulling repo: %#v", err) +// shouldUpgrade returns true if the current running values or chart +// don't match what the repo says we ought to be running, based on +// doing a dry run install from the chart in the git repo. +func (chs *ChartChangeSync) shouldUpgrade(chartsRepo string, currRel *hapi_release.Release, fhr ifv1.FluxHelmRelease) (bool, error) { + if currRel == nil { + return false, fmt.Errorf("No Chart release provided for %v", fhr.GetName()) } - chartPathBase := filepath.Join(checkout.Dir, checkout.Config.Path) - for _, chart := range chartsToRelease { - var err error - fhrs := chartFhrs[chart] - for _, fhr := range fhrs { - - // sanity check - chartPath := filepath.Join(chartPathBase, fhr.Spec.ChartGitPath) - if _, err := os.Stat(chartPath); os.IsNotExist(err) { - chs.logger.Log("error", fmt.Sprintf("Missing Chart %s. No release can happen.", chartPath)) - continue - } - - rlsName := chartrelease.GetReleaseName(fhr) + currVals := currRel.GetConfig().GetRaw() + currChart := currRel.GetChart().String() - opts := chartrelease.InstallOptions{DryRun: false} - _, err = chs.release.Install(checkout, rlsName, fhr, chartrelease.UpgradeAction, opts) - if err != nil { - chs.logger.Log("info", fmt.Sprintf("Error to do upgrade of release of [%s]: %s. Skipping.", rlsName, err.Error())) - // TODO: collect errors and return them after looping through all - ? - continue - } - chs.logger.Log("info", fmt.Sprintf("Release [%s] upgraded due to chart only changes", rlsName)) - } - } - - // get latest revision - newRev, err := checkout.GetRevision() + // Get the desired release state + opts := release.InstallOptions{DryRun: true} + tempRelName := currRel.GetName() + "-temp" + desRel, err := chs.release.Install(chartsRepo, tempRelName, fhr, release.InstallAction, opts) if err != nil { - return fmt.Errorf("Failure while getting repo revision: %s", err.Error()) + return false, err } - chs.lastCheckedRevision = newRev.String() - chs.logger.Log("debug", fmt.Sprintf("Populated lastCheckedRevision with %s", chs.lastCheckedRevision)) + desVals := desRel.GetConfig().GetRaw() + desChart := desRel.GetChart().String() - return nil -} - -// releaseNeeded finds if there were commits related to Chart changes since last sync -// returns maps keys on chart name with value corresponding to the chart path -// (go-git.v4 does not provide a possibility to find commit for a particular path.) -func (chs *ChartChangeSync) releaseNeeded(ctx context.Context, newRev string, charts []string, chartFhrs map[string][]ifv1.FluxHelmRelease) ([]string, error) { - chartsToRelease := []string{} - var changed, ok bool - var err error - var fhrs []ifv1.FluxHelmRelease - - revRange := fmt.Sprintf("%s..%s", chs.lastCheckedRevision, newRev) - dir := fmt.Sprintf("%s/%s", chs.release.Repo.ConfigSync.Dir, chs.release.Repo.ConfigSync.Config.Path) - - for _, chart := range charts { - chs.logger.Log("debug", fmt.Sprintf("Testing if release needed for Chart [%s]", chart)) - - if fhrs, ok = chartFhrs[chart]; !ok { - continue - } - if len(fhrs) < 1 { - continue - } - - if changed, err = chs.chartChanged(ctx, dir, revRange, chart); err != nil { - chs.logger.Log("error", fmt.Sprintf("Failure to determine chart change for [%s]: %s", chart, err.Error())) - continue - } - if !changed { - continue - } - chartsToRelease = append(chartsToRelease, chart) + // compare values && Chart + if currVals != desVals { + chs.logger.Log("error", fmt.Sprintf("Release %s: values have diverged due to manual Chart release", currRel.GetName())) + return true, nil + } + if currChart != desChart { + chs.logger.Log("error", fmt.Sprintf("Release %s: Chart has diverged due to manual Chart release", currRel.GetName())) + return true, nil } - return chartsToRelease, nil + + return false, nil } diff --git a/integrations/helm/chartsync/utils.go b/integrations/helm/chartsync/utils.go deleted file mode 100644 index e79e205fb..000000000 --- a/integrations/helm/chartsync/utils.go +++ /dev/null @@ -1,80 +0,0 @@ -package chartsync - -import ( - "bytes" - "context" - "fmt" - "io" - "io/ioutil" - "os/exec" - "strings" -) - -func (chs *ChartChangeSync) chartChanged(ctx context.Context, dir, revRange, chart string) (bool, error) { - chs.release.Repo.ConfigSync.Lock() - defer chs.release.Repo.ConfigSync.Unlock() - - if len(dir) == 0 || dir[0] != '/' { - return false, fmt.Errorf("directory must provided and must be absolute: [%s]", dir) - } - if len(chart) == 0 { - return false, fmt.Errorf("chart must provided: [%s]", chart) - } - - out := &bytes.Buffer{} - if err := chs.execDiffCmd(ctx, dir, out, revRange, chart); err != nil { - return false, err - } - - lines := splitLines(out.String()) - if len(lines) < 1 { - return false, nil - } - return true, nil -} - -func splitLines(s string) []string { - outStr := strings.TrimSpace(s) - if outStr == "" { - return nil - } - return strings.Split(outStr, "\n") -} - -// execDiffCmd ... find if there is a change in a particular chart -// git diff revCurr..revNew chart -// (runs in the /repoRoot/charts dir) -// input: -// dir (/repoRoot/charts) -// args (revCurr..revNew, chart) -func (chs *ChartChangeSync) execDiffCmd(ctx context.Context, dir string, out io.Writer, args ...string) error { - args = append([]string{"diff"}, args...) - c := exec.CommandContext(ctx, "git", args...) - - chs.logger.Log("info", fmt.Sprintf("Running command: git %v", args)) - - c.Dir = dir - - c.Stdout = ioutil.Discard - if out != nil { - c.Stdout = out - } - errOut := &bytes.Buffer{} - c.Stderr = errOut - - err := c.Run() - if err != nil { - if exitError, ok := err.(*exec.ExitError); ok { - chs.logger.Log("error", fmt.Sprintf("Failure while running git diff: %#v", exitError.Error())) - return err - } - } - - if ctx.Err() == context.DeadlineExceeded { - return fmt.Errorf("running git diff command: %s %v", "git", args) - } else if ctx.Err() == context.Canceled { - return fmt.Errorf("context was unexpectedly cancelled when running command: %s %v", "gitt", args) - } - - return err -} diff --git a/integrations/helm/git/git.go b/integrations/helm/git/git.go deleted file mode 100644 index 6c2b4a35a..000000000 --- a/integrations/helm/git/git.go +++ /dev/null @@ -1,210 +0,0 @@ -package git - -import ( - "context" - "errors" - "fmt" - "io/ioutil" - "os" - "path" - "sync" - "time" - - "github.com/go-kit/kit/log" - "golang.org/x/crypto/ssh" - gogit "gopkg.in/src-d/go-git.v4" - "gopkg.in/src-d/go-git.v4/plumbing" - gitssh "gopkg.in/src-d/go-git.v4/plumbing/transport/ssh" -) - -const ( - DefaultCloneTimeout = 2 * time.Minute - DefaultPullTimeout = 2 * time.Minute - privateKeyFileMode = os.FileMode(0400) - ChangesClone = "sync_clone" -) - -var ( - ErrNoChanges = errors.New("no changes made in repo") - ErrNoChartsDir = errors.New("no Charts dir provided") - ErrNoRepo = errors.New("no repo provided") - ErrNoRepoCloned = errors.New("no repo cloned") -) - -type GitRemoteConfig struct { - URL string `json:"url"` - Branch string `json:"branch"` - Path string `json:"path"` -} - -// Checkout is a local clone of the remote repo. -type Checkout struct { - Logger log.Logger - Config GitRemoteConfig // remote repo info provided by the user - auth *gitssh.PublicKeys - Dir string // directory where the repo was cloned (repo root) - Repo *gogit.Repository // cloned repo info - worktree *gogit.Worktree - sync.RWMutex -} - -// NewGitRemoteConfig ... sets up git repo configuration. -func NewGitRemoteConfig(url, branch, path string) (GitRemoteConfig, error) { - if len(url) == 0 { - return GitRemoteConfig{}, errors.New("git repo URL must be provided") - } - if len(branch) == 0 { - branch = "master" - } - if len(path) == 0 || (len(path) != 0 && path[0] == '/') { - return GitRemoteConfig{}, errors.New("git subdirectory (--git-charts-path) must be provided and cannot have leading forward slash") - } - - return GitRemoteConfig{ - URL: url, - Branch: branch, - Path: path, - }, nil -} - -// SetupRepo creates a new checkout and clones repo until ready -func RepoSetup(logger log.Logger, auth *gitssh.PublicKeys, config GitRemoteConfig, cloneSubdir string) *Checkout { - checkout := &Checkout{ - Logger: logger, - Config: config, - auth: auth, - } - // If cloning not immediately possible, we wait until it is ----------------------------- - var err error - for { - ctx, cancel := context.WithTimeout(context.Background(), DefaultCloneTimeout) - err = checkout.Clone(ctx, cloneSubdir) - cancel() - if err == nil { - break - } - logger.Log("error", fmt.Sprintf("Failed to clone git repo [%s, %s, %s]: %v", config.URL, config.Path, config.Branch, err)) - time.Sleep(10 * time.Second) - } - - return checkout -} - -// Clone creates a local clone of a remote repo and -// checks out the relevant branch -// subdir reflects the purpose of the clone: -// * acting on Charts changes (syncing the cluster when there were only commits -// in the Charts parts of the repo which did not trigger Custom Resource changes) -func (ch *Checkout) Clone(ctx context.Context, cloneSubdir string) error { - ch.Lock() - defer ch.Unlock() - - if ch.Config.URL == "" { - return ErrNoRepo - } - - repoDir, err := ioutil.TempDir(os.TempDir(), cloneSubdir) - if err != nil { - return err - } - ch.Dir = repoDir - - repo, err := gogit.PlainCloneContext(ctx, repoDir, false, &gogit.CloneOptions{ - URL: ch.Config.URL, - Auth: ch.auth, - ReferenceName: plumbing.ReferenceName(fmt.Sprintf("refs/heads/%s", ch.Config.Branch)), - }) - if err != nil { - return err - } - - wt, err := repo.Worktree() - if err != nil { - return err - } - - ch.Repo = repo - ch.worktree = wt - - ch.Logger.Log("debug", fmt.Sprintf("repo cloned in into %s", ch.Dir)) - - return nil -} - -// Cleanup ... removes the temp repo directory -func (ch *Checkout) Cleanup() { - ch.Lock() - defer ch.Unlock() - - if ch.Dir != "" { - err := os.RemoveAll(ch.Dir) - if err != nil { - ch.Logger.Log("error", err.Error()) - } - } - ch.Dir = "" - ch.Repo = nil - ch.worktree = nil -} - -// GetRepoAuth ... provides git repo authentication based on private ssh key -func GetRepoAuth(sshUsername, k8sSecretVolumeMountPath, k8sSecretDataKey string) (*gitssh.PublicKeys, error) { - privateKeyPath := path.Join(k8sSecretVolumeMountPath, k8sSecretDataKey) - fileInfo, err := os.Stat(privateKeyPath) - switch { - case os.IsNotExist(err): - return &gitssh.PublicKeys{}, err - case err != nil: - return &gitssh.PublicKeys{}, err - case fileInfo.Mode() != privateKeyFileMode: - if err := os.Chmod(privateKeyPath, privateKeyFileMode); err != nil { - return &gitssh.PublicKeys{}, err - } - default: - } - - sshKey, err := ioutil.ReadFile(privateKeyPath) - if err != nil { - return nil, err - } - signer, err := ssh.ParsePrivateKey([]byte(sshKey)) - if err != nil { - return nil, err - } - auth := &gitssh.PublicKeys{User: sshUsername, Signer: signer} - - return auth, nil -} - -// Pull ... makes a git pull -func (ch *Checkout) Pull(ctx context.Context) error { - ch.Lock() - defer ch.Unlock() - - w := ch.worktree - if w == nil { - return ErrNoRepoCloned - } - err := w.Pull(&gogit.PullOptions{ - RemoteName: "origin", - Auth: ch.auth, - ReferenceName: plumbing.ReferenceName(fmt.Sprintf("refs/heads/%s", ch.Config.Branch)), - }) - if err != nil && err != gogit.NoErrAlreadyUpToDate { - return err - } - return nil -} - -// GetRevision returns string representation of the revision hash -func (ch *Checkout) GetRevision() (plumbing.Hash, error) { - if ch.Repo == nil { - return plumbing.Hash{}, ErrNoRepoCloned - } - ref, err := ch.Repo.Head() - if err != nil { - return plumbing.Hash{}, err - } - rev := ref.Hash() - return rev, nil -} diff --git a/integrations/helm/helm.go b/integrations/helm/helm.go index 3a4771f7b..070c786d2 100644 --- a/integrations/helm/helm.go +++ b/integrations/helm/helm.go @@ -11,8 +11,20 @@ import ( k8shelm "k8s.io/helm/pkg/helm" rls "k8s.io/helm/pkg/proto/hapi/services" "k8s.io/helm/pkg/tlsutil" + + "github.com/weaveworks/flux/git" +) + +const ( + GitOperationTimeout = 30 * time.Second ) +type RepoConfig struct { + Repo *git.Repo + Branch string + ChartsPath string +} + type TillerOptions struct { IP string Port string diff --git a/integrations/helm/operator/operator.go b/integrations/helm/operator/operator.go index 038773e24..88c67a226 100644 --- a/integrations/helm/operator/operator.go +++ b/integrations/helm/operator/operator.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/go-kit/kit/log" "github.com/golang/glog" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -19,13 +20,11 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" - "github.com/go-kit/kit/log" - ifv1 "github.com/weaveworks/flux/apis/helm.integrations.flux.weave.works/v1alpha2" ifscheme "github.com/weaveworks/flux/integrations/client/clientset/versioned/scheme" fhrv1 "github.com/weaveworks/flux/integrations/client/informers/externalversions/helm.integrations.flux.weave.works/v1alpha2" iflister "github.com/weaveworks/flux/integrations/client/listers/helm.integrations.flux.weave.works/v1alpha2" - helmgit "github.com/weaveworks/flux/integrations/helm/git" + helmop "github.com/weaveworks/flux/integrations/helm" chartrelease "github.com/weaveworks/flux/integrations/helm/release" ) @@ -58,6 +57,7 @@ type Controller struct { fhrSynced cache.InformerSynced release *chartrelease.Release + config helmop.RepoConfig // workqueue is a rate limited work queue. This is used to queue work to be // processed instead of performing it as soon as a change happens. This @@ -76,7 +76,8 @@ func New( logger log.Logger, kubeclientset kubernetes.Interface, fhrInformer fhrv1.FluxHelmReleaseInformer, - release *chartrelease.Release) *Controller { + release *chartrelease.Release, + config helmop.RepoConfig) *Controller { // Add helm-operator types to the default Kubernetes Scheme so Events can be // logged for helm-operator types. @@ -93,6 +94,7 @@ func New( release: release, releaseWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ChartRelease"), recorder: recorder, + config: config, } controller.logger.Log("info", "Setting up event handlers") @@ -263,15 +265,16 @@ func (c *Controller) syncHandler(key string) error { } // Chart installation of the appropriate type - ctx, cancel := context.WithTimeout(context.Background(), helmgit.DefaultCloneTimeout) - err = c.release.Repo.ConfigSync.Pull(ctx) + ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) + clone, err := c.config.Repo.Export(ctx, c.config.Branch) cancel() if err != nil { - return fmt.Errorf("Failure to do git pull: %s", err.Error()) + return fmt.Errorf("Failure to clone repo: %s", err.Error()) } + defer clone.Clean() opts := chartrelease.InstallOptions{DryRun: false} - _, err = c.release.Install(c.release.Repo.ConfigSync, releaseName, *fhr, syncType, opts) + _, err = c.release.Install(clone.Dir(), releaseName, *fhr, syncType, opts) if err != nil { return err } @@ -344,7 +347,6 @@ func (c *Controller) deleteRelease(fhr ifv1.FluxHelmRelease) { // needsUpdate compares two FluxHelmRelease and determines if any changes occurred func needsUpdate(old, new ifv1.FluxHelmRelease) bool { - oldValues, err := old.Spec.Values.YAML() if err != nil { return false diff --git a/integrations/helm/release/release.go b/integrations/helm/release/release.go index 2e215e020..50c40cec6 100644 --- a/integrations/helm/release/release.go +++ b/integrations/helm/release/release.go @@ -6,7 +6,6 @@ import ( "fmt" "os/exec" "path/filepath" - "sync" "time" "github.com/go-kit/kit/log" @@ -16,7 +15,6 @@ import ( "github.com/weaveworks/flux" ifv1 "github.com/weaveworks/flux/apis/helm.integrations.flux.weave.works/v1alpha2" fluxk8s "github.com/weaveworks/flux/cluster/kubernetes" - helmgit "github.com/weaveworks/flux/integrations/helm/git" ) var ( @@ -30,31 +28,23 @@ const ( UpgradeAction Action = "UPDATE" ) +type Config struct { + ChartsPath string +} + // Release contains clients needed to provide functionality related to helm releases type Release struct { - logger log.Logger + logger log.Logger + HelmClient *k8shelm.Client - Repo repo - sync.RWMutex -} -func (r *Release) ConfigSync() *helmgit.Checkout { - return r.Repo.ConfigSync + config Config } type Releaser interface { GetCurrent() (map[string][]DeployInfo, error) GetDeployedRelease(name string) (*hapi_release.Release, error) - Install(checkout *helmgit.Checkout, - releaseName string, - fhr ifv1.FluxHelmRelease, - action Action, - opts InstallOptions) (*hapi_release.Release, error) - ConfigSync() *helmgit.Checkout -} - -type repo struct { - ConfigSync *helmgit.Checkout + Install(dir string, releaseName string, fhr ifv1.FluxHelmRelease, action Action, opts InstallOptions) (*hapi_release.Release, error) } type DeployInfo struct { @@ -66,15 +56,13 @@ type InstallOptions struct { ReuseName bool } -// New creates a new Release instance -func New(logger log.Logger, helmClient *k8shelm.Client, configCheckout *helmgit.Checkout) *Release { - repo := repo{ - ConfigSync: configCheckout, - } +// New creates a new Release instance. +func New(logger log.Logger, helmClient *k8shelm.Client, config Config) *Release { + // TODO(michael): check we don't have nil values in the config r := &Release{ logger: logger, HelmClient: helmClient, - Repo: repo, + config: config, } return r } @@ -114,9 +102,7 @@ func (r *Release) Exists(name string) (bool, error) { } func (r *Release) canDelete(name string) (bool, error) { - r.Lock() rls, err := r.HelmClient.ReleaseStatus(name) - r.Unlock() if err != nil { r.logger.Log("error", fmt.Sprintf("Error finding status for release (%s): %#v", name, err)) @@ -148,9 +134,11 @@ func (r *Release) canDelete(name string) (bool, error) { } } -// Install performs Chart release. Depending on the release type, this is either a new release, -// or an upgrade of an existing one -func (r *Release) Install(checkout *helmgit.Checkout, releaseName string, fhr ifv1.FluxHelmRelease, action Action, opts InstallOptions) (*hapi_release.Release, error) { +// Install performs a Chart release given the directory containing the +// charts, and the FluxHelmRelease specifying the release. Depending +// on the release type, this is either a new release, or an upgrade of +// an existing one. +func (r *Release) Install(repoDir, releaseName string, fhr ifv1.FluxHelmRelease, action Action, opts InstallOptions) (*hapi_release.Release, error) { r.logger.Log("info", fmt.Sprintf("releaseName= %s, action=%s, install options: %+v", releaseName, action, opts)) chartPath := fhr.Spec.ChartGitPath @@ -164,7 +152,7 @@ func (r *Release) Install(checkout *helmgit.Checkout, releaseName string, fhr if namespace = "default" } - chartDir := filepath.Join(checkout.Dir, checkout.Config.Path, chartPath) + chartDir := filepath.Join(repoDir, r.config.ChartsPath, chartPath) strVals, err := fhr.Spec.Values.YAML() if err != nil { @@ -175,7 +163,6 @@ func (r *Release) Install(checkout *helmgit.Checkout, releaseName string, fhr if switch action { case InstallAction: - r.Lock() res, err := r.HelmClient.InstallRelease( chartDir, namespace, @@ -190,7 +177,6 @@ func (r *Release) Install(checkout *helmgit.Checkout, releaseName string, fhr if helm.InstallWait(i.wait) */ ) - r.Unlock() if err != nil { r.logger.Log("error", fmt.Sprintf("Chart release failed: %s: %#v", releaseName, err)) @@ -201,7 +187,6 @@ func (r *Release) Install(checkout *helmgit.Checkout, releaseName string, fhr if } return res.Release, err case UpgradeAction: - r.Lock() res, err := r.HelmClient.UpdateRelease( releaseName, chartDir, @@ -217,7 +202,6 @@ func (r *Release) Install(checkout *helmgit.Checkout, releaseName string, fhr if helm.UpgradeWait(u.wait)) */ ) - r.Unlock() if err != nil { r.logger.Log("error", fmt.Sprintf("Chart upgrade release failed: %s: %#v", releaseName, err)) @@ -244,9 +228,7 @@ func (r *Release) Delete(name string) error { return nil } - r.Lock() _, err = r.HelmClient.DeleteRelease(name, k8shelm.DeletePurge(true)) - r.Unlock() if err != nil { r.logger.Log("error", fmt.Sprintf("Release deletion error: %#v", err)) return err @@ -255,7 +237,7 @@ func (r *Release) Delete(name string) error { return nil } -// GetCurrentWithDate provides Chart releases (stored in tiller ConfigMaps) +// GetCurrent provides Chart releases (stored in tiller ConfigMaps) // output: // map[namespace][release name] = nil func (r *Release) GetCurrent() (map[string][]DeployInfo, error) { diff --git a/integrations/helm/releasesync/releasesync.go b/integrations/helm/releasesync/releasesync.go deleted file mode 100644 index 4aae14b6d..000000000 --- a/integrations/helm/releasesync/releasesync.go +++ /dev/null @@ -1,325 +0,0 @@ -/* -When a Chart release is manually deleted/upgraded/created, the cluster gets out of sync -with the prescribed state defined by FluxHelmRelease custom resources. The releasesync package -attemps to bring the cluster back to the prescribed state. -*/ -package releasesync - -import ( - "context" - "fmt" - "os" - "path/filepath" - "strings" - - "github.com/pkg/errors" - - protobuf "github.com/golang/protobuf/ptypes/timestamp" - hapi_release "k8s.io/helm/pkg/proto/hapi/release" - - "github.com/go-kit/kit/log" - - ifv1 "github.com/weaveworks/flux/apis/helm.integrations.flux.weave.works/v1alpha2" - ifclientset "github.com/weaveworks/flux/integrations/client/clientset/versioned" - "github.com/weaveworks/flux/integrations/helm/customresource" - helmgit "github.com/weaveworks/flux/integrations/helm/git" - chartrelease "github.com/weaveworks/flux/integrations/helm/release" -) - -const ( - syncDelay = 90 -) - -type releaseFhr struct { - RelName string - Fhr ifv1.FluxHelmRelease -} - -// ReleaseChangeSync implements DoReleaseChangeSync to return the cluster to the -// state dictated by Custom Resources after manual Chart release(s). -type ReleaseChangeSync struct { - logger log.Logger - release chartrelease.Releaser -} - -// New creates a ReleaseChangeSync. -func New( - logger log.Logger, - releaser chartrelease.Releaser) *ReleaseChangeSync { - - return &ReleaseChangeSync{ - logger: logger, - release: releaser, - } -} - -type customResourceInfo struct { - name, releaseName string - resource ifv1.FluxHelmRelease - lastUpdated protobuf.Timestamp -} - -type chartRelease struct { - releaseName string - action chartrelease.Action - desiredState ifv1.FluxHelmRelease -} - -// DoReleaseChangeSync returns the cluster to the state dictated by Custom Resources -// after manual Chart release(s). -func (rs *ReleaseChangeSync) DoReleaseChangeSync(ifClient ifclientset.Clientset, ns []string) (bool, error) { - ctx, cancel := context.WithTimeout(context.Background(), helmgit.DefaultCloneTimeout) - relsToSync, err := rs.releasesToSync(ctx, ifClient, ns) - cancel() - if err != nil { - err = errors.Wrap(err, "getting info about manual chart release changes") - rs.logger.Log("error", err) - return false, err - } - if len(relsToSync) == 0 { - return false, nil - } - - ctx, cancel = context.WithTimeout(context.Background(), helmgit.DefaultCloneTimeout) - err = rs.sync(ctx, relsToSync) - cancel() - if err != nil { - return false, errors.Wrap(err, "syncing cluster after manual chart release changes") - } - - return true, nil -} - -// getCustomResources retrieves FluxHelmRelease resources -// and returns them organised by namespace and chart release name. -// map[namespace] = []releaseFhr. -func (rs *ReleaseChangeSync) getCustomResources( - ifClient ifclientset.Clientset, - namespaces []string) (map[string][]releaseFhr, error) { - - relInfo := make(map[string][]releaseFhr) - - for _, ns := range namespaces { - list, err := customresource.GetNSCustomResources(ifClient, ns) - if err != nil { - return nil, errors.Wrap(err, - fmt.Sprintf("retrieving FluxHelmReleases in namespace %s", ns)) - } - - rf := []releaseFhr{} - for _, fhr := range list.Items { - relName := chartrelease.GetReleaseName(fhr) - rf = append(rf, releaseFhr{RelName: relName, Fhr: fhr}) - } - if len(rf) > 0 { - relInfo[ns] = rf - } - } - return relInfo, nil -} - -// shouldUpgrade returns true if the current running values or chart -// don't match what the repo says we ought to be running, based on -// doing a dry run install from the chart in the git repo. -func (rs *ReleaseChangeSync) shouldUpgrade( - currRel *hapi_release.Release, - fhr ifv1.FluxHelmRelease) (bool, error) { - - if currRel == nil { - return false, fmt.Errorf("No Chart release provided for %v", fhr.GetName()) - } - - currVals := currRel.GetConfig().GetRaw() - currChart := currRel.GetChart().String() - - // Get the desired release state - opts := chartrelease.InstallOptions{DryRun: true} - tempRelName := strings.Join([]string{currRel.GetName(), "temp"}, "-") - desRel, err := rs.release.Install(rs.release.ConfigSync(), tempRelName, fhr, "CREATE", opts) - if err != nil { - return false, err - } - desVals := desRel.GetConfig().GetRaw() - desChart := desRel.GetChart().String() - - // compare values && Chart - if currVals != desVals { - rs.logger.Log("error", fmt.Sprintf("Release %s: values have diverged due to manual Chart release", currRel.GetName())) - return true, nil - } - if currChart != desChart { - rs.logger.Log("error", fmt.Sprintf("Release %s: Chart has diverged due to manual Chart release", currRel.GetName())) - return true, nil - } - - return false, nil -} - -// addExistingReleasesToSync populates relsToSync (map from namespace -// to chartRelease) with the members of currentReleases that need -// updating because they're diverged from the desired state. Desired -// state is specified by customResources and what's in our git checkout. -func (rs *ReleaseChangeSync) addExistingReleasesToSync( - relsToSync map[string][]chartRelease, - currentReleases map[string]map[string]struct{}, - customResources map[string]map[string]ifv1.FluxHelmRelease) error { - - var chRels []chartRelease - for ns, nsRelsM := range currentReleases { - chRels = relsToSync[ns] - for relName := range nsRelsM { - if customResources[ns] == nil { - continue - } - // We are ignoring Charts that are not under flux/helm-operator control - fhr, ok := customResources[ns][relName] - if !ok { - continue - } - rel, err := rs.release.GetDeployedRelease(relName) - if err != nil { - return err - } - doUpgrade, err := rs.shouldUpgrade(rel, fhr) - if err != nil { - return err - } - if doUpgrade { - chr := chartRelease{ - releaseName: relName, - action: chartrelease.UpgradeAction, - desiredState: fhr, - } - chRels = append(chRels, chr) - } - } - if len(chRels) > 0 { - relsToSync[ns] = chRels - } - } - return nil -} - -// addDeletedReleasesToSync populates relsToSync (map from namespace -// to chartRelease) with chartReleases based on the charts referenced -// in customResources that are absent from currentReleases. -func (rs *ReleaseChangeSync) addDeletedReleasesToSync( - relsToSync map[string][]chartRelease, - currentReleases map[string]map[string]struct{}, - customResources map[string]map[string]ifv1.FluxHelmRelease) error { - - var chRels []chartRelease - for ns, nsFhrs := range customResources { - chRels = relsToSync[ns] - - for relName, fhr := range nsFhrs { - // there are Custom Resources (CRs) in this namespace - // missing Chart release even though there is a CR - if currentReleases[ns] == nil { - chr := chartRelease{ - releaseName: relName, - action: chartrelease.InstallAction, - desiredState: fhr, - } - chRels = append(chRels, chr) - continue - } - if _, ok := currentReleases[ns][relName]; !ok { - chr := chartRelease{ - releaseName: relName, - action: chartrelease.InstallAction, - desiredState: fhr, - } - chRels = append(chRels, chr) - } - } - if len(chRels) > 0 { - relsToSync[ns] = chRels - } - } - return nil -} - -// releasesToSync queries Tiller to get all current Helm releases, queries k8s -// custom resources to get all FluxHelmRelease(s), and returns a map from -// namespace to chartRelease(s) that need to be synced. -func (rs *ReleaseChangeSync) releasesToSync( - ctx context.Context, - ifClient ifclientset.Clientset, - ns []string) (map[string][]chartRelease, error) { - - relDepl, err := rs.release.GetCurrent() - if err != nil { - return nil, err - } - curRels := mappifyDeployInfo(relDepl) - - relCrs, err := rs.getCustomResources(ifClient, ns) - if err != nil { - return nil, err - } - crs := mappifyReleaseFhrInfo(relCrs) - - relsToSync := make(map[string][]chartRelease) - - // FIXME: we probably shouldn't be throwing away errors - _ = rs.addDeletedReleasesToSync(relsToSync, curRels, crs) - _ = rs.addExistingReleasesToSync(relsToSync, curRels, crs) - - return relsToSync, nil -} - -// sync takes a map from namespace to list of chartRelease(s) -// that need to be applied, and attempts to apply them. -// It returns the first error encountered. A chart missing -// from the repo doesn't count as an error, but will be logged. -func (rs *ReleaseChangeSync) sync( - ctx context.Context, - releases map[string][]chartRelease) error { - - // TODO it's weird that we do a pull here, after we've already decided - // what to do. Ask why. - ctx, cancel := context.WithTimeout(ctx, helmgit.DefaultPullTimeout) - err := rs.release.ConfigSync().Pull(ctx) - cancel() - if err != nil { - return fmt.Errorf("Failure while pulling repo: %#v", err) - } - - checkout := rs.release.ConfigSync() - chartPathBase := filepath.Join(checkout.Dir, checkout.Config.Path) - - opts := chartrelease.InstallOptions{DryRun: false} - - for ns, relsToProcess := range releases { - - for _, chr := range relsToProcess { - - // sanity check - chartPath := filepath.Join(chartPathBase, chr.desiredState.Spec.ChartGitPath) - if _, err := os.Stat(chartPath); os.IsNotExist(err) { - rs.logger.Log("error", fmt.Sprintf("Missing Chart %s. No release can happen.", chartPath)) - continue - } - - relName := chr.releaseName - switch chr.action { - case chartrelease.UpgradeAction: - rs.logger.Log("info", fmt.Sprintf("Resyncing manually upgraded Chart release %s (namespace %s)", relName, ns)) - _, err := rs.release.Install(checkout, relName, chr.desiredState, chartrelease.UpgradeAction, opts) - if err != nil { - return err - } - case chartrelease.InstallAction: - rs.logger.Log("info", fmt.Sprintf("Installing manually deleted Chart release %s (namespace %s)", relName, ns)) - _, err := rs.release.Install(checkout, relName, chr.desiredState, chartrelease.InstallAction, opts) - if err != nil { - return err - } - default: - panic(fmt.Sprintf("invalid action %q", chr.action)) - } - } - } - return nil -} diff --git a/integrations/helm/releasesync/releasesync_test.go b/integrations/helm/releasesync/releasesync_test.go deleted file mode 100644 index 6eede7c79..000000000 --- a/integrations/helm/releasesync/releasesync_test.go +++ /dev/null @@ -1,353 +0,0 @@ -package releasesync - -import ( - "fmt" - "os" - "testing" - - proto "github.com/golang/protobuf/proto" - "github.com/google/go-cmp/cmp" - "k8s.io/helm/pkg/helm" - hapi_chart "k8s.io/helm/pkg/proto/hapi/chart" - hapi_release "k8s.io/helm/pkg/proto/hapi/release" - - "github.com/go-kit/kit/log" - ifv1 "github.com/weaveworks/flux/apis/helm.integrations.flux.weave.works/v1alpha2" - helmgit "github.com/weaveworks/flux/integrations/helm/git" - "github.com/weaveworks/flux/integrations/helm/release" - chartrelease "github.com/weaveworks/flux/integrations/helm/release" -) - -type installReq struct { - checkoutDir string - releaseName string - fhr ifv1.FluxHelmRelease - action chartrelease.Action - opts chartrelease.InstallOptions -} - -type installResult struct { - release hapi_release.Release - err error -} - -type install struct { - installReq - installResult -} - -type mockReleaser struct { - current map[string][]chartrelease.DeployInfo - deployed map[string]*hapi_release.Release - configSync *helmgit.Checkout - installs []install -} - -func (r *mockReleaser) GetCurrent() (map[string][]chartrelease.DeployInfo, error) { - if r.current == nil { - return nil, fmt.Errorf("failed to fetch current releases") - } - return r.current, nil -} - -func (r *mockReleaser) GetDeployedRelease(name string) (*hapi_release.Release, error) { - if _, present := r.deployed[name]; !present { - return nil, fmt.Errorf("no release hamed %q", name) - } - return r.deployed[name], nil -} - -func (r *mockReleaser) ConfigSync() *helmgit.Checkout { - return r.configSync -} - -func (r *mockReleaser) Install(checkout *helmgit.Checkout, - releaseName string, - fhr ifv1.FluxHelmRelease, - action chartrelease.Action, - opts chartrelease.InstallOptions) (*hapi_release.Release, error) { - req := installReq{ - checkoutDir: checkout.Dir, - releaseName: releaseName, - fhr: fhr, - action: action, - opts: opts} - cmpopts := cmp.AllowUnexported(installReq{}) - for _, i := range r.installs { - if cmp.Equal(i.installReq, req, cmpopts) { - return &i.installResult.release, i.installResult.err - } - } - return nil, fmt.Errorf("unexpected request: %+v", req) -} - -func makeCurRel(ns string, relNames ...string) map[string]map[string]struct{} { - m := make(map[string]map[string]struct{}) - m[ns] = make(map[string]struct{}) - for _, relName := range relNames { - m[ns][relName] = struct{}{} - } - return m -} - -func mergeCurRels(a, b map[string]map[string]struct{}) map[string]map[string]struct{} { - m := make(map[string]map[string]struct{}) - for ns := range a { - m[ns] = a[ns] - } - for ns := range b { - if _, present := m[ns]; present { - panic("ns '" + ns + "' present in both a and b") - } - m[ns] = b[ns] - } - return m -} - -func makeCustRes(ns string, relNames ...string) map[string]map[string]ifv1.FluxHelmRelease { - m := make(map[string]map[string]ifv1.FluxHelmRelease) - m[ns] = make(map[string]ifv1.FluxHelmRelease) - for _, relName := range relNames { - m[ns][relName] = ifv1.FluxHelmRelease{} - } - return m -} - -func mergeCustRes(a, b map[string]map[string]ifv1.FluxHelmRelease) map[string]map[string]ifv1.FluxHelmRelease { - m := make(map[string]map[string]ifv1.FluxHelmRelease) - for ns := range a { - m[ns] = a[ns] - } - for ns := range b { - if _, present := m[ns]; present { - panic("ns '" + ns + "' present in both a and b") - } - m[ns] = b[ns] - } - return m -} - -func TestAddDeletedReleasesToSync(t *testing.T) { - var zeromap = make(map[string][]chartRelease) - var tests = []struct { - msg string - currentReleases map[string]map[string]struct{} - customResources map[string]map[string]ifv1.FluxHelmRelease - want map[string][]chartRelease - }{ - { - msg: "no-op, zero resources", - currentReleases: makeCurRel("ns1", "r1"), - customResources: make(map[string]map[string]ifv1.FluxHelmRelease), - want: zeromap, - }, - { - msg: "no-op, equality", - currentReleases: makeCurRel("ns1", "r1"), - customResources: makeCustRes("ns1", "r1"), - want: zeromap, - }, - { - msg: "add missing release", - currentReleases: makeCurRel("ns1"), - customResources: makeCustRes("ns1", "r1"), - want: map[string][]chartRelease{"ns1": []chartRelease{ - chartRelease{releaseName: "r1", action: release.InstallAction}}}, - }, - { - msg: "add missing release new namespace", - currentReleases: makeCurRel("ns1"), - customResources: makeCustRes("ns2", "r1"), - want: map[string][]chartRelease{"ns2": []chartRelease{ - chartRelease{releaseName: "r1", action: release.InstallAction}}}, - }, - { - msg: "add missing releases multi namespace", - currentReleases: mergeCurRels(makeCurRel("ns1"), - makeCurRel("ns2", "r2")), - customResources: mergeCustRes(makeCustRes("ns1", "r1"), - makeCustRes("ns2", "r2", "r3")), - want: map[string][]chartRelease{ - "ns1": []chartRelease{chartRelease{releaseName: "r1", action: release.InstallAction}}, - "ns2": []chartRelease{chartRelease{releaseName: "r3", action: release.InstallAction}}, - }, - }, - } - - opts := cmp.AllowUnexported(chartRelease{}) - rs := New(log.NewLogfmtLogger(os.Stdout), nil) - for i, test := range tests { - var got = make(map[string][]chartRelease) - err := rs.addDeletedReleasesToSync(got, test.currentReleases, test.customResources) - if err != nil { - t.Errorf("%d %s: got error: %v", i, test.msg, err) - } - if diff := cmp.Diff(got, test.want, opts); diff != "" { - t.Errorf("%d %s: diff (-got +want)\n%s", i, test.msg, diff) - } - - } -} - -func config(vals map[string]string) *hapi_chart.Config { - pv := make(map[string]*hapi_chart.Value) - for k, v := range vals { - pv[k] = &hapi_chart.Value{Value: v} - } - - c := &hapi_chart.Config{Values: pv} - // Marshalling to get c.Raw populated - data, _ := proto.Marshal(c) - _ = proto.Unmarshal(data, c) - return c -} - -func relvals(name string, vals string) *hapi_release.Release { - rel := helm.ReleaseMock(&helm.MockReleaseOptions{Name: name}) - rel.Config.Raw = vals - return rel -} - -func relchart(name string, chartname string, chartver string, tmplname string) *hapi_release.Release { - return helm.ReleaseMock(&helm.MockReleaseOptions{Name: name, Chart: &hapi_chart.Chart{ - Metadata: &hapi_chart.Metadata{ - Name: chartname, - Version: chartver, - }, - Templates: []*hapi_chart.Template{ - {Name: tmplname, Data: []byte(helm.MockManifest)}, - }, - }}) -} - -func TestAddExistingReleasesToSync(t *testing.T) { - var zeromap = make(map[string][]chartRelease) - var tests = []struct { - msg string - currentReleases map[string]map[string]struct{} - customResources map[string]map[string]ifv1.FluxHelmRelease - want map[string][]chartRelease - releaser chartrelease.Releaser - wanterror error - }{ - { - msg: "no-op, zero resources", - currentReleases: makeCurRel("ns1", "r1"), - customResources: make(map[string]map[string]ifv1.FluxHelmRelease), - want: zeromap, - }, - { - msg: "no-op, no overlap", - currentReleases: makeCurRel("ns1", "r1"), - customResources: mergeCustRes( - makeCustRes("ns1", "r2"), - makeCustRes("ns2", "r1")), - want: zeromap, - }, - { - msg: "get deployed release fails", - currentReleases: makeCurRel("ns1", "r1"), - customResources: makeCustRes("ns1", "r1"), - releaser: &mockReleaser{}, - wanterror: fmt.Errorf("no release hamed %q", "r1"), - }, - { - msg: "dry-run install fails", - currentReleases: makeCurRel("ns1", "r1"), - customResources: makeCustRes("ns1", "r1"), - releaser: &mockReleaser{ - configSync: &helmgit.Checkout{Dir: "dir"}, - deployed: map[string]*hapi_release.Release{ - "r1": relvals("r1", `k1: "v1"`), - }, - installs: []install{ - dryinst("r1", *relvals("", ""), fmt.Errorf("dry-run failed")), - }, - }, - wanterror: fmt.Errorf("dry-run failed"), - }, - { - msg: "r1 vals changed, r2 unchanged", - currentReleases: mergeCurRels( - makeCurRel("ns1", "r1"), - makeCurRel("ns2", "r2")), - customResources: mergeCustRes( - makeCustRes("ns1", "r1"), - makeCustRes("ns2", "r2")), - releaser: &mockReleaser{ - configSync: &helmgit.Checkout{Dir: "dir"}, - deployed: map[string]*hapi_release.Release{ - "r1": relvals("r1", `k1: "v1"`), - "r2": relvals("r2", `k1: "v1"`), - }, - installs: []install{ - dryinst("r1", *relvals("r1", `k1: "v2"`), nil), - dryinst("r2", *relvals("r2", `k1: "v1"`), nil), - }, - }, - want: map[string][]chartRelease{"ns1": []chartRelease{ - chartRelease{releaseName: "r1", action: release.Action("UPDATE")}, - }}, - }, - { - msg: "r1/r2/r3 charts changed, r4 unchanged", - currentReleases: mergeCurRels(mergeCurRels( - makeCurRel("ns1", "r1"), - makeCurRel("ns2", "r2")), - makeCurRel("ns3", "r3", "r4")), - customResources: mergeCustRes(mergeCustRes( - makeCustRes("ns1", "r1"), - makeCustRes("ns2", "r2")), - makeCustRes("ns3", "r3", "r4")), - releaser: &mockReleaser{ - configSync: &helmgit.Checkout{Dir: "dir"}, - deployed: map[string]*hapi_release.Release{ - "r1": relchart("r1", "c1", "v0.1.0", "templates/foo.tpl"), - "r2": relchart("r2", "c2", "v0.1.0", "templates/bar.tpl"), - "r3": relchart("r3", "c3", "v0.1.0", "templates/baz.tpl"), - "r4": relchart("r4", "c4", "v0.1.0", "templates/qux.tpl"), - }, - installs: []install{ - dryinst("r1", *relchart("r1", "c-", "v0.1.0", "templates/foo.tpl"), nil), - dryinst("r2", *relchart("r2", "c2", "v0.1.1", "templates/bar.tpl"), nil), - dryinst("r3", *relchart("r3", "c3", "v0.1.0", "templates/foo.tpl"), nil), - dryinst("r4", *relchart("r4", "c4", "v0.1.0", "templates/qux.tpl"), nil), - }, - }, - want: map[string][]chartRelease{ - "ns1": []chartRelease{chartRelease{releaseName: "r1", action: release.Action("UPDATE")}}, - "ns2": []chartRelease{chartRelease{releaseName: "r2", action: release.Action("UPDATE")}}, - "ns3": []chartRelease{chartRelease{releaseName: "r3", action: release.Action("UPDATE")}}, - }, - }, - } - - opts := cmp.AllowUnexported(chartRelease{}) - for i, test := range tests { - rs := New(log.NewLogfmtLogger(os.Stdout), test.releaser) - var got = make(map[string][]chartRelease) - err := rs.addExistingReleasesToSync(got, test.currentReleases, test.customResources) - if fmt.Sprintf("%v", err) != fmt.Sprintf("%v", test.wanterror) { - t.Errorf("%d %s: got error %q, want error %q", i, test.msg, err, test.wanterror) - } - if test.wanterror != nil { - continue - } - if diff := cmp.Diff(got, test.want, opts); diff != "" { - t.Errorf("%d %s: diff (-got +want)\n%s", i, test.msg, diff) - } - - } -} - -func dryinst(relname string, rel hapi_release.Release, err error) install { - return install{ - installReq{ - checkoutDir: "dir", - releaseName: relname + "-temp", - action: "CREATE", - opts: chartrelease.InstallOptions{DryRun: true}, - }, - installResult{rel, err}, - } -} diff --git a/integrations/helm/releasesync/utils.go b/integrations/helm/releasesync/utils.go deleted file mode 100644 index 3b77b5b07..000000000 --- a/integrations/helm/releasesync/utils.go +++ /dev/null @@ -1,39 +0,0 @@ -package releasesync - -import ( - ifv1 "github.com/weaveworks/flux/apis/helm.integrations.flux.weave.works/v1alpha2" - "github.com/weaveworks/flux/integrations/helm/release" -) - -// mappifyDeployInfo takes a map of namespace -> []DeployInfo, -// returning a map whose keys are the same namespaces -// and whose values are key-only maps holding the DeployInfo names. -func mappifyDeployInfo(releases map[string][]release.DeployInfo) map[string]map[string]struct{} { - deployM := make(map[string]map[string]struct{}) - - for ns, nsRels := range releases { - nsDeployM := make(map[string]struct{}) - for _, r := range nsRels { - nsDeployM[r.Name] = struct{}{} - } - deployM[ns] = nsDeployM - } - return deployM -} - -// mappifyReleaseFhrInfo takes a map of namespace -> []releaseFhr, -// returning a map whose keys are the same namespaces -// and whose values are maps of releaseName -> FluxHelmRelease. -func mappifyReleaseFhrInfo(fhrs map[string][]releaseFhr) map[string]map[string]ifv1.FluxHelmRelease { - relFhrM := make(map[string]map[string]ifv1.FluxHelmRelease) - - for ns, nsFhrs := range fhrs { - nsRels := make(map[string]ifv1.FluxHelmRelease) - for _, r := range nsFhrs { - nsRels[r.RelName] = r.Fhr - } - relFhrM[ns] = nsRels - } - - return relFhrM -} From 125a7e01f2ea1e0968146d52d72df42d0926f5d7 Mon Sep 17 00:00:00 2001 From: Michael Bridgen Date: Tue, 24 Jul 2018 17:16:54 +0100 Subject: [PATCH 2/2] Fix git.Export(sha1) `git clone -b ...` does not accept a revision -- it must be a branch or tag -- so we can't use that to export a repo at a revision, as we'd like to do for the helm operator's sync loop. Instead we'll have to clone without an argument, and checkout the desired revision. --- git/export.go | 5 ++++- git/export_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ git/operations.go | 4 ++++ 3 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 git/export_test.go diff --git a/git/export.go b/git/export.go index 968543e5b..1015987ac 100644 --- a/git/export.go +++ b/git/export.go @@ -21,9 +21,12 @@ func (e *Export) Clean() { // Export creates a minimal clone of the repo, at the ref given. func (r *Repo) Export(ctx context.Context, ref string) (*Export, error) { - dir, err := r.workingClone(ctx, ref) + dir, err := r.workingClone(ctx, "") if err != nil { return nil, err } + if err = checkout(ctx, dir, ref); err != nil { + return nil, err + } return &Export{dir}, nil } diff --git a/git/export_test.go b/git/export_test.go new file mode 100644 index 000000000..b5585bb09 --- /dev/null +++ b/git/export_test.go @@ -0,0 +1,44 @@ +package git + +import ( + "context" + "testing" + "time" + + "github.com/weaveworks/flux/cluster/kubernetes/testfiles" +) + +func TestExportAtRevision(t *testing.T) { + newDir, cleanup := testfiles.TempDir(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + err := createRepo(newDir, []string{"config"}) + if err != nil { + t.Fatal(err) + } + repo := NewRepo(Remote{URL: newDir}, ReadOnly) + if err := repo.Ready(ctx); err != nil { + t.Fatal(err) + } + + headMinusOne, err := repo.Revision(ctx, "HEAD^1") + if err != nil { + t.Fatal(err) + } + + export, err := repo.Export(ctx, headMinusOne) + if err != nil { + t.Fatal(err) + } + + exportHead, err := refRevision(ctx, export.dir, "HEAD") + if err != nil { + t.Fatal(err) + } + if headMinusOne != exportHead { + t.Errorf("exported %s, but head in export dir %s is %s", headMinusOne, export.dir, exportHead) + } +} diff --git a/git/operations.go b/git/operations.go index 8db508c56..19202b6d2 100644 --- a/git/operations.go +++ b/git/operations.go @@ -53,6 +53,10 @@ func mirror(ctx context.Context, workingDir, repoURL string) (path string, err e return repoPath, nil } +func checkout(ctx context.Context, workingDir, ref string) error { + return execGitCmd(ctx, workingDir, nil, "checkout", ref) +} + // checkPush sanity-checks that we can write to the upstream repo // (being able to `clone` is an adequate check that we can read the // upstream).