Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backup: fix remote file not clean when job is failed (#2669) #2674

Merged
merged 2 commits into from
Jun 10, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions cmd/backup-manager/app/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ type Options struct {
backupUtil.GenericOptions
}

func (bo *Options) backupData(backup *v1alpha1.Backup) (string, error) {
func (bo *Options) backupData(backup *v1alpha1.Backup) error {
clusterNamespace := backup.Spec.BR.ClusterNamespace
if backup.Spec.BR.ClusterNamespace == "" {
clusterNamespace = backup.Namespace
}
args, remotePath, err := constructOptions(backup)
args, err := constructOptions(backup)
if err != nil {
return "", err
return err
}
args = append(args, fmt.Sprintf("--pd=%s-pd.%s:2379", backup.Spec.BR.Cluster, clusterNamespace))
if bo.TLSCluster {
Expand All @@ -71,15 +71,15 @@ func (bo *Options) backupData(backup *v1alpha1.Backup) (string, error) {

stdOut, err := cmd.StdoutPipe()
if err != nil {
return remotePath, fmt.Errorf("cluster %s, create stdout pipe failed, err: %v", bo, err)
return fmt.Errorf("cluster %s, create stdout pipe failed, err: %v", bo, err)
}
stdErr, err := cmd.StderrPipe()
if err != nil {
return remotePath, fmt.Errorf("cluster %s, create stderr pipe failed, err: %v", bo, err)
return fmt.Errorf("cluster %s, create stderr pipe failed, err: %v", bo, err)
}
err = cmd.Start()
if err != nil {
return remotePath, fmt.Errorf("cluster %s, execute br command failed, args: %s, err: %v", bo, fullArgs, err)
return fmt.Errorf("cluster %s, execute br command failed, args: %s, err: %v", bo, fullArgs, err)
}
var errMsg string
reader := bufio.NewReader(stdOut)
Expand All @@ -101,11 +101,11 @@ func (bo *Options) backupData(backup *v1alpha1.Backup) (string, error) {
}
err = cmd.Wait()
if err != nil {
return remotePath, fmt.Errorf("cluster %s, wait pipe message failed, errMsg %s, err: %v", bo, errMsg, err)
return fmt.Errorf("cluster %s, wait pipe message failed, errMsg %s, err: %v", bo, errMsg, err)
}

klog.Infof("Backup data for cluster %s successfully", bo)
return remotePath, nil
return nil
}

// getCommitTs get backup position from `EndVersion` in BR backup meta
Expand Down Expand Up @@ -138,10 +138,10 @@ func getCommitTs(backup *v1alpha1.Backup) (uint64, error) {
}

// constructOptions constructs options for BR and also return the remote path
func constructOptions(backup *v1alpha1.Backup) ([]string, string, error) {
args, remotePath, err := backupUtil.ConstructBRGlobalOptionsForBackup(backup)
func constructOptions(backup *v1alpha1.Backup) ([]string, error) {
args, err := backupUtil.ConstructBRGlobalOptionsForBackup(backup)
if err != nil {
return args, remotePath, err
return args, err
}
config := backup.Spec.BR
if config.Concurrency != nil {
Expand All @@ -156,7 +156,7 @@ func constructOptions(backup *v1alpha1.Backup) ([]string, string, error) {
if config.Checksum != nil {
args = append(args, fmt.Sprintf("--checksum=%t", *config.Checksum))
}
return args, remotePath, nil
return args, nil
}

// getBackupSize get the backup data size from remote
Expand Down
35 changes: 32 additions & 3 deletions cmd/backup-manager/app/backup/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (bm *Manager) ProcessBackup() error {
func (bm *Manager) performBackup(backup *v1alpha1.Backup, db *sql.DB) error {
started := time.Now()

var errs []error
err := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupRunning,
Status: corev1.ConditionTrue,
Expand All @@ -135,7 +136,36 @@ func (bm *Manager) performBackup(backup *v1alpha1.Backup, db *sql.DB) error {
return err
}

var errs []error
backupFullPath, err := util.GetRemotePath(backup)
if err != nil {
errs = append(errs, err)
uerr := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "GetBackupRemotePathFailed",
Message: err.Error(),
})
errs = append(errs, uerr)
return errorutils.NewAggregate(errs)
}

backup.Status.BackupPath = backupFullPath
err = bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupPrepare,
Status: corev1.ConditionTrue,
})
if err != nil {
errs = append(errs, err)
uerr := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "UpdatePrepareBackupFailed",
Message: err.Error(),
})
errs = append(errs, uerr)
return errorutils.NewAggregate(errs)
}

oldTikvGCTime, err := bm.GetTikvGCLifeTime(db)
if err != nil {
errs = append(errs, err)
Expand Down Expand Up @@ -216,7 +246,7 @@ func (bm *Manager) performBackup(backup *v1alpha1.Backup, db *sql.DB) error {
klog.Infof("set cluster %s %s to %s success", bm, constants.TikvGCVariable, tikvGCLifeTime)
}

backupFullPath, backupErr := bm.backupData(backup)
backupErr := bm.backupData(backup)
if oldTikvGCTimeDuration < tikvGCTimeDuration {
err = bm.SetTikvGCLifeTime(db, oldTikvGCTime)
if err != nil {
Expand Down Expand Up @@ -282,7 +312,6 @@ func (bm *Manager) performBackup(backup *v1alpha1.Backup, db *sql.DB) error {

finish := time.Now()

backup.Status.BackupPath = backupFullPath
backup.Status.TimeStarted = metav1.Time{Time: started}
backup.Status.TimeCompleted = metav1.Time{Time: finish}
backup.Status.BackupSize = size
Expand Down
20 changes: 10 additions & 10 deletions cmd/backup-manager/app/util/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,24 +81,24 @@ func NewRemoteStorage(backup *v1alpha1.Backup) (*blob.Bucket, error) {
}

// getRemoteStorage returns the arg for --storage option and the remote path for br
func getRemoteStorage(provider v1alpha1.StorageProvider) ([]string, string, error) {
func getRemoteStorage(provider v1alpha1.StorageProvider) ([]string, error) {
st := util.GetStorageType(provider)
switch st {
case v1alpha1.BackupStorageTypeS3:
qs := checkS3Config(provider.S3, false)
s, path := newS3StorageOption(qs)
return s, path, nil
s := newS3StorageOption(qs)
return s, nil
case v1alpha1.BackupStorageTypeGcs:
qs := checkGcsConfig(provider.Gcs, false)
s, path := newGcsStorageOption(qs)
return s, path, nil
s := newGcsStorageOption(qs)
return s, nil
default:
return nil, "", fmt.Errorf("storage %s not support yet", st)
return nil, fmt.Errorf("storage %s not support yet", st)
}
}

// newS3StorageOption constructs the arg for --storage option and the remote path for br
func newS3StorageOption(qs *s3Query) ([]string, string) {
func newS3StorageOption(qs *s3Query) []string {
var s3options []string
var path string
if qs.prefix == "/" {
Expand All @@ -125,7 +125,7 @@ func newS3StorageOption(qs *s3Query) ([]string, string) {
if qs.storageClass != "" {
s3options = append(s3options, fmt.Sprintf("--s3.storage-class=%s", qs.storageClass))
}
return s3options, path
return s3options
}

// newS3Storage initialize a new s3 storage
Expand Down Expand Up @@ -183,7 +183,7 @@ func newGcsStorage(qs *gcsQuery) (*blob.Bucket, error) {
}

// newGcsStorageOption constructs the arg for --storage option and the remote path for br
func newGcsStorageOption(qs *gcsQuery) ([]string, string) {
func newGcsStorageOption(qs *gcsQuery) []string {
var gcsoptions []string
var path string
if qs.prefix == "/" {
Expand All @@ -198,7 +198,7 @@ func newGcsStorageOption(qs *gcsQuery) ([]string, string) {
if qs.objectAcl != "" {
gcsoptions = append(gcsoptions, fmt.Sprintf("--gcs.predefined-acl=%s", qs.objectAcl))
}
return gcsoptions, path
return gcsoptions
}

// checkS3Config constructs s3Query parameters
Expand Down
43 changes: 37 additions & 6 deletions cmd/backup-manager/app/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,37 @@ func EnsureDirectoryExist(dirName string) error {
return nil
}

func GetRemotePath(backup *v1alpha1.Backup) (string, error) {
var path, bucket, prefix string
st := util.GetStorageType(backup.Spec.StorageProvider)
switch st {
case v1alpha1.BackupStorageTypeS3:
prefix = backup.Spec.StorageProvider.S3.Prefix
bucket = backup.Spec.StorageProvider.S3.Bucket
prefix = strings.Trim(prefix, "/")
prefix += "/"
if prefix == "/" {
path = fmt.Sprintf("s3://%s%s", bucket, prefix)
} else {
path = fmt.Sprintf("s3://%s/%s", bucket, prefix)
}
return path, nil
case v1alpha1.BackupStorageTypeGcs:
prefix = backup.Spec.StorageProvider.Gcs.Prefix
bucket = backup.Spec.StorageProvider.Gcs.Bucket
prefix = strings.Trim(prefix, "/")
prefix += "/"
if prefix == "/" {
path = fmt.Sprintf("gcs://%s%s", bucket, prefix)
} else {
path = fmt.Sprintf("gcs://%s/%s", bucket, prefix)
}
return path, nil
default:
return "", fmt.Errorf("storage %s not support yet", st)
}
}

// OpenDB opens db
func OpenDB(dsn string) (*sql.DB, error) {
db, err := sql.Open("mysql", dsn)
Expand Down Expand Up @@ -127,16 +158,16 @@ func GetOptionValueFromEnv(option, envPrefix string) string {
}

// ConstructBRGlobalOptionsForBackup constructs BR global options for backup and also return the remote path.
func ConstructBRGlobalOptionsForBackup(backup *v1alpha1.Backup) ([]string, string, error) {
func ConstructBRGlobalOptionsForBackup(backup *v1alpha1.Backup) ([]string, error) {
var args []string
config := backup.Spec.BR
if config == nil {
return nil, "", fmt.Errorf("no config for br in backup %s/%s", backup.Namespace, backup.Name)
return nil, fmt.Errorf("no config for br in backup %s/%s", backup.Namespace, backup.Name)
}
args = append(args, constructBRGlobalOptions(config)...)
storageArgs, remotePath, err := getRemoteStorage(backup.Spec.StorageProvider)
storageArgs, err := getRemoteStorage(backup.Spec.StorageProvider)
if err != nil {
return nil, "", err
return nil, err
}
args = append(args, storageArgs...)
if (backup.Spec.Type == v1alpha1.BackupTypeDB || backup.Spec.Type == v1alpha1.BackupTypeTable) && config.DB != "" {
Expand All @@ -145,7 +176,7 @@ func ConstructBRGlobalOptionsForBackup(backup *v1alpha1.Backup) ([]string, strin
if backup.Spec.Type == v1alpha1.BackupTypeTable && config.Table != "" {
args = append(args, fmt.Sprintf("--table=%s", config.Table))
}
return args, remotePath, nil
return args, nil
}

// ConstructMydumperOptionsForBackup constructs mydumper options for backup
Expand Down Expand Up @@ -180,7 +211,7 @@ func ConstructBRGlobalOptionsForRestore(restore *v1alpha1.Restore) ([]string, er
return nil, fmt.Errorf("no config for br in restore %s/%s", restore.Namespace, restore.Name)
}
args = append(args, constructBRGlobalOptions(config)...)
storageArgs, _, err := getRemoteStorage(restore.Spec.StorageProvider)
storageArgs, err := getRemoteStorage(restore.Spec.StorageProvider)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/pingcap/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,8 @@ const (
BackupRetryFailed BackupConditionType = "RetryFailed"
// BackupInvalid means invalid backup CR
BackupInvalid BackupConditionType = "Invalid"
// BackupPrepare means the backup prepare backup process
BackupPrepare BackupConditionType = "Prepare"
)

// BackupCondition describes the observed state of a Backup at a certain point.
Expand Down