From 840aca28588ce1827212a1970bfcd4342a7737f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wenkai=20Yin=28=E5=B0=B9=E6=96=87=E5=BC=80=29?= Date: Mon, 25 Mar 2024 11:46:09 +0800 Subject: [PATCH] Improve the concurrency for PVBs in different pods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Improve the concurrency for PVBs in different pods Fixes #6676 Signed-off-by: Wenkai Yin(尹文开) --- pkg/backup/backup.go | 3 + pkg/backup/item_backupper.go | 1 - pkg/podvolume/backupper.go | 92 ++++++++------- pkg/podvolume/backupper_test.go | 194 ++++++++++++++++---------------- 4 files changed, 144 insertions(+), 146 deletions(-) diff --git a/pkg/backup/backup.go b/pkg/backup/backup.go index 67e20acb0e..bd79eefed2 100644 --- a/pkg/backup/backup.go +++ b/pkg/backup/backup.go @@ -419,6 +419,9 @@ func (kb *kubernetesBackupper) BackupWithResolvers(log logrus.FieldLogger, } } + processedPVBs := itemBackupper.podVolumeBackupper.WaitAllPodVolumesProcessed(log) + backupRequest.PodVolumeBackups = append(backupRequest.PodVolumeBackups, processedPVBs...) + // do a final update on progress since we may have just added some CRDs and may not have updated // for the last few processed items. updated = backupRequest.Backup.DeepCopy() diff --git a/pkg/backup/item_backupper.go b/pkg/backup/item_backupper.go index 7f1d826857..4b589fae31 100644 --- a/pkg/backup/item_backupper.go +++ b/pkg/backup/item_backupper.go @@ -270,7 +270,6 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti // even if there are errors. podVolumeBackups, podVolumePVCBackupSummary, errs := ib.backupPodVolumes(log, pod, pvbVolumes) - ib.backupRequest.PodVolumeBackups = append(ib.backupRequest.PodVolumeBackups, podVolumeBackups...) backupErrs = append(backupErrs, errs...) // Mark the volumes that has been processed by pod volume backup as Taken in the tracker. diff --git a/pkg/podvolume/backupper.go b/pkg/podvolume/backupper.go index c40a198d20..6929cfa64d 100644 --- a/pkg/podvolume/backupper.go +++ b/pkg/podvolume/backupper.go @@ -45,17 +45,19 @@ import ( type Backupper interface { // BackupPodVolumes backs up all specified volumes in a pod. BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.Pod, volumesToBackup []string, resPolicies *resourcepolicies.Policies, log logrus.FieldLogger) ([]*velerov1api.PodVolumeBackup, *PVCBackupSummary, []error) + WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velerov1api.PodVolumeBackup } type backupper struct { - ctx context.Context - repoLocker *repository.RepoLocker - repoEnsurer *repository.Ensurer - crClient ctrlclient.Client - uploaderType string - - results map[string]chan *velerov1api.PodVolumeBackup - resultsLock sync.Mutex + ctx context.Context + repoLocker *repository.RepoLocker + repoEnsurer *repository.Ensurer + crClient ctrlclient.Client + uploaderType string + pvbInformer ctrlcache.Informer + handlerRegistration cache.ResourceEventHandlerRegistration + wg sync.WaitGroup + result []*velerov1api.PodVolumeBackup } type skippedPVC struct { @@ -113,11 +115,12 @@ func newBackupper( repoEnsurer: repoEnsurer, crClient: crClient, uploaderType: uploaderType, - - results: make(map[string]chan *velerov1api.PodVolumeBackup), + pvbInformer: pvbInformer, + wg: sync.WaitGroup{}, + result: []*velerov1api.PodVolumeBackup{}, } - _, _ = pvbInformer.AddEventHandler( + b.handlerRegistration, _ = pvbInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ UpdateFunc: func(_, obj interface{}) { pvb := obj.(*velerov1api.PodVolumeBackup) @@ -126,17 +129,13 @@ func newBackupper( return } - if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted || pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed { - b.resultsLock.Lock() - defer b.resultsLock.Unlock() - - resChan, ok := b.results[resultsKey(pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name)] - if !ok { - log.Errorf("No results channel found for pod %s/%s to send pod volume backup %s/%s on", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name, pvb.Namespace, pvb.Name) - return - } - resChan <- pvb + if pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseCompleted && + pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseFailed { + return } + + b.wg.Done() + b.result = append(b.result, pvb) }, }, ) @@ -217,12 +216,6 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api. b.repoLocker.Lock(repo.Name) defer b.repoLocker.Unlock(repo.Name) - resultsChan := make(chan *velerov1api.PodVolumeBackup) - - b.resultsLock.Lock() - b.results[resultsKey(pod.Namespace, pod.Name)] = resultsChan - b.resultsLock.Unlock() - var ( podVolumeBackups []*velerov1api.PodVolumeBackup mountedPodVolumes = sets.Set[string]{} @@ -243,7 +236,6 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api. repoIdentifier = repo.Spec.ResticIdentifier } - var numVolumeSnapshots int for _, volumeName := range volumesToBackup { volume, ok := podVolumes[volumeName] if !ok { @@ -305,32 +297,36 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api. errs = append(errs, err) continue } + b.wg.Add(1) + podVolumeBackups = append(podVolumeBackups, volumeBackup) pvcSummary.addBackedup(volumeName) - numVolumeSnapshots++ } -ForEachVolume: - for i, count := 0, numVolumeSnapshots; i < count; i++ { - select { - case <-b.ctx.Done(): - errs = append(errs, errors.New("timed out waiting for all PodVolumeBackups to complete")) - break ForEachVolume - case res := <-resultsChan: - switch res.Status.Phase { - case velerov1api.PodVolumeBackupPhaseCompleted: - podVolumeBackups = append(podVolumeBackups, res) - case velerov1api.PodVolumeBackupPhaseFailed: - errs = append(errs, errors.Errorf("pod volume backup failed: %s", res.Status.Message)) - podVolumeBackups = append(podVolumeBackups, res) + return podVolumeBackups, pvcSummary, errs +} + +func (b *backupper) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velerov1api.PodVolumeBackup { + defer b.pvbInformer.RemoveEventHandler(b.handlerRegistration) + + done := make(chan struct{}) + go func() { + defer close(done) + b.wg.Wait() + }() + + var podVolumeBackups []*velerov1api.PodVolumeBackup + select { + case <-b.ctx.Done(): + log.Error("timed out waiting for all PodVolumeBackups to complete") + case <-done: + for _, pvb := range b.result { + podVolumeBackups = append(podVolumeBackups, pvb) + if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed { + log.Errorf("pod volume backup failed: %s", pvb.Status.Message) } } } - - b.resultsLock.Lock() - delete(b.results, resultsKey(pod.Namespace, pod.Name)) - b.resultsLock.Unlock() - - return podVolumeBackups, pvcSummary, errs + return podVolumeBackups } func skipAllPodVolumes(pod *corev1api.Pod, volumesToBackup []string, err error, pvcSummary *PVCBackupSummary, log logrus.FieldLogger) { diff --git a/pkg/podvolume/backupper_test.go b/pkg/podvolume/backupper_test.go index e08b772dd9..1f39a22c0f 100644 --- a/pkg/podvolume/backupper_test.go +++ b/pkg/podvolume/backupper_test.go @@ -29,6 +29,7 @@ import ( corev1api "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" ctrlfake "sigs.k8s.io/controller-runtime/pkg/client/fake" clientTesting "k8s.io/client-go/testing" @@ -308,15 +309,8 @@ func TestBackupPodVolumes(t *testing.T) { velerov1api.AddToScheme(scheme) corev1api.AddToScheme(scheme) - ctxWithCancel, cancel := context.WithCancel(context.Background()) - defer cancel() - - failedPVB := createPVBObj(true, false, 1, "") - completedPVB := createPVBObj(false, false, 1, "") - tests := []struct { name string - ctx context.Context bsl string uploaderType string volumes []string @@ -326,7 +320,6 @@ func TestBackupPodVolumes(t *testing.T) { veleroClientObj []runtime.Object veleroReactors []reactor runtimeScheme *runtime.Scheme - retPVBs []*velerov1api.PodVolumeBackup pvbs []*velerov1api.PodVolumeBackup errs []string }{ @@ -494,87 +487,11 @@ func TestBackupPodVolumes(t *testing.T) { uploaderType: "kopia", bsl: "fake-bsl", }, - { - name: "context cancelled", - ctx: ctxWithCancel, - volumes: []string{ - "fake-volume-1", - }, - sourcePod: createPodObj(true, true, true, 1), - kubeClientObj: []runtime.Object{ - createNodeAgentPodObj(true), - createPVCObj(1), - createPVObj(1, false), - }, - ctlClientObj: []runtime.Object{ - createBackupRepoObj(), - }, - runtimeScheme: scheme, - uploaderType: "kopia", - bsl: "fake-bsl", - errs: []string{ - "timed out waiting for all PodVolumeBackups to complete", - }, - }, - { - name: "return failed pvbs", - volumes: []string{ - "fake-volume-1", - }, - sourcePod: createPodObj(true, true, true, 1), - kubeClientObj: []runtime.Object{ - createNodeAgentPodObj(true), - createPVCObj(1), - createPVObj(1, false), - }, - ctlClientObj: []runtime.Object{ - createBackupRepoObj(), - }, - runtimeScheme: scheme, - uploaderType: "kopia", - bsl: "fake-bsl", - retPVBs: []*velerov1api.PodVolumeBackup{ - failedPVB, - }, - pvbs: []*velerov1api.PodVolumeBackup{ - failedPVB, - }, - errs: []string{ - "pod volume backup failed: fake-message", - }, - }, - { - name: "return completed pvbs", - volumes: []string{ - "fake-volume-1", - }, - sourcePod: createPodObj(true, true, true, 1), - kubeClientObj: []runtime.Object{ - createNodeAgentPodObj(true), - createPVCObj(1), - createPVObj(1, false), - }, - ctlClientObj: []runtime.Object{ - createBackupRepoObj(), - }, - runtimeScheme: scheme, - uploaderType: "kopia", - bsl: "fake-bsl", - retPVBs: []*velerov1api.PodVolumeBackup{ - completedPVB, - }, - pvbs: []*velerov1api.PodVolumeBackup{ - completedPVB, - }, - }, } // TODO add more verification around PVCBackupSummary returned by "BackupPodVolumes" for _, test := range tests { t.Run(test.name, func(t *testing.T) { ctx := context.Background() - if test.ctx != nil { - ctx = test.ctx - } fakeClientBuilder := ctrlfake.NewClientBuilder() if test.runtimeScheme != nil { @@ -607,19 +524,6 @@ func TestBackupPodVolumes(t *testing.T) { require.NoError(t, err) - go func() { - if test.ctx != nil { - time.Sleep(time.Second) - cancel() - } else if test.retPVBs != nil { - time.Sleep(time.Second) - for _, pvb := range test.retPVBs { - bp.(*backupper).results[resultsKey(test.sourcePod.Namespace, test.sourcePod.Name)] <- pvb - } - - } - }() - pvbs, _, errs := bp.BackupPodVolumes(backupObj, test.sourcePod, test.volumes, nil, velerotest.NewLogger()) if errs == nil { @@ -635,6 +539,102 @@ func TestBackupPodVolumes(t *testing.T) { } } +type logHook struct { + entry *logrus.Entry +} + +func (l *logHook) Levels() []logrus.Level { + return []logrus.Level{logrus.ErrorLevel} +} +func (l *logHook) Fire(entry *logrus.Entry) error { + l.entry = entry + return nil +} + +func TestWaitAllPodVolumesProcessed(t *testing.T) { + timeoutCtx, _ := context.WithTimeout(context.Background(), 1*time.Second) + cases := []struct { + name string + ctx context.Context + statusToBeUpdated *velerov1api.PodVolumeBackupStatus + expectedErr string + expectedPVBPhase velerov1api.PodVolumeBackupPhase + }{ + { + name: "context cancelled", + ctx: timeoutCtx, + expectedErr: "timed out waiting for all PodVolumeBackups to complete", + }, + { + name: "failed pvbs", + ctx: context.Background(), + statusToBeUpdated: &velerov1api.PodVolumeBackupStatus{ + Phase: velerov1api.PodVolumeBackupPhaseFailed, + Message: "failed", + }, + expectedPVBPhase: velerov1api.PodVolumeBackupPhaseFailed, + expectedErr: "pod volume backup failed: failed", + }, + { + name: "completed pvbs", + ctx: context.Background(), + statusToBeUpdated: &velerov1api.PodVolumeBackupStatus{ + Phase: velerov1api.PodVolumeBackupPhaseCompleted, + Message: "completed", + }, + expectedPVBPhase: velerov1api.PodVolumeBackupPhaseCompleted, + }, + } + + for _, c := range cases { + newPVB := builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb").Result() + scheme := runtime.NewScheme() + velerov1api.AddToScheme(scheme) + client := ctrlfake.NewClientBuilder().WithScheme(scheme).WithObjects(newPVB).Build() + + lw := kube.InternalLW{ + Client: client, + Namespace: velerov1api.DefaultNamespace, + ObjectList: new(velerov1api.PodVolumeBackupList), + } + + informer := cache.NewSharedIndexInformer(&lw, &velerov1api.PodVolumeBackup{}, 0, cache.Indexers{}) + + ctx := context.Background() + go informer.Run(ctx.Done()) + require.True(t, cache.WaitForCacheSync(ctx.Done(), informer.HasSynced)) + + logger := logrus.New() + logHook := &logHook{} + logger.Hooks.Add(logHook) + + backuper := newBackupper(c.ctx, nil, nil, informer, nil, "", &velerov1api.Backup{}, logger) + backuper.wg.Add(1) + + if c.statusToBeUpdated != nil { + pvb := &velerov1api.PodVolumeBackup{} + err := client.Get(context.Background(), ctrlclient.ObjectKey{Namespace: newPVB.Namespace, Name: newPVB.Name}, pvb) + require.Nil(t, err) + + pvb.Status = *c.statusToBeUpdated + err = client.Update(context.Background(), pvb) + require.Nil(t, err) + } + + pvbs := backuper.WaitAllPodVolumesProcessed(logger) + + if c.expectedErr != "" { + assert.Equal(t, c.expectedErr, logHook.entry.Message) + } + + if c.expectedPVBPhase != "" { + require.Len(t, pvbs, 1) + assert.Equal(t, c.expectedPVBPhase, pvbs[0].Status.Phase) + } + + } +} + func TestPVCBackupSummary(t *testing.T) { pbs := NewPVCBackupSummary() pbs.pvcMap["vol-1"] = builder.ForPersistentVolumeClaim("ns-1", "pvc-1").VolumeName("pv-1").Result()