Skip to content

Commit

Permalink
fix: added tests and fixed some behaviour bugs
Browse files Browse the repository at this point in the history
Signed-off-by: Lukas Wöhrl <lukas.woehrl@plentymarkets.com>
  • Loading branch information
woehrl01 committed Feb 13, 2024
1 parent 08adee2 commit cabd99b
Show file tree
Hide file tree
Showing 2 changed files with 353 additions and 12 deletions.
38 changes: 26 additions & 12 deletions controller/sharding/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type ClusterSharding struct {
getClusterShard DistributionFunction
}

func NewClusterSharding(db db.ArgoDB, shard, replicas int, shardingAlgorithm string) ClusterShardingCache {
func NewClusterSharding(_ db.ArgoDB, shard, replicas int, shardingAlgorithm string) ClusterShardingCache {
log.Debugf("Processing clusters from shard %d: Using filter function: %s", shard, shardingAlgorithm)
clusterSharding := &ClusterSharding{
Shard: shard,
Expand Down Expand Up @@ -67,7 +67,8 @@ func (sharding *ClusterSharding) Init(clusters *v1alpha1.ClusterList) {
defer sharding.lock.Unlock()
newClusters := make(map[string]*v1alpha1.Cluster, len(clusters.Items))
for _, c := range clusters.Items {
newClusters[c.Server] = &c
cluster := c
newClusters[c.Server] = &cluster
}
sharding.Clusters = newClusters
sharding.updateDistribution()
Expand Down Expand Up @@ -122,9 +123,7 @@ func (sharding *ClusterSharding) GetDistribution() map[string]int {
}

func (sharding *ClusterSharding) updateDistribution() {
log.Info("Updating cluster shards")

for _, c := range sharding.Clusters {
for k, c := range sharding.Clusters {
shard := 0
if c.Shard != nil {
requestedShard := int(*c.Shard)
Expand All @@ -136,17 +135,32 @@ func (sharding *ClusterSharding) updateDistribution() {
} else {
shard = sharding.getClusterShard(c)
}
var shard64 int64 = int64(shard)
c.Shard = &shard64
sharding.Shards[c.Server] = shard

existingShard, ok := sharding.Shards[k]
if ok && existingShard != shard {
log.Infof("Cluster %s has changed shard from %d to %d", k, existingShard, shard)
} else if !ok {
log.Infof("Cluster %s has been assigned to shard %d", k, shard)
} else {
log.Debugf("Cluster %s has not changed shard", k)
}
sharding.Shards[k] = shard
}
}

// hasShardingUpdates returns true if the sharding distribution has been updated.
// nil checking is done for the corner case of the in-cluster cluster which may
// have a nil shard assigned
// hasShardingUpdates returns true if the sharding distribution has explicitly changed
func hasShardingUpdates(old, new *v1alpha1.Cluster) bool {
if old == nil || new == nil || (old.Shard == nil && new.Shard == nil) {
if old == nil || new == nil {
return false
}

// returns true if the cluster id has changed because some sharding algorithms depend on it.
if old.ID != new.ID {
return true
}

// return false if the shard field has not been modified
if old.Shard == nil && new.Shard == nil {
return false
}
return old.Shard == nil || new.Shard == nil || int64(*old.Shard) != int64(*new.Shard)
Expand Down
327 changes: 327 additions & 0 deletions controller/sharding/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,327 @@
package sharding

import (
"testing"

"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
dbmocks "github.com/argoproj/argo-cd/v2/util/db/mocks"
"github.com/stretchr/testify/assert"
)

func setupTestSharding(shard int, replicas int) *ClusterSharding {
shardingAlgorithm := "legacy" // we are using the legacy algorithm as it is deterministic based on the cluster id

db := &dbmocks.ArgoDB{}

return NewClusterSharding(db, shard, replicas, shardingAlgorithm).(*ClusterSharding)
}

func TestNewClusterSharding(t *testing.T) {
shard := 1
replicas := 2
sharding := setupTestSharding(shard, replicas)

assert.NotNil(t, sharding)
assert.Equal(t, shard, sharding.Shard)
assert.Equal(t, replicas, sharding.Replicas)
assert.NotNil(t, sharding.Shards)
assert.NotNil(t, sharding.Clusters)
}

func TestClusterSharding_Add(t *testing.T) {
shard := 1
replicas := 2
sharding := setupTestSharding(shard, replicas) // Implement a helper function to setup the test environment

cluster := &v1alpha1.Cluster{
ID: "2",
Server: "https://127.0.0.1:6443",
}

sharding.Add(cluster)

myCluster := v1alpha1.Cluster{
ID: "1",
Server: "https://kubernetes.default.svc",
}

sharding.Add(&myCluster)

distribution := sharding.GetDistribution()

assert.Contains(t, sharding.Clusters, cluster.Server)
assert.Contains(t, sharding.Clusters, myCluster.Server)

clusterDistribution, ok := distribution[cluster.Server]
assert.True(t, ok)
assert.Equal(t, 1, clusterDistribution)

myClusterDistribution, ok := distribution[myCluster.Server]
assert.True(t, ok)
assert.Equal(t, 0, myClusterDistribution)

assert.Equal(t, 2, len(distribution))
}

func TestClusterSharding_AddRoundRobin(t *testing.T) {
shard := 1
replicas := 2

db := &dbmocks.ArgoDB{}

sharding := NewClusterSharding(db, shard, replicas, "round-robin").(*ClusterSharding)

firstCluster := &v1alpha1.Cluster{
ID: "1",
Server: "https://127.0.0.1:6443",
}
sharding.Add(firstCluster)

secondCluster := v1alpha1.Cluster{
ID: "2",
Server: "https://kubernetes.default.svc",
}
sharding.Add(&secondCluster)

distribution := sharding.GetDistribution()

assert.Contains(t, sharding.Clusters, firstCluster.Server)
assert.Contains(t, sharding.Clusters, secondCluster.Server)

clusterDistribution, ok := distribution[firstCluster.Server]
assert.True(t, ok)
assert.Equal(t, 0, clusterDistribution)

myClusterDistribution, ok := distribution[secondCluster.Server]
assert.True(t, ok)
assert.Equal(t, 1, myClusterDistribution)

assert.Equal(t, 2, len(distribution))
}

func TestClusterSharding_Delete(t *testing.T) {
shard := 1
replicas := 2
sharding := setupTestSharding(shard, replicas)

sharding.Init(
&v1alpha1.ClusterList{
Items: []v1alpha1.Cluster{
{
ID: "2",
Server: "https://127.0.0.1:6443",
},
{
ID: "1",
Server: "https://kubernetes.default.svc",
},
},
},
)

sharding.Delete("https://kubernetes.default.svc")
distribution := sharding.GetDistribution()
assert.Equal(t, 1, len(distribution))
}

func TestClusterSharding_Update(t *testing.T) {
shard := 1
replicas := 2
sharding := setupTestSharding(shard, replicas)

sharding.Init(
&v1alpha1.ClusterList{
Items: []v1alpha1.Cluster{
{
ID: "2",
Server: "https://127.0.0.1:6443",
},
{
ID: "1",
Server: "https://kubernetes.default.svc",
},
},
},
)

distribution := sharding.GetDistribution()
assert.Equal(t, 2, len(distribution))

myClusterDistribution, ok := distribution["https://kubernetes.default.svc"]
assert.True(t, ok)
assert.Equal(t, 0, myClusterDistribution)

sharding.Update(&v1alpha1.Cluster{
ID: "4",
Server: "https://kubernetes.default.svc",
})

distribution = sharding.GetDistribution()
assert.Equal(t, 2, len(distribution))

myClusterDistribution, ok = distribution["https://kubernetes.default.svc"]
assert.True(t, ok)
assert.Equal(t, 1, myClusterDistribution)
}

func TestClusterSharding_IsManagedCluster(t *testing.T) {
replicas := 2
sharding0 := setupTestSharding(0, replicas)

sharding0.Init(
&v1alpha1.ClusterList{
Items: []v1alpha1.Cluster{
{
ID: "1",
Server: "https://kubernetes.default.svc",
},
{
ID: "2",
Server: "https://127.0.0.1:6443",
},
},
},
)

assert.True(t, sharding0.IsManagedCluster(&v1alpha1.Cluster{
ID: "1",
Server: "https://kubernetes.default.svc",
}))

assert.False(t, sharding0.IsManagedCluster(&v1alpha1.Cluster{
ID: "2",
Server: "https://127.0.0.1:6443",
}))

sharding1 := setupTestSharding(1, replicas)

sharding1.Init(
&v1alpha1.ClusterList{
Items: []v1alpha1.Cluster{
{
ID: "2",
Server: "https://127.0.0.1:6443",
},
{
ID: "1",
Server: "https://kubernetes.default.svc",
},
},
},
)

assert.False(t, sharding1.IsManagedCluster(&v1alpha1.Cluster{
ID: "1",
Server: "https://kubernetes.default.svc",
}))

assert.True(t, sharding1.IsManagedCluster(&v1alpha1.Cluster{
ID: "2",
Server: "https://127.0.0.1:6443",
}))

}

func Int64Ptr(i int64) *int64 {
return &i
}

func TestHasShardingUpdates(t *testing.T) {
testCases := []struct {
name string
old *v1alpha1.Cluster
new *v1alpha1.Cluster
expected bool
}{
{
name: "No updates",
old: &v1alpha1.Cluster{
Server: "https://kubernetes.default.svc",
Shard: Int64Ptr(1),
},
new: &v1alpha1.Cluster{
Server: "https://kubernetes.default.svc",
Shard: Int64Ptr(1),
},
expected: false,
},
{
name: "Updates",
old: &v1alpha1.Cluster{
Server: "https://kubernetes.default.svc",
Shard: Int64Ptr(1),
},
new: &v1alpha1.Cluster{
Server: "https://kubernetes.default.svc",
Shard: Int64Ptr(2),
},
expected: true,
},
{
name: "Old is nil",
old: nil,
new: &v1alpha1.Cluster{
Server: "https://kubernetes.default.svc",
Shard: Int64Ptr(2),
},
expected: false,
},
{
name: "New is nil",
old: &v1alpha1.Cluster{
Server: "https://kubernetes.default.svc",
Shard: Int64Ptr(2),
},
new: nil,
expected: false,
},
{
name: "Both are nil",
old: nil,
new: nil,
expected: false,
},
{
name: "Both shards are nil",
old: &v1alpha1.Cluster{
Server: "https://kubernetes.default.svc",
Shard: nil,
},
new: &v1alpha1.Cluster{
Server: "https://kubernetes.default.svc",
Shard: nil,
},
expected: false,
},
{
name: "Old shard is nil",
old: &v1alpha1.Cluster{
Server: "https://kubernetes.default.svc",
Shard: nil,
},
new: &v1alpha1.Cluster{
Server: "https://kubernetes.default.svc",
Shard: Int64Ptr(2),
},
expected: true,
},
{
name: "New shard is nil",
old: &v1alpha1.Cluster{
Server: "https://kubernetes.default.svc",
Shard: Int64Ptr(2),
},
new: &v1alpha1.Cluster{
Server: "https://kubernetes.default.svc",
Shard: nil,
},
expected: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.expected, hasShardingUpdates(tc.old, tc.new))
})
}
}

0 comments on commit cabd99b

Please sign in to comment.