diff --git a/Gopkg.lock b/Gopkg.lock index c7bcdf5b4..77a5a4799 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -164,17 +164,6 @@ packages = ["."] revision = "e89373fe6b4a7413d7acd6da1725b83ef713e6e4" -[[projects]] - name = "github.com/google/go-cmp" - packages = [ - "cmp", - "cmp/internal/diff", - "cmp/internal/function", - "cmp/internal/value" - ] - revision = "3af367b6b30c263d47e8895973edcca9a49cf029" - version = "v0.2.0" - [[projects]] branch = "master" name = "github.com/google/gofuzz" @@ -245,12 +234,6 @@ revision = "76626ae9c91c4f2a10f34cad8ce83ea42c93bb75" version = "v1.0" -[[projects]] - branch = "master" - name = "github.com/jbenet/go-context" - packages = ["io"] - revision = "d14ea06fba99483203c19d92cfcd13ebe73135f4" - [[projects]] name = "github.com/json-iterator/go" packages = ["."] @@ -262,12 +245,6 @@ revision = "b691ffe85e96a3e4ce5de9d8a17f1f884190e6d0" version = "v1.0.0" -[[projects]] - name = "github.com/kevinburke/ssh_config" - packages = ["."] - revision = "0ff8514904a8ebfcfb3c32ad73e1f8498a7f81b4" - version = "0.3" - [[projects]] branch = "master" name = "github.com/kr/logfmt" @@ -280,12 +257,6 @@ revision = "3247c84500bff8d9fb6d579d800f20b3e091582c" version = "v1.0.0" -[[projects]] - branch = "master" - name = "github.com/mitchellh/go-homedir" - packages = ["."] - revision = "b8bc1bf767474819792c23f32d8286a45736f1c6" - [[projects]] name = "github.com/modern-go/concurrent" packages = ["."] @@ -304,12 +275,6 @@ revision = "279bed98673dd5bef374d3b6e4b09e2af76183bf" version = "v1.0.0-rc1" -[[projects]] - name = "github.com/pelletier/go-buffruneio" - packages = ["."] - revision = "c37440a7cf42ac63b919c752ca73a85067e05992" - version = "v0.2.0" - [[projects]] branch = "master" name = "github.com/petar/GoLLRB" @@ -385,12 +350,6 @@ revision = "572520ed46dbddaed19ea3d9541bdd0494163693" version = "v0.1" -[[projects]] - name = "github.com/sergi/go-diff" - packages = ["diffmatchpatch"] - revision = "1744e2970ca51c86172c8190fadad617561ed6e7" - version = "v1.0.0" - [[projects]] name = "github.com/sirupsen/logrus" packages = ["."] @@ -409,17 +368,6 @@ revision = "583c0c0531f06d5278b7d917446061adc344b5cd" version = "v1.0.1" -[[projects]] - name = "github.com/src-d/gcfg" - packages = [ - ".", - "scanner", - "token", - "types" - ] - revision = "f187355171c936ac84a82793659ebb4936bc1c23" - version = "v1.3.0" - [[projects]] name = "github.com/stretchr/testify" packages = ["assert"] @@ -450,33 +398,12 @@ revision = "0599d764e054d4e983bb120e30759179fafe3942" version = "v1.2.0" -[[projects]] - branch = "master" - name = "github.com/xanzy/ssh-agent" - packages = ["."] - revision = "ba9c9e33906f58169366275e3450db66139a31a9" - [[projects]] branch = "master" name = "golang.org/x/crypto" packages = [ - "cast5", - "curve25519", - "ed25519", - "ed25519/internal/edwards25519", - "internal/chacha20", - "openpgp", - "openpgp/armor", - "openpgp/elgamal", - "openpgp/errors", - "openpgp/packet", - "openpgp/s2k", "pbkdf2", - "poly1305", "scrypt", - "ssh", - "ssh/agent", - "ssh/knownhosts", "ssh/terminal" ] revision = "432090b8f568c018896cd8a0fb0345872bbac6ce" @@ -573,71 +500,6 @@ revision = "3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4" version = "v0.9.0" -[[projects]] - name = "gopkg.in/src-d/go-billy.v4" - packages = [ - ".", - "helper/chroot", - "helper/polyfill", - "osfs", - "util" - ] - revision = "027dceab1aa836eb310d92c2eb2266e4dc9f4b81" - version = "v4.1.0" - -[[projects]] - name = "gopkg.in/src-d/go-git.v4" - packages = [ - ".", - "config", - "internal/revision", - "plumbing", - "plumbing/cache", - "plumbing/filemode", - "plumbing/format/config", - "plumbing/format/diff", - "plumbing/format/gitignore", - "plumbing/format/idxfile", - "plumbing/format/index", - "plumbing/format/objfile", - "plumbing/format/packfile", - "plumbing/format/pktline", - "plumbing/object", - "plumbing/protocol/packp", - "plumbing/protocol/packp/capability", - "plumbing/protocol/packp/sideband", - "plumbing/revlist", - "plumbing/storer", - "plumbing/transport", - "plumbing/transport/client", - "plumbing/transport/file", - "plumbing/transport/git", - "plumbing/transport/http", - "plumbing/transport/internal/common", - "plumbing/transport/server", - "plumbing/transport/ssh", - "storage", - "storage/filesystem", - "storage/filesystem/internal/dotgit", - "storage/memory", - "utils/binary", - "utils/diff", - "utils/ioutil", - "utils/merkletrie", - "utils/merkletrie/filesystem", - "utils/merkletrie/index", - "utils/merkletrie/internal/frame", - "utils/merkletrie/noder" - ] - revision = "886dc83f3ed518a78772055497bcc7d7621b468e" - version = "v4.1.1" - -[[projects]] - name = "gopkg.in/warnings.v0" - packages = ["."] - revision = "ec4a0fea49c7b46c2aeb0b51aac55779c607e52b" - version = "v0.1.2" - [[projects]] branch = "v2" name = "gopkg.in/yaml.v2" @@ -883,6 +745,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "753bd801312bfb0aacc2d33e343346c5a73bbdab2cde843d0b31aabefe14895f" + inputs-digest = "39117abf5941771d8502f737e9680717e0d6659d66b5f3da5ec48a142cdc986a" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 3e2a6d966..ec587fc3b 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -42,10 +42,6 @@ required = ["k8s.io/code-generator/cmd/client-gen"] name = "k8s.io/helm" version = "v2.8.1" -[[constraint]] - name = "github.com/google/go-cmp" - version = "0.2.0" - [[constraint]] name = "github.com/justinbarrick/go-k8s-portforward" version = "v1.0.0" diff --git a/cmd/helm-operator/main.go b/cmd/helm-operator/main.go index aa2577577..9ba6d9734 100644 --- a/cmd/helm-operator/main.go +++ b/cmd/helm-operator/main.go @@ -1,35 +1,28 @@ package main import ( - "sync" - "syscall" - "time" - - "net/url" - - "github.com/spf13/pflag" - + "context" "fmt" "os" "os/signal" + "sync" + "syscall" + "time" "github.com/go-kit/kit/log" + "github.com/spf13/pflag" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + "github.com/weaveworks/flux/git" clientset "github.com/weaveworks/flux/integrations/client/clientset/versioned" ifinformers "github.com/weaveworks/flux/integrations/client/informers/externalversions" fluxhelm "github.com/weaveworks/flux/integrations/helm" + helmop "github.com/weaveworks/flux/integrations/helm" "github.com/weaveworks/flux/integrations/helm/chartsync" - "github.com/weaveworks/flux/integrations/helm/git" "github.com/weaveworks/flux/integrations/helm/operator" "github.com/weaveworks/flux/integrations/helm/release" - "github.com/weaveworks/flux/integrations/helm/releasesync" "github.com/weaveworks/flux/integrations/helm/status" - "github.com/weaveworks/flux/ssh" - - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/clientcmd" - - gitssh "gopkg.in/src-d/go-git.v4/plumbing/transport/ssh" ) var ( @@ -51,32 +44,25 @@ var ( tillerTLSCert *string tillerTLSCACert *string - chartsSyncInterval *time.Duration - chartsSyncTimeout *time.Duration - eventHandlerWorkers *uint - - customKubectl *string - gitURL *string - gitBranch *string - //gitConfigPath *string - gitChartsPath *string + chartsSyncInterval *time.Duration + chartsSyncTimeout *time.Duration - k8sSecretName *string - k8sSecretVolumeMountPath *string - k8sSecretDataKey *string + gitURL *string + gitBranch *string + gitChartsPath *string + gitPollInterval *time.Duration queueWorkerCount *int name *string listenAddr *string gcInterval *time.Duration - - ErrOperatorFailure = "Operator failure: %q" ) const ( - defaultGitConfigPath = "releaseconfig" defaultGitChartsPath = "charts" + + ErrOperatorFailure = "Operator failure: %q" ) func init() { @@ -84,7 +70,7 @@ func init() { fs = pflag.NewFlagSet("default", pflag.ExitOnError) fs.Usage = func() { fmt.Fprintf(os.Stderr, "DESCRIPTION\n") - fmt.Fprintf(os.Stderr, " helm-operator is a Kubernetes operator for Helm integration into flux.\n") + fmt.Fprintf(os.Stderr, " helm-operator releases Helm charts from git.\n") fmt.Fprintf(os.Stderr, "\n") fmt.Fprintf(os.Stderr, "FLAGS\n") fs.PrintDefaults() @@ -105,17 +91,11 @@ func init() { chartsSyncInterval = fs.Duration("charts-sync-interval", 3*time.Minute, "Interval at which to check for changed charts") chartsSyncTimeout = fs.Duration("charts-sync-timeout", 1*time.Minute, "Timeout when checking for changed charts") - eventHandlerWorkers = fs.Uint("event-handler-workers", 2, "Number of workers processing events for Flux-Helm custom resources") - customKubectl = fs.String("kubernetes-kubectl", "", "Optional, explicit path to kubectl tool") gitURL = fs.String("git-url", "", "URL of git repo with Helm Charts; e.g., git@github.com:weaveworks/flux-example") gitBranch = fs.String("git-branch", "master", "branch of git repo") gitChartsPath = fs.String("git-charts-path", defaultGitChartsPath, "path within git repo to locate Helm Charts (relative path)") - - // k8s-secret backed ssh keyring configuration - // generated by flux daemon - k8sSecretVolumeMountPath = fs.String("k8s-secret-volume-mount-path", "/etc/fluxd/ssh", "Mount location of the k8s secret storing the private SSH key") - k8sSecretDataKey = fs.String("k8s-secret-data-key", "identity", "Data key holding the private SSH key within the k8s secret") + gitPollInterval = fs.Duration("git-poll-interval", 5*time.Minute, "period on which to poll for changes to the git repo") queueWorkerCount = fs.Int("queue-worker-count", 2, "Number of workers to process queue with Chart release jobs. Two by default") } @@ -178,7 +158,6 @@ func main() { IP: *tillerIP, Port: *tillerPort, Namespace: *tillerNamespace, - TLSVerify: *tillerTLSVerify, TLSEnable: *tillerTLSEnable, TLSKey: *tillerTLSKey, @@ -191,48 +170,44 @@ func main() { statusUpdater := status.New(ifClient, kubeClient, helmClient) go statusUpdater.Loop(shutdown, log.With(logger, "component", "annotator")) - gitURLParsed, err := url.Parse(*gitURL) - if err != nil { - mainLogger.Log("error", fmt.Sprintf("Error parsing -git-url %q: %v", *gitURL, err)) - os.Exit(1) - } + gitRemote := git.Remote{URL: *gitURL} + repo := git.NewRepo(gitRemote, git.PollInterval(*gitPollInterval), git.ReadOnly) - // GIT REPO SETUP --------------------------------------------------------------------- - var gitAuth *gitssh.PublicKeys - for { - gitAuth, err = git.GetRepoAuth(gitURLParsed.User.Username(), *k8sSecretVolumeMountPath, *k8sSecretDataKey) + // Chart releases sync due to Custom Resources changes ------------------------------- + { + mainLogger.Log("info", "Attempting to clone repo ...", "url", gitRemote.URL) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + err := repo.Ready(ctx) + cancel() if err != nil { - mainLogger.Log("error", fmt.Sprintf("Failed to set up git authorization : %#v", err)) - time.Sleep(20 * time.Second) - continue - } - if err == nil { - break + mainLogger.Log("error", err) + os.Exit(2) } - } + mainLogger.Log("info", "Repo cloned", "url", gitRemote.URL) - gitRemoteConfig, err := git.NewGitRemoteConfig(*gitURL, *gitBranch, *gitChartsPath) - if err != nil { - mainLogger.Log("err", err) - os.Exit(1) + // Start the repo fetching from upstream + shutdownWg.Add(1) + go func() { + errc <- repo.Start(shutdown, shutdownWg) + }() } - gitLogger := log.With(logger, "component", "git") - - // Chart releases sync due to Custom Resources changes ------------------------------- - mainLogger.Log("info", "Starting to clone repo ...") - checkout := git.RepoSetup(gitLogger, gitAuth, gitRemoteConfig, git.ChangesClone) - defer checkout.Cleanup() - mainLogger.Log("info", "Repo cloned") - // release instance is needed during the sync of Charts changes and during the sync of FluxHelRelease changes - rel := release.New(log.With(logger, "component", "release"), helmClient, checkout) - relsync := releasesync.New(log.With(logger, "component", "releasesync"), rel) + releaseConfig := release.Config{ + ChartsPath: *gitChartsPath, + } + repoConfig := helmop.RepoConfig{ + Repo: repo, + Branch: *gitBranch, + ChartsPath: *gitChartsPath, + } + // release instance is needed during the sync of Charts changes and during the sync of FluxHelmRelease changes + rel := release.New(log.With(logger, "component", "release"), helmClient, releaseConfig) // CHARTS CHANGES SYNC ------------------------------------------------------------------ chartSync := chartsync.New(log.With(logger, "component", "chartsync"), chartsync.Polling{Interval: *chartsSyncInterval, Timeout: *chartsSyncTimeout}, chartsync.Clients{KubeClient: *kubeClient, IfClient: *ifClient}, - rel, *relsync) + rel, repoConfig) chartSync.Run(shutdown, errc, shutdownWg) // OPERATOR - CUSTOM RESOURCE CHANGE SYNC ----------------------------------------------- @@ -243,7 +218,7 @@ func main() { // Reference to shared index informers for the FluxHelmRelease fhrInformer := ifInformerFactory.Helm().V1alpha2().FluxHelmReleases() - opr := operator.New(log.With(logger, "component", "operator"), kubeClient, fhrInformer, rel) + opr := operator.New(log.With(logger, "component", "operator"), kubeClient, fhrInformer, rel, repoConfig) // Starts handling k8s events related to the given resource kind go ifInformerFactory.Start(shutdown) @@ -253,9 +228,3 @@ func main() { errc <- fmt.Errorf(ErrOperatorFailure, err) } } - -// Helper functions ----------------------------------------------------------------------- -func optionalVar(fs *pflag.FlagSet, value ssh.OptionalValue, name, usage string) ssh.OptionalValue { - fs.Var(value, name, usage) - return value -} diff --git a/git/operations_test.go b/git/operations_test.go index eab0553f8..f82509f33 100644 --- a/git/operations_test.go +++ b/git/operations_test.go @@ -216,6 +216,8 @@ func TestCheckPush(t *testing.T) { } } +// --- + func createRepo(dir string, subdirs []string) error { var ( err error diff --git a/integrations/helm/chartsync/chartsync.go b/integrations/helm/chartsync/chartsync.go index 3a50698ab..77a279c39 100644 --- a/integrations/helm/chartsync/chartsync.go +++ b/integrations/helm/chartsync/chartsync.go @@ -1,34 +1,59 @@ /* -Package chartsync provides the functionality for updating a Chart release -due to (git repo) changes of Charts, while no Custom Resource changes. -Helm operator regularly checks the Chart repo and if new commits are found -all Custom Resources related to the changed Charts are updates, resulting in new -Chart release(s). +This package has the algorithm for making sure the Helm releases in +the cluster match what are defined in the FluxHelmRelease resources. + +There are several ways they can be mismatched. Here's how they are +reconciled: + + 1a. There is a FluxHelmRelease resource, but no corresponding + release. This can happen when the helm operator is first run, for + example. The ChartChangeSync periodically checks for this by + running through the resources and installing any that aren't + released already. + + 1b. The release corresponding to a FluxHelmRelease has been updated by + some other means, perhaps while the operator wasn't running. This + is also checked periodically, by doing a dry-run release and + comparing the result to the release. + + 2. The chart has changed in git, meaning the release is out of + date. The ChartChangeSync responds to new git commits by looking at + each chart that's referenced by a FluxHelmRelease, and if it's + changed since the last seen commit, updating the release. + +1a.) and 1b.) run on the same schedule, and 2.) is run when the git +mirror reports it has fetched from upstream _and_ (upon checking) the +head of the branch has changed. + +Since both 1*.) and 2.) look at the charts in the git repo, but run on +different schedules (non-deterministically), there's a chance that +they can fight each other. For example, the git mirror may fetch new +commits which are used in 1), then treated as changes subsequently by +2). To keep consistency between the two, the current revision is used +by 1), and advanced only by 2). + */ package chartsync import ( "context" "fmt" - "io/ioutil" - "os" "path/filepath" "sync" "time" - "github.com/weaveworks/flux/integrations/helm/releasesync" - "github.com/go-kit/kit/log" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" + hapi_release "k8s.io/helm/pkg/proto/hapi/release" ifv1 "github.com/weaveworks/flux/apis/helm.integrations.flux.weave.works/v1alpha2" + "github.com/weaveworks/flux/git" ifclientset "github.com/weaveworks/flux/integrations/client/clientset/versioned" - helmgit "github.com/weaveworks/flux/integrations/helm/git" - chartrelease "github.com/weaveworks/flux/integrations/helm/release" + helmop "github.com/weaveworks/flux/integrations/helm" + "github.com/weaveworks/flux/integrations/helm/release" ) type Polling struct { @@ -44,84 +69,81 @@ type Clients struct { type ChartChangeSync struct { logger log.Logger Polling - kubeClient kubernetes.Clientset - ifClient ifclientset.Clientset - release *chartrelease.Release - relsync releasesync.ReleaseChangeSync - lastCheckedRevision string + kubeClient kubernetes.Clientset + ifClient ifclientset.Clientset + release *release.Release + config helmop.RepoConfig } -func New( - logger log.Logger, - polling Polling, - clients Clients, - release *chartrelease.Release, - relsync releasesync.ReleaseChangeSync) *ChartChangeSync { - - lastCheckedRevision := "" - gitRef, err := release.Repo.ConfigSync.GetRevision() - if err != nil { - // we shall try again later - } - lastCheckedRevision = gitRef.String() - +func New(logger log.Logger, polling Polling, clients Clients, release *release.Release, config helmop.RepoConfig) *ChartChangeSync { return &ChartChangeSync{ - logger: logger, - Polling: polling, - kubeClient: clients.KubeClient, - ifClient: clients.IfClient, - release: release, - relsync: relsync, - lastCheckedRevision: lastCheckedRevision, + logger: logger, + Polling: polling, + kubeClient: clients.KubeClient, + ifClient: clients.IfClient, + release: release, + config: config, } } -// Run creates a syncing loop monitoring repo chart changes +// Run creates a syncing loop monitoring repo chart changes. It is +// assumed that the *git.Repo given to the config is ready to use +// before this is invoked. +// +// The behaviour if the git mirror becomes unavailable while it's +// running is not defined (this could be tightened up). func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *sync.WaitGroup) { chs.logger.Log("info", "Starting charts sync loop") - wg.Add(1) go func() { defer runtime.HandleCrash() defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) + currentRevision, err := chs.config.Repo.Revision(ctx, chs.config.Branch) + cancel() + if err != nil { + errc <- err + return + } + ticker := time.NewTicker(chs.Polling.Interval) defer ticker.Stop() for { select { - case <-ticker.C: - ns, err := GetNamespaces(chs.logger, chs.kubeClient) + case <-chs.config.Repo.C: + ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) + head, err := chs.config.Repo.Revision(ctx, chs.config.Branch) + cancel() if err != nil { - errc <- err + chs.logger.Log("warning", "failure using git repo", "error", err.Error()) + continue } - var syncNeeded bool + if head == currentRevision { + chs.logger.Log("info", "no new commits on branch", "branch", chs.config.Branch, "head", head) + continue + } - // Syncing git repo Charts only changes + // Sync changes to charts in the git repo chs.logger.Log("info", fmt.Sprint("Start of chartsync")) - syncNeeded, err = chs.DoChartChangeSync(ns) + err = chs.ApplyChartChanges(currentRevision, head) if err != nil { chs.logger.Log("error", fmt.Sprintf("Failure to do chart sync: %#v", err)) - chs.logger.Log("info", fmt.Sprint("End of chartsync")) - } - if !syncNeeded { - chs.logger.Log("info", fmt.Sprint("No repo changes of Charts")) } + currentRevision = head chs.logger.Log("info", fmt.Sprint("End of chartsync")) - // Syncing manual Chart releases + case <-ticker.C: + // Re-release any chart releases that have apparently + // changed in the cluster. chs.logger.Log("info", fmt.Sprint("Start of releasesync")) - syncNeeded, err = chs.relsync.DoReleaseChangeSync(chs.ifClient, ns) + err = chs.ReapplyReleaseDefs(currentRevision) if err != nil { - chs.logger.Log("error", fmt.Sprintf("Failure to do manual release sync: %#v", err)) - chs.logger.Log("info", fmt.Sprint("End of releasesync")) - } - if !syncNeeded { - chs.logger.Log("info", fmt.Sprint("No manual changes of Chart releases")) + chs.logger.Log("error", fmt.Sprintf("Failure to do manual release sync: %s", err)) } chs.logger.Log("info", fmt.Sprint("End of releasesync")) - continue case <-stopCh: chs.logger.Log("stopping", "true") @@ -131,259 +153,192 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn }() } -func (chs *ChartChangeSync) DoChartChangeSync(ns []string) (bool, error) { - var exist bool - var newRev string - var err error - if exist, newRev, err = chs.newCommits(); err != nil { - return false, fmt.Errorf("Failure during retrieving commits: %#v", err) - } - if !exist { - return false, nil - } - - chartDirs, err := getChartDirs(chs.logger, chs.release.Repo.ConfigSync) +// ApplyChartChanges looks at the FluxHelmRelease resources in the +// cluster, figures out which refer to charts that have changed since +// the last commit, then re-releases those that have. +func (chs *ChartChangeSync) ApplyChartChanges(prevRef, head string) error { + resources, err := chs.getCustomResources() if err != nil { - return false, fmt.Errorf("Failure to get charts under the charts path: %#v", err) + return fmt.Errorf("Failure getting FHR custom resources: %s", err.Error()) } - chartFhrs := make(map[string][]ifv1.FluxHelmRelease) - for _, chart := range chartDirs { - err = chs.getCustomResources(ns, chart, chartFhrs) - if err != nil { - return false, fmt.Errorf("Failure during retrieving Custom Resources related to Chart [%s]: %#v", chart, err) + // Release all the resources whose charts have changed. More than + // one FluxHelmRelease resource can refer to a given chart, so to + // avoid repeated checking, keep track of which charts have + // changed or not changed. + chartHasChanged := map[string]bool{} + + // Lazily clone the repo if and when it turns out we need it + var clone *git.Export + defer func() { + if clone != nil { + clone.Clean() } - } + }() - ctx, cancel := context.WithTimeout(context.Background(), helmgit.DefaultCloneTimeout) - chartsToRelease, err := chs.releaseNeeded(ctx, newRev, chartDirs, chartFhrs) - cancel() - if err != nil { - return false, fmt.Errorf("Failure while establishing upgrade need of releases: %#v", err) - } - if len(chartsToRelease) == 0 { - chs.lastCheckedRevision = newRev - return false, nil - } + for _, fhr := range resources { + chartPath := filepath.Join(chs.config.ChartsPath, fhr.Spec.ChartGitPath) + changed, ok := chartHasChanged[chartPath] + if !ok { + ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) + commits, err := chs.config.Repo.CommitsBetween(ctx, prevRef, head, chartPath) + cancel() + if err != nil { + return fmt.Errorf("error while checking if chart at %q has changed in %s..%s: %s", chartPath, prevRef, head, err.Error()) + } + changed = len(commits) > 0 + chartHasChanged[chartPath] = changed + } + if changed { + if clone == nil { + clone, err = chs.exportAtRef(head) + if err != nil { + return fmt.Errorf("failed to clone repo at %s: %s", head, err.Error()) + } + } - if err = chs.releaseCharts(chartsToRelease, chartFhrs); err != nil { - return false, fmt.Errorf("Failure to release Chart(s): %#v", err) + rlsName := release.GetReleaseName(fhr) + opts := release.InstallOptions{DryRun: false} + if _, err = chs.release.Install(clone.Dir(), rlsName, fhr, release.UpgradeAction, opts); err != nil { + // NB in this step, failure to release is considered non-fatal, i.e,. we move on to the next rather than giving up entirely. + chs.logger.Log("warning", "failure to release chart with changes in git", "error", err, "chart", chartPath, "release", rlsName) + } + } } - return true, nil + return nil } -// GetNamespaces gets current kubernetes cluster namespaces -func GetNamespaces(logger log.Logger, kubeClient kubernetes.Clientset) ([]string, error) { - ns := []string{} +func (chs *ChartChangeSync) ReapplyReleaseDefs(ref string) error { + var clone *git.Export + defer func() { + if clone != nil { + clone.Clean() + } + }() - nso, err := kubeClient.CoreV1().Namespaces().List(metav1.ListOptions{}) + resources, err := chs.getCustomResources() if err != nil { - errm := fmt.Errorf("Failure while retrieving kubernetes namespaces: %#v", err) - logger.Log("error", errm.Error()) - return nil, errm + return fmt.Errorf("failed to get FluxHelmRelease resources from the API server: %s", err.Error()) } - for _, n := range nso.Items { - ns = append(ns, n.GetName()) - } - - return ns, nil -} - -// getChartDirs retrieves charts under the charts directory (under the repo root) -// go-git(.v4) does not implement finding commits under a specific path. This means -// the individual chart paths cannor be currently used with "git log" -func getChartDirs(logger log.Logger, checkout *helmgit.Checkout) ([]string, error) { - chartDirs := []string{} - - repoRoot := checkout.Dir - if repoRoot == "" { - return nil, helmgit.ErrNoRepoCloned - } - chartsFullPath := filepath.Join(repoRoot, checkout.Config.Path) - - files, err := ioutil.ReadDir(chartsFullPath) - if err != nil { - errm := fmt.Errorf("Failure to access directory %s: %#v", chartsFullPath, err) - logger.Log("error", errm.Error()) - return nil, errm - } + for _, fhr := range resources { + releaseName := release.GetReleaseName(fhr) + rel, err := chs.release.GetDeployedRelease(releaseName) + if err != nil { + return fmt.Errorf("failed to get release %q: %s", releaseName, err) + } - // We only choose subdirectories that represent Charts - for _, f := range files { - if f.IsDir() { - chartDir := filepath.Join(chartsFullPath, f.Name()) - chartMeta := filepath.Join(chartDir, "Chart.yaml") - if _, err := os.Stat(chartMeta); os.IsNotExist(err) { - continue + // At this point, one way or another, we are going to need a clone of the repo. + if clone == nil { + clone, err = chs.exportAtRef(ref) + if err != nil { + return err } - chartDirs = append(chartDirs, f.Name()) } - } - - return chartDirs, nil -} - -// newCommits determines if charts need to be released -// go-git.v4 does not provide a possibility to find commit for a particular path. -// So we find if there are any commits at all sice last time -func (chs *ChartChangeSync) newCommits() (bool, string, error) { - chs.logger.Log("info", "Getting new commits") - checkout := chs.release.Repo.ConfigSync - - chs.logger.Log("info", fmt.Sprintf("Repo dir = %s", checkout.Dir)) + opts := release.InstallOptions{DryRun: false} + if rel == nil { + _, err := chs.release.Install(clone.Dir(), releaseName, fhr, release.InstallAction, opts) + if err != nil { + chs.logger.Log("warning", "Failed to install chart", "namespace", fhr.Namespace, "name", fhr.Name, "error", err) + } + continue + } - if checkout.Dir == "" { - ctx, cancel := context.WithTimeout(context.Background(), helmgit.DefaultCloneTimeout) - err := checkout.Clone(ctx, helmgit.ChangesClone) - cancel() + changed, err := chs.shouldUpgrade(clone.Dir(), rel, fhr) if err != nil { - errm := fmt.Errorf("Failure while cloning repo : %#v", err) - chs.logger.Log("error", errm.Error()) - return false, "", errm + chs.logger.Log("warning", "Unable to determine if release has changed", "namespace", fhr.Namespace, "name", fhr.Name, "error", err) + continue + } + if changed { + _, err := chs.release.Install(clone.Dir(), releaseName, fhr, release.UpgradeAction, opts) + if err != nil { + chs.logger.Log("warning", "Failed to upgrade chart", "namespace", fhr.Namespace, "name", fhr.Name, "error", err) + } } - chs.logger.Log("info", "Cloned repo") } + return nil +} + +//--- - ctx, cancel := context.WithTimeout(context.Background(), helmgit.DefaultPullTimeout) - err := checkout.Pull(ctx) +func (chs *ChartChangeSync) exportAtRef(ref string) (*git.Export, error) { + ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) + clone, err := chs.config.Repo.Export(ctx, ref) cancel() if err != nil { - return false, "", fmt.Errorf("Failure while pulling repo: %#v", err) + return nil, fmt.Errorf("error cloning repo at ref %s for chart releases: %s", ref, err.Error()) } + return clone, nil +} - // get latest revision - newRev, err := checkout.GetRevision() +// GetNamespaces gets current kubernetes cluster namespaces +func (chs *ChartChangeSync) getNamespaces() ([]string, error) { + var ns []string + nso, err := chs.kubeClient.CoreV1().Namespaces().List(metav1.ListOptions{}) if err != nil { - return false, "", fmt.Errorf("Failure while getting repo revision: %s", err.Error()) + return nil, fmt.Errorf("Failure while retrieving kubernetes namespaces: %s", err) } - chs.logger.Log("info", fmt.Sprintf("Got revision %s", newRev.String())) - oldRev := chs.lastCheckedRevision - if oldRev == "" { - chs.lastCheckedRevision = newRev.String() - chs.logger.Log("debug", fmt.Sprintf("Populated lastCheckedRevision with %s", chs.lastCheckedRevision)) - - return false, "", nil + for _, n := range nso.Items { + ns = append(ns, n.GetName()) } - chs.logger.Log("info", fmt.Sprintf("lastCheckedRevision: %s", chs.lastCheckedRevision)) - - if oldRev != newRev.String() { - return true, newRev.String(), nil - } - return false, "", nil + return ns, nil } -// getCustomResources assembles custom resources referencing a particular chart -func (chs *ChartChangeSync) getCustomResources(namespaces []string, chart string, chartFhrs map[string][]ifv1.FluxHelmRelease) error { - chartSelector := map[string]string{ - "chart": chart, +// getCustomResources assembles all custom resources +func (chs *ChartChangeSync) getCustomResources() ([]ifv1.FluxHelmRelease, error) { + namespaces, err := chs.getNamespaces() + if err != nil { + return nil, err } - labelsSet := labels.Set(chartSelector) - listOptions := metav1.ListOptions{LabelSelector: labelsSet.AsSelector().String()} - fhrs := []ifv1.FluxHelmRelease{} + var fhrs []ifv1.FluxHelmRelease for _, ns := range namespaces { - list, err := chs.ifClient.HelmV1alpha2().FluxHelmReleases(ns).List(listOptions) + list, err := chs.ifClient.HelmV1alpha2().FluxHelmReleases(ns).List(metav1.ListOptions{}) if err != nil { - chs.logger.Log("error", fmt.Errorf("Failure while retrieving FluxHelmReleases: %#v", err)) - continue + return nil, err } for _, fhr := range list.Items { fhrs = append(fhrs, fhr) } } - - chartFhrs[chart] = fhrs - - return nil + return fhrs, nil } -// releaseCharts upgrades releases with changed Charts -// input: -// chartD ... provides chart name and its directory information -// fhr ...... provides chart name and all Custom Resources associated with this chart -func (chs *ChartChangeSync) releaseCharts(chartsToRelease []string, chartFhrs map[string][]ifv1.FluxHelmRelease) error { - checkout := chs.release.Repo.ConfigSync - - ctx, cancel := context.WithTimeout(context.Background(), helmgit.DefaultPullTimeout) - err := checkout.Pull(ctx) - cancel() - if err != nil { - return fmt.Errorf("Failure while pulling repo: %#v", err) +// shouldUpgrade returns true if the current running values or chart +// don't match what the repo says we ought to be running, based on +// doing a dry run install from the chart in the git repo. +func (chs *ChartChangeSync) shouldUpgrade(chartsRepo string, currRel *hapi_release.Release, fhr ifv1.FluxHelmRelease) (bool, error) { + if currRel == nil { + return false, fmt.Errorf("No Chart release provided for %v", fhr.GetName()) } - chartPathBase := filepath.Join(checkout.Dir, checkout.Config.Path) - for _, chart := range chartsToRelease { - var err error - fhrs := chartFhrs[chart] - for _, fhr := range fhrs { - - // sanity check - chartPath := filepath.Join(chartPathBase, fhr.Spec.ChartGitPath) - if _, err := os.Stat(chartPath); os.IsNotExist(err) { - chs.logger.Log("error", fmt.Sprintf("Missing Chart %s. No release can happen.", chartPath)) - continue - } - - rlsName := chartrelease.GetReleaseName(fhr) + currVals := currRel.GetConfig().GetRaw() + currChart := currRel.GetChart().String() - opts := chartrelease.InstallOptions{DryRun: false} - _, err = chs.release.Install(checkout, rlsName, fhr, chartrelease.UpgradeAction, opts) - if err != nil { - chs.logger.Log("info", fmt.Sprintf("Error to do upgrade of release of [%s]: %s. Skipping.", rlsName, err.Error())) - // TODO: collect errors and return them after looping through all - ? - continue - } - chs.logger.Log("info", fmt.Sprintf("Release [%s] upgraded due to chart only changes", rlsName)) - } - } - - // get latest revision - newRev, err := checkout.GetRevision() + // Get the desired release state + opts := release.InstallOptions{DryRun: true} + tempRelName := currRel.GetName() + "-temp" + desRel, err := chs.release.Install(chartsRepo, tempRelName, fhr, release.InstallAction, opts) if err != nil { - return fmt.Errorf("Failure while getting repo revision: %s", err.Error()) + return false, err } - chs.lastCheckedRevision = newRev.String() - chs.logger.Log("debug", fmt.Sprintf("Populated lastCheckedRevision with %s", chs.lastCheckedRevision)) + desVals := desRel.GetConfig().GetRaw() + desChart := desRel.GetChart().String() - return nil -} - -// releaseNeeded finds if there were commits related to Chart changes since last sync -// returns maps keys on chart name with value corresponding to the chart path -// (go-git.v4 does not provide a possibility to find commit for a particular path.) -func (chs *ChartChangeSync) releaseNeeded(ctx context.Context, newRev string, charts []string, chartFhrs map[string][]ifv1.FluxHelmRelease) ([]string, error) { - chartsToRelease := []string{} - var changed, ok bool - var err error - var fhrs []ifv1.FluxHelmRelease - - revRange := fmt.Sprintf("%s..%s", chs.lastCheckedRevision, newRev) - dir := fmt.Sprintf("%s/%s", chs.release.Repo.ConfigSync.Dir, chs.release.Repo.ConfigSync.Config.Path) - - for _, chart := range charts { - chs.logger.Log("debug", fmt.Sprintf("Testing if release needed for Chart [%s]", chart)) - - if fhrs, ok = chartFhrs[chart]; !ok { - continue - } - if len(fhrs) < 1 { - continue - } - - if changed, err = chs.chartChanged(ctx, dir, revRange, chart); err != nil { - chs.logger.Log("error", fmt.Sprintf("Failure to determine chart change for [%s]: %s", chart, err.Error())) - continue - } - if !changed { - continue - } - chartsToRelease = append(chartsToRelease, chart) + // compare values && Chart + if currVals != desVals { + chs.logger.Log("error", fmt.Sprintf("Release %s: values have diverged due to manual Chart release", currRel.GetName())) + return true, nil + } + if currChart != desChart { + chs.logger.Log("error", fmt.Sprintf("Release %s: Chart has diverged due to manual Chart release", currRel.GetName())) + return true, nil } - return chartsToRelease, nil + + return false, nil } diff --git a/integrations/helm/chartsync/utils.go b/integrations/helm/chartsync/utils.go deleted file mode 100644 index e79e205fb..000000000 --- a/integrations/helm/chartsync/utils.go +++ /dev/null @@ -1,80 +0,0 @@ -package chartsync - -import ( - "bytes" - "context" - "fmt" - "io" - "io/ioutil" - "os/exec" - "strings" -) - -func (chs *ChartChangeSync) chartChanged(ctx context.Context, dir, revRange, chart string) (bool, error) { - chs.release.Repo.ConfigSync.Lock() - defer chs.release.Repo.ConfigSync.Unlock() - - if len(dir) == 0 || dir[0] != '/' { - return false, fmt.Errorf("directory must provided and must be absolute: [%s]", dir) - } - if len(chart) == 0 { - return false, fmt.Errorf("chart must provided: [%s]", chart) - } - - out := &bytes.Buffer{} - if err := chs.execDiffCmd(ctx, dir, out, revRange, chart); err != nil { - return false, err - } - - lines := splitLines(out.String()) - if len(lines) < 1 { - return false, nil - } - return true, nil -} - -func splitLines(s string) []string { - outStr := strings.TrimSpace(s) - if outStr == "" { - return nil - } - return strings.Split(outStr, "\n") -} - -// execDiffCmd ... find if there is a change in a particular chart -// git diff revCurr..revNew chart -// (runs in the /repoRoot/charts dir) -// input: -// dir (/repoRoot/charts) -// args (revCurr..revNew, chart) -func (chs *ChartChangeSync) execDiffCmd(ctx context.Context, dir string, out io.Writer, args ...string) error { - args = append([]string{"diff"}, args...) - c := exec.CommandContext(ctx, "git", args...) - - chs.logger.Log("info", fmt.Sprintf("Running command: git %v", args)) - - c.Dir = dir - - c.Stdout = ioutil.Discard - if out != nil { - c.Stdout = out - } - errOut := &bytes.Buffer{} - c.Stderr = errOut - - err := c.Run() - if err != nil { - if exitError, ok := err.(*exec.ExitError); ok { - chs.logger.Log("error", fmt.Sprintf("Failure while running git diff: %#v", exitError.Error())) - return err - } - } - - if ctx.Err() == context.DeadlineExceeded { - return fmt.Errorf("running git diff command: %s %v", "git", args) - } else if ctx.Err() == context.Canceled { - return fmt.Errorf("context was unexpectedly cancelled when running command: %s %v", "gitt", args) - } - - return err -} diff --git a/integrations/helm/git/git.go b/integrations/helm/git/git.go deleted file mode 100644 index 6c2b4a35a..000000000 --- a/integrations/helm/git/git.go +++ /dev/null @@ -1,210 +0,0 @@ -package git - -import ( - "context" - "errors" - "fmt" - "io/ioutil" - "os" - "path" - "sync" - "time" - - "github.com/go-kit/kit/log" - "golang.org/x/crypto/ssh" - gogit "gopkg.in/src-d/go-git.v4" - "gopkg.in/src-d/go-git.v4/plumbing" - gitssh "gopkg.in/src-d/go-git.v4/plumbing/transport/ssh" -) - -const ( - DefaultCloneTimeout = 2 * time.Minute - DefaultPullTimeout = 2 * time.Minute - privateKeyFileMode = os.FileMode(0400) - ChangesClone = "sync_clone" -) - -var ( - ErrNoChanges = errors.New("no changes made in repo") - ErrNoChartsDir = errors.New("no Charts dir provided") - ErrNoRepo = errors.New("no repo provided") - ErrNoRepoCloned = errors.New("no repo cloned") -) - -type GitRemoteConfig struct { - URL string `json:"url"` - Branch string `json:"branch"` - Path string `json:"path"` -} - -// Checkout is a local clone of the remote repo. -type Checkout struct { - Logger log.Logger - Config GitRemoteConfig // remote repo info provided by the user - auth *gitssh.PublicKeys - Dir string // directory where the repo was cloned (repo root) - Repo *gogit.Repository // cloned repo info - worktree *gogit.Worktree - sync.RWMutex -} - -// NewGitRemoteConfig ... sets up git repo configuration. -func NewGitRemoteConfig(url, branch, path string) (GitRemoteConfig, error) { - if len(url) == 0 { - return GitRemoteConfig{}, errors.New("git repo URL must be provided") - } - if len(branch) == 0 { - branch = "master" - } - if len(path) == 0 || (len(path) != 0 && path[0] == '/') { - return GitRemoteConfig{}, errors.New("git subdirectory (--git-charts-path) must be provided and cannot have leading forward slash") - } - - return GitRemoteConfig{ - URL: url, - Branch: branch, - Path: path, - }, nil -} - -// SetupRepo creates a new checkout and clones repo until ready -func RepoSetup(logger log.Logger, auth *gitssh.PublicKeys, config GitRemoteConfig, cloneSubdir string) *Checkout { - checkout := &Checkout{ - Logger: logger, - Config: config, - auth: auth, - } - // If cloning not immediately possible, we wait until it is ----------------------------- - var err error - for { - ctx, cancel := context.WithTimeout(context.Background(), DefaultCloneTimeout) - err = checkout.Clone(ctx, cloneSubdir) - cancel() - if err == nil { - break - } - logger.Log("error", fmt.Sprintf("Failed to clone git repo [%s, %s, %s]: %v", config.URL, config.Path, config.Branch, err)) - time.Sleep(10 * time.Second) - } - - return checkout -} - -// Clone creates a local clone of a remote repo and -// checks out the relevant branch -// subdir reflects the purpose of the clone: -// * acting on Charts changes (syncing the cluster when there were only commits -// in the Charts parts of the repo which did not trigger Custom Resource changes) -func (ch *Checkout) Clone(ctx context.Context, cloneSubdir string) error { - ch.Lock() - defer ch.Unlock() - - if ch.Config.URL == "" { - return ErrNoRepo - } - - repoDir, err := ioutil.TempDir(os.TempDir(), cloneSubdir) - if err != nil { - return err - } - ch.Dir = repoDir - - repo, err := gogit.PlainCloneContext(ctx, repoDir, false, &gogit.CloneOptions{ - URL: ch.Config.URL, - Auth: ch.auth, - ReferenceName: plumbing.ReferenceName(fmt.Sprintf("refs/heads/%s", ch.Config.Branch)), - }) - if err != nil { - return err - } - - wt, err := repo.Worktree() - if err != nil { - return err - } - - ch.Repo = repo - ch.worktree = wt - - ch.Logger.Log("debug", fmt.Sprintf("repo cloned in into %s", ch.Dir)) - - return nil -} - -// Cleanup ... removes the temp repo directory -func (ch *Checkout) Cleanup() { - ch.Lock() - defer ch.Unlock() - - if ch.Dir != "" { - err := os.RemoveAll(ch.Dir) - if err != nil { - ch.Logger.Log("error", err.Error()) - } - } - ch.Dir = "" - ch.Repo = nil - ch.worktree = nil -} - -// GetRepoAuth ... provides git repo authentication based on private ssh key -func GetRepoAuth(sshUsername, k8sSecretVolumeMountPath, k8sSecretDataKey string) (*gitssh.PublicKeys, error) { - privateKeyPath := path.Join(k8sSecretVolumeMountPath, k8sSecretDataKey) - fileInfo, err := os.Stat(privateKeyPath) - switch { - case os.IsNotExist(err): - return &gitssh.PublicKeys{}, err - case err != nil: - return &gitssh.PublicKeys{}, err - case fileInfo.Mode() != privateKeyFileMode: - if err := os.Chmod(privateKeyPath, privateKeyFileMode); err != nil { - return &gitssh.PublicKeys{}, err - } - default: - } - - sshKey, err := ioutil.ReadFile(privateKeyPath) - if err != nil { - return nil, err - } - signer, err := ssh.ParsePrivateKey([]byte(sshKey)) - if err != nil { - return nil, err - } - auth := &gitssh.PublicKeys{User: sshUsername, Signer: signer} - - return auth, nil -} - -// Pull ... makes a git pull -func (ch *Checkout) Pull(ctx context.Context) error { - ch.Lock() - defer ch.Unlock() - - w := ch.worktree - if w == nil { - return ErrNoRepoCloned - } - err := w.Pull(&gogit.PullOptions{ - RemoteName: "origin", - Auth: ch.auth, - ReferenceName: plumbing.ReferenceName(fmt.Sprintf("refs/heads/%s", ch.Config.Branch)), - }) - if err != nil && err != gogit.NoErrAlreadyUpToDate { - return err - } - return nil -} - -// GetRevision returns string representation of the revision hash -func (ch *Checkout) GetRevision() (plumbing.Hash, error) { - if ch.Repo == nil { - return plumbing.Hash{}, ErrNoRepoCloned - } - ref, err := ch.Repo.Head() - if err != nil { - return plumbing.Hash{}, err - } - rev := ref.Hash() - return rev, nil -} diff --git a/integrations/helm/helm.go b/integrations/helm/helm.go index 3a4771f7b..070c786d2 100644 --- a/integrations/helm/helm.go +++ b/integrations/helm/helm.go @@ -11,8 +11,20 @@ import ( k8shelm "k8s.io/helm/pkg/helm" rls "k8s.io/helm/pkg/proto/hapi/services" "k8s.io/helm/pkg/tlsutil" + + "github.com/weaveworks/flux/git" +) + +const ( + GitOperationTimeout = 30 * time.Second ) +type RepoConfig struct { + Repo *git.Repo + Branch string + ChartsPath string +} + type TillerOptions struct { IP string Port string diff --git a/integrations/helm/operator/operator.go b/integrations/helm/operator/operator.go index 038773e24..88c67a226 100644 --- a/integrations/helm/operator/operator.go +++ b/integrations/helm/operator/operator.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/go-kit/kit/log" "github.com/golang/glog" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -19,13 +20,11 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" - "github.com/go-kit/kit/log" - ifv1 "github.com/weaveworks/flux/apis/helm.integrations.flux.weave.works/v1alpha2" ifscheme "github.com/weaveworks/flux/integrations/client/clientset/versioned/scheme" fhrv1 "github.com/weaveworks/flux/integrations/client/informers/externalversions/helm.integrations.flux.weave.works/v1alpha2" iflister "github.com/weaveworks/flux/integrations/client/listers/helm.integrations.flux.weave.works/v1alpha2" - helmgit "github.com/weaveworks/flux/integrations/helm/git" + helmop "github.com/weaveworks/flux/integrations/helm" chartrelease "github.com/weaveworks/flux/integrations/helm/release" ) @@ -58,6 +57,7 @@ type Controller struct { fhrSynced cache.InformerSynced release *chartrelease.Release + config helmop.RepoConfig // workqueue is a rate limited work queue. This is used to queue work to be // processed instead of performing it as soon as a change happens. This @@ -76,7 +76,8 @@ func New( logger log.Logger, kubeclientset kubernetes.Interface, fhrInformer fhrv1.FluxHelmReleaseInformer, - release *chartrelease.Release) *Controller { + release *chartrelease.Release, + config helmop.RepoConfig) *Controller { // Add helm-operator types to the default Kubernetes Scheme so Events can be // logged for helm-operator types. @@ -93,6 +94,7 @@ func New( release: release, releaseWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ChartRelease"), recorder: recorder, + config: config, } controller.logger.Log("info", "Setting up event handlers") @@ -263,15 +265,16 @@ func (c *Controller) syncHandler(key string) error { } // Chart installation of the appropriate type - ctx, cancel := context.WithTimeout(context.Background(), helmgit.DefaultCloneTimeout) - err = c.release.Repo.ConfigSync.Pull(ctx) + ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) + clone, err := c.config.Repo.Export(ctx, c.config.Branch) cancel() if err != nil { - return fmt.Errorf("Failure to do git pull: %s", err.Error()) + return fmt.Errorf("Failure to clone repo: %s", err.Error()) } + defer clone.Clean() opts := chartrelease.InstallOptions{DryRun: false} - _, err = c.release.Install(c.release.Repo.ConfigSync, releaseName, *fhr, syncType, opts) + _, err = c.release.Install(clone.Dir(), releaseName, *fhr, syncType, opts) if err != nil { return err } @@ -344,7 +347,6 @@ func (c *Controller) deleteRelease(fhr ifv1.FluxHelmRelease) { // needsUpdate compares two FluxHelmRelease and determines if any changes occurred func needsUpdate(old, new ifv1.FluxHelmRelease) bool { - oldValues, err := old.Spec.Values.YAML() if err != nil { return false diff --git a/integrations/helm/release/release.go b/integrations/helm/release/release.go index 2e215e020..50c40cec6 100644 --- a/integrations/helm/release/release.go +++ b/integrations/helm/release/release.go @@ -6,7 +6,6 @@ import ( "fmt" "os/exec" "path/filepath" - "sync" "time" "github.com/go-kit/kit/log" @@ -16,7 +15,6 @@ import ( "github.com/weaveworks/flux" ifv1 "github.com/weaveworks/flux/apis/helm.integrations.flux.weave.works/v1alpha2" fluxk8s "github.com/weaveworks/flux/cluster/kubernetes" - helmgit "github.com/weaveworks/flux/integrations/helm/git" ) var ( @@ -30,31 +28,23 @@ const ( UpgradeAction Action = "UPDATE" ) +type Config struct { + ChartsPath string +} + // Release contains clients needed to provide functionality related to helm releases type Release struct { - logger log.Logger + logger log.Logger + HelmClient *k8shelm.Client - Repo repo - sync.RWMutex -} -func (r *Release) ConfigSync() *helmgit.Checkout { - return r.Repo.ConfigSync + config Config } type Releaser interface { GetCurrent() (map[string][]DeployInfo, error) GetDeployedRelease(name string) (*hapi_release.Release, error) - Install(checkout *helmgit.Checkout, - releaseName string, - fhr ifv1.FluxHelmRelease, - action Action, - opts InstallOptions) (*hapi_release.Release, error) - ConfigSync() *helmgit.Checkout -} - -type repo struct { - ConfigSync *helmgit.Checkout + Install(dir string, releaseName string, fhr ifv1.FluxHelmRelease, action Action, opts InstallOptions) (*hapi_release.Release, error) } type DeployInfo struct { @@ -66,15 +56,13 @@ type InstallOptions struct { ReuseName bool } -// New creates a new Release instance -func New(logger log.Logger, helmClient *k8shelm.Client, configCheckout *helmgit.Checkout) *Release { - repo := repo{ - ConfigSync: configCheckout, - } +// New creates a new Release instance. +func New(logger log.Logger, helmClient *k8shelm.Client, config Config) *Release { + // TODO(michael): check we don't have nil values in the config r := &Release{ logger: logger, HelmClient: helmClient, - Repo: repo, + config: config, } return r } @@ -114,9 +102,7 @@ func (r *Release) Exists(name string) (bool, error) { } func (r *Release) canDelete(name string) (bool, error) { - r.Lock() rls, err := r.HelmClient.ReleaseStatus(name) - r.Unlock() if err != nil { r.logger.Log("error", fmt.Sprintf("Error finding status for release (%s): %#v", name, err)) @@ -148,9 +134,11 @@ func (r *Release) canDelete(name string) (bool, error) { } } -// Install performs Chart release. Depending on the release type, this is either a new release, -// or an upgrade of an existing one -func (r *Release) Install(checkout *helmgit.Checkout, releaseName string, fhr ifv1.FluxHelmRelease, action Action, opts InstallOptions) (*hapi_release.Release, error) { +// Install performs a Chart release given the directory containing the +// charts, and the FluxHelmRelease specifying the release. Depending +// on the release type, this is either a new release, or an upgrade of +// an existing one. +func (r *Release) Install(repoDir, releaseName string, fhr ifv1.FluxHelmRelease, action Action, opts InstallOptions) (*hapi_release.Release, error) { r.logger.Log("info", fmt.Sprintf("releaseName= %s, action=%s, install options: %+v", releaseName, action, opts)) chartPath := fhr.Spec.ChartGitPath @@ -164,7 +152,7 @@ func (r *Release) Install(checkout *helmgit.Checkout, releaseName string, fhr if namespace = "default" } - chartDir := filepath.Join(checkout.Dir, checkout.Config.Path, chartPath) + chartDir := filepath.Join(repoDir, r.config.ChartsPath, chartPath) strVals, err := fhr.Spec.Values.YAML() if err != nil { @@ -175,7 +163,6 @@ func (r *Release) Install(checkout *helmgit.Checkout, releaseName string, fhr if switch action { case InstallAction: - r.Lock() res, err := r.HelmClient.InstallRelease( chartDir, namespace, @@ -190,7 +177,6 @@ func (r *Release) Install(checkout *helmgit.Checkout, releaseName string, fhr if helm.InstallWait(i.wait) */ ) - r.Unlock() if err != nil { r.logger.Log("error", fmt.Sprintf("Chart release failed: %s: %#v", releaseName, err)) @@ -201,7 +187,6 @@ func (r *Release) Install(checkout *helmgit.Checkout, releaseName string, fhr if } return res.Release, err case UpgradeAction: - r.Lock() res, err := r.HelmClient.UpdateRelease( releaseName, chartDir, @@ -217,7 +202,6 @@ func (r *Release) Install(checkout *helmgit.Checkout, releaseName string, fhr if helm.UpgradeWait(u.wait)) */ ) - r.Unlock() if err != nil { r.logger.Log("error", fmt.Sprintf("Chart upgrade release failed: %s: %#v", releaseName, err)) @@ -244,9 +228,7 @@ func (r *Release) Delete(name string) error { return nil } - r.Lock() _, err = r.HelmClient.DeleteRelease(name, k8shelm.DeletePurge(true)) - r.Unlock() if err != nil { r.logger.Log("error", fmt.Sprintf("Release deletion error: %#v", err)) return err @@ -255,7 +237,7 @@ func (r *Release) Delete(name string) error { return nil } -// GetCurrentWithDate provides Chart releases (stored in tiller ConfigMaps) +// GetCurrent provides Chart releases (stored in tiller ConfigMaps) // output: // map[namespace][release name] = nil func (r *Release) GetCurrent() (map[string][]DeployInfo, error) { diff --git a/integrations/helm/releasesync/releasesync.go b/integrations/helm/releasesync/releasesync.go deleted file mode 100644 index 4aae14b6d..000000000 --- a/integrations/helm/releasesync/releasesync.go +++ /dev/null @@ -1,325 +0,0 @@ -/* -When a Chart release is manually deleted/upgraded/created, the cluster gets out of sync -with the prescribed state defined by FluxHelmRelease custom resources. The releasesync package -attemps to bring the cluster back to the prescribed state. -*/ -package releasesync - -import ( - "context" - "fmt" - "os" - "path/filepath" - "strings" - - "github.com/pkg/errors" - - protobuf "github.com/golang/protobuf/ptypes/timestamp" - hapi_release "k8s.io/helm/pkg/proto/hapi/release" - - "github.com/go-kit/kit/log" - - ifv1 "github.com/weaveworks/flux/apis/helm.integrations.flux.weave.works/v1alpha2" - ifclientset "github.com/weaveworks/flux/integrations/client/clientset/versioned" - "github.com/weaveworks/flux/integrations/helm/customresource" - helmgit "github.com/weaveworks/flux/integrations/helm/git" - chartrelease "github.com/weaveworks/flux/integrations/helm/release" -) - -const ( - syncDelay = 90 -) - -type releaseFhr struct { - RelName string - Fhr ifv1.FluxHelmRelease -} - -// ReleaseChangeSync implements DoReleaseChangeSync to return the cluster to the -// state dictated by Custom Resources after manual Chart release(s). -type ReleaseChangeSync struct { - logger log.Logger - release chartrelease.Releaser -} - -// New creates a ReleaseChangeSync. -func New( - logger log.Logger, - releaser chartrelease.Releaser) *ReleaseChangeSync { - - return &ReleaseChangeSync{ - logger: logger, - release: releaser, - } -} - -type customResourceInfo struct { - name, releaseName string - resource ifv1.FluxHelmRelease - lastUpdated protobuf.Timestamp -} - -type chartRelease struct { - releaseName string - action chartrelease.Action - desiredState ifv1.FluxHelmRelease -} - -// DoReleaseChangeSync returns the cluster to the state dictated by Custom Resources -// after manual Chart release(s). -func (rs *ReleaseChangeSync) DoReleaseChangeSync(ifClient ifclientset.Clientset, ns []string) (bool, error) { - ctx, cancel := context.WithTimeout(context.Background(), helmgit.DefaultCloneTimeout) - relsToSync, err := rs.releasesToSync(ctx, ifClient, ns) - cancel() - if err != nil { - err = errors.Wrap(err, "getting info about manual chart release changes") - rs.logger.Log("error", err) - return false, err - } - if len(relsToSync) == 0 { - return false, nil - } - - ctx, cancel = context.WithTimeout(context.Background(), helmgit.DefaultCloneTimeout) - err = rs.sync(ctx, relsToSync) - cancel() - if err != nil { - return false, errors.Wrap(err, "syncing cluster after manual chart release changes") - } - - return true, nil -} - -// getCustomResources retrieves FluxHelmRelease resources -// and returns them organised by namespace and chart release name. -// map[namespace] = []releaseFhr. -func (rs *ReleaseChangeSync) getCustomResources( - ifClient ifclientset.Clientset, - namespaces []string) (map[string][]releaseFhr, error) { - - relInfo := make(map[string][]releaseFhr) - - for _, ns := range namespaces { - list, err := customresource.GetNSCustomResources(ifClient, ns) - if err != nil { - return nil, errors.Wrap(err, - fmt.Sprintf("retrieving FluxHelmReleases in namespace %s", ns)) - } - - rf := []releaseFhr{} - for _, fhr := range list.Items { - relName := chartrelease.GetReleaseName(fhr) - rf = append(rf, releaseFhr{RelName: relName, Fhr: fhr}) - } - if len(rf) > 0 { - relInfo[ns] = rf - } - } - return relInfo, nil -} - -// shouldUpgrade returns true if the current running values or chart -// don't match what the repo says we ought to be running, based on -// doing a dry run install from the chart in the git repo. -func (rs *ReleaseChangeSync) shouldUpgrade( - currRel *hapi_release.Release, - fhr ifv1.FluxHelmRelease) (bool, error) { - - if currRel == nil { - return false, fmt.Errorf("No Chart release provided for %v", fhr.GetName()) - } - - currVals := currRel.GetConfig().GetRaw() - currChart := currRel.GetChart().String() - - // Get the desired release state - opts := chartrelease.InstallOptions{DryRun: true} - tempRelName := strings.Join([]string{currRel.GetName(), "temp"}, "-") - desRel, err := rs.release.Install(rs.release.ConfigSync(), tempRelName, fhr, "CREATE", opts) - if err != nil { - return false, err - } - desVals := desRel.GetConfig().GetRaw() - desChart := desRel.GetChart().String() - - // compare values && Chart - if currVals != desVals { - rs.logger.Log("error", fmt.Sprintf("Release %s: values have diverged due to manual Chart release", currRel.GetName())) - return true, nil - } - if currChart != desChart { - rs.logger.Log("error", fmt.Sprintf("Release %s: Chart has diverged due to manual Chart release", currRel.GetName())) - return true, nil - } - - return false, nil -} - -// addExistingReleasesToSync populates relsToSync (map from namespace -// to chartRelease) with the members of currentReleases that need -// updating because they're diverged from the desired state. Desired -// state is specified by customResources and what's in our git checkout. -func (rs *ReleaseChangeSync) addExistingReleasesToSync( - relsToSync map[string][]chartRelease, - currentReleases map[string]map[string]struct{}, - customResources map[string]map[string]ifv1.FluxHelmRelease) error { - - var chRels []chartRelease - for ns, nsRelsM := range currentReleases { - chRels = relsToSync[ns] - for relName := range nsRelsM { - if customResources[ns] == nil { - continue - } - // We are ignoring Charts that are not under flux/helm-operator control - fhr, ok := customResources[ns][relName] - if !ok { - continue - } - rel, err := rs.release.GetDeployedRelease(relName) - if err != nil { - return err - } - doUpgrade, err := rs.shouldUpgrade(rel, fhr) - if err != nil { - return err - } - if doUpgrade { - chr := chartRelease{ - releaseName: relName, - action: chartrelease.UpgradeAction, - desiredState: fhr, - } - chRels = append(chRels, chr) - } - } - if len(chRels) > 0 { - relsToSync[ns] = chRels - } - } - return nil -} - -// addDeletedReleasesToSync populates relsToSync (map from namespace -// to chartRelease) with chartReleases based on the charts referenced -// in customResources that are absent from currentReleases. -func (rs *ReleaseChangeSync) addDeletedReleasesToSync( - relsToSync map[string][]chartRelease, - currentReleases map[string]map[string]struct{}, - customResources map[string]map[string]ifv1.FluxHelmRelease) error { - - var chRels []chartRelease - for ns, nsFhrs := range customResources { - chRels = relsToSync[ns] - - for relName, fhr := range nsFhrs { - // there are Custom Resources (CRs) in this namespace - // missing Chart release even though there is a CR - if currentReleases[ns] == nil { - chr := chartRelease{ - releaseName: relName, - action: chartrelease.InstallAction, - desiredState: fhr, - } - chRels = append(chRels, chr) - continue - } - if _, ok := currentReleases[ns][relName]; !ok { - chr := chartRelease{ - releaseName: relName, - action: chartrelease.InstallAction, - desiredState: fhr, - } - chRels = append(chRels, chr) - } - } - if len(chRels) > 0 { - relsToSync[ns] = chRels - } - } - return nil -} - -// releasesToSync queries Tiller to get all current Helm releases, queries k8s -// custom resources to get all FluxHelmRelease(s), and returns a map from -// namespace to chartRelease(s) that need to be synced. -func (rs *ReleaseChangeSync) releasesToSync( - ctx context.Context, - ifClient ifclientset.Clientset, - ns []string) (map[string][]chartRelease, error) { - - relDepl, err := rs.release.GetCurrent() - if err != nil { - return nil, err - } - curRels := mappifyDeployInfo(relDepl) - - relCrs, err := rs.getCustomResources(ifClient, ns) - if err != nil { - return nil, err - } - crs := mappifyReleaseFhrInfo(relCrs) - - relsToSync := make(map[string][]chartRelease) - - // FIXME: we probably shouldn't be throwing away errors - _ = rs.addDeletedReleasesToSync(relsToSync, curRels, crs) - _ = rs.addExistingReleasesToSync(relsToSync, curRels, crs) - - return relsToSync, nil -} - -// sync takes a map from namespace to list of chartRelease(s) -// that need to be applied, and attempts to apply them. -// It returns the first error encountered. A chart missing -// from the repo doesn't count as an error, but will be logged. -func (rs *ReleaseChangeSync) sync( - ctx context.Context, - releases map[string][]chartRelease) error { - - // TODO it's weird that we do a pull here, after we've already decided - // what to do. Ask why. - ctx, cancel := context.WithTimeout(ctx, helmgit.DefaultPullTimeout) - err := rs.release.ConfigSync().Pull(ctx) - cancel() - if err != nil { - return fmt.Errorf("Failure while pulling repo: %#v", err) - } - - checkout := rs.release.ConfigSync() - chartPathBase := filepath.Join(checkout.Dir, checkout.Config.Path) - - opts := chartrelease.InstallOptions{DryRun: false} - - for ns, relsToProcess := range releases { - - for _, chr := range relsToProcess { - - // sanity check - chartPath := filepath.Join(chartPathBase, chr.desiredState.Spec.ChartGitPath) - if _, err := os.Stat(chartPath); os.IsNotExist(err) { - rs.logger.Log("error", fmt.Sprintf("Missing Chart %s. No release can happen.", chartPath)) - continue - } - - relName := chr.releaseName - switch chr.action { - case chartrelease.UpgradeAction: - rs.logger.Log("info", fmt.Sprintf("Resyncing manually upgraded Chart release %s (namespace %s)", relName, ns)) - _, err := rs.release.Install(checkout, relName, chr.desiredState, chartrelease.UpgradeAction, opts) - if err != nil { - return err - } - case chartrelease.InstallAction: - rs.logger.Log("info", fmt.Sprintf("Installing manually deleted Chart release %s (namespace %s)", relName, ns)) - _, err := rs.release.Install(checkout, relName, chr.desiredState, chartrelease.InstallAction, opts) - if err != nil { - return err - } - default: - panic(fmt.Sprintf("invalid action %q", chr.action)) - } - } - } - return nil -} diff --git a/integrations/helm/releasesync/releasesync_test.go b/integrations/helm/releasesync/releasesync_test.go deleted file mode 100644 index 6eede7c79..000000000 --- a/integrations/helm/releasesync/releasesync_test.go +++ /dev/null @@ -1,353 +0,0 @@ -package releasesync - -import ( - "fmt" - "os" - "testing" - - proto "github.com/golang/protobuf/proto" - "github.com/google/go-cmp/cmp" - "k8s.io/helm/pkg/helm" - hapi_chart "k8s.io/helm/pkg/proto/hapi/chart" - hapi_release "k8s.io/helm/pkg/proto/hapi/release" - - "github.com/go-kit/kit/log" - ifv1 "github.com/weaveworks/flux/apis/helm.integrations.flux.weave.works/v1alpha2" - helmgit "github.com/weaveworks/flux/integrations/helm/git" - "github.com/weaveworks/flux/integrations/helm/release" - chartrelease "github.com/weaveworks/flux/integrations/helm/release" -) - -type installReq struct { - checkoutDir string - releaseName string - fhr ifv1.FluxHelmRelease - action chartrelease.Action - opts chartrelease.InstallOptions -} - -type installResult struct { - release hapi_release.Release - err error -} - -type install struct { - installReq - installResult -} - -type mockReleaser struct { - current map[string][]chartrelease.DeployInfo - deployed map[string]*hapi_release.Release - configSync *helmgit.Checkout - installs []install -} - -func (r *mockReleaser) GetCurrent() (map[string][]chartrelease.DeployInfo, error) { - if r.current == nil { - return nil, fmt.Errorf("failed to fetch current releases") - } - return r.current, nil -} - -func (r *mockReleaser) GetDeployedRelease(name string) (*hapi_release.Release, error) { - if _, present := r.deployed[name]; !present { - return nil, fmt.Errorf("no release hamed %q", name) - } - return r.deployed[name], nil -} - -func (r *mockReleaser) ConfigSync() *helmgit.Checkout { - return r.configSync -} - -func (r *mockReleaser) Install(checkout *helmgit.Checkout, - releaseName string, - fhr ifv1.FluxHelmRelease, - action chartrelease.Action, - opts chartrelease.InstallOptions) (*hapi_release.Release, error) { - req := installReq{ - checkoutDir: checkout.Dir, - releaseName: releaseName, - fhr: fhr, - action: action, - opts: opts} - cmpopts := cmp.AllowUnexported(installReq{}) - for _, i := range r.installs { - if cmp.Equal(i.installReq, req, cmpopts) { - return &i.installResult.release, i.installResult.err - } - } - return nil, fmt.Errorf("unexpected request: %+v", req) -} - -func makeCurRel(ns string, relNames ...string) map[string]map[string]struct{} { - m := make(map[string]map[string]struct{}) - m[ns] = make(map[string]struct{}) - for _, relName := range relNames { - m[ns][relName] = struct{}{} - } - return m -} - -func mergeCurRels(a, b map[string]map[string]struct{}) map[string]map[string]struct{} { - m := make(map[string]map[string]struct{}) - for ns := range a { - m[ns] = a[ns] - } - for ns := range b { - if _, present := m[ns]; present { - panic("ns '" + ns + "' present in both a and b") - } - m[ns] = b[ns] - } - return m -} - -func makeCustRes(ns string, relNames ...string) map[string]map[string]ifv1.FluxHelmRelease { - m := make(map[string]map[string]ifv1.FluxHelmRelease) - m[ns] = make(map[string]ifv1.FluxHelmRelease) - for _, relName := range relNames { - m[ns][relName] = ifv1.FluxHelmRelease{} - } - return m -} - -func mergeCustRes(a, b map[string]map[string]ifv1.FluxHelmRelease) map[string]map[string]ifv1.FluxHelmRelease { - m := make(map[string]map[string]ifv1.FluxHelmRelease) - for ns := range a { - m[ns] = a[ns] - } - for ns := range b { - if _, present := m[ns]; present { - panic("ns '" + ns + "' present in both a and b") - } - m[ns] = b[ns] - } - return m -} - -func TestAddDeletedReleasesToSync(t *testing.T) { - var zeromap = make(map[string][]chartRelease) - var tests = []struct { - msg string - currentReleases map[string]map[string]struct{} - customResources map[string]map[string]ifv1.FluxHelmRelease - want map[string][]chartRelease - }{ - { - msg: "no-op, zero resources", - currentReleases: makeCurRel("ns1", "r1"), - customResources: make(map[string]map[string]ifv1.FluxHelmRelease), - want: zeromap, - }, - { - msg: "no-op, equality", - currentReleases: makeCurRel("ns1", "r1"), - customResources: makeCustRes("ns1", "r1"), - want: zeromap, - }, - { - msg: "add missing release", - currentReleases: makeCurRel("ns1"), - customResources: makeCustRes("ns1", "r1"), - want: map[string][]chartRelease{"ns1": []chartRelease{ - chartRelease{releaseName: "r1", action: release.InstallAction}}}, - }, - { - msg: "add missing release new namespace", - currentReleases: makeCurRel("ns1"), - customResources: makeCustRes("ns2", "r1"), - want: map[string][]chartRelease{"ns2": []chartRelease{ - chartRelease{releaseName: "r1", action: release.InstallAction}}}, - }, - { - msg: "add missing releases multi namespace", - currentReleases: mergeCurRels(makeCurRel("ns1"), - makeCurRel("ns2", "r2")), - customResources: mergeCustRes(makeCustRes("ns1", "r1"), - makeCustRes("ns2", "r2", "r3")), - want: map[string][]chartRelease{ - "ns1": []chartRelease{chartRelease{releaseName: "r1", action: release.InstallAction}}, - "ns2": []chartRelease{chartRelease{releaseName: "r3", action: release.InstallAction}}, - }, - }, - } - - opts := cmp.AllowUnexported(chartRelease{}) - rs := New(log.NewLogfmtLogger(os.Stdout), nil) - for i, test := range tests { - var got = make(map[string][]chartRelease) - err := rs.addDeletedReleasesToSync(got, test.currentReleases, test.customResources) - if err != nil { - t.Errorf("%d %s: got error: %v", i, test.msg, err) - } - if diff := cmp.Diff(got, test.want, opts); diff != "" { - t.Errorf("%d %s: diff (-got +want)\n%s", i, test.msg, diff) - } - - } -} - -func config(vals map[string]string) *hapi_chart.Config { - pv := make(map[string]*hapi_chart.Value) - for k, v := range vals { - pv[k] = &hapi_chart.Value{Value: v} - } - - c := &hapi_chart.Config{Values: pv} - // Marshalling to get c.Raw populated - data, _ := proto.Marshal(c) - _ = proto.Unmarshal(data, c) - return c -} - -func relvals(name string, vals string) *hapi_release.Release { - rel := helm.ReleaseMock(&helm.MockReleaseOptions{Name: name}) - rel.Config.Raw = vals - return rel -} - -func relchart(name string, chartname string, chartver string, tmplname string) *hapi_release.Release { - return helm.ReleaseMock(&helm.MockReleaseOptions{Name: name, Chart: &hapi_chart.Chart{ - Metadata: &hapi_chart.Metadata{ - Name: chartname, - Version: chartver, - }, - Templates: []*hapi_chart.Template{ - {Name: tmplname, Data: []byte(helm.MockManifest)}, - }, - }}) -} - -func TestAddExistingReleasesToSync(t *testing.T) { - var zeromap = make(map[string][]chartRelease) - var tests = []struct { - msg string - currentReleases map[string]map[string]struct{} - customResources map[string]map[string]ifv1.FluxHelmRelease - want map[string][]chartRelease - releaser chartrelease.Releaser - wanterror error - }{ - { - msg: "no-op, zero resources", - currentReleases: makeCurRel("ns1", "r1"), - customResources: make(map[string]map[string]ifv1.FluxHelmRelease), - want: zeromap, - }, - { - msg: "no-op, no overlap", - currentReleases: makeCurRel("ns1", "r1"), - customResources: mergeCustRes( - makeCustRes("ns1", "r2"), - makeCustRes("ns2", "r1")), - want: zeromap, - }, - { - msg: "get deployed release fails", - currentReleases: makeCurRel("ns1", "r1"), - customResources: makeCustRes("ns1", "r1"), - releaser: &mockReleaser{}, - wanterror: fmt.Errorf("no release hamed %q", "r1"), - }, - { - msg: "dry-run install fails", - currentReleases: makeCurRel("ns1", "r1"), - customResources: makeCustRes("ns1", "r1"), - releaser: &mockReleaser{ - configSync: &helmgit.Checkout{Dir: "dir"}, - deployed: map[string]*hapi_release.Release{ - "r1": relvals("r1", `k1: "v1"`), - }, - installs: []install{ - dryinst("r1", *relvals("", ""), fmt.Errorf("dry-run failed")), - }, - }, - wanterror: fmt.Errorf("dry-run failed"), - }, - { - msg: "r1 vals changed, r2 unchanged", - currentReleases: mergeCurRels( - makeCurRel("ns1", "r1"), - makeCurRel("ns2", "r2")), - customResources: mergeCustRes( - makeCustRes("ns1", "r1"), - makeCustRes("ns2", "r2")), - releaser: &mockReleaser{ - configSync: &helmgit.Checkout{Dir: "dir"}, - deployed: map[string]*hapi_release.Release{ - "r1": relvals("r1", `k1: "v1"`), - "r2": relvals("r2", `k1: "v1"`), - }, - installs: []install{ - dryinst("r1", *relvals("r1", `k1: "v2"`), nil), - dryinst("r2", *relvals("r2", `k1: "v1"`), nil), - }, - }, - want: map[string][]chartRelease{"ns1": []chartRelease{ - chartRelease{releaseName: "r1", action: release.Action("UPDATE")}, - }}, - }, - { - msg: "r1/r2/r3 charts changed, r4 unchanged", - currentReleases: mergeCurRels(mergeCurRels( - makeCurRel("ns1", "r1"), - makeCurRel("ns2", "r2")), - makeCurRel("ns3", "r3", "r4")), - customResources: mergeCustRes(mergeCustRes( - makeCustRes("ns1", "r1"), - makeCustRes("ns2", "r2")), - makeCustRes("ns3", "r3", "r4")), - releaser: &mockReleaser{ - configSync: &helmgit.Checkout{Dir: "dir"}, - deployed: map[string]*hapi_release.Release{ - "r1": relchart("r1", "c1", "v0.1.0", "templates/foo.tpl"), - "r2": relchart("r2", "c2", "v0.1.0", "templates/bar.tpl"), - "r3": relchart("r3", "c3", "v0.1.0", "templates/baz.tpl"), - "r4": relchart("r4", "c4", "v0.1.0", "templates/qux.tpl"), - }, - installs: []install{ - dryinst("r1", *relchart("r1", "c-", "v0.1.0", "templates/foo.tpl"), nil), - dryinst("r2", *relchart("r2", "c2", "v0.1.1", "templates/bar.tpl"), nil), - dryinst("r3", *relchart("r3", "c3", "v0.1.0", "templates/foo.tpl"), nil), - dryinst("r4", *relchart("r4", "c4", "v0.1.0", "templates/qux.tpl"), nil), - }, - }, - want: map[string][]chartRelease{ - "ns1": []chartRelease{chartRelease{releaseName: "r1", action: release.Action("UPDATE")}}, - "ns2": []chartRelease{chartRelease{releaseName: "r2", action: release.Action("UPDATE")}}, - "ns3": []chartRelease{chartRelease{releaseName: "r3", action: release.Action("UPDATE")}}, - }, - }, - } - - opts := cmp.AllowUnexported(chartRelease{}) - for i, test := range tests { - rs := New(log.NewLogfmtLogger(os.Stdout), test.releaser) - var got = make(map[string][]chartRelease) - err := rs.addExistingReleasesToSync(got, test.currentReleases, test.customResources) - if fmt.Sprintf("%v", err) != fmt.Sprintf("%v", test.wanterror) { - t.Errorf("%d %s: got error %q, want error %q", i, test.msg, err, test.wanterror) - } - if test.wanterror != nil { - continue - } - if diff := cmp.Diff(got, test.want, opts); diff != "" { - t.Errorf("%d %s: diff (-got +want)\n%s", i, test.msg, diff) - } - - } -} - -func dryinst(relname string, rel hapi_release.Release, err error) install { - return install{ - installReq{ - checkoutDir: "dir", - releaseName: relname + "-temp", - action: "CREATE", - opts: chartrelease.InstallOptions{DryRun: true}, - }, - installResult{rel, err}, - } -} diff --git a/integrations/helm/releasesync/utils.go b/integrations/helm/releasesync/utils.go deleted file mode 100644 index 3b77b5b07..000000000 --- a/integrations/helm/releasesync/utils.go +++ /dev/null @@ -1,39 +0,0 @@ -package releasesync - -import ( - ifv1 "github.com/weaveworks/flux/apis/helm.integrations.flux.weave.works/v1alpha2" - "github.com/weaveworks/flux/integrations/helm/release" -) - -// mappifyDeployInfo takes a map of namespace -> []DeployInfo, -// returning a map whose keys are the same namespaces -// and whose values are key-only maps holding the DeployInfo names. -func mappifyDeployInfo(releases map[string][]release.DeployInfo) map[string]map[string]struct{} { - deployM := make(map[string]map[string]struct{}) - - for ns, nsRels := range releases { - nsDeployM := make(map[string]struct{}) - for _, r := range nsRels { - nsDeployM[r.Name] = struct{}{} - } - deployM[ns] = nsDeployM - } - return deployM -} - -// mappifyReleaseFhrInfo takes a map of namespace -> []releaseFhr, -// returning a map whose keys are the same namespaces -// and whose values are maps of releaseName -> FluxHelmRelease. -func mappifyReleaseFhrInfo(fhrs map[string][]releaseFhr) map[string]map[string]ifv1.FluxHelmRelease { - relFhrM := make(map[string]map[string]ifv1.FluxHelmRelease) - - for ns, nsFhrs := range fhrs { - nsRels := make(map[string]ifv1.FluxHelmRelease) - for _, r := range nsFhrs { - nsRels[r.RelName] = r.Fhr - } - relFhrM[ns] = nsRels - } - - return relFhrM -}