Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: infer correct shard in statefulset setup (#17124, #17016) #17167

Merged
merged 15 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"

Expand All @@ -26,7 +24,6 @@ import (
cacheutil "github.com/argoproj/argo-cd/v2/util/cache"
appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate"
"github.com/argoproj/argo-cd/v2/util/cli"
"github.com/argoproj/argo-cd/v2/util/db"
"github.com/argoproj/argo-cd/v2/util/env"
"github.com/argoproj/argo-cd/v2/util/errors"
kubeutil "github.com/argoproj/argo-cd/v2/util/kube"
Expand Down Expand Up @@ -147,7 +144,7 @@ func NewCommand() *cobra.Command {
appController.InvalidateProjectsCache()
}))
kubectl := kubeutil.NewKubectl()
clusterSharding, err := getClusterSharding(kubeClient, settingsMgr, shardingAlgorithm, enableDynamicClusterDistribution)
clusterSharding, err := sharding.GetClusterSharding(kubeClient, settingsMgr, shardingAlgorithm, enableDynamicClusterDistribution)
errors.CheckError(err)
appController, err = controller.NewApplicationController(
namespace,
Expand Down Expand Up @@ -239,64 +236,3 @@ func NewCommand() *cobra.Command {
})
return &command
}

func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) (sharding.ClusterShardingCache, error) {
var (
replicasCount int
)
// StatefulSet mode and Deployment mode uses different default values for shard number.
defaultShardNumberValue := 0

if enableDynamicClusterDistribution {
applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
appControllerDeployment, err := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{})

// if app controller deployment is not found when dynamic cluster distribution is enabled error out
if err != nil {
return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment: %v", err)
}

if appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil {
replicasCount = int(*appControllerDeployment.Spec.Replicas)
defaultShardNumberValue = -1
} else {
return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment replica count")
}

} else {
replicasCount = env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
}
shardNumber := env.ParseNumFromEnv(common.EnvControllerShard, defaultShardNumberValue, -math.MaxInt32, math.MaxInt32)
if replicasCount > 1 {
// check for shard mapping using configmap if application-controller is a deployment
// else use existing logic to infer shard from pod name if application-controller is a statefulset
if enableDynamicClusterDistribution {
var err error
// retry 3 times if we find a conflict while updating shard mapping configMap.
// If we still see conflicts after the retries, wait for next iteration of heartbeat process.
for i := 0; i <= common.AppControllerHeartbeatUpdateRetryCount; i++ {
shardNumber, err = sharding.GetOrUpdateShardFromConfigMap(kubeClient, settingsMgr, replicasCount, shardNumber)
if err != nil && !kubeerrors.IsConflict(err) {
err = fmt.Errorf("unable to get shard due to error updating the sharding config map: %s", err)
break
}
log.Warnf("conflict when getting shard from shard mapping configMap. Retrying (%d/3)", i)
}
errors.CheckError(err)
} else {
if shardNumber < 0 {
var err error
shardNumber, err = sharding.InferShard()
errors.CheckError(err)
}
if shardNumber > replicasCount {
log.Warnf("Calculated shard number %d is greated than the number of replicas count. Defaulting to 0", shardNumber)
shardNumber = 0
}
}
} else {
log.Info("Processing all cluster shards")
}
db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient)
return sharding.NewClusterSharding(db, shardNumber, replicasCount, shardingAlgorithm), nil
}
2 changes: 1 addition & 1 deletion controller/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ func (c *liveStateCache) handleAddEvent(cluster *appv1.Cluster) {
}

func (c *liveStateCache) handleModEvent(oldCluster *appv1.Cluster, newCluster *appv1.Cluster) {
c.clusterSharding.Update(newCluster)
c.clusterSharding.Update(oldCluster, newCluster)
c.lock.Lock()
cluster, ok := c.clusters[newCluster.Server]
c.lock.Unlock()
Expand Down
60 changes: 41 additions & 19 deletions controller/sharding/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type ClusterShardingCache interface {
Init(clusters *v1alpha1.ClusterList)
Add(c *v1alpha1.Cluster)
Delete(clusterServer string)
Update(c *v1alpha1.Cluster)
Update(oldCluster *v1alpha1.Cluster, newCluster *v1alpha1.Cluster)
IsManagedCluster(c *v1alpha1.Cluster) bool
GetDistribution() map[string]int
}
Expand All @@ -26,7 +26,7 @@ type ClusterSharding struct {
getClusterShard DistributionFunction
}

func NewClusterSharding(db db.ArgoDB, shard, replicas int, shardingAlgorithm string) ClusterShardingCache {
func NewClusterSharding(_ db.ArgoDB, shard, replicas int, shardingAlgorithm string) ClusterShardingCache {
log.Debugf("Processing clusters from shard %d: Using filter function: %s", shard, shardingAlgorithm)
clusterSharding := &ClusterSharding{
Shard: shard,
Expand Down Expand Up @@ -67,7 +67,8 @@ func (sharding *ClusterSharding) Init(clusters *v1alpha1.ClusterList) {
defer sharding.lock.Unlock()
newClusters := make(map[string]*v1alpha1.Cluster, len(clusters.Items))
for _, c := range clusters.Items {
newClusters[c.Server] = &c
cluster := c
newClusters[c.Server] = &cluster
}
sharding.Clusters = newClusters
sharding.updateDistribution()
Expand Down Expand Up @@ -96,13 +97,16 @@ func (sharding *ClusterSharding) Delete(clusterServer string) {
}
}

func (sharding *ClusterSharding) Update(c *v1alpha1.Cluster) {
func (sharding *ClusterSharding) Update(oldCluster *v1alpha1.Cluster, newCluster *v1alpha1.Cluster) {
sharding.lock.Lock()
defer sharding.lock.Unlock()

old, ok := sharding.Clusters[c.Server]
sharding.Clusters[c.Server] = c
if !ok || hasShardingUpdates(old, c) {
if _, ok := sharding.Clusters[oldCluster.Server]; ok && oldCluster.Server != newCluster.Server {
delete(sharding.Clusters, oldCluster.Server)
delete(sharding.Shards, oldCluster.Server)
}
sharding.Clusters[newCluster.Server] = newCluster
if hasShardingUpdates(oldCluster, newCluster) {
sharding.updateDistribution()
} else {
log.Debugf("Skipping sharding distribution update. No relevant changes")
Expand All @@ -111,8 +115,8 @@ func (sharding *ClusterSharding) Update(c *v1alpha1.Cluster) {

func (sharding *ClusterSharding) GetDistribution() map[string]int {
sharding.lock.RLock()
defer sharding.lock.RUnlock()
shards := sharding.Shards
sharding.lock.RUnlock()

distribution := make(map[string]int, len(shards))
for k, v := range shards {
Expand All @@ -122,9 +126,7 @@ func (sharding *ClusterSharding) GetDistribution() map[string]int {
}

func (sharding *ClusterSharding) updateDistribution() {
log.Info("Updating cluster shards")

for _, c := range sharding.Clusters {
for k, c := range sharding.Clusters {
shard := 0
if c.Shard != nil {
requestedShard := int(*c.Shard)
Expand All @@ -136,24 +138,44 @@ func (sharding *ClusterSharding) updateDistribution() {
} else {
shard = sharding.getClusterShard(c)
}
var shard64 int64 = int64(shard)
c.Shard = &shard64
sharding.Shards[c.Server] = shard

existingShard, ok := sharding.Shards[k]
if ok && existingShard != shard {
log.Infof("Cluster %s has changed shard from %d to %d", k, existingShard, shard)
} else if !ok {
log.Infof("Cluster %s has been assigned to shard %d", k, shard)
} else {
log.Debugf("Cluster %s has not changed shard", k)
}
sharding.Shards[k] = shard
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous PR seemed to update the cluster shard value in the cache and not only the shard number in the Shards object.

I think the goal was to persist the computed shard in the cache so we do not need to call sharding.getClusterShard(c) and recompute the shard number every time updateDistribution() is called. Basically, once the controller starts managing a cluster, it will manage it forever.

Now with the dynamic cluster sharding, I think you are right. We do have to recompute all the cluster shards whenever a cluster is added/modified/deleted.

Copy link
Contributor Author

@woehrl01 woehrl01 Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agaudreault Thanks for confirming, the problem with "controller starts managing a cluster, it will manage it forever." is that this does not work properly as every shard could have a different view on the cluster. If clusters are added/removed and stateful set controllers are restarting at different time intervals (e.g. spot interruptions)

}
}

// hasShardingUpdates returns true if the sharding distribution has been updated.
// nil checking is done for the corner case of the in-cluster cluster which may
// have a nil shard assigned
// hasShardingUpdates returns true if the sharding distribution has explicitly changed
func hasShardingUpdates(old, new *v1alpha1.Cluster) bool {
if old == nil || new == nil || (old.Shard == nil && new.Shard == nil) {
if old == nil || new == nil {
return false
}

// returns true if the cluster id has changed because some sharding algorithms depend on it.
if old.ID != new.ID {
return true
}

if old.Server != new.Server {
return true
}

// return false if the shard field has not been modified
if old.Shard == nil && new.Shard == nil {
return false
}
return old.Shard != new.Shard
return old.Shard == nil || new.Shard == nil || int64(*old.Shard) != int64(*new.Shard)
leoluz marked this conversation as resolved.
Show resolved Hide resolved
}

func (d *ClusterSharding) GetClusterAccessor() clusterAccessor {
return func() []*v1alpha1.Cluster {
// no need to lock, as this is only called from the updateDistribution function
woehrl01 marked this conversation as resolved.
Show resolved Hide resolved
clusters := make([]*v1alpha1.Cluster, 0, len(d.Clusters))
for _, c := range d.Clusters {
clusters = append(clusters, c)
Expand Down
Loading
Loading