Skip to content

Commit

Permalink
backup: update syn handling and logging
Browse files Browse the repository at this point in the history
Signed-off-by: Xieql <xieqianglong@huawei.com>
  • Loading branch information
Xieql committed Oct 18, 2023
1 parent ddbf7a7 commit 1d4ab8b
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 50 deletions.
58 changes: 31 additions & 27 deletions pkg/fleet-manager/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ package fleet
import (
"context"
"fmt"
"strings"

"github.com/go-logr/logr"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -37,7 +36,6 @@ import (
type BackupManager struct {
client.Client
Scheme *runtime.Scheme
logger logr.Logger
}

// SetupWithManager sets up the controller with the Manager.
Expand All @@ -49,13 +47,13 @@ func (b *BackupManager) SetupWithManager(ctx context.Context, mgr ctrl.Manager,
}

func (b *BackupManager) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) {
b.logger = ctrl.LoggerFrom(ctx).WithValues("backup", req.NamespacedName)
log := ctrl.LoggerFrom(ctx).WithValues("backup", req.NamespacedName)

backup := &backupapi.Backup{}

if err := b.Client.Get(ctx, req.NamespacedName, backup); err != nil {
if apierrors.IsNotFound(err) {
b.logger.Info("backup object not found")
log.Info("backup object not found")
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -91,10 +89,12 @@ func (b *BackupManager) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl

// reconcileBackup handles the main reconcile logic for a Backup object.
func (b *BackupManager) reconcileBackup(ctx context.Context, backup *backupapi.Backup) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)

// Fetch destination clusters
destinationClusters, err := fetchDestinationClusters(ctx, b.Client, backup.Namespace, backup.Spec.Destination)
if err != nil {
b.logger.Error(err, "failed to fetch destination clusters for backup")
log.Error(err, "failed to fetch destination clusters for backup")
return ctrl.Result{}, err
}
// Apply velero backup resource in target clusters
Expand All @@ -108,6 +108,8 @@ func (b *BackupManager) reconcileBackup(ctx context.Context, backup *backupapi.B

// reconcileBackupResources converts the backup resources into velero backup resources that can be used by Velero on the target clusters, and applies each of these backup resources to the respective target clusters.
func (b *BackupManager) reconcileBackupResources(ctx context.Context, backup *backupapi.Backup, destinationClusters map[ClusterKey]*fleetCluster) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)

backupLabel := generateVeleroInstanceLabel(BackupNameLabel, backup.Name, backup.Spec.Destination.Fleet)

// Add tasks of syncVeleroObj func
Expand All @@ -130,20 +132,16 @@ func (b *BackupManager) reconcileBackupResources(ctx context.Context, backup *ba
}
}

// Parallel process syncVeleroObj func
errs := parallelProcess(tasks)
// Check for errors
var errorList []string
for _, err := range errs {
if err != nil {
b.logger.Error(err, "Error encountered during parallel processing")
errorList = append(errorList, err.Error())
}
g := &multierror.Group{}
for _, task := range tasks {
g.Go(task)
}

if len(errorList) > 0 {
// Return all errs
return ctrl.Result{}, fmt.Errorf("encountered %d errors during processing: %s", len(errorList), strings.Join(errorList, "; "))
err := g.Wait().ErrorOrNil()

if err != nil {
log.Error(err, "Error encountered during sync velero obj when backup")
return ctrl.Result{}, fmt.Errorf("encountered errors during processing: %v", err)
}

return ctrl.Result{}, nil
Expand All @@ -167,6 +165,8 @@ func (b *BackupManager) reconcileBackupStatus(ctx context.Context, backup *backu
// reconcileOneTimeBackupStatus updates the status of a one-time Backup object by checking the status of corresponding Velero backup resources in each target cluster.
// It determines whether to requeue the reconciliation based on the completion status of all Velero backup resources.
func (b *BackupManager) reconcileOneTimeBackupStatus(ctx context.Context, backup *backupapi.Backup, destinationClusters map[ClusterKey]*fleetCluster, statusMap map[string]*backupapi.BackupDetails) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)

// Loop through each target cluster to retrieve the status of Velero backup resources using the client associated with the respective target cluster.
for clusterKey, clusterAccess := range destinationClusters {
name := generateVeleroResourceName(clusterKey.Name, BackupKind, backup.Namespace, backup.Name)
Expand All @@ -175,7 +175,7 @@ func (b *BackupManager) reconcileOneTimeBackupStatus(ctx context.Context, backup
// Use the client of the target cluster to get the status of Velero backup resources
err := getResourceFromClusterClient(ctx, name, VeleroNamespace, *clusterAccess, veleroBackup)
if err != nil {
b.logger.Error(err, "failed to create velero backup instance for sync one time backup status")
log.Error(err, "failed to create velero backup instance for sync one time backup status")
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -208,22 +208,24 @@ func (b *BackupManager) reconcileOneTimeBackupStatus(ctx context.Context, backup
// reconcileScheduleBackupStatus manages the status synchronization for scheduled Backup objects.
// If the backup type is "schedule", new backups will be continuously generated, hence the status synchronization will be executed continuously.
func (b *BackupManager) reconcileScheduleBackupStatus(ctx context.Context, schedule *backupapi.Backup, destinationClusters map[ClusterKey]*fleetCluster, statusMap map[string]*backupapi.BackupDetails) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)

// Loop through each target cluster to retrieve the status of Velero backup created by schedule resources using the client associated with the respective target cluster.
for clusterKey, clusterAccess := range destinationClusters {
name := generateVeleroResourceName(clusterKey.Name, BackupKind, schedule.Namespace, schedule.Name)
veleroSchedule := &velerov1.Schedule{}
// Use the client of the target cluster to get the status of Velero backup resources
err := getResourceFromClusterClient(ctx, name, VeleroNamespace, *clusterAccess, veleroSchedule)
if err != nil {
b.logger.Error(err, "Unable to get velero schedule", "scheduleName", name)
log.Error(err, "Unable to get velero schedule", "scheduleName", name)
return ctrl.Result{}, err
}

// Fetch all velero backups created by velero schedule
backupList := &velerov1.BackupList{}
listErr := listResourcesFromClusterClient(ctx, VeleroNamespace, velerov1.ScheduleNameLabel, veleroSchedule.Name, *clusterAccess, backupList)
if listErr != nil {
b.logger.Info("Unable to list velero backups for velero schedule", "scheduleName", veleroSchedule.Name)
log.Info("Unable to list velero backups for velero schedule", "scheduleName", veleroSchedule.Name)
return ctrl.Result{}, listErr
}

Expand All @@ -233,7 +235,7 @@ func (b *BackupManager) reconcileScheduleBackupStatus(ctx context.Context, sched
// If a schedule backup record cannot be found, the potential reasons are:
// 1. The backup task hasn't been triggered by schedule.
// 2. An issue occurred, but we can not get information directly from the status of schedules.velero.io
b.logger.Info("No completed backups found for schedule", "scheduleName", veleroSchedule.Name)
log.Info("No completed backups found for schedule", "scheduleName", veleroSchedule.Name)
}

// Sync schedule backup status with most recent complete backup
Expand All @@ -257,7 +259,7 @@ func (b *BackupManager) reconcileScheduleBackupStatus(ctx context.Context, sched
// Get the next reconcile interval
cronInterval, err := GetCronInterval(schedule.Spec.Schedule)
if err != nil {
b.logger.Error(err, "failed to get cron Interval of backup.spec.schedule", "backupName", schedule.Name, "cronExpression", schedule.Spec.Schedule)
log.Error(err, "failed to get cron Interval of backup.spec.schedule", "backupName", schedule.Name, "cronExpression", schedule.Spec.Schedule)
return ctrl.Result{}, err
}
// If all backups are complete,requeue the reconciliation after a long cronInterval.
Expand All @@ -269,12 +271,14 @@ func (b *BackupManager) reconcileScheduleBackupStatus(ctx context.Context, sched

// reconcileDeleteBackup handles the deletion process of a Backup object.
func (b *BackupManager) reconcileDeleteBackup(ctx context.Context, backup *backupapi.Backup) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)

// Fetch backup destination clusters
destinationClusters, err := fetchDestinationClusters(ctx, b.Client, backup.Namespace, backup.Spec.Destination)
if err != nil {
b.logger.Error(err, "failed to fetch destination clusters when delete backup")
log.Error(err, "failed to fetch destination clusters when delete backup")
controllerutil.RemoveFinalizer(backup, BackupFinalizer)
b.logger.Info("Removed finalizer due to fetch destination clusters error")
log.Info("Removed finalizer due to fetch destination clusters error")
return ctrl.Result{}, err
}

Expand All @@ -287,13 +291,13 @@ func (b *BackupManager) reconcileDeleteBackup(ctx context.Context, backup *backu

// Delete all related velero schedule or backup instance
if err := deleteResourcesInClusters(ctx, VeleroNamespace, BackupNameLabel, backup.Name, destinationClusters, objList); err != nil {
b.logger.Error(err, "failed to delete velero schedule or backup Instances when delete backup")
log.Error(err, "failed to delete velero schedule or backup Instances when delete backup")
return ctrl.Result{}, err
}

// Remove finalizer
controllerutil.RemoveFinalizer(backup, BackupFinalizer)

b.logger.Info("Delete Backup successful")
log.Info("Delete Backup successful")
return ctrl.Result{}, nil
}
23 changes: 0 additions & 23 deletions pkg/fleet-manager/backup_restore_migrate_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"fmt"
"reflect"
"sort"
"sync"
"time"

"github.com/robfig/cron/v3"
Expand Down Expand Up @@ -312,25 +311,3 @@ func listResourcesFromClusterClient(ctx context.Context, namespace string, label
}
return clusterClient.List(ctx, objList, opts)
}

// parallelProcess runs the provided tasks concurrently and collects any errors.
func parallelProcess(tasks []func() error) []error {
var errs []error
var errMutex sync.Mutex
var wg sync.WaitGroup

for _, task := range tasks {
wg.Add(1)
go func(task func() error) {
defer wg.Done()
if err := task(); err != nil {
errMutex.Lock()
errs = append(errs, err)
errMutex.Unlock()
}
}(task)
}

wg.Wait()
return errs
}

0 comments on commit 1d4ab8b

Please sign in to comment.