diff --git a/cmd/backup-manager/app/backup/backup.go b/cmd/backup-manager/app/backup/backup.go index 8e3b3748cc..e2c6ddd69e 100644 --- a/cmd/backup-manager/app/backup/backup.go +++ b/cmd/backup-manager/app/backup/backup.go @@ -38,14 +38,14 @@ type Options struct { backupUtil.GenericOptions } -func (bo *Options) backupData(backup *v1alpha1.Backup) (string, error) { +func (bo *Options) backupData(backup *v1alpha1.Backup) error { clusterNamespace := backup.Spec.BR.ClusterNamespace if backup.Spec.BR.ClusterNamespace == "" { clusterNamespace = backup.Namespace } - args, remotePath, err := constructOptions(backup) + args, err := constructOptions(backup) if err != nil { - return "", err + return err } args = append(args, fmt.Sprintf("--pd=%s-pd.%s:2379", backup.Spec.BR.Cluster, clusterNamespace)) if bo.TLSCluster { @@ -71,15 +71,15 @@ func (bo *Options) backupData(backup *v1alpha1.Backup) (string, error) { stdOut, err := cmd.StdoutPipe() if err != nil { - return remotePath, fmt.Errorf("cluster %s, create stdout pipe failed, err: %v", bo, err) + return fmt.Errorf("cluster %s, create stdout pipe failed, err: %v", bo, err) } stdErr, err := cmd.StderrPipe() if err != nil { - return remotePath, fmt.Errorf("cluster %s, create stderr pipe failed, err: %v", bo, err) + return fmt.Errorf("cluster %s, create stderr pipe failed, err: %v", bo, err) } err = cmd.Start() if err != nil { - return remotePath, fmt.Errorf("cluster %s, execute br command failed, args: %s, err: %v", bo, fullArgs, err) + return fmt.Errorf("cluster %s, execute br command failed, args: %s, err: %v", bo, fullArgs, err) } var errMsg string reader := bufio.NewReader(stdOut) @@ -101,11 +101,11 @@ func (bo *Options) backupData(backup *v1alpha1.Backup) (string, error) { } err = cmd.Wait() if err != nil { - return remotePath, fmt.Errorf("cluster %s, wait pipe message failed, errMsg %s, err: %v", bo, errMsg, err) + return fmt.Errorf("cluster %s, wait pipe message failed, errMsg %s, err: %v", bo, errMsg, err) } klog.Infof("Backup data for cluster %s successfully", bo) - return remotePath, nil + return nil } // getCommitTs get backup position from `EndVersion` in BR backup meta @@ -138,10 +138,10 @@ func getCommitTs(backup *v1alpha1.Backup) (uint64, error) { } // constructOptions constructs options for BR and also return the remote path -func constructOptions(backup *v1alpha1.Backup) ([]string, string, error) { - args, remotePath, err := backupUtil.ConstructBRGlobalOptionsForBackup(backup) +func constructOptions(backup *v1alpha1.Backup) ([]string, error) { + args, err := backupUtil.ConstructBRGlobalOptionsForBackup(backup) if err != nil { - return args, remotePath, err + return args, err } config := backup.Spec.BR if config.Concurrency != nil { @@ -156,7 +156,7 @@ func constructOptions(backup *v1alpha1.Backup) ([]string, string, error) { if config.Checksum != nil { args = append(args, fmt.Sprintf("--checksum=%t", *config.Checksum)) } - return args, remotePath, nil + return args, nil } // getBackupSize get the backup data size from remote diff --git a/cmd/backup-manager/app/backup/manager.go b/cmd/backup-manager/app/backup/manager.go index a666ca4640..d24e451dab 100644 --- a/cmd/backup-manager/app/backup/manager.go +++ b/cmd/backup-manager/app/backup/manager.go @@ -127,6 +127,7 @@ func (bm *Manager) ProcessBackup() error { func (bm *Manager) performBackup(backup *v1alpha1.Backup, db *sql.DB) error { started := time.Now() + var errs []error err := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ Type: v1alpha1.BackupRunning, Status: corev1.ConditionTrue, @@ -135,7 +136,36 @@ func (bm *Manager) performBackup(backup *v1alpha1.Backup, db *sql.DB) error { return err } - var errs []error + backupFullPath, err := util.GetRemotePath(backup) + if err != nil { + errs = append(errs, err) + uerr := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupFailed, + Status: corev1.ConditionTrue, + Reason: "GetBackupRemotePathFailed", + Message: err.Error(), + }) + errs = append(errs, uerr) + return errorutils.NewAggregate(errs) + } + + backup.Status.BackupPath = backupFullPath + err = bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupPrepare, + Status: corev1.ConditionTrue, + }) + if err != nil { + errs = append(errs, err) + uerr := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupFailed, + Status: corev1.ConditionTrue, + Reason: "UpdatePrepareBackupFailed", + Message: err.Error(), + }) + errs = append(errs, uerr) + return errorutils.NewAggregate(errs) + } + oldTikvGCTime, err := bm.GetTikvGCLifeTime(db) if err != nil { errs = append(errs, err) @@ -216,7 +246,7 @@ func (bm *Manager) performBackup(backup *v1alpha1.Backup, db *sql.DB) error { klog.Infof("set cluster %s %s to %s success", bm, constants.TikvGCVariable, tikvGCLifeTime) } - backupFullPath, backupErr := bm.backupData(backup) + backupErr := bm.backupData(backup) if oldTikvGCTimeDuration < tikvGCTimeDuration { err = bm.SetTikvGCLifeTime(db, oldTikvGCTime) if err != nil { @@ -282,7 +312,6 @@ func (bm *Manager) performBackup(backup *v1alpha1.Backup, db *sql.DB) error { finish := time.Now() - backup.Status.BackupPath = backupFullPath backup.Status.TimeStarted = metav1.Time{Time: started} backup.Status.TimeCompleted = metav1.Time{Time: finish} backup.Status.BackupSize = size diff --git a/cmd/backup-manager/app/util/remote.go b/cmd/backup-manager/app/util/remote.go index 3381258683..61669abcd2 100644 --- a/cmd/backup-manager/app/util/remote.go +++ b/cmd/backup-manager/app/util/remote.go @@ -81,24 +81,24 @@ func NewRemoteStorage(backup *v1alpha1.Backup) (*blob.Bucket, error) { } // getRemoteStorage returns the arg for --storage option and the remote path for br -func getRemoteStorage(provider v1alpha1.StorageProvider) ([]string, string, error) { +func getRemoteStorage(provider v1alpha1.StorageProvider) ([]string, error) { st := util.GetStorageType(provider) switch st { case v1alpha1.BackupStorageTypeS3: qs := checkS3Config(provider.S3, false) - s, path := newS3StorageOption(qs) - return s, path, nil + s := newS3StorageOption(qs) + return s, nil case v1alpha1.BackupStorageTypeGcs: qs := checkGcsConfig(provider.Gcs, false) - s, path := newGcsStorageOption(qs) - return s, path, nil + s := newGcsStorageOption(qs) + return s, nil default: - return nil, "", fmt.Errorf("storage %s not support yet", st) + return nil, fmt.Errorf("storage %s not support yet", st) } } // newS3StorageOption constructs the arg for --storage option and the remote path for br -func newS3StorageOption(qs *s3Query) ([]string, string) { +func newS3StorageOption(qs *s3Query) []string { var s3options []string var path string if qs.prefix == "/" { @@ -125,7 +125,7 @@ func newS3StorageOption(qs *s3Query) ([]string, string) { if qs.storageClass != "" { s3options = append(s3options, fmt.Sprintf("--s3.storage-class=%s", qs.storageClass)) } - return s3options, path + return s3options } // newS3Storage initialize a new s3 storage @@ -183,7 +183,7 @@ func newGcsStorage(qs *gcsQuery) (*blob.Bucket, error) { } // newGcsStorageOption constructs the arg for --storage option and the remote path for br -func newGcsStorageOption(qs *gcsQuery) ([]string, string) { +func newGcsStorageOption(qs *gcsQuery) []string { var gcsoptions []string var path string if qs.prefix == "/" { @@ -198,7 +198,7 @@ func newGcsStorageOption(qs *gcsQuery) ([]string, string) { if qs.objectAcl != "" { gcsoptions = append(gcsoptions, fmt.Sprintf("--gcs.predefined-acl=%s", qs.objectAcl)) } - return gcsoptions, path + return gcsoptions } // checkS3Config constructs s3Query parameters diff --git a/cmd/backup-manager/app/util/util.go b/cmd/backup-manager/app/util/util.go index 8f1ebca140..7af922ee46 100644 --- a/cmd/backup-manager/app/util/util.go +++ b/cmd/backup-manager/app/util/util.go @@ -84,6 +84,37 @@ func EnsureDirectoryExist(dirName string) error { return nil } +func GetRemotePath(backup *v1alpha1.Backup) (string, error) { + var path, bucket, prefix string + st := util.GetStorageType(backup.Spec.StorageProvider) + switch st { + case v1alpha1.BackupStorageTypeS3: + prefix = backup.Spec.StorageProvider.S3.Prefix + bucket = backup.Spec.StorageProvider.S3.Bucket + prefix = strings.Trim(prefix, "/") + prefix += "/" + if prefix == "/" { + path = fmt.Sprintf("s3://%s%s", bucket, prefix) + } else { + path = fmt.Sprintf("s3://%s/%s", bucket, prefix) + } + return path, nil + case v1alpha1.BackupStorageTypeGcs: + prefix = backup.Spec.StorageProvider.Gcs.Prefix + bucket = backup.Spec.StorageProvider.Gcs.Bucket + prefix = strings.Trim(prefix, "/") + prefix += "/" + if prefix == "/" { + path = fmt.Sprintf("gcs://%s%s", bucket, prefix) + } else { + path = fmt.Sprintf("gcs://%s/%s", bucket, prefix) + } + return path, nil + default: + return "", fmt.Errorf("storage %s not support yet", st) + } +} + // OpenDB opens db func OpenDB(dsn string) (*sql.DB, error) { db, err := sql.Open("mysql", dsn) @@ -127,16 +158,16 @@ func GetOptionValueFromEnv(option, envPrefix string) string { } // ConstructBRGlobalOptionsForBackup constructs BR global options for backup and also return the remote path. -func ConstructBRGlobalOptionsForBackup(backup *v1alpha1.Backup) ([]string, string, error) { +func ConstructBRGlobalOptionsForBackup(backup *v1alpha1.Backup) ([]string, error) { var args []string config := backup.Spec.BR if config == nil { - return nil, "", fmt.Errorf("no config for br in backup %s/%s", backup.Namespace, backup.Name) + return nil, fmt.Errorf("no config for br in backup %s/%s", backup.Namespace, backup.Name) } args = append(args, constructBRGlobalOptions(config)...) - storageArgs, remotePath, err := getRemoteStorage(backup.Spec.StorageProvider) + storageArgs, err := getRemoteStorage(backup.Spec.StorageProvider) if err != nil { - return nil, "", err + return nil, err } args = append(args, storageArgs...) if (backup.Spec.Type == v1alpha1.BackupTypeDB || backup.Spec.Type == v1alpha1.BackupTypeTable) && config.DB != "" { @@ -145,7 +176,7 @@ func ConstructBRGlobalOptionsForBackup(backup *v1alpha1.Backup) ([]string, strin if backup.Spec.Type == v1alpha1.BackupTypeTable && config.Table != "" { args = append(args, fmt.Sprintf("--table=%s", config.Table)) } - return args, remotePath, nil + return args, nil } // ConstructMydumperOptionsForBackup constructs mydumper options for backup @@ -180,7 +211,7 @@ func ConstructBRGlobalOptionsForRestore(restore *v1alpha1.Restore) ([]string, er return nil, fmt.Errorf("no config for br in restore %s/%s", restore.Namespace, restore.Name) } args = append(args, constructBRGlobalOptions(config)...) - storageArgs, _, err := getRemoteStorage(restore.Spec.StorageProvider) + storageArgs, err := getRemoteStorage(restore.Spec.StorageProvider) if err != nil { return nil, err } diff --git a/pkg/apis/pingcap/v1alpha1/types.go b/pkg/apis/pingcap/v1alpha1/types.go index 80ec484457..1882ec4f57 100644 --- a/pkg/apis/pingcap/v1alpha1/types.go +++ b/pkg/apis/pingcap/v1alpha1/types.go @@ -1124,6 +1124,8 @@ const ( BackupRetryFailed BackupConditionType = "RetryFailed" // BackupInvalid means invalid backup CR BackupInvalid BackupConditionType = "Invalid" + // BackupPrepare means the backup prepare backup process + BackupPrepare BackupConditionType = "Prepare" ) // BackupCondition describes the observed state of a Backup at a certain point.