Skip to content
This repository is currently being migrated. It's locked while the migration is in progress.

Add cron job that does defrag without considering threshold #37

Merged
merged 6 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions controllers/etcdcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ func nextOutdatedPeer(cluster *etcdv1alpha1.EtcdCluster, peers *etcdv1alpha1.Etc
}

func scheduleMapKeyFor(cluster *etcdv1alpha1.EtcdCluster) string {
return string(cluster.UID)
return string(cluster.UID) + "-defrag"
}

func scheduleMapKeyWithoutThresholdFor(cluster *etcdv1alpha1.EtcdCluster) string {
Expand Down Expand Up @@ -1423,13 +1423,16 @@ func expectedURLForPeer(cluster *etcdv1alpha1.EtcdCluster, peerName string) stri
}

func (r *EtcdClusterReconciler) defragWithThresholdCronJob(log logr.Logger, cluster *etcdv1alpha1.EtcdCluster) {
ctx, cancel, etcdClient, members, ok := r.setupDefragDeps(log, cluster)
etcdClient, members, ok := r.setupDefragDeps(log, cluster)
if !ok {
return
}
defer cancel()
defer etcdClient.Close()

// create a new context with a long timeout for the defrag operation.
ctx, cancel := context.WithTimeout(context.Background(), defragTimeout)
defer cancel()

err := defragger.DefragIfNecessary(ctx, r.DefragThreshold, members, etcdClient, etcdClient, log)
if err != nil {
log.Error(err, "failed to defrag if necessary")
Expand All @@ -1438,23 +1441,23 @@ func (r *EtcdClusterReconciler) defragWithThresholdCronJob(log logr.Logger, clus
}

func (r *EtcdClusterReconciler) defragCronJob(log logr.Logger, cluster *etcdv1alpha1.EtcdCluster) {
ctx, cancel, etcdClient, members, ok := r.setupDefragDeps(log, cluster)
etcdClient, members, ok := r.setupDefragDeps(log, cluster)
if !ok {
return
}
defer cancel()
defer etcdClient.Close()

ctx = context.Background()
// create a new context with a long timeout for the defrag operation.
ctx, cancel := context.WithTimeout(context.Background(), defragTimeout)
defer cancel()

err := defragger.Defrag(ctx, members, etcdClient, log)
if err != nil {
log.Error(err, "failed to defrag")
}
}

func (r *EtcdClusterReconciler) setupDefragDeps(log logr.Logger, cluster *etcdv1alpha1.EtcdCluster) (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth adding a comment that "it's the callers responsibility to close the return etcd client"

context.Context,
context.CancelFunc,
etcd.API,
[]etcd.Member,
bool,
Expand All @@ -1465,27 +1468,25 @@ func (r *EtcdClusterReconciler) setupDefragDeps(log logr.Logger, cluster *etcdv1
tlsConfig, err := r.getTLSConfig(ctx, cluster)
if err != nil {
log.Error(err, "failed to create tls config before defragging")
return nil, nil, nil, nil, false
return nil, nil, false
}
var c etcd.API
if c, err = r.Etcd.New(etcdClientConfig(cluster, tlsConfig)); err != nil {
log.Error(err, "Unable to connect to etcd")
return nil, nil, nil, nil, false
return nil, nil, false
}

var memberSlice []etcd.Member
if memberSlice, err = c.List(ctx); err != nil {
log.Error(err, "Unable to list etcd cluster members")
return nil, nil, nil, nil, false
return nil, nil, false
}
if memberSlice == nil {
log.Info("Cannot defrag, not aware of members yet")
return nil, nil, nil, nil, false
return nil, nil, false
}

longCtx, cancel := context.WithTimeout(context.Background(), defragTimeout)

return longCtx, cancel, c, memberSlice, true
return c, memberSlice, true
}

func (r *EtcdClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
Expand Down
2 changes: 1 addition & 1 deletion controllers/etcdcluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func (s *controllerSuite) testClusterController(t *testing.T) {

t.Run("CreatesCronJob", func(t *testing.T) {
err = try.Eventually(func() error {
_, ok := s.clusterControllerSchedules.Read(string(etcdCluster.UID))
_, ok := s.clusterControllerSchedules.Read(string(etcdCluster.UID) + "-defrag")
if !ok {
return errors.New("cronjob not found")
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.46.0
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.6.1
github.com/stretchr/testify v1.7.2
// Pin specific etcd version via tag. See https://github.com/etcd-io/etcd/pull/11477
go.etcd.io/etcd v0.5.0-alpha.5.0.20200910180754-dd1b699fc489
gocloud.dev v0.17.0
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,9 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand Down Expand Up @@ -842,8 +843,9 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
31 changes: 15 additions & 16 deletions internal/interval/validate_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package interval

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
)

func TestValidateForDefrag(t *testing.T) {
Expand All @@ -27,21 +28,19 @@ func TestValidateForDefrag(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
out, err := ValidateForDefrag(tc.input)

if tc.err != nil && err == nil {
t.Errorf("expected an error")
}
if err != nil && tc.err == nil {
t.Errorf("unexpected error: %s", err)
}
if tc.err != nil && err != nil {
if !errors.As(err, &tc.err) {
t.Errorf("expected (%s) got (%s)", tc.err, err)
}
}

if out != tc.output {
t.Errorf("expected (%s) got (%s)", tc.output, out)
}
errorAs(t, err, tc.err)
assert.Equal(t, tc.output, out)
})
}
}

// errorAs wraps assert.ErrorAs but handles nil target errors.
func errorAs(t *testing.T, err, target error) {
t.Helper()

if target == nil {
assert.Nil(t, err)
} else {
assert.ErrorAs(t, err, &target)
}
}
Loading