Skip to content

Commit

Permalink
support backup with br
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielZhangQD committed Dec 19, 2019
1 parent 35db292 commit eca87b9
Show file tree
Hide file tree
Showing 8 changed files with 501 additions and 5 deletions.
93 changes: 93 additions & 0 deletions cmd/backup-manager/app/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package backup

import (
"context"
"database/sql"
"fmt"
"io"
"io/ioutil"
"os/exec"
"path/filepath"
Expand All @@ -26,6 +28,7 @@ import (
"github.com/mholt/archiver"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/util"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
glog "k8s.io/klog"
)

Expand Down Expand Up @@ -124,6 +127,51 @@ func (bo *BackupOpts) backupDataToRemote(source, bucketURI string) error {
return nil
}

func (bo *BackupOpts) brBackupData(backup *v1alpha1.Backup) (string, error) {
args, path, err := constructBROptions(backup)
if err != nil {
return "", err
}
fullArgs := []string{
"backup",
fmt.Sprintf("%s", backup.Spec.Type),
}
fullArgs = append(fullArgs, args...)
output, err := exec.Command("br", fullArgs...).CombinedOutput()
if err != nil {
return path, fmt.Errorf("cluster %s, execute br command %v failed, output: %s, err: %v", bo, args, string(output), err)
}
glog.Infof("backup data for cluster %s successfully, output: %s", bo, string(output))
return path, nil
}

// cleanBRRemoteBackupData clean the backup data from remote
func (bo *BackupOpts) cleanBRRemoteBackupData(backup *v1alpha1.Backup) error {
s, err := util.NewRemoteStorage(backup)
if err != nil {
return err
}
defer s.Close()
ctx := context.Background()
iter := s.List(nil)
for {
obj, err := iter.Next(ctx)
if err == io.EOF {
break
}
if err != nil {
return err
}
glog.Infof("Prepare to delete %s for cluster %s", obj.Key, bo)
err = s.Delete(context.Background(), obj.Key)
if err != nil {
return err
}
glog.Infof("Delete %s for cluster %s successfully", obj.Key, bo)
}
return nil
}

func (bo *BackupOpts) cleanRemoteBackupData(bucket string) error {
destBucket := util.NormalizeBucketURI(bucket)
output, err := exec.Command("rclone", constants.RcloneConfigArg, "deletefile", destBucket).CombinedOutput()
Expand Down Expand Up @@ -211,3 +259,48 @@ func archiveBackupData(backupDir, destFile string) error {
}
return nil
}

// constructBROptions constructs options for BR and also return the remote path
func constructBROptions(backup *v1alpha1.Backup) ([]string, string, error) {
args, path, err := util.ConstructBRGlobalOptions(backup)
if err != nil {
return args, path, err
}
config := backup.Spec.BR
if config.Concurrency != nil {
args = append(args, fmt.Sprintf("--concurrency=%d", *config.Concurrency))
}
if config.RateLimit != nil {
args = append(args, fmt.Sprintf("--ratelimit=%d", *config.RateLimit))
}
if config.TimeAgo != "" {
args = append(args, fmt.Sprintf("--timeago=%s", config.TimeAgo))
}
if config.Checksum != nil {
args = append(args, fmt.Sprintf("--checksum=%t", *config.Checksum))
}
return args, path, nil
}

// getBRBackupSize get the backup data size from remote
func getBRBackupSize(backupPath string, backup *v1alpha1.Backup) (int64, error) {
var size int64
s, err := util.NewRemoteStorage(backup)
if err != nil {
return size, err
}
defer s.Close()
ctx := context.Background()
iter := s.List(nil)
for {
obj, err := iter.Next(ctx)
if err == io.EOF {
break
}
if err != nil {
return size, err
}
size += obj.Size
}
return size, nil
}
77 changes: 76 additions & 1 deletion cmd/backup-manager/app/backup/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func (bm *BackupManager) ProcessBackup() error {
})
}

if backup.Spec.BR != nil {
return bm.performBRBackup(backup.DeepCopy())
}

var db *sql.DB
err = wait.PollImmediate(constants.PollInterval, constants.CheckTimeout, func() (done bool, err error) {
db, err = util.OpenDB(bm.getDSN(constants.TidbMetaDB))
Expand Down Expand Up @@ -91,6 +95,71 @@ func (bm *BackupManager) ProcessBackup() error {
return bm.performBackup(backup.DeepCopy(), db)
}

func (bm *BackupManager) performBRBackup(backup *v1alpha1.Backup) error {
started := time.Now()

err := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupRunning,
Status: corev1.ConditionTrue,
})
if err != nil {
return err
}

backupFullPath, err := bm.brBackupData(backup)
if err != nil {
glog.Errorf("backup cluster %s data to %s failed, err: %s", bm, bm.StorageType, err)
return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "BackupDataToRemoteFailed",
Message: err.Error(),
})
}
glog.Infof("backup cluster %s data to %s success", bm, backupFullPath)

// Note: The size get from remote may be incorrect because the blobs
// are eventually consistent.
size, err := getBRBackupSize(backupFullPath, backup)
if err != nil {
glog.Errorf("Get size for backup files in %s of cluster %s failed, err: %s", backupFullPath, bm, err)
return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "GetBackupSizeFailed",
Message: err.Error(),
})
}
glog.Infof("Get size %d for backup files in %s of cluster %s success", size, backupFullPath, bm)

// BR does not provide CommitTS yet
// https://github.com/pingcap/br/issues/76
// commitTs, err := getBRCommitTsFromMetadata(backupFullPath)
// if err != nil {
// glog.Errorf("get cluster %s commitTs failed, err: %s", bm, err)
// return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
// Type: v1alpha1.BackupFailed,
// Status: corev1.ConditionTrue,
// Reason: "GetCommitTsFailed",
// Message: err.Error(),
// })
// }
// glog.Infof("get cluster %s commitTs %s success", bm, commitTs)

finish := time.Now()

backup.Status.BackupPath = backupFullPath
backup.Status.TimeStarted = metav1.Time{Time: started}
backup.Status.TimeCompleted = metav1.Time{Time: finish}
backup.Status.BackupSize = size
backup.Status.CommitTs = ""

return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupComplete,
Status: corev1.ConditionTrue,
})
}

func (bm *BackupManager) performBackup(backup *v1alpha1.Backup, db *sql.DB) error {
started := time.Now()

Expand Down Expand Up @@ -260,7 +329,13 @@ func (bm *BackupManager) performCleanBackup(backup *v1alpha1.Backup) error {
})
}

err := bm.cleanRemoteBackupData(backup.Status.BackupPath)
var err error
if backup.Spec.BR != nil {
err = bm.cleanBRRemoteBackupData(backup)
} else {
err = bm.cleanRemoteBackupData(backup.Status.BackupPath)
}

if err != nil {
glog.Errorf("clean cluster %s backup %s failed, err: %s", bm, backup.Status.BackupPath, err)
return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Expand Down
Loading

0 comments on commit eca87b9

Please sign in to comment.