Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support backup to S3 with br #1280

Merged
merged 15 commits into from
Dec 30, 2019
2 changes: 0 additions & 2 deletions pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
type BackupManager interface {
// Sync implements the logic for syncing Backup.
Sync(backup *v1alpha1.Backup) error
// UpdateCondition updates condition for backup CR status
UpdateCondition(backup *v1alpha1.Backup, condition *v1alpha1.BackupCondition) error
}

// RestoreManager implements the logic for manage restore.
Expand Down
26 changes: 16 additions & 10 deletions pkg/backup/backup/backup_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
batchlisters "k8s.io/client-go/listers/batch/v1"
corelisters "k8s.io/client-go/listers/core/v1"
glog "k8s.io/klog"
)

type backupManager struct {
Expand Down Expand Up @@ -62,11 +63,6 @@ func NewBackupManager(
}
}

// UpdateCondition updates condition for backup CR status
func (bm *backupManager) UpdateCondition(backup *v1alpha1.Backup, condition *v1alpha1.BackupCondition) error {
return bm.statusUpdater.Update(backup, condition)
}

func (bm *backupManager) Sync(backup *v1alpha1.Backup) error {
if err := bm.backupCleaner.Clean(backup); err != nil {
return err
Expand All @@ -85,7 +81,21 @@ func (bm *backupManager) syncBackupJob(backup *v1alpha1.Backup) error {
name := backup.GetName()
backupJobName := backup.GetBackupJobName()

_, err := bm.jobLister.Jobs(ns).Get(backupJobName)
err := backuputil.ValidateBackup(backup)
if err != nil {
uErr := bm.statusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupInvalid,
Status: corev1.ConditionTrue,
Reason: "InvalidSpec",
Message: err.Error(),
})
if uErr != nil {
glog.Warningf("Update condition %s failed for backup %s/%s", v1alpha1.BackupInvalid, ns, name)
DanielZhangQD marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
DanielZhangQD marked this conversation as resolved.
Show resolved Hide resolved
}

_, err = bm.jobLister.Jobs(ns).Get(backupJobName)
if err == nil {
// already have a backup job running,return directly
return nil
Expand Down Expand Up @@ -368,8 +378,4 @@ func (fbm *FakeBackupManager) Sync(_ *v1alpha1.Backup) error {
return fbm.err
}

func (fbm *FakeBackupManager) UpdateCondition(_ *v1alpha1.Backup, _ *v1alpha1.BackupCondition) error {
return fbm.err
}

var _ backup.BackupManager = &FakeBackupManager{}
48 changes: 48 additions & 0 deletions pkg/backup/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package util
import (
"errors"
"fmt"
"net/url"
"strings"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
Expand Down Expand Up @@ -282,3 +283,50 @@ func GetBackupDataPath(provider v1alpha1.StorageProvider) (string, string, error
}
return fmt.Sprintf("%s://%s", string(storageType), backupPath), "", nil
}

func ValidateBackup(backup *v1alpha1.Backup) error {
ns := backup.Namespace
name := backup.Name
if backup.Spec.BR == nil {
if backup.Spec.From.Host == "" {
return fmt.Errorf("missing cluster config in spec of %s/%s", ns, name)
}
if backup.Spec.From.SecretName == "" {
return fmt.Errorf("missing tidbSecretName config in spec of %s/%s", ns, name)
}
if backup.Spec.StorageClassName == "" {
return fmt.Errorf("missing storageClassName config in spec of %s/%s", ns, name)
}
if backup.Spec.StorageSize == "" {
return fmt.Errorf("missing StorageSize config in spec of %s/%s", ns, name)
}
} else {
if backup.Spec.BR.PDAddress == "" {
return fmt.Errorf("pd address should be configured for BR in spec of %s/%s", ns, name)
}
if backup.Spec.Type != "" &&
backup.Spec.Type != v1alpha1.BackupTypeFull &&
backup.Spec.Type != v1alpha1.BackupTypeDB &&
backup.Spec.Type != v1alpha1.BackupTypeTable {
return fmt.Errorf("invalid backup type %s for BR in spec of %s/%s", backup.Spec.Type, ns, name)
}
if backup.Spec.S3 != nil {
if backup.Spec.S3.Bucket == "" {
return fmt.Errorf("bucket should be configured for BR in spec of %s/%s", ns, name)
}
if backup.Spec.S3.Endpoint != "" {
u, err := url.Parse(backup.Spec.S3.Endpoint)
if err != nil {
return fmt.Errorf("invalid endpoint %s is configured for BR in spec of %s/%s", backup.Spec.S3.Endpoint, ns, name)
}
if u.Scheme == "" {
return fmt.Errorf("scheme not found in endpoint %s configured for BR in spec of %s/%s", backup.Spec.S3.Endpoint, ns, name)
}
if u.Host == "" {
return fmt.Errorf("host not found in endpoint %s configured for BR in spec of %s/%s", backup.Spec.S3.Endpoint, ns, name)
}
}
}
}
return nil
}
29 changes: 2 additions & 27 deletions pkg/controller/backup/backup_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ import (
type ControlInterface interface {
// UpdateBackup implements the control logic for backup job and backup clean job's creation, deletion
UpdateBackup(backup *v1alpha1.Backup) error
// UpdateCondition updates status for backup CR
UpdateCondition(backup *v1alpha1.Backup, condition *v1alpha1.BackupCondition) error
}

// NewDefaultBackupControl returns a new instance of the default implementation BackupControlInterface that
Expand Down Expand Up @@ -70,11 +68,6 @@ func (bc *defaultBackupControl) updateBackup(backup *v1alpha1.Backup) error {
return bc.backupManager.Sync(backup)
}

// UpdateCondition updates status for backup CR
func (bc *defaultBackupControl) UpdateCondition(backup *v1alpha1.Backup, condition *v1alpha1.BackupCondition) error {
return bc.backupManager.UpdateCondition(backup, condition)
}

func (bc *defaultBackupControl) addProtectionFinalizer(backup *v1alpha1.Backup) error {
ns := backup.GetNamespace()
name := backup.GetName()
Expand Down Expand Up @@ -116,17 +109,15 @@ var _ ControlInterface = &defaultBackupControl{}

// FakeBackupControl is a fake BackupControlInterface
type FakeBackupControl struct {
backupIndexer cache.Indexer
updateBackupTracker controller.RequestTracker
updateConditionTracker controller.RequestTracker
backupIndexer cache.Indexer
updateBackupTracker controller.RequestTracker
}

// NewFakeBackupControl returns a FakeBackupControl
func NewFakeBackupControl(backupInformer informers.BackupInformer) *FakeBackupControl {
return &FakeBackupControl{
backupInformer.Informer().GetIndexer(),
controller.RequestTracker{},
controller.RequestTracker{},
}
}

Expand All @@ -146,20 +137,4 @@ func (fbc *FakeBackupControl) UpdateBackup(backup *v1alpha1.Backup) error {
return fbc.backupIndexer.Add(backup)
}

// SetUpdateConditionError sets the error attributes of updateConditionTracker
func (fbc *FakeBackupControl) SetUpdateConditionError(err error, after int) {
fbc.updateConditionTracker.SetError(err).SetAfter(after)
}

// UpdateCondition adds the backup to BackupIndexer
func (fbc *FakeBackupControl) UpdateCondition(backup *v1alpha1.Backup, c *v1alpha1.BackupCondition) error {
defer fbc.updateConditionTracker.Inc()
if fbc.updateConditionTracker.ErrorReady() {
defer fbc.updateConditionTracker.Reset()
return fbc.updateConditionTracker.GetError()
}

return fbc.backupIndexer.Add(backup)
}

var _ ControlInterface = &FakeBackupControl{}
61 changes: 0 additions & 61 deletions pkg/controller/backup/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package backup

import (
"fmt"
"net/url"
"time"

perrors "github.com/pingcap/errors"
Expand Down Expand Up @@ -174,19 +173,6 @@ func (bkc *Controller) sync(key string) error {
return err
}

err = validateBackup(backup)
if err != nil {
uErr := bkc.control.UpdateCondition(backup.DeepCopy(), &v1alpha1.BackupCondition{
Type: v1alpha1.BackupInvalid,
Status: corev1.ConditionTrue,
Reason: "InvalidSpec",
Message: err.Error(),
})
if uErr != nil {
glog.Warningf("Update condition %s failed for backup %s/%s", v1alpha1.BackupInvalid, ns, name)
}
return nil
}
return bkc.syncBackup(backup.DeepCopy())
}

Expand Down Expand Up @@ -234,50 +220,3 @@ func (bkc *Controller) enqueueBackup(obj interface{}) {
}
bkc.queue.Add(key)
}

func validateBackup(backup *v1alpha1.Backup) error {
ns := backup.Namespace
name := backup.Name
if backup.Spec.BR == nil {
if backup.Spec.From.Host == "" {
return fmt.Errorf("missing cluster config in spec of %s/%s", ns, name)
}
if backup.Spec.From.SecretName == "" {
return fmt.Errorf("missing tidbSecretName config in spec of %s/%s", ns, name)
}
if backup.Spec.StorageClassName == "" {
return fmt.Errorf("missing storageClassName config in spec of %s/%s", ns, name)
}
if backup.Spec.StorageSize == "" {
return fmt.Errorf("missing StorageSize config in spec of %s/%s", ns, name)
}
} else {
if backup.Spec.BR.PDAddress == "" {
return fmt.Errorf("pd address should be configured for BR in spec of %s/%s", ns, name)
}
if backup.Spec.Type != "" &&
backup.Spec.Type != v1alpha1.BackupTypeFull &&
backup.Spec.Type != v1alpha1.BackupTypeDB &&
backup.Spec.Type != v1alpha1.BackupTypeTable {
return fmt.Errorf("invalid backup type %s for BR in spec of %s/%s", backup.Spec.Type, ns, name)
}
if backup.Spec.S3 != nil {
if backup.Spec.S3.Bucket == "" {
return fmt.Errorf("bucket should be configured for BR in spec of %s/%s", ns, name)
}
if backup.Spec.S3.Endpoint != "" {
u, err := url.Parse(backup.Spec.S3.Endpoint)
if err != nil {
return fmt.Errorf("invalid endpoint %s is configured for BR in spec of %s/%s", backup.Spec.S3.Endpoint, ns, name)
}
if u.Scheme == "" {
return fmt.Errorf("scheme not found in endpoint %s configured for BR in spec of %s/%s", backup.Spec.S3.Endpoint, ns, name)
}
if u.Host == "" {
return fmt.Errorf("host not found in endpoint %s configured for BR in spec of %s/%s", backup.Spec.S3.Endpoint, ns, name)
}
}
}
}
return nil
}