From 5d6111b7459af8b1cf1c49559f1ec001410a6e9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lukas=20W=C3=B6hrl?= Date: Tue, 13 Feb 2024 17:51:41 +0100 Subject: [PATCH] fix: infer correct shard in statefulset setup (#17124, #17016) (#17167) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: infer correct shard in statefulset setup Signed-off-by: Lukas Wöhrl * fix the case if only a single replica Signed-off-by: Lukas Wöhrl * fix: resolving pointer on shard compare Signed-off-by: Lukas Wöhrl * fix: add readlock for cluster accessor Signed-off-by: Lukas Wöhrl * fix: use defer to protect access of 'shard' Signed-off-by: Lukas Wöhrl * fix: revert locking in getclusteraccessor Signed-off-by: Lukas Wöhrl * fix: handle nil shard case Signed-off-by: Lukas Wöhrl * fix: handle any nil shard value as false Signed-off-by: Lukas Wöhrl * fix: handle nil case and fix another missing pointer dereference Signed-off-by: Lukas Wöhrl * revert Signed-off-by: Lukas Wöhrl * fix: added tests and fixed some behaviour bugs Signed-off-by: Lukas Wöhrl * test: add test to validate that Shard value is not overriden Signed-off-by: Lukas Wöhrl * fix: added tests and fixe the case when server is changed inside a secret Signed-off-by: Lukas Wöhrl * tests: add test cases for infering the shard logic Signed-off-by: Lukas Wöhrl --------- Signed-off-by: Lukas Wöhrl --- .../commands/argocd_application_controller.go | 66 +-- controller/cache/cache.go | 2 +- controller/sharding/cache.go | 60 ++- controller/sharding/cache_test.go | 475 ++++++++++++++++++ controller/sharding/sharding.go | 60 ++- controller/sharding/sharding_test.go | 189 +++++++ 6 files changed, 766 insertions(+), 86 deletions(-) create mode 100644 controller/sharding/cache_test.go diff --git a/cmd/argocd-application-controller/commands/argocd_application_controller.go b/cmd/argocd-application-controller/commands/argocd_application_controller.go index c38a2113e2b34..a5fec90f6b972 100644 --- a/cmd/argocd-application-controller/commands/argocd_application_controller.go +++ b/cmd/argocd-application-controller/commands/argocd_application_controller.go @@ -10,8 +10,6 @@ import ( "github.com/redis/go-redis/v9" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" - kubeerrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" @@ -26,7 +24,6 @@ import ( cacheutil "github.com/argoproj/argo-cd/v2/util/cache" appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate" "github.com/argoproj/argo-cd/v2/util/cli" - "github.com/argoproj/argo-cd/v2/util/db" "github.com/argoproj/argo-cd/v2/util/env" "github.com/argoproj/argo-cd/v2/util/errors" kubeutil "github.com/argoproj/argo-cd/v2/util/kube" @@ -147,7 +144,7 @@ func NewCommand() *cobra.Command { appController.InvalidateProjectsCache() })) kubectl := kubeutil.NewKubectl() - clusterSharding, err := getClusterSharding(kubeClient, settingsMgr, shardingAlgorithm, enableDynamicClusterDistribution) + clusterSharding, err := sharding.GetClusterSharding(kubeClient, settingsMgr, shardingAlgorithm, enableDynamicClusterDistribution) errors.CheckError(err) appController, err = controller.NewApplicationController( namespace, @@ -239,64 +236,3 @@ func NewCommand() *cobra.Command { }) return &command } - -func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) (sharding.ClusterShardingCache, error) { - var ( - replicasCount int - ) - // StatefulSet mode and Deployment mode uses different default values for shard number. - defaultShardNumberValue := 0 - - if enableDynamicClusterDistribution { - applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName) - appControllerDeployment, err := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{}) - - // if app controller deployment is not found when dynamic cluster distribution is enabled error out - if err != nil { - return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment: %v", err) - } - - if appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil { - replicasCount = int(*appControllerDeployment.Spec.Replicas) - defaultShardNumberValue = -1 - } else { - return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment replica count") - } - - } else { - replicasCount = env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32) - } - shardNumber := env.ParseNumFromEnv(common.EnvControllerShard, defaultShardNumberValue, -math.MaxInt32, math.MaxInt32) - if replicasCount > 1 { - // check for shard mapping using configmap if application-controller is a deployment - // else use existing logic to infer shard from pod name if application-controller is a statefulset - if enableDynamicClusterDistribution { - var err error - // retry 3 times if we find a conflict while updating shard mapping configMap. - // If we still see conflicts after the retries, wait for next iteration of heartbeat process. - for i := 0; i <= common.AppControllerHeartbeatUpdateRetryCount; i++ { - shardNumber, err = sharding.GetOrUpdateShardFromConfigMap(kubeClient, settingsMgr, replicasCount, shardNumber) - if err != nil && !kubeerrors.IsConflict(err) { - err = fmt.Errorf("unable to get shard due to error updating the sharding config map: %s", err) - break - } - log.Warnf("conflict when getting shard from shard mapping configMap. Retrying (%d/3)", i) - } - errors.CheckError(err) - } else { - if shardNumber < 0 { - var err error - shardNumber, err = sharding.InferShard() - errors.CheckError(err) - } - if shardNumber > replicasCount { - log.Warnf("Calculated shard number %d is greated than the number of replicas count. Defaulting to 0", shardNumber) - shardNumber = 0 - } - } - } else { - log.Info("Processing all cluster shards") - } - db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient) - return sharding.NewClusterSharding(db, shardNumber, replicasCount, shardingAlgorithm), nil -} diff --git a/controller/cache/cache.go b/controller/cache/cache.go index e3b1d7b77f19d..d1ae8989cd8e6 100644 --- a/controller/cache/cache.go +++ b/controller/cache/cache.go @@ -751,7 +751,7 @@ func (c *liveStateCache) handleAddEvent(cluster *appv1.Cluster) { } func (c *liveStateCache) handleModEvent(oldCluster *appv1.Cluster, newCluster *appv1.Cluster) { - c.clusterSharding.Update(newCluster) + c.clusterSharding.Update(oldCluster, newCluster) c.lock.Lock() cluster, ok := c.clusters[newCluster.Server] c.lock.Unlock() diff --git a/controller/sharding/cache.go b/controller/sharding/cache.go index d16574accdf8a..3818e7381f3ab 100644 --- a/controller/sharding/cache.go +++ b/controller/sharding/cache.go @@ -12,7 +12,7 @@ type ClusterShardingCache interface { Init(clusters *v1alpha1.ClusterList) Add(c *v1alpha1.Cluster) Delete(clusterServer string) - Update(c *v1alpha1.Cluster) + Update(oldCluster *v1alpha1.Cluster, newCluster *v1alpha1.Cluster) IsManagedCluster(c *v1alpha1.Cluster) bool GetDistribution() map[string]int } @@ -26,7 +26,7 @@ type ClusterSharding struct { getClusterShard DistributionFunction } -func NewClusterSharding(db db.ArgoDB, shard, replicas int, shardingAlgorithm string) ClusterShardingCache { +func NewClusterSharding(_ db.ArgoDB, shard, replicas int, shardingAlgorithm string) ClusterShardingCache { log.Debugf("Processing clusters from shard %d: Using filter function: %s", shard, shardingAlgorithm) clusterSharding := &ClusterSharding{ Shard: shard, @@ -67,7 +67,8 @@ func (sharding *ClusterSharding) Init(clusters *v1alpha1.ClusterList) { defer sharding.lock.Unlock() newClusters := make(map[string]*v1alpha1.Cluster, len(clusters.Items)) for _, c := range clusters.Items { - newClusters[c.Server] = &c + cluster := c + newClusters[c.Server] = &cluster } sharding.Clusters = newClusters sharding.updateDistribution() @@ -96,13 +97,16 @@ func (sharding *ClusterSharding) Delete(clusterServer string) { } } -func (sharding *ClusterSharding) Update(c *v1alpha1.Cluster) { +func (sharding *ClusterSharding) Update(oldCluster *v1alpha1.Cluster, newCluster *v1alpha1.Cluster) { sharding.lock.Lock() defer sharding.lock.Unlock() - old, ok := sharding.Clusters[c.Server] - sharding.Clusters[c.Server] = c - if !ok || hasShardingUpdates(old, c) { + if _, ok := sharding.Clusters[oldCluster.Server]; ok && oldCluster.Server != newCluster.Server { + delete(sharding.Clusters, oldCluster.Server) + delete(sharding.Shards, oldCluster.Server) + } + sharding.Clusters[newCluster.Server] = newCluster + if hasShardingUpdates(oldCluster, newCluster) { sharding.updateDistribution() } else { log.Debugf("Skipping sharding distribution update. No relevant changes") @@ -111,8 +115,8 @@ func (sharding *ClusterSharding) Update(c *v1alpha1.Cluster) { func (sharding *ClusterSharding) GetDistribution() map[string]int { sharding.lock.RLock() + defer sharding.lock.RUnlock() shards := sharding.Shards - sharding.lock.RUnlock() distribution := make(map[string]int, len(shards)) for k, v := range shards { @@ -122,9 +126,7 @@ func (sharding *ClusterSharding) GetDistribution() map[string]int { } func (sharding *ClusterSharding) updateDistribution() { - log.Info("Updating cluster shards") - - for _, c := range sharding.Clusters { + for k, c := range sharding.Clusters { shard := 0 if c.Shard != nil { requestedShard := int(*c.Shard) @@ -136,24 +138,44 @@ func (sharding *ClusterSharding) updateDistribution() { } else { shard = sharding.getClusterShard(c) } - var shard64 int64 = int64(shard) - c.Shard = &shard64 - sharding.Shards[c.Server] = shard + + existingShard, ok := sharding.Shards[k] + if ok && existingShard != shard { + log.Infof("Cluster %s has changed shard from %d to %d", k, existingShard, shard) + } else if !ok { + log.Infof("Cluster %s has been assigned to shard %d", k, shard) + } else { + log.Debugf("Cluster %s has not changed shard", k) + } + sharding.Shards[k] = shard } } -// hasShardingUpdates returns true if the sharding distribution has been updated. -// nil checking is done for the corner case of the in-cluster cluster which may -// have a nil shard assigned +// hasShardingUpdates returns true if the sharding distribution has explicitly changed func hasShardingUpdates(old, new *v1alpha1.Cluster) bool { - if old == nil || new == nil || (old.Shard == nil && new.Shard == nil) { + if old == nil || new == nil { + return false + } + + // returns true if the cluster id has changed because some sharding algorithms depend on it. + if old.ID != new.ID { + return true + } + + if old.Server != new.Server { + return true + } + + // return false if the shard field has not been modified + if old.Shard == nil && new.Shard == nil { return false } - return old.Shard != new.Shard + return old.Shard == nil || new.Shard == nil || int64(*old.Shard) != int64(*new.Shard) } func (d *ClusterSharding) GetClusterAccessor() clusterAccessor { return func() []*v1alpha1.Cluster { + // no need to lock, as this is only called from the updateDistribution function clusters := make([]*v1alpha1.Cluster, 0, len(d.Clusters)) for _, c := range d.Clusters { clusters = append(clusters, c) diff --git a/controller/sharding/cache_test.go b/controller/sharding/cache_test.go new file mode 100644 index 0000000000000..ed3da752e7279 --- /dev/null +++ b/controller/sharding/cache_test.go @@ -0,0 +1,475 @@ +package sharding + +import ( + "testing" + + "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + dbmocks "github.com/argoproj/argo-cd/v2/util/db/mocks" + "github.com/stretchr/testify/assert" +) + +func setupTestSharding(shard int, replicas int) *ClusterSharding { + shardingAlgorithm := "legacy" // we are using the legacy algorithm as it is deterministic based on the cluster id which is easier to test + db := &dbmocks.ArgoDB{} + return NewClusterSharding(db, shard, replicas, shardingAlgorithm).(*ClusterSharding) +} + +func TestNewClusterSharding(t *testing.T) { + shard := 1 + replicas := 2 + sharding := setupTestSharding(shard, replicas) + + assert.NotNil(t, sharding) + assert.Equal(t, shard, sharding.Shard) + assert.Equal(t, replicas, sharding.Replicas) + assert.NotNil(t, sharding.Shards) + assert.NotNil(t, sharding.Clusters) +} + +func TestClusterSharding_Add(t *testing.T) { + shard := 1 + replicas := 2 + sharding := setupTestSharding(shard, replicas) + + clusterA := &v1alpha1.Cluster{ + ID: "2", + Server: "https://127.0.0.1:6443", + } + + sharding.Add(clusterA) + + clusterB := v1alpha1.Cluster{ + ID: "1", + Server: "https://kubernetes.default.svc", + } + + sharding.Add(&clusterB) + + distribution := sharding.GetDistribution() + + assert.Contains(t, sharding.Clusters, clusterA.Server) + assert.Contains(t, sharding.Clusters, clusterB.Server) + + clusterDistribution, ok := distribution[clusterA.Server] + assert.True(t, ok) + assert.Equal(t, 1, clusterDistribution) + + myClusterDistribution, ok := distribution[clusterB.Server] + assert.True(t, ok) + assert.Equal(t, 0, myClusterDistribution) + + assert.Equal(t, 2, len(distribution)) +} + +func TestClusterSharding_AddRoundRobin_Redistributes(t *testing.T) { + shard := 1 + replicas := 2 + + db := &dbmocks.ArgoDB{} + + sharding := NewClusterSharding(db, shard, replicas, "round-robin").(*ClusterSharding) + + clusterA := &v1alpha1.Cluster{ + ID: "1", + Server: "https://127.0.0.1:6443", + } + sharding.Add(clusterA) + + clusterB := v1alpha1.Cluster{ + ID: "3", + Server: "https://kubernetes.default.svc", + } + sharding.Add(&clusterB) + + distributionBefore := sharding.GetDistribution() + + assert.Contains(t, sharding.Clusters, clusterA.Server) + assert.Contains(t, sharding.Clusters, clusterB.Server) + + clusterDistributionA, ok := distributionBefore[clusterA.Server] + assert.True(t, ok) + assert.Equal(t, 0, clusterDistributionA) + + clusterDistributionB, ok := distributionBefore[clusterB.Server] + assert.True(t, ok) + assert.Equal(t, 1, clusterDistributionB) + + assert.Equal(t, 2, len(distributionBefore)) + + clusterC := v1alpha1.Cluster{ + ID: "2", + Server: "https://1.1.1.1", + } + sharding.Add(&clusterC) + + distributionAfter := sharding.GetDistribution() + + assert.Contains(t, sharding.Clusters, clusterA.Server) + assert.Contains(t, sharding.Clusters, clusterB.Server) + assert.Contains(t, sharding.Clusters, clusterC.Server) + + clusterDistributionA, ok = distributionAfter[clusterA.Server] + assert.True(t, ok) + assert.Equal(t, 0, clusterDistributionA) + + clusterDistributionC, ok := distributionAfter[clusterC.Server] + assert.True(t, ok) + assert.Equal(t, 1, clusterDistributionC) // will be assigned to shard 1 because the .ID is smaller then the "B" cluster + + clusterDistributionB, ok = distributionAfter[clusterB.Server] + assert.True(t, ok) + assert.Equal(t, 0, clusterDistributionB) // will be reassigned to shard 0 because the .ID is bigger then the "C" cluster +} + +func TestClusterSharding_Delete(t *testing.T) { + shard := 1 + replicas := 2 + sharding := setupTestSharding(shard, replicas) + + sharding.Init( + &v1alpha1.ClusterList{ + Items: []v1alpha1.Cluster{ + { + ID: "2", + Server: "https://127.0.0.1:6443", + }, + { + ID: "1", + Server: "https://kubernetes.default.svc", + }, + }, + }, + ) + + sharding.Delete("https://kubernetes.default.svc") + distribution := sharding.GetDistribution() + assert.Equal(t, 1, len(distribution)) +} + +func TestClusterSharding_Update(t *testing.T) { + shard := 1 + replicas := 2 + sharding := setupTestSharding(shard, replicas) + + sharding.Init( + &v1alpha1.ClusterList{ + Items: []v1alpha1.Cluster{ + { + ID: "2", + Server: "https://127.0.0.1:6443", + }, + { + ID: "1", + Server: "https://kubernetes.default.svc", + }, + }, + }, + ) + + distributionBefore := sharding.GetDistribution() + assert.Equal(t, 2, len(distributionBefore)) + + distributionA, ok := distributionBefore["https://kubernetes.default.svc"] + assert.True(t, ok) + assert.Equal(t, 0, distributionA) + + sharding.Update(&v1alpha1.Cluster{ + ID: "1", + Server: "https://kubernetes.default.svc", + }, &v1alpha1.Cluster{ + ID: "4", + Server: "https://kubernetes.default.svc", + }) + + distributionAfter := sharding.GetDistribution() + assert.Equal(t, 2, len(distributionAfter)) + + distributionA, ok = distributionAfter["https://kubernetes.default.svc"] + assert.True(t, ok) + assert.Equal(t, 1, distributionA) +} + +func TestClusterSharding_UpdateServerName(t *testing.T) { + shard := 1 + replicas := 2 + sharding := setupTestSharding(shard, replicas) + + sharding.Init( + &v1alpha1.ClusterList{ + Items: []v1alpha1.Cluster{ + { + ID: "2", + Server: "https://127.0.0.1:6443", + }, + { + ID: "1", + Server: "https://kubernetes.default.svc", + }, + }, + }, + ) + + distributionBefore := sharding.GetDistribution() + assert.Equal(t, 2, len(distributionBefore)) + + distributionA, ok := distributionBefore["https://kubernetes.default.svc"] + assert.True(t, ok) + assert.Equal(t, 0, distributionA) + + sharding.Update(&v1alpha1.Cluster{ + ID: "1", + Server: "https://kubernetes.default.svc", + }, &v1alpha1.Cluster{ + ID: "1", + Server: "https://server2", + }) + + distributionAfter := sharding.GetDistribution() + assert.Equal(t, 2, len(distributionAfter)) + + _, ok = distributionAfter["https://kubernetes.default.svc"] + assert.False(t, ok) // the old server name should not be present anymore + + _, ok = distributionAfter["https://server2"] + assert.True(t, ok) // the new server name should be present +} + +func TestClusterSharding_IsManagedCluster(t *testing.T) { + replicas := 2 + sharding0 := setupTestSharding(0, replicas) + + sharding0.Init( + &v1alpha1.ClusterList{ + Items: []v1alpha1.Cluster{ + { + ID: "1", + Server: "https://kubernetes.default.svc", + }, + { + ID: "2", + Server: "https://127.0.0.1:6443", + }, + }, + }, + ) + + assert.True(t, sharding0.IsManagedCluster(&v1alpha1.Cluster{ + ID: "1", + Server: "https://kubernetes.default.svc", + })) + + assert.False(t, sharding0.IsManagedCluster(&v1alpha1.Cluster{ + ID: "2", + Server: "https://127.0.0.1:6443", + })) + + sharding1 := setupTestSharding(1, replicas) + + sharding1.Init( + &v1alpha1.ClusterList{ + Items: []v1alpha1.Cluster{ + { + ID: "2", + Server: "https://127.0.0.1:6443", + }, + { + ID: "1", + Server: "https://kubernetes.default.svc", + }, + }, + }, + ) + + assert.False(t, sharding1.IsManagedCluster(&v1alpha1.Cluster{ + ID: "1", + Server: "https://kubernetes.default.svc", + })) + + assert.True(t, sharding1.IsManagedCluster(&v1alpha1.Cluster{ + ID: "2", + Server: "https://127.0.0.1:6443", + })) + +} + +func TestClusterSharding_ClusterShardOfResourceShouldNotBeChanged(t *testing.T) { + shard := 1 + replicas := 2 + sharding := setupTestSharding(shard, replicas) + + Int64Ptr := func(i int64) *int64 { + return &i + } + + clusterWithNil := &v1alpha1.Cluster{ + ID: "2", + Server: "https://127.0.0.1:6443", + Shard: nil, + } + + clusterWithValue := &v1alpha1.Cluster{ + ID: "1", + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(1), + } + + clusterWithToBigValue := &v1alpha1.Cluster{ + ID: "3", + Server: "https://1.1.1.1", + Shard: Int64Ptr(999), // shard value is explicitly bigger than the number of replicas + } + + sharding.Init( + &v1alpha1.ClusterList{ + Items: []v1alpha1.Cluster{ + *clusterWithNil, + *clusterWithValue, + *clusterWithToBigValue, + }, + }, + ) + distribution := sharding.GetDistribution() + assert.Equal(t, 3, len(distribution)) + + assert.Nil(t, sharding.Clusters[clusterWithNil.Server].Shard) + + assert.NotNil(t, sharding.Clusters[clusterWithValue.Server].Shard) + assert.Equal(t, int64(1), *sharding.Clusters[clusterWithValue.Server].Shard) + assert.Equal(t, 1, distribution[clusterWithValue.Server]) + + assert.NotNil(t, sharding.Clusters[clusterWithToBigValue.Server].Shard) + assert.Equal(t, int64(999), *sharding.Clusters[clusterWithToBigValue.Server].Shard) + assert.Equal(t, 0, distribution[clusterWithToBigValue.Server]) // will be assigned to shard 0 because the value is bigger than the number of replicas +} + +func TestHasShardingUpdates(t *testing.T) { + Int64Ptr := func(i int64) *int64 { + return &i + } + + testCases := []struct { + name string + old *v1alpha1.Cluster + new *v1alpha1.Cluster + expected bool + }{ + { + name: "No updates", + old: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(1), + }, + new: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(1), + }, + expected: false, + }, + { + name: "Updates", + old: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(1), + }, + new: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(2), + }, + expected: true, + }, + { + name: "Old is nil", + old: nil, + new: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(2), + }, + expected: false, + }, + { + name: "New is nil", + old: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(2), + }, + new: nil, + expected: false, + }, + { + name: "Both are nil", + old: nil, + new: nil, + expected: false, + }, + { + name: "Both shards are nil", + old: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: nil, + }, + new: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: nil, + }, + expected: false, + }, + { + name: "Old shard is nil", + old: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: nil, + }, + new: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(2), + }, + expected: true, + }, + { + name: "New shard is nil", + old: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(2), + }, + new: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: nil, + }, + expected: true, + }, + { + name: "Cluster ID has changed", + old: &v1alpha1.Cluster{ + ID: "1", + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(2), + }, + new: &v1alpha1.Cluster{ + ID: "2", + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(2), + }, + expected: true, + }, + { + name: "Server has changed", + old: &v1alpha1.Cluster{ + ID: "1", + Server: "https://server1", + Shard: Int64Ptr(2), + }, + new: &v1alpha1.Cluster{ + ID: "1", + Server: "https://server2", + Shard: Int64Ptr(2), + }, + expected: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expected, hasShardingUpdates(tc.old, tc.new)) + }) + } +} diff --git a/controller/sharding/sharding.go b/controller/sharding/sharding.go index 2b86ed3f82bc6..49d38711a74f6 100644 --- a/controller/sharding/sharding.go +++ b/controller/sharding/sharding.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "hash/fnv" + "math" "os" "sort" "strconv" @@ -20,6 +21,7 @@ import ( "github.com/argoproj/argo-cd/v2/util/db" "github.com/argoproj/argo-cd/v2/util/env" + "github.com/argoproj/argo-cd/v2/util/errors" "github.com/argoproj/argo-cd/v2/util/settings" log "github.com/sirupsen/logrus" kubeerrors "k8s.io/apimachinery/pkg/api/errors" @@ -206,7 +208,7 @@ func createClusterIndexByClusterIdMap(getCluster clusterAccessor) map[string]int // The function takes the shard number from the environment variable (default value -1, if not set) and passes it to this function. // If the shard value passed to this function is -1, that is, the shard was not set as an environment variable, // we default the shard number to 0 for computing the default config map. -func GetOrUpdateShardFromConfigMap(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, replicas, shard int) (int, error) { +func GetOrUpdateShardFromConfigMap(kubeClient kubernetes.Interface, settingsMgr *settings.SettingsManager, replicas, shard int) (int, error) { hostname, err := osHostnameFunction() if err != nil { return -1, err @@ -363,3 +365,59 @@ func getDefaultShardMappingData(replicas int) []shardApplicationControllerMappin } return shardMappingData } + +func GetClusterSharding(kubeClient kubernetes.Interface, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) (ClusterShardingCache, error) { + var replicasCount int + if enableDynamicClusterDistribution { + applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName) + appControllerDeployment, err := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{}) + + // if app controller deployment is not found when dynamic cluster distribution is enabled error out + if err != nil { + return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment: %v", err) + } + + if appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil { + replicasCount = int(*appControllerDeployment.Spec.Replicas) + } else { + return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment replica count") + } + + } else { + replicasCount = env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32) + } + shardNumber := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32) + if replicasCount > 1 { + // check for shard mapping using configmap if application-controller is a deployment + // else use existing logic to infer shard from pod name if application-controller is a statefulset + if enableDynamicClusterDistribution { + var err error + // retry 3 times if we find a conflict while updating shard mapping configMap. + // If we still see conflicts after the retries, wait for next iteration of heartbeat process. + for i := 0; i <= common.AppControllerHeartbeatUpdateRetryCount; i++ { + shardNumber, err = GetOrUpdateShardFromConfigMap(kubeClient, settingsMgr, replicasCount, shardNumber) + if err != nil && !kubeerrors.IsConflict(err) { + err = fmt.Errorf("unable to get shard due to error updating the sharding config map: %s", err) + break + } + log.Warnf("conflict when getting shard from shard mapping configMap. Retrying (%d/3)", i) + } + errors.CheckError(err) + } else { + if shardNumber < 0 { + var err error + shardNumber, err = InferShard() + errors.CheckError(err) + } + if shardNumber > replicasCount { + log.Warnf("Calculated shard number %d is greated than the number of replicas count. Defaulting to 0", shardNumber) + shardNumber = 0 + } + } + } else { + log.Info("Processing all cluster shards") + shardNumber = 0 + } + db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient) + return NewClusterSharding(db, shardNumber, replicasCount, shardingAlgorithm), nil +} diff --git a/controller/sharding/sharding_test.go b/controller/sharding/sharding_test.go index 0992f7a9dfd7f..15f834f190259 100644 --- a/controller/sharding/sharding_test.go +++ b/controller/sharding/sharding_test.go @@ -1,6 +1,7 @@ package sharding import ( + "context" "encoding/json" "errors" "fmt" @@ -12,10 +13,14 @@ import ( "github.com/argoproj/argo-cd/v2/common" "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" dbmocks "github.com/argoproj/argo-cd/v2/util/db/mocks" + "github.com/argoproj/argo-cd/v2/util/settings" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + kubefake "k8s.io/client-go/kubernetes/fake" ) func TestGetShardByID_NotEmptyID(t *testing.T) { @@ -681,3 +686,187 @@ func Test_getOrUpdateShardNumberForController(t *testing.T) { }) } } + +func TestGetClusterSharding(t *testing.T) { + IntPtr := func(i int32) *int32 { + return &i + } + + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: common.DefaultApplicationControllerName, + Namespace: "argocd", + }, + Spec: appsv1.DeploymentSpec{ + Replicas: IntPtr(1), + }, + } + + deploymentMultiReplicas := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "argocd-application-controller-multi-replicas", + Namespace: "argocd", + }, + Spec: appsv1.DeploymentSpec{ + Replicas: IntPtr(3), + }, + } + + objects := append([]runtime.Object{}, deployment, deploymentMultiReplicas) + kubeclientset := kubefake.NewSimpleClientset(objects...) + + settingsMgr := settings.NewSettingsManager(context.TODO(), kubeclientset, "argocd", settings.WithRepoOrClusterChangedHandler(func() { + })) + + testCases := []struct { + name string + useDynamicSharding bool + envsSetter func(t *testing.T) + cleanup func() + expectedShard int + expectedReplicas int + expectedErr error + }{ + { + name: "Default sharding with statefulset", + envsSetter: func(t *testing.T) { + t.Setenv(common.EnvControllerReplicas, "1") + }, + cleanup: func() {}, + useDynamicSharding: false, + expectedShard: 0, + expectedReplicas: 1, + expectedErr: nil, + }, + { + name: "Default sharding with deployment", + envsSetter: func(t *testing.T) { + t.Setenv(common.EnvAppControllerName, common.DefaultApplicationControllerName) + }, + cleanup: func() {}, + useDynamicSharding: true, + expectedShard: 0, + expectedReplicas: 1, + expectedErr: nil, + }, + { + name: "Default sharding with deployment and multiple replicas", + envsSetter: func(t *testing.T) { + t.Setenv(common.EnvAppControllerName, "argocd-application-controller-multi-replicas") + }, + cleanup: func() {}, + useDynamicSharding: true, + expectedShard: 0, + expectedReplicas: 3, + expectedErr: nil, + }, + { + name: "Statefulset multiple replicas", + envsSetter: func(t *testing.T) { + t.Setenv(common.EnvControllerReplicas, "3") + osHostnameFunction = func() (string, error) { return "example-shard-3", nil } + }, + cleanup: func() { + osHostnameFunction = os.Hostname + }, + useDynamicSharding: false, + expectedShard: 3, + expectedReplicas: 3, + expectedErr: nil, + }, + { + name: "Explicit shard with statefulset and 1 replica", + envsSetter: func(t *testing.T) { + t.Setenv(common.EnvControllerReplicas, "1") + t.Setenv(common.EnvControllerShard, "3") + }, + cleanup: func() {}, + useDynamicSharding: false, + expectedShard: 0, + expectedReplicas: 1, + expectedErr: nil, + }, + { + name: "Explicit shard with statefulset and 2 replica - and to high shard", + envsSetter: func(t *testing.T) { + t.Setenv(common.EnvControllerReplicas, "2") + t.Setenv(common.EnvControllerShard, "3") + }, + cleanup: func() {}, + useDynamicSharding: false, + expectedShard: 0, + expectedReplicas: 2, + expectedErr: nil, + }, + { + name: "Explicit shard with statefulset and 2 replica", + envsSetter: func(t *testing.T) { + t.Setenv(common.EnvControllerReplicas, "2") + t.Setenv(common.EnvControllerShard, "1") + }, + cleanup: func() {}, + useDynamicSharding: false, + expectedShard: 1, + expectedReplicas: 2, + expectedErr: nil, + }, + { + name: "Explicit shard with deployment", + envsSetter: func(t *testing.T) { + t.Setenv(common.EnvControllerShard, "3") + }, + cleanup: func() {}, + useDynamicSharding: true, + expectedShard: 0, + expectedReplicas: 1, + expectedErr: nil, + }, + { + name: "Explicit shard with deployment and multiple replicas will read from configmap", + envsSetter: func(t *testing.T) { + t.Setenv(common.EnvAppControllerName, "argocd-application-controller-multi-replicas") + t.Setenv(common.EnvControllerShard, "3") + }, + cleanup: func() {}, + useDynamicSharding: true, + expectedShard: 0, + expectedReplicas: 3, + expectedErr: nil, + }, + { + name: "Dynamic sharding but missing deployment", + envsSetter: func(t *testing.T) { + t.Setenv(common.EnvAppControllerName, "missing-deployment") + }, + cleanup: func() {}, + useDynamicSharding: true, + expectedShard: 0, + expectedReplicas: 1, + expectedErr: fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment: deployments.apps \"missing-deployment\" not found"), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tc.envsSetter(t) + defer tc.cleanup() + shardingCache, err := GetClusterSharding(kubeclientset, settingsMgr, "round-robin", tc.useDynamicSharding) + + if shardingCache != nil { + clusterSharding := shardingCache.(*ClusterSharding) + assert.Equal(t, tc.expectedShard, clusterSharding.Shard) + assert.Equal(t, tc.expectedReplicas, clusterSharding.Replicas) + } + + if tc.expectedErr != nil { + if err != nil { + assert.Equal(t, tc.expectedErr.Error(), err.Error()) + } else { + t.Errorf("Expected error %v but got nil", tc.expectedErr) + } + } else { + assert.Nil(t, err) + } + }) + } +}