Skip to content

Commit

Permalink
Backup: fix remote file not clean when job is failed (#2669) (#2674)
Browse files Browse the repository at this point in the history
Signed-off-by: sre-bot <sre-bot@pingcap.com>

Co-authored-by: 尹亮 <30903849+shuijing198799@users.noreply.github.com>
  • Loading branch information
sre-bot and shuijing198799 authored Jun 10, 2020
1 parent b88761b commit d1b5c9b
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 31 deletions.
24 changes: 12 additions & 12 deletions cmd/backup-manager/app/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ type Options struct {
backupUtil.GenericOptions
}

func (bo *Options) backupData(backup *v1alpha1.Backup) (string, error) {
func (bo *Options) backupData(backup *v1alpha1.Backup) error {
clusterNamespace := backup.Spec.BR.ClusterNamespace
if backup.Spec.BR.ClusterNamespace == "" {
clusterNamespace = backup.Namespace
}
args, remotePath, err := constructOptions(backup)
args, err := constructOptions(backup)
if err != nil {
return "", err
return err
}
args = append(args, fmt.Sprintf("--pd=%s-pd.%s:2379", backup.Spec.BR.Cluster, clusterNamespace))
if bo.TLSCluster {
Expand All @@ -71,15 +71,15 @@ func (bo *Options) backupData(backup *v1alpha1.Backup) (string, error) {

stdOut, err := cmd.StdoutPipe()
if err != nil {
return remotePath, fmt.Errorf("cluster %s, create stdout pipe failed, err: %v", bo, err)
return fmt.Errorf("cluster %s, create stdout pipe failed, err: %v", bo, err)
}
stdErr, err := cmd.StderrPipe()
if err != nil {
return remotePath, fmt.Errorf("cluster %s, create stderr pipe failed, err: %v", bo, err)
return fmt.Errorf("cluster %s, create stderr pipe failed, err: %v", bo, err)
}
err = cmd.Start()
if err != nil {
return remotePath, fmt.Errorf("cluster %s, execute br command failed, args: %s, err: %v", bo, fullArgs, err)
return fmt.Errorf("cluster %s, execute br command failed, args: %s, err: %v", bo, fullArgs, err)
}
var errMsg string
reader := bufio.NewReader(stdOut)
Expand All @@ -101,11 +101,11 @@ func (bo *Options) backupData(backup *v1alpha1.Backup) (string, error) {
}
err = cmd.Wait()
if err != nil {
return remotePath, fmt.Errorf("cluster %s, wait pipe message failed, errMsg %s, err: %v", bo, errMsg, err)
return fmt.Errorf("cluster %s, wait pipe message failed, errMsg %s, err: %v", bo, errMsg, err)
}

klog.Infof("Backup data for cluster %s successfully", bo)
return remotePath, nil
return nil
}

// getCommitTs get backup position from `EndVersion` in BR backup meta
Expand Down Expand Up @@ -138,10 +138,10 @@ func getCommitTs(backup *v1alpha1.Backup) (uint64, error) {
}

// constructOptions constructs options for BR and also return the remote path
func constructOptions(backup *v1alpha1.Backup) ([]string, string, error) {
args, remotePath, err := backupUtil.ConstructBRGlobalOptionsForBackup(backup)
func constructOptions(backup *v1alpha1.Backup) ([]string, error) {
args, err := backupUtil.ConstructBRGlobalOptionsForBackup(backup)
if err != nil {
return args, remotePath, err
return args, err
}
config := backup.Spec.BR
if config.Concurrency != nil {
Expand All @@ -156,7 +156,7 @@ func constructOptions(backup *v1alpha1.Backup) ([]string, string, error) {
if config.Checksum != nil {
args = append(args, fmt.Sprintf("--checksum=%t", *config.Checksum))
}
return args, remotePath, nil
return args, nil
}

// getBackupSize get the backup data size from remote
Expand Down
35 changes: 32 additions & 3 deletions cmd/backup-manager/app/backup/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (bm *Manager) ProcessBackup() error {
func (bm *Manager) performBackup(backup *v1alpha1.Backup, db *sql.DB) error {
started := time.Now()

var errs []error
err := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupRunning,
Status: corev1.ConditionTrue,
Expand All @@ -135,7 +136,36 @@ func (bm *Manager) performBackup(backup *v1alpha1.Backup, db *sql.DB) error {
return err
}

var errs []error
backupFullPath, err := util.GetRemotePath(backup)
if err != nil {
errs = append(errs, err)
uerr := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "GetBackupRemotePathFailed",
Message: err.Error(),
})
errs = append(errs, uerr)
return errorutils.NewAggregate(errs)
}

backup.Status.BackupPath = backupFullPath
err = bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupPrepare,
Status: corev1.ConditionTrue,
})
if err != nil {
errs = append(errs, err)
uerr := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "UpdatePrepareBackupFailed",
Message: err.Error(),
})
errs = append(errs, uerr)
return errorutils.NewAggregate(errs)
}

oldTikvGCTime, err := bm.GetTikvGCLifeTime(db)
if err != nil {
errs = append(errs, err)
Expand Down Expand Up @@ -216,7 +246,7 @@ func (bm *Manager) performBackup(backup *v1alpha1.Backup, db *sql.DB) error {
klog.Infof("set cluster %s %s to %s success", bm, constants.TikvGCVariable, tikvGCLifeTime)
}

backupFullPath, backupErr := bm.backupData(backup)
backupErr := bm.backupData(backup)
if oldTikvGCTimeDuration < tikvGCTimeDuration {
err = bm.SetTikvGCLifeTime(db, oldTikvGCTime)
if err != nil {
Expand Down Expand Up @@ -282,7 +312,6 @@ func (bm *Manager) performBackup(backup *v1alpha1.Backup, db *sql.DB) error {

finish := time.Now()

backup.Status.BackupPath = backupFullPath
backup.Status.TimeStarted = metav1.Time{Time: started}
backup.Status.TimeCompleted = metav1.Time{Time: finish}
backup.Status.BackupSize = size
Expand Down
20 changes: 10 additions & 10 deletions cmd/backup-manager/app/util/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,24 +81,24 @@ func NewRemoteStorage(backup *v1alpha1.Backup) (*blob.Bucket, error) {
}

// getRemoteStorage returns the arg for --storage option and the remote path for br
func getRemoteStorage(provider v1alpha1.StorageProvider) ([]string, string, error) {
func getRemoteStorage(provider v1alpha1.StorageProvider) ([]string, error) {
st := util.GetStorageType(provider)
switch st {
case v1alpha1.BackupStorageTypeS3:
qs := checkS3Config(provider.S3, false)
s, path := newS3StorageOption(qs)
return s, path, nil
s := newS3StorageOption(qs)
return s, nil
case v1alpha1.BackupStorageTypeGcs:
qs := checkGcsConfig(provider.Gcs, false)
s, path := newGcsStorageOption(qs)
return s, path, nil
s := newGcsStorageOption(qs)
return s, nil
default:
return nil, "", fmt.Errorf("storage %s not support yet", st)
return nil, fmt.Errorf("storage %s not support yet", st)
}
}

// newS3StorageOption constructs the arg for --storage option and the remote path for br
func newS3StorageOption(qs *s3Query) ([]string, string) {
func newS3StorageOption(qs *s3Query) []string {
var s3options []string
var path string
if qs.prefix == "/" {
Expand All @@ -125,7 +125,7 @@ func newS3StorageOption(qs *s3Query) ([]string, string) {
if qs.storageClass != "" {
s3options = append(s3options, fmt.Sprintf("--s3.storage-class=%s", qs.storageClass))
}
return s3options, path
return s3options
}

// newS3Storage initialize a new s3 storage
Expand Down Expand Up @@ -183,7 +183,7 @@ func newGcsStorage(qs *gcsQuery) (*blob.Bucket, error) {
}

// newGcsStorageOption constructs the arg for --storage option and the remote path for br
func newGcsStorageOption(qs *gcsQuery) ([]string, string) {
func newGcsStorageOption(qs *gcsQuery) []string {
var gcsoptions []string
var path string
if qs.prefix == "/" {
Expand All @@ -198,7 +198,7 @@ func newGcsStorageOption(qs *gcsQuery) ([]string, string) {
if qs.objectAcl != "" {
gcsoptions = append(gcsoptions, fmt.Sprintf("--gcs.predefined-acl=%s", qs.objectAcl))
}
return gcsoptions, path
return gcsoptions
}

// checkS3Config constructs s3Query parameters
Expand Down
43 changes: 37 additions & 6 deletions cmd/backup-manager/app/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,37 @@ func EnsureDirectoryExist(dirName string) error {
return nil
}

func GetRemotePath(backup *v1alpha1.Backup) (string, error) {
var path, bucket, prefix string
st := util.GetStorageType(backup.Spec.StorageProvider)
switch st {
case v1alpha1.BackupStorageTypeS3:
prefix = backup.Spec.StorageProvider.S3.Prefix
bucket = backup.Spec.StorageProvider.S3.Bucket
prefix = strings.Trim(prefix, "/")
prefix += "/"
if prefix == "/" {
path = fmt.Sprintf("s3://%s%s", bucket, prefix)
} else {
path = fmt.Sprintf("s3://%s/%s", bucket, prefix)
}
return path, nil
case v1alpha1.BackupStorageTypeGcs:
prefix = backup.Spec.StorageProvider.Gcs.Prefix
bucket = backup.Spec.StorageProvider.Gcs.Bucket
prefix = strings.Trim(prefix, "/")
prefix += "/"
if prefix == "/" {
path = fmt.Sprintf("gcs://%s%s", bucket, prefix)
} else {
path = fmt.Sprintf("gcs://%s/%s", bucket, prefix)
}
return path, nil
default:
return "", fmt.Errorf("storage %s not support yet", st)
}
}

// OpenDB opens db
func OpenDB(dsn string) (*sql.DB, error) {
db, err := sql.Open("mysql", dsn)
Expand Down Expand Up @@ -127,16 +158,16 @@ func GetOptionValueFromEnv(option, envPrefix string) string {
}

// ConstructBRGlobalOptionsForBackup constructs BR global options for backup and also return the remote path.
func ConstructBRGlobalOptionsForBackup(backup *v1alpha1.Backup) ([]string, string, error) {
func ConstructBRGlobalOptionsForBackup(backup *v1alpha1.Backup) ([]string, error) {
var args []string
config := backup.Spec.BR
if config == nil {
return nil, "", fmt.Errorf("no config for br in backup %s/%s", backup.Namespace, backup.Name)
return nil, fmt.Errorf("no config for br in backup %s/%s", backup.Namespace, backup.Name)
}
args = append(args, constructBRGlobalOptions(config)...)
storageArgs, remotePath, err := getRemoteStorage(backup.Spec.StorageProvider)
storageArgs, err := getRemoteStorage(backup.Spec.StorageProvider)
if err != nil {
return nil, "", err
return nil, err
}
args = append(args, storageArgs...)
if (backup.Spec.Type == v1alpha1.BackupTypeDB || backup.Spec.Type == v1alpha1.BackupTypeTable) && config.DB != "" {
Expand All @@ -145,7 +176,7 @@ func ConstructBRGlobalOptionsForBackup(backup *v1alpha1.Backup) ([]string, strin
if backup.Spec.Type == v1alpha1.BackupTypeTable && config.Table != "" {
args = append(args, fmt.Sprintf("--table=%s", config.Table))
}
return args, remotePath, nil
return args, nil
}

// ConstructMydumperOptionsForBackup constructs mydumper options for backup
Expand Down Expand Up @@ -180,7 +211,7 @@ func ConstructBRGlobalOptionsForRestore(restore *v1alpha1.Restore) ([]string, er
return nil, fmt.Errorf("no config for br in restore %s/%s", restore.Namespace, restore.Name)
}
args = append(args, constructBRGlobalOptions(config)...)
storageArgs, _, err := getRemoteStorage(restore.Spec.StorageProvider)
storageArgs, err := getRemoteStorage(restore.Spec.StorageProvider)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/pingcap/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,8 @@ const (
BackupRetryFailed BackupConditionType = "RetryFailed"
// BackupInvalid means invalid backup CR
BackupInvalid BackupConditionType = "Invalid"
// BackupPrepare means the backup prepare backup process
BackupPrepare BackupConditionType = "Prepare"
)

// BackupCondition describes the observed state of a Backup at a certain point.
Expand Down

0 comments on commit d1b5c9b

Please sign in to comment.