Skip to content

Commit

Permalink
backup: refactor to parallel syn obj
Browse files Browse the repository at this point in the history
Signed-off-by: Xieql <xieqianglong@huawei.com>
  • Loading branch information
Xieql committed Oct 16, 2023
1 parent 139f0d2 commit 79cdd34
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 42 deletions.
2 changes: 2 additions & 0 deletions cmd/fleet-manager/scheme/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
kubescheme "k8s.io/client-go/kubernetes/scheme"

applicationapi "kurator.dev/kurator/pkg/apis/apps/v1alpha1"
backupapi "kurator.dev/kurator/pkg/apis/backups/v1alpha1"
clusterv1alpha1 "kurator.dev/kurator/pkg/apis/cluster/v1alpha1"
fleetapi "kurator.dev/kurator/pkg/apis/fleet/v1alpha1"
)
Expand All @@ -36,4 +37,5 @@ func init() {
_ = clusterv1alpha1.AddToScheme(Scheme)
_ = hrapiv2b1.AddToScheme(Scheme)
_ = applicationapi.AddToScheme(Scheme)
_ = backupapi.AddToScheme(Scheme)
}
5 changes: 3 additions & 2 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ type Client struct {
crd crdclientset.Interface
helm *helmclient.Client

karmada karmadaclientset.Interface
prom promclient.Interface
karmada karmadaclientset.Interface
prom promclient.Interface
// it currently only support k8s core API and velero API, because only these schemes are registered
ctrlRuntimeClient client.Client
}

Expand Down
81 changes: 44 additions & 37 deletions pkg/fleet-manager/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package fleet
import (
"context"
"fmt"
"strings"

"github.com/go-logr/logr"
"github.com/pkg/errors"
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -35,6 +37,7 @@ import (
type BackupManager struct {
client.Client
Scheme *runtime.Scheme
logger logr.Logger
}

// SetupWithManager sets up the controller with the Manager.
Expand All @@ -46,36 +49,35 @@ func (b *BackupManager) SetupWithManager(ctx context.Context, mgr ctrl.Manager,
}

func (b *BackupManager) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) {
log := ctrl.LoggerFrom(ctx)
backup := &backupapi.Backup{}

if err := b.Client.Get(ctx, req.NamespacedName, backup); err != nil {
if apierrors.IsNotFound(err) {
log.Info("backup object not found", "backup", req)
b.logger.Info("backup object not found", "backup", req)
return ctrl.Result{}, nil
}

// Error reading the object - requeue the request.
return ctrl.Result{}, err
}

b.logger = ctrl.LoggerFrom(ctx).WithValues("backup", client.ObjectKeyFromObject(backup))

// Initialize patch helper
patchHelper, err := patch.NewHelper(backup, b.Client)
if err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to init patch helper for backup %s", req.NamespacedName)
}
// Setup deferred function to handle patching the object at the end of the reconciler
defer func() {
patchOpts := []patch.Option{}
if err := patchHelper.Patch(ctx, backup, patchOpts...); err != nil {
if err := patchHelper.Patch(ctx, backup); err != nil {
reterr = utilerrors.NewAggregate([]error{reterr, errors.Wrapf(err, "failed to patch %s %s", backup.Name, req.NamespacedName)})
}
}()

// Check and add finalizer if not present
if !controllerutil.ContainsFinalizer(backup, BackupFinalizer) {
controllerutil.AddFinalizer(backup, BackupFinalizer)
return ctrl.Result{}, nil
}

// Handle deletion
Expand All @@ -89,17 +91,15 @@ func (b *BackupManager) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl

// reconcileBackup handles the main reconcile logic for a Backup object.
func (b *BackupManager) reconcileBackup(ctx context.Context, backup *backupapi.Backup) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)

// Fetch destination clusters
destinationClusters, err := fetchDestinationClusters(ctx, b.Client, backup.Namespace, backup.Spec.Destination)
if err != nil {
log.Error(err, "failed to fetch destination clusters for backup", "backupName", backup.Name)
b.logger.Error(err, "failed to fetch destination clusters for backup", "backupName", backup.Name)
return ctrl.Result{}, err
}
// Apply velero backup resource in target clusters
result, err := b.reconcileBackupResources(ctx, backup, destinationClusters)
if err != nil || result.Requeue || result.RequeueAfter > 0 {
if err != nil {
return result, err
}
// Collect velero backup resource status to current backup
Expand All @@ -108,31 +108,44 @@ func (b *BackupManager) reconcileBackup(ctx context.Context, backup *backupapi.B

// reconcileBackupResources converts the backup resources into velero backup resources that can be used by Velero on the target clusters, and applies each of these backup resources to the respective target clusters.
func (b *BackupManager) reconcileBackupResources(ctx context.Context, backup *backupapi.Backup, destinationClusters map[ClusterKey]*fleetCluster) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)

backupLabel := generateVeleroInstanceLabel(BackupNameLabel, backup.Name, backup.Spec.Destination.Fleet)

// Add tasks of syncVeleroObj func
var tasks []func() error
if isScheduleBackup(backup) {
// Handle scheduled backups
for clusterKey, clusterAccess := range destinationClusters {
veleroScheduleName := generateVeleroResourceName(clusterKey.Name, BackupKind, backup.Name)
veleroScheduleName := generateVeleroResourceName(clusterKey.Name, BackupKind, backup.Namespace, backup.Name)
veleroSchedule := buildVeleroScheduleInstance(&backup.Spec, backupLabel, veleroScheduleName)
if err := syncVeleroObj(ctx, clusterKey, clusterAccess, veleroSchedule); err != nil {
log.Error(err, "failed to create velero schedule instance for backup", "backupName", backup.Name)
return ctrl.Result{}, err
}
task := newSyncVeleroTaskFunc(ctx, clusterAccess, veleroSchedule)
tasks = append(tasks, task)
}
} else {
// Handle one time backups
for clusterKey, clusterAccess := range destinationClusters {
veleroBackupName := generateVeleroResourceName(clusterKey.Name, BackupKind, backup.Name)
veleroBackupName := generateVeleroResourceName(clusterKey.Name, BackupKind, backup.Namespace, backup.Name)
veleroBackup := buildVeleroBackupInstance(&backup.Spec, backupLabel, veleroBackupName)
if err := syncVeleroObj(ctx, clusterKey, clusterAccess, veleroBackup); err != nil {
log.Error(err, "failed to create velero backup instance for backup", "backupName", backup.Name)
return ctrl.Result{}, err
}
task := newSyncVeleroTaskFunc(ctx, clusterAccess, veleroBackup)
tasks = append(tasks, task)
}
}

// Parallel process syncVeleroObj func
errs := parallelProcess(tasks)
// Check for errors
var errorList []string
for _, err := range errs {
if err != nil {
b.logger.Error(err, "Error encountered during parallel processing", "backupName", backup.Name)
errorList = append(errorList, err.Error())
}
}

if len(errorList) > 0 {
// Return all errs
return ctrl.Result{}, fmt.Errorf("encountered %d errors during processing: %s", len(errorList), strings.Join(errorList, "; "))
}

return ctrl.Result{}, nil
}

Expand All @@ -154,17 +167,15 @@ func (b *BackupManager) reconcileBackupStatus(ctx context.Context, backup *backu
// reconcileOneTimeBackupStatus updates the status of a one-time Backup object by checking the status of corresponding Velero backup resources in each target cluster.
// It determines whether to requeue the reconciliation based on the completion status of all Velero backup resources.
func (b *BackupManager) reconcileOneTimeBackupStatus(ctx context.Context, backup *backupapi.Backup, destinationClusters map[ClusterKey]*fleetCluster, statusMap map[string]*backupapi.BackupDetails) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)

// Loop through each target cluster to retrieve the status of Velero backup resources using the client associated with the respective target cluster.
for clusterKey, clusterAccess := range destinationClusters {
name := generateVeleroResourceName(clusterKey.Name, BackupKind, backup.Name)
name := generateVeleroResourceName(clusterKey.Name, BackupKind, backup.Namespace, backup.Name)
veleroBackup := &velerov1.Backup{}

// Use the client of the target cluster to get the status of Velero backup resources
err := getResourceFromClusterClient(ctx, name, VeleroNamespace, *clusterAccess, veleroBackup)
if err != nil {
log.Error(err, "failed to create velero backup instance for sync one time backup status", "backupName", backup.Name)
b.logger.Error(err, "failed to create velero backup instance for sync one time backup status", "backupName", backup.Name)
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -197,24 +208,22 @@ func (b *BackupManager) reconcileOneTimeBackupStatus(ctx context.Context, backup
// reconcileScheduleBackupStatus manages the status synchronization for scheduled Backup objects.
// If the backup type is "schedule", new backups will be continuously generated, hence the status synchronization will be executed continuously.
func (b *BackupManager) reconcileScheduleBackupStatus(ctx context.Context, schedule *backupapi.Backup, destinationClusters map[ClusterKey]*fleetCluster, statusMap map[string]*backupapi.BackupDetails) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)

// Loop through each target cluster to retrieve the status of Velero backup created by schedule resources using the client associated with the respective target cluster.
for clusterKey, clusterAccess := range destinationClusters {
name := generateVeleroResourceName(clusterKey.Name, BackupKind, schedule.Name)
name := generateVeleroResourceName(clusterKey.Name, BackupKind, schedule.Namespace, schedule.Name)
veleroSchedule := &velerov1.Schedule{}
// Use the client of the target cluster to get the status of Velero backup resources
err := getResourceFromClusterClient(ctx, name, VeleroNamespace, *clusterAccess, veleroSchedule)
if err != nil {
log.Error(err, "Unable to get velero schedule", "scheduleName", name)
b.logger.Error(err, "Unable to get velero schedule", "scheduleName", name)
return ctrl.Result{}, err
}

// Fetch all velero backups created by velero schedule
backupList := &velerov1.BackupList{}
listErr := listResourcesFromClusterClient(ctx, VeleroNamespace, velerov1.ScheduleNameLabel, veleroSchedule.Name, *clusterAccess, backupList)
if listErr != nil {
log.Info("Unable to list velero backups for velero schedule", "scheduleName", veleroSchedule.Name)
b.logger.Info("Unable to list velero backups for velero schedule", "scheduleName", veleroSchedule.Name)
return ctrl.Result{}, listErr
}

Expand All @@ -224,7 +233,7 @@ func (b *BackupManager) reconcileScheduleBackupStatus(ctx context.Context, sched
// If a schedule backup record cannot be found, the potential reasons are:
// 1. The backup task hasn't been triggered by schedule.
// 2. An issue occurred, but we can not get information directly from the status of schedules.velero.io
log.Info("No completed backups found for schedule", "scheduleName", veleroSchedule.Name)
b.logger.Info("No completed backups found for schedule", "scheduleName", veleroSchedule.Name)
}

// Sync schedule backup status with most recent complete backup
Expand All @@ -248,7 +257,7 @@ func (b *BackupManager) reconcileScheduleBackupStatus(ctx context.Context, sched
// Get the next reconcile interval
cronInterval, err := GetCronInterval(schedule.Spec.Schedule)
if err != nil {
log.Error(err, "failed to get cron Interval of backup.spec.schedule", "backupName", schedule.Name, "cronExpression", schedule.Spec.Schedule)
b.logger.Error(err, "failed to get cron Interval of backup.spec.schedule", "backupName", schedule.Name, "cronExpression", schedule.Spec.Schedule)
return ctrl.Result{}, err
}
// If all backups are complete,requeue the reconciliation after a long cronInterval.
Expand All @@ -260,14 +269,12 @@ func (b *BackupManager) reconcileScheduleBackupStatus(ctx context.Context, sched

// reconcileDeleteBackup handles the deletion process of a Backup object.
func (b *BackupManager) reconcileDeleteBackup(ctx context.Context, backup *backupapi.Backup) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)

// Fetch backup destination clusters
destinationClusters, err := fetchDestinationClusters(ctx, b.Client, backup.Namespace, backup.Spec.Destination)
if err != nil {
log.Error(err, "failed to fetch destination clusters when delete backup", "backupName", backup.Name)
b.logger.Error(err, "failed to fetch destination clusters when delete backup", "backupName", backup.Name)
controllerutil.RemoveFinalizer(backup, BackupFinalizer)
log.Info("Removed finalizer due to fetch destination clusters error", "backupName", backup.Name)
b.logger.Info("Removed finalizer due to fetch destination clusters error", "backupName", backup.Name)
return ctrl.Result{}, err
}

Expand All @@ -280,7 +287,7 @@ func (b *BackupManager) reconcileDeleteBackup(ctx context.Context, backup *backu

// Delete all related velero schedule or backup instance
if err := deleteResourcesInClusters(ctx, VeleroNamespace, BackupNameLabel, backup.Name, destinationClusters, objList); err != nil {
log.Error(err, "failed to delete velero schedule or backup Instances when delete backup", "backupName", backup.Name)
b.logger.Error(err, "failed to delete velero schedule or backup Instances when delete backup", "backupName", backup.Name)
return ctrl.Result{}, err
}

Expand Down
36 changes: 33 additions & 3 deletions pkg/fleet-manager/backup_restore_migrate_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"reflect"
"sort"
"sync"
"time"

"github.com/robfig/cron/v3"
Expand Down Expand Up @@ -141,7 +142,13 @@ func buildVeleroBackupSpec(backupPolicy *backupapi.BackupPolicy) velerov1.Backup
}
}

func syncVeleroObj(ctx context.Context, clusterKey ClusterKey, cluster *fleetCluster, veleroObj client.Object) error {
func newSyncVeleroTaskFunc(ctx context.Context, clusterAccess *fleetCluster, obj client.Object) func() error {
return func() error {
return syncVeleroObj(ctx, clusterAccess, obj)
}
}

func syncVeleroObj(ctx context.Context, cluster *fleetCluster, veleroObj client.Object) error {
// Get the client
clusterClient := cluster.client.CtrlRuntimeClient()

Expand Down Expand Up @@ -231,8 +238,9 @@ func generateVeleroResourceObjectMeta(veleroResourceName string, labels map[stri
}
}

func generateVeleroResourceName(clusterName, creatorKind, creatorName string) string {
return clusterName + "-" + creatorKind + "-" + creatorName
// generateVeleroResourceName generate a name uniquely across object store
func generateVeleroResourceName(clusterName, creatorKind, creatorNamespace, creatorName string) string {
return clusterName + "-" + creatorKind + "-" + creatorNamespace + "-" + creatorName
}

// MostRecentCompletedBackup returns the most recent backup that's completed from a list of backups.
Expand Down Expand Up @@ -304,3 +312,25 @@ func listResourcesFromClusterClient(ctx context.Context, namespace string, label
}
return clusterClient.List(ctx, objList, opts)
}

// parallelProcess runs the provided tasks concurrently and collects any errors.
func parallelProcess(tasks []func() error) []error {
var errs []error
var errMutex sync.Mutex
var wg sync.WaitGroup

for _, task := range tasks {
wg.Add(1)
go func(task func() error) {
defer wg.Done()
if err := task(); err != nil {
errMutex.Lock()
errs = append(errs, err)
errMutex.Unlock()
}
}(task)
}

wg.Wait()
return errs
}

0 comments on commit 79cdd34

Please sign in to comment.