Skip to content

Commit

Permalink
Add Option for PeriodicalEnqueueSource.
Browse files Browse the repository at this point in the history
Signed-off-by: Xun Jiang <blackpiglet@gmail.com>
  • Loading branch information
Xun Jiang committed Aug 25, 2022
1 parent 3c3cfc6 commit f52e1f3
Show file tree
Hide file tree
Showing 9 changed files with 320 additions and 215 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/backup_deletion_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewBackupDeletionReconciler(

func (r *backupDeletionReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Make sure the expired requests can be deleted eventually
s := kube.NewPeriodicalEnqueueSource(r.logger, mgr.GetClient(), &velerov1api.DeleteBackupRequestList{}, time.Hour)
s := kube.NewPeriodicalEnqueueSource(r.logger, mgr.GetClient(), &velerov1api.DeleteBackupRequestList{}, time.Hour, kube.PeriodicalEnqueueSourceOption{})
return ctrl.NewControllerManagedBy(mgr).
For(&velerov1api.DeleteBackupRequest{}).
Watches(s, nil).
Expand Down
11 changes: 7 additions & 4 deletions pkg/controller/backup_storage_location_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,13 @@ func (r *backupStorageLocationReconciler) SetupWithManager(mgr ctrl.Manager) err
mgr.GetClient(),
&velerov1api.BackupStorageLocationList{},
bslValidationEnqueuePeriod,
// Add filter function to enqueue BSL per ValidationFrequency setting.
func(object client.Object) bool {
location := object.(*velerov1api.BackupStorageLocation)
return storage.IsReadyToValidate(location.Spec.ValidationFrequency, location.Status.LastValidationTime, r.defaultBackupLocationInfo.ServerValidationFrequency, r.log.WithField("controller", BackupStorageLocation))
kube.PeriodicalEnqueueSourceOption{
FilterFuncs: []func(object client.Object) bool{
func(object client.Object) bool {
location := object.(*velerov1api.BackupStorageLocation)
return storage.IsReadyToValidate(location.Spec.ValidationFrequency, location.Status.LastValidationTime, r.defaultBackupLocationInfo.ServerValidationFrequency, r.log.WithField("controller", BackupStorageLocation))
},
},
},
)
return ctrl.NewControllerManagedBy(mgr).
Expand Down
377 changes: 185 additions & 192 deletions pkg/controller/backup_sync_controller.go

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions pkg/controller/backup_sync_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
. "github.com/onsi/gomega"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation"
Expand Down Expand Up @@ -650,4 +652,24 @@ var _ = Describe("Backup Sync Reconciler", func() {
Expect(expected).To(BeEquivalentTo(numBackups))
}
})

It("Test moving default BSL at the head of BSL array.", func() {
locationList := &velerov1api.BackupStorageLocationList{}
objArray := make([]runtime.Object, 0)

// Generate BSL array.
locations := defaultLocationsList("velero")
// Move default BSL to tail of array.
objArray = append(objArray, locations[1])
objArray = append(objArray, locations[0])

meta.SetList(locationList, objArray)

testObjList := backupSyncSourceOrderFunc(locationList)
testObjArray, err := meta.ExtractList(testObjList)
Expect(err).ShouldNot(HaveOccurred())

expectLocation := testObjArray[0].(*velerov1api.BackupStorageLocation)
Expect(expectLocation.Spec.Default).To(BeEquivalentTo(true))
})
})
2 changes: 1 addition & 1 deletion pkg/controller/gc_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewGCReconciler(
// Other Events will be filtered to decrease the number of reconcile call. Especially UpdateEvent must be filtered since we removed
// the backup status as the sub-resource of backup in v1.9, every change on it will be treated as UpdateEvent and trigger reconcile call.
func (c *gcReconciler) SetupWithManager(mgr ctrl.Manager) error {
s := kube.NewPeriodicalEnqueueSource(c.logger, mgr.GetClient(), &velerov1api.BackupList{}, defaultGCFrequency)
s := kube.NewPeriodicalEnqueueSource(c.logger, mgr.GetClient(), &velerov1api.BackupList{}, defaultGCFrequency, kube.PeriodicalEnqueueSourceOption{})
return ctrl.NewControllerManagedBy(mgr).
For(&velerov1api.Backup{}).
WithEventFilter(predicate.Funcs{
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/restic_repository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func NewResticRepoReconciler(namespace string, logger logrus.FieldLogger, client
}

func (r *ResticRepoReconciler) SetupWithManager(mgr ctrl.Manager) error {
s := kube.NewPeriodicalEnqueueSource(r.logger, mgr.GetClient(), &velerov1api.BackupRepositoryList{}, repoSyncPeriod)
s := kube.NewPeriodicalEnqueueSource(r.logger, mgr.GetClient(), &velerov1api.BackupRepositoryList{}, repoSyncPeriod, kube.PeriodicalEnqueueSourceOption{})
return ctrl.NewControllerManagedBy(mgr).
For(&velerov1api.BackupRepository{}).
Watches(s, nil).
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/schedule_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewScheduleReconciler(
}

func (c *scheduleReconciler) SetupWithManager(mgr ctrl.Manager) error {
s := kube.NewPeriodicalEnqueueSource(c.logger, mgr.GetClient(), &velerov1.ScheduleList{}, scheduleSyncPeriod)
s := kube.NewPeriodicalEnqueueSource(c.logger, mgr.GetClient(), &velerov1.ScheduleList{}, scheduleSyncPeriod, kube.PeriodicalEnqueueSourceOption{})
return ctrl.NewControllerManagedBy(mgr).
For(&velerov1.Schedule{}).
Watches(s, nil).
Expand Down
35 changes: 24 additions & 11 deletions pkg/util/kube/periodical_enqueue_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,18 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

func NewPeriodicalEnqueueSource(logger logrus.FieldLogger, client client.Client, objList client.ObjectList, period time.Duration, filters ...func(object client.Object) bool) *PeriodicalEnqueueSource {
func NewPeriodicalEnqueueSource(
logger logrus.FieldLogger,
client client.Client,
objList client.ObjectList,
period time.Duration,
option PeriodicalEnqueueSourceOption) *PeriodicalEnqueueSource {
return &PeriodicalEnqueueSource{
logger: logger.WithField("resource", reflect.TypeOf(objList).String()),
Client: client,
objList: objList,
period: period,
filterFuncs: filters,
logger: logger.WithField("resource", reflect.TypeOf(objList).String()),
Client: client,
objList: objList,
period: period,
option: option,
}
}

Expand All @@ -49,10 +54,15 @@ func NewPeriodicalEnqueueSource(logger logrus.FieldLogger, client client.Client,
// the reconcile logic periodically
type PeriodicalEnqueueSource struct {
client.Client
logger logrus.FieldLogger
objList client.ObjectList
period time.Duration
filterFuncs []func(object client.Object) bool
logger logrus.FieldLogger
objList client.ObjectList
period time.Duration
option PeriodicalEnqueueSourceOption
}

type PeriodicalEnqueueSourceOption struct {
FilterFuncs []func(object client.Object) bool
OrderFunc func(objList client.ObjectList) client.ObjectList
}

func (p *PeriodicalEnqueueSource) Start(ctx context.Context, h handler.EventHandler, q workqueue.RateLimitingInterface, pre ...predicate.Predicate) error {
Expand All @@ -66,13 +76,16 @@ func (p *PeriodicalEnqueueSource) Start(ctx context.Context, h handler.EventHand
p.logger.Debug("no resources, skip")
return
}
if p.option.OrderFunc != nil {
p.objList = p.option.OrderFunc(p.objList)
}
if err := meta.EachListItem(p.objList, func(object runtime.Object) error {
obj, ok := object.(metav1.Object)
if !ok {
p.logger.Error("%s's type isn't metav1.Object", object.GetObjectKind().GroupVersionKind().String())
return nil
}
for _, filter := range p.filterFuncs {
for _, filter := range p.option.FilterFuncs {
if filter != nil {
if enqueueObj := filter(object.(client.Object)); !enqueueObj {
p.logger.Debugf("skip enqueue object %s/%s due to filter function.", obj.GetNamespace(), obj.GetName())
Expand Down
82 changes: 78 additions & 4 deletions pkg/util/kube/periodical_enqueue_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ import (
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/util/workqueue"
crclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/vmware-tanzu/velero/internal/storage"
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
Expand All @@ -39,7 +42,7 @@ func TestStart(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.TODO())
client := (&fake.ClientBuilder{}).Build()
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter())
source := NewPeriodicalEnqueueSource(logrus.WithContext(ctx), client, &velerov1.ScheduleList{}, 1*time.Second)
source := NewPeriodicalEnqueueSource(logrus.WithContext(ctx), client, &velerov1.ScheduleList{}, 1*time.Second, PeriodicalEnqueueSourceOption{})

require.Nil(t, source.Start(ctx, nil, queue))

Expand Down Expand Up @@ -76,9 +79,11 @@ func TestFilter(t *testing.T) {
client,
&velerov1.BackupStorageLocationList{},
1*time.Second,
func(object crclient.Object) bool {
location := object.(*velerov1.BackupStorageLocation)
return storage.IsReadyToValidate(location.Spec.ValidationFrequency, location.Status.LastValidationTime, 1*time.Minute, logrus.WithContext(ctx).WithField("BackupStorageLocation", location.Name))
PeriodicalEnqueueSourceOption{
FilterFuncs: []func(object crclient.Object) bool{func(object crclient.Object) bool {
location := object.(*velerov1.BackupStorageLocation)
return storage.IsReadyToValidate(location.Spec.ValidationFrequency, location.Status.LastValidationTime, 1*time.Minute, logrus.WithContext(ctx).WithField("BackupStorageLocation", location.Name))
}},
},
)

Expand All @@ -104,3 +109,72 @@ func TestFilter(t *testing.T) {

cancelFunc()
}

func TestOrder(t *testing.T) {
require.Nil(t, velerov1.AddToScheme(scheme.Scheme))

ctx, cancelFunc := context.WithCancel(context.TODO())
client := (&fake.ClientBuilder{}).Build()
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter())
source := NewPeriodicalEnqueueSource(
logrus.WithContext(ctx),
client,
&velerov1.BackupStorageLocationList{},
1*time.Second,
PeriodicalEnqueueSourceOption{
OrderFunc: func(objList crclient.ObjectList) crclient.ObjectList {
locationList := &velerov1.BackupStorageLocationList{}
objArray := make([]runtime.Object, 0)

// Generate BSL array.
locations, _ := meta.ExtractList(objList)
// Move default BSL to tail of array.
objArray = append(objArray, locations[1])
objArray = append(objArray, locations[0])

meta.SetList(locationList, objArray)

return locationList
},
},
)

require.Nil(t, source.Start(ctx, nil, queue))

// Should not patch a backup storage location object status phase
// if the location's validation frequency is specifically set to zero
require.Nil(t, client.Create(ctx, &velerov1.BackupStorageLocation{
ObjectMeta: metav1.ObjectMeta{
Name: "location1",
Namespace: "default",
},
Spec: velerov1.BackupStorageLocationSpec{
ValidationFrequency: &metav1.Duration{Duration: 0},
},
Status: velerov1.BackupStorageLocationStatus{
LastValidationTime: &metav1.Time{Time: time.Now()},
},
}))
require.Nil(t, client.Create(ctx, &velerov1.BackupStorageLocation{
ObjectMeta: metav1.ObjectMeta{
Name: "location2",
Namespace: "default",
},
Spec: velerov1.BackupStorageLocationSpec{
ValidationFrequency: &metav1.Duration{Duration: 0},
Default: true,
},
Status: velerov1.BackupStorageLocationStatus{
LastValidationTime: &metav1.Time{Time: time.Now()},
},
}))
time.Sleep(2 * time.Second)

first, _ := queue.Get()
bsl := &velerov1.BackupStorageLocation{}
require.Equal(t, "location2", first.(reconcile.Request).Name)
require.Nil(t, client.Get(ctx, first.(reconcile.Request).NamespacedName, bsl))
require.Equal(t, true, bsl.Spec.Default)

cancelFunc()
}

0 comments on commit f52e1f3

Please sign in to comment.