Skip to content

Commit

Permalink
cherry pick pingcap#2899 to release-1.1
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
lichunzhu authored and ti-srebot committed Jul 14, 2020
1 parent 8f6ad50 commit bbdf742
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 97 deletions.
34 changes: 1 addition & 33 deletions cmd/backup-manager/app/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ import (
"path"
"strings"

"github.com/gogo/protobuf/proto"
kvbackup "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
backupUtil "github.com/pingcap/tidb-operator/cmd/backup-manager/app/util"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/util"
Expand Down Expand Up @@ -108,35 +105,6 @@ func (bo *Options) backupData(backup *v1alpha1.Backup) error {
return nil
}

// getCommitTs get backup position from `EndVersion` in BR backup meta
func getCommitTs(backup *v1alpha1.Backup) (uint64, error) {
var commitTs uint64
s, err := backupUtil.NewRemoteStorage(backup)
if err != nil {
return commitTs, err
}
defer s.Close()
ctx := context.Background()
exist, err := s.Exists(ctx, constants.MetaFile)
if err != nil {
return commitTs, err
}
if !exist {
return commitTs, fmt.Errorf("%s not exist", constants.MetaFile)

}
metaData, err := s.ReadAll(ctx, constants.MetaFile)
if err != nil {
return commitTs, err
}
backupMeta := &kvbackup.BackupMeta{}
err = proto.Unmarshal(metaData, backupMeta)
if err != nil {
return commitTs, err
}
return backupMeta.EndVersion, nil
}

// constructOptions constructs options for BR and also return the remote path
func constructOptions(backup *v1alpha1.Backup) ([]string, error) {
args, err := backupUtil.ConstructBRGlobalOptionsForBackup(backup)
Expand All @@ -162,7 +130,7 @@ func constructOptions(backup *v1alpha1.Backup) ([]string, error) {
// getBackupSize get the backup data size from remote
func getBackupSize(backup *v1alpha1.Backup) (int64, error) {
var size int64
s, err := backupUtil.NewRemoteStorage(backup)
s, err := backupUtil.NewRemoteStorage(backup.Spec.StorageProvider)
if err != nil {
return size, err
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/backup-manager/app/backup/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package backup
import (
"database/sql"
"fmt"
"strconv"
"time"

"github.com/dustin/go-humanize"
Expand Down Expand Up @@ -296,7 +297,7 @@ func (bm *Manager) performBackup(backup *v1alpha1.Backup, db *sql.DB) error {
}
klog.Infof("Get size %d for backup files in %s of cluster %s success", size, backupFullPath, bm)

commitTs, err := getCommitTs(backup)
commitTs, err := util.GetCommitTsFromBRMetaData(backup.Spec.StorageProvider)
if err != nil {
errs = append(errs, err)
klog.Errorf("get cluster %s commitTs failed, err: %s", bm, err)
Expand All @@ -317,7 +318,7 @@ func (bm *Manager) performBackup(backup *v1alpha1.Backup, db *sql.DB) error {
backup.Status.TimeCompleted = metav1.Time{Time: finish}
backup.Status.BackupSize = size
backup.Status.BackupSizeReadable = humanize.Bytes(uint64(size))
backup.Status.CommitTs = fmt.Sprintf("%d", commitTs)
backup.Status.CommitTs = strconv.FormatUint(commitTs, 10)

return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupComplete,
Expand Down
2 changes: 1 addition & 1 deletion cmd/backup-manager/app/clean/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (bo *Options) String() string {

// cleanBRRemoteBackupData clean the backup data from remote
func (bo *Options) cleanBRRemoteBackupData(backup *v1alpha1.Backup) error {
s, err := util.NewRemoteStorage(backup)
s, err := util.NewRemoteStorage(backup.Spec.StorageProvider)
if err != nil {
return err
}
Expand Down
40 changes: 0 additions & 40 deletions cmd/backup-manager/app/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package export

import (
"fmt"
"io/ioutil"
"os/exec"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -102,45 +101,6 @@ func (bo *Options) backupDataToRemote(source, bucketURI string, opts []string) e
return nil
}

/*
getCommitTsFromMetadata get commitTs from dumpling's metadata file
metadata file format is as follows:
Started dump at: 2019-06-13 10:00:04
SHOW MASTER STATUS:
Log: tidb-binlog
Pos: 409054741514944513
GTID:
Finished dump at: 2019-06-13 10:00:04
*/
func getCommitTsFromMetadata(backupPath string) (string, error) {
var commitTs string

metaFile := filepath.Join(backupPath, constants.MetaDataFile)
if exist := util.IsFileExist(metaFile); !exist {
return commitTs, fmt.Errorf("file %s does not exist or is not regular file", metaFile)
}
contents, err := ioutil.ReadFile(metaFile)
if err != nil {
return commitTs, fmt.Errorf("read metadata file %s failed, err: %v", metaFile, err)
}

for _, lineStr := range strings.Split(string(contents), "\n") {
if !strings.Contains(lineStr, "Pos") {
continue
}
lineStrSlice := strings.Split(lineStr, ":")
if len(lineStrSlice) != 2 {
return commitTs, fmt.Errorf("parse dumpling's metadata file %s failed, str: %s", metaFile, lineStr)
}
commitTs = strings.TrimSpace(lineStrSlice[1])
break
}
return commitTs, nil
}

// getBackupSize get the backup data size
func getBackupSize(backupPath string, opts []string) (int64, error) {
var size int64
Expand Down
31 changes: 16 additions & 15 deletions cmd/backup-manager/app/export/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,21 @@ func (bm *BackupManager) performBackup(backup *v1alpha1.Backup, db *sql.DB) erro
}
klog.Infof("dump cluster %s data to %s success", bm, backupFullPath)

commitTs, err := util.GetCommitTsFromMetadata(backupFullPath)
if err != nil {
errs = append(errs, err)
klog.Errorf("get cluster %s commitTs failed, err: %s", bm, err)
uerr := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "GetCommitTsFailed",
Message: err.Error(),
})
errs = append(errs, uerr)
return errorutils.NewAggregate(errs)
}
klog.Infof("get cluster %s commitTs %s success", bm, commitTs)

// TODO: Concurrent get file size and upload backup data to speed up processing time
archiveBackupPath := backupFullPath + constants.DefaultArchiveExtention
err = archiveBackupData(backupFullPath, archiveBackupPath)
Expand Down Expand Up @@ -300,21 +315,7 @@ func (bm *BackupManager) performBackup(backup *v1alpha1.Backup, db *sql.DB) erro
}
klog.Infof("get cluster %s archived backup file %s size %d success", bm, archiveBackupPath, size)

commitTs, err := getCommitTsFromMetadata(backupFullPath)
if err != nil {
errs = append(errs, err)
klog.Errorf("get cluster %s commitTs failed, err: %s", bm, err)
uerr := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "GetCommitTsFailed",
Message: err.Error(),
})
errs = append(errs, uerr)
return errorutils.NewAggregate(errs)
}
klog.Infof("get cluster %s commitTs %s success", bm, commitTs)
// get commitTs succeed, origin dir can be deleted safely
// archive backup data successfully, origin dir can be deleted safely
os.RemoveAll(backupFullPath)

remotePath := strings.TrimPrefix(archiveBackupPath, constants.BackupRootPath+"/")
Expand Down
16 changes: 16 additions & 0 deletions cmd/backup-manager/app/import/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,21 @@ func (rm *RestoreManager) performRestore(restore *v1alpha1.Restore) error {
}
klog.Infof("unarchive cluster %s backup %s data success", rm, restoreDataPath)

commitTs, err := util.GetCommitTsFromMetadata(unarchiveDataPath)
if err != nil {
errs = append(errs, err)
klog.Errorf("get cluster %s commitTs failed, err: %s", rm, err)
uerr := rm.StatusUpdater.Update(restore, &v1alpha1.RestoreCondition{
Type: v1alpha1.RestoreFailed,
Status: corev1.ConditionTrue,
Reason: "GetCommitTsFailed",
Message: err.Error(),
})
errs = append(errs, uerr)
return errorutils.NewAggregate(errs)
}
klog.Infof("get cluster %s commitTs %s success", rm, commitTs)

err = rm.loadTidbClusterData(unarchiveDataPath)
if err != nil {
errs = append(errs, err)
Expand All @@ -186,6 +201,7 @@ func (rm *RestoreManager) performRestore(restore *v1alpha1.Restore) error {

restore.Status.TimeStarted = metav1.Time{Time: started}
restore.Status.TimeCompleted = metav1.Time{Time: finish}
restore.Status.CommitTs = commitTs

return rm.StatusUpdater.Update(restore, &v1alpha1.RestoreCondition{
Type: v1alpha1.RestoreComplete,
Expand Down
17 changes: 17 additions & 0 deletions cmd/backup-manager/app/restore/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package restore
import (
"database/sql"
"fmt"
"strconv"
"time"

"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
Expand Down Expand Up @@ -135,6 +136,21 @@ func (rm *Manager) performRestore(restore *v1alpha1.Restore, db *sql.DB) error {
}

var errs []error

commitTs, err := util.GetCommitTsFromBRMetaData(restore.Spec.StorageProvider)
if err != nil {
errs = append(errs, err)
klog.Errorf("get cluster %s commitTs failed, err: %s", rm, err)
uerr := rm.StatusUpdater.Update(restore, &v1alpha1.RestoreCondition{
Type: v1alpha1.RestoreFailed,
Status: corev1.ConditionTrue,
Reason: "GetCommitTsFailed",
Message: err.Error(),
})
errs = append(errs, uerr)
return errorutils.NewAggregate(errs)
}

oldTikvGCTime, err := rm.GetTikvGCLifeTime(db)
if err != nil {
errs = append(errs, err)
Expand Down Expand Up @@ -250,6 +266,7 @@ func (rm *Manager) performRestore(restore *v1alpha1.Restore, db *sql.DB) error {
finish := time.Now()
restore.Status.TimeStarted = metav1.Time{Time: started}
restore.Status.TimeCompleted = metav1.Time{Time: finish}
restore.Status.CommitTs = strconv.FormatUint(commitTs, 10)

return rm.StatusUpdater.Update(restore, &v1alpha1.RestoreCondition{
Type: v1alpha1.RestoreComplete,
Expand Down
8 changes: 4 additions & 4 deletions cmd/backup-manager/app/util/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,18 @@ type gcsQuery struct {
}

// NewRemoteStorage creates new remote storage
func NewRemoteStorage(backup *v1alpha1.Backup) (*blob.Bucket, error) {
st := util.GetStorageType(backup.Spec.StorageProvider)
func NewRemoteStorage(provider v1alpha1.StorageProvider) (*blob.Bucket, error) {
st := util.GetStorageType(provider)
switch st {
case v1alpha1.BackupStorageTypeS3:
qs := checkS3Config(backup.Spec.S3, true)
qs := checkS3Config(provider.S3, true)
bucket, err := newS3Storage(qs)
if err != nil {
return nil, err
}
return bucket, nil
case v1alpha1.BackupStorageTypeGcs:
qs := checkGcsConfig(backup.Spec.Gcs, true)
qs := checkGcsConfig(provider.Gcs, true)
bucket, err := newGcsStorage(qs)
if err != nil {
return nil, err
Expand Down
73 changes: 73 additions & 0 deletions cmd/backup-manager/app/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@
package util

import (
"context"
"database/sql"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"

"github.com/Masterminds/semver"
"github.com/gogo/protobuf/proto"
kvbackup "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/backup/util"
Expand Down Expand Up @@ -266,6 +271,74 @@ func GetOptions(provider v1alpha1.StorageProvider) []string {
}
}

/*
GetCommitTsFromMetadata get commitTs from mydumper's metadata file
metadata file format is as follows:
Started dump at: 2019-06-13 10:00:04
SHOW MASTER STATUS:
Log: tidb-binlog
Pos: 409054741514944513
GTID:
Finished dump at: 2019-06-13 10:00:04
*/
func GetCommitTsFromMetadata(backupPath string) (string, error) {
var commitTs string

metaFile := filepath.Join(backupPath, constants.MetaDataFile)
if exist := IsFileExist(metaFile); !exist {
return commitTs, fmt.Errorf("file %s does not exist or is not regular file", metaFile)
}
contents, err := ioutil.ReadFile(metaFile)
if err != nil {
return commitTs, fmt.Errorf("read metadata file %s failed, err: %v", metaFile, err)
}

for _, lineStr := range strings.Split(string(contents), "\n") {
if !strings.Contains(lineStr, "Pos") {
continue
}
lineStrSlice := strings.Split(lineStr, ":")
if len(lineStrSlice) != 2 {
return commitTs, fmt.Errorf("parse mydumper's metadata file %s failed, str: %s", metaFile, lineStr)
}
commitTs = strings.TrimSpace(lineStrSlice[1])
break
}
return commitTs, nil
}

// GetCommitTsFromBRMetaData get backup position from `EndVersion` in BR backup meta
func GetCommitTsFromBRMetaData(provider v1alpha1.StorageProvider) (uint64, error) {
var commitTs uint64
s, err := NewRemoteStorage(provider)
if err != nil {
return commitTs, err
}
defer s.Close()
ctx := context.Background()
exist, err := s.Exists(ctx, constants.MetaFile)
if err != nil {
return commitTs, err
}
if !exist {
return commitTs, fmt.Errorf("%s not exist", constants.MetaFile)

}
metaData, err := s.ReadAll(ctx, constants.MetaFile)
if err != nil {
return commitTs, err
}
backupMeta := &kvbackup.BackupMeta{}
err = proto.Unmarshal(metaData, backupMeta)
if err != nil {
return commitTs, err
}
return backupMeta.EndVersion, nil
}

// ConstructArgs constructs the rclone args
func ConstructArgs(conf string, opts []string, command, source, dest string) []string {
var args []string
Expand Down
Loading

0 comments on commit bbdf742

Please sign in to comment.