diff --git a/cmd/backup-manager/app/backup/backup.go b/cmd/backup-manager/app/backup/backup.go index f2e535f09c6..440252f5eda 100644 --- a/cmd/backup-manager/app/backup/backup.go +++ b/cmd/backup-manager/app/backup/backup.go @@ -15,280 +15,88 @@ package backup import ( "context" - "database/sql" "fmt" "io" - "io/ioutil" "os/exec" - "path/filepath" - "strconv" - "strings" - "time" - "github.com/mholt/archiver" - "github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants" + "github.com/gogo/protobuf/proto" + glog "k8s.io/klog" + + kvbackup "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/tidb-operator/cmd/backup-manager/app/util" "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" - glog "k8s.io/klog" ) -// BackupOpts contains the input arguments to the backup command -type BackupOpts struct { - Namespace string - BackupName string - Bucket string - Host string - Port int32 - Password string - User string - StorageType string -} - -func (bo *BackupOpts) String() string { - return fmt.Sprintf("%s/%s", bo.Namespace, bo.BackupName) -} - -func (bo *BackupOpts) getBackupFullPath() string { - return filepath.Join(constants.BackupRootPath, bo.getBackupRelativePath()) -} - -func (bo *BackupOpts) getBackupRelativePath() string { - backupName := fmt.Sprintf("backup-%s", time.Now().UTC().Format(time.RFC3339)) - return fmt.Sprintf("%s/%s", bo.Bucket, backupName) -} - -func (bo *BackupOpts) getDestBucketURI(remotePath string) string { - return fmt.Sprintf("%s://%s", bo.StorageType, remotePath) -} +const ( + // MetaFile represents file name + MetaFile = "backupmeta" +) -func (bo *BackupOpts) getTikvGCLifeTime(db *sql.DB) (string, error) { - var tikvGCTime string - sql := fmt.Sprintf("select variable_value from %s where variable_name= ?", constants.TidbMetaTable) - row := db.QueryRow(sql, constants.TikvGCVariable) - err := row.Scan(&tikvGCTime) - if err != nil { - return tikvGCTime, fmt.Errorf("query cluster %s %s failed, sql: %s, err: %v", bo, constants.TikvGCVariable, sql, err) - } - return tikvGCTime, nil +// Options contains the input arguments to the backup command +type Options struct { + Namespace string + BackupName string } -func (bo *BackupOpts) setTikvGCLifeTime(db *sql.DB, gcTime string) error { - sql := fmt.Sprintf("update %s set variable_value = ? where variable_name = ?", constants.TidbMetaTable) - _, err := db.Exec(sql, gcTime, constants.TikvGCVariable) - if err != nil { - return fmt.Errorf("set cluster %s %s failed, sql: %s, err: %v", bo, constants.TikvGCVariable, sql, err) - } - return nil +func (bo *Options) String() string { + return fmt.Sprintf("%s/%s", bo.Namespace, bo.BackupName) } -func (bo *BackupOpts) dumpTidbClusterData() (string, error) { - bfPath := bo.getBackupFullPath() - err := util.EnsureDirectoryExist(bfPath) +func (bo *Options) backupData(backup *v1alpha1.Backup) (string, error) { + args, path, err := constructOptions(backup) if err != nil { return "", err } - args := []string{ - fmt.Sprintf("--outputdir=%s", bfPath), - fmt.Sprintf("--host=%s", bo.Host), - fmt.Sprintf("--port=%d", bo.Port), - fmt.Sprintf("--user=%s", bo.User), - fmt.Sprintf("--password=%s", bo.Password), - "--long-query-guard=3600", - "--tidb-force-priority=LOW_PRIORITY", - "--verbose=3", - "--regex", - "^(?!(mysql|test|INFORMATION_SCHEMA|PERFORMANCE_SCHEMA))", - } - - output, err := exec.Command("/mydumper", args...).CombinedOutput() - if err != nil { - return bfPath, fmt.Errorf("cluster %s, execute mydumper command %v failed, output: %s, err: %v", bo, args, string(output), err) - } - return bfPath, nil -} - -func (bo *BackupOpts) backupDataToRemote(source, bucketURI string) error { - destBucket := util.NormalizeBucketURI(bucketURI) - tmpDestBucket := fmt.Sprintf("%s.tmp", destBucket) - // TODO: We may need to use exec.CommandContext to control timeouts. - output, err := exec.Command("rclone", constants.RcloneConfigArg, "copyto", source, tmpDestBucket).CombinedOutput() - if err != nil { - return fmt.Errorf("cluster %s, execute rclone copyto command for upload backup data %s failed, output: %s, err: %v", bo, bucketURI, string(output), err) - } - - glog.Infof("upload cluster %s backup data to %s successfully, now move it to permanent URL %s", bo, tmpDestBucket, destBucket) - - // the backup was a success - // remove .tmp extension - output, err = exec.Command("rclone", constants.RcloneConfigArg, "moveto", tmpDestBucket, destBucket).CombinedOutput() - if err != nil { - return fmt.Errorf("cluster %s, execute rclone moveto command failed, output: %s, err: %v", bo, string(output), err) - } - return nil -} - -func (bo *BackupOpts) brBackupData(backup *v1alpha1.Backup) (string, string, error) { - args, path, err := constructBROptions(backup) - if err != nil { - return "", "", err + var btype string + if backup.Spec.Type == "" { + btype = string(v1alpha1.BackupTypeFull) + } else { + btype = string(backup.Spec.Type) } fullArgs := []string{ "backup", - fmt.Sprintf("%s", backup.Spec.Type), + btype, } fullArgs = append(fullArgs, args...) output, err := exec.Command("br", fullArgs...).CombinedOutput() if err != nil { - return path, "", fmt.Errorf("cluster %s, execute br command %v failed, output: %s, err: %v", bo, args, string(output), err) + return path, fmt.Errorf("cluster %s, execute br command %v failed, output: %s, err: %v", bo, args, string(output), err) } glog.Infof("Backup data for cluster %s successfully, output: %s", bo, string(output)) - return path, string(output), nil + return path, nil } -// cleanBRRemoteBackupData clean the backup data from remote -func (bo *BackupOpts) cleanBRRemoteBackupData(backup *v1alpha1.Backup) error { +// getCommitTs get backup position from `EndVersion` in BR backup meta +func getCommitTs(backup *v1alpha1.Backup) (uint64, error) { + var commitTs uint64 s, err := util.NewRemoteStorage(backup) if err != nil { - return err + return commitTs, err } defer s.Close() ctx := context.Background() - iter := s.List(nil) - for { - obj, err := iter.Next(ctx) - if err == io.EOF { - break - } - if err != nil { - return err - } - glog.Infof("Prepare to delete %s for cluster %s", obj.Key, bo) - err = s.Delete(context.Background(), obj.Key) - if err != nil { - return err - } - glog.Infof("Delete %s for cluster %s successfully", obj.Key, bo) - } - return nil -} - -func (bo *BackupOpts) cleanRemoteBackupData(bucket string) error { - destBucket := util.NormalizeBucketURI(bucket) - output, err := exec.Command("rclone", constants.RcloneConfigArg, "deletefile", destBucket).CombinedOutput() + exist, err := s.Exists(ctx, MetaFile) if err != nil { - return fmt.Errorf("cluster %s, execute rclone deletefile command failed, output: %s, err: %v", bo, string(output), err) + return commitTs, err } + if !exist { + return commitTs, fmt.Errorf("%s not exist", MetaFile) - glog.Infof("cluster %s backup %s was deleted successfully", bo, bucket) - return nil -} - -func (bo *BackupOpts) getDSN(db string) string { - return fmt.Sprintf("%s:%s@(%s:%d)/%s?charset=utf8", bo.User, bo.Password, bo.Host, bo.Port, db) -} - -/* - getCommitTsFromMetadata get commitTs from mydumper's metadata file - - metadata file format is as follows: - - Started dump at: 2019-06-13 10:00:04 - SHOW MASTER STATUS: - Log: tidb-binlog - Pos: 409054741514944513 - GTID: - - Finished dump at: 2019-06-13 10:00:04 -*/ -func getCommitTsFromMetadata(backupPath string) (string, error) { - var commitTs string - - metaFile := filepath.Join(backupPath, constants.MetaDataFile) - if exist := util.IsFileExist(metaFile); !exist { - return commitTs, fmt.Errorf("file %s does not exist or is not regular file", metaFile) } - contents, err := ioutil.ReadFile(metaFile) + metaData, err := s.ReadAll(ctx, MetaFile) if err != nil { - return commitTs, fmt.Errorf("read metadata file %s failed, err: %v", metaFile, err) + return commitTs, err } - - for _, lineStr := range strings.Split(string(contents), "\n") { - if !strings.Contains(lineStr, "Pos") { - continue - } - lineStrSlice := strings.Split(lineStr, ":") - if len(lineStrSlice) != 2 { - return commitTs, fmt.Errorf("parse mydumper's metadata file %s failed, str: %s", metaFile, lineStr) - } - commitTs = strings.TrimSpace(lineStrSlice[1]) - break - } - return commitTs, nil -} - -// getBackupSize get the backup data size -func getBackupSize(backupPath string) (int64, error) { - var size int64 - if exist := util.IsFileExist(backupPath); !exist { - return size, fmt.Errorf("file %s does not exist or is not regular file", backupPath) - } - out, err := exec.Command("rclone", constants.RcloneConfigArg, "ls", backupPath).CombinedOutput() - if err != nil { - return size, fmt.Errorf("failed to get backup %s size, err: %v", backupPath, err) - } - sizeStr := strings.Fields(string(out))[0] - size, err = strconv.ParseInt(sizeStr, 10, 64) + backupMeta := &kvbackup.BackupMeta{} + err = proto.Unmarshal(metaData, backupMeta) if err != nil { - return size, fmt.Errorf("failed to parse size string %s, err: %v", sizeStr, err) - } - return size, nil -} - -// archiveBackupData archive backup data by destFile's extension name -func archiveBackupData(backupDir, destFile string) error { - if exist := util.IsDirExist(backupDir); !exist { - return fmt.Errorf("dir %s does not exist or is not a dir", backupDir) - } - destDir := filepath.Dir(destFile) - if err := util.EnsureDirectoryExist(destDir); err != nil { - return err - } - err := archiver.Archive([]string{backupDir}, destFile) - if err != nil { - return fmt.Errorf("archive backup data %s to %s failed, err: %v", backupDir, destFile, err) - } - return nil -} - -// getBRCommitTs get backup position from the log output of BR -// It really depends on the format in BR log, -// currently, it's in the format of [BackupTS=412992336769581057] -func getBRCommitTs(out string) (string, error) { - var commitTs string - for _, lineStr := range strings.Split(out, "\n") { - if !strings.Contains(lineStr, "BackupTS=") { - continue - } - lineStrSlice := strings.Split(lineStr, " ") - for _, s := range lineStrSlice { - if !strings.Contains(s, "BackupTS=") { - continue - } - kv := strings.Split(s, "=") - if len(kv) != 2 { - return commitTs, fmt.Errorf("get pos from %s failed", lineStr) - } - cs := strings.TrimSpace(kv[1]) - commitTs = strings.TrimSuffix(cs, "]") - break - } - + return commitTs, err } - return commitTs, nil + return backupMeta.EndVersion, nil } -// constructBROptions constructs options for BR and also return the remote path -func constructBROptions(backup *v1alpha1.Backup) ([]string, string, error) { +// constructOptions constructs options for BR and also return the remote path +func constructOptions(backup *v1alpha1.Backup) ([]string, string, error) { args, path, err := util.ConstructBRGlobalOptions(backup) if err != nil { return args, path, err @@ -309,8 +117,8 @@ func constructBROptions(backup *v1alpha1.Backup) ([]string, string, error) { return args, path, nil } -// getBRBackupSize get the backup data size from remote -func getBRBackupSize(backupPath string, backup *v1alpha1.Backup) (int64, error) { +// getBackupSize get the backup data size from remote +func getBackupSize(backup *v1alpha1.Backup) (int64, error) { var size int64 s, err := util.NewRemoteStorage(backup) if err != nil { diff --git a/cmd/backup-manager/app/backup/manager.go b/cmd/backup-manager/app/backup/manager.go index 385f4f22281..78f652126be 100644 --- a/cmd/backup-manager/app/backup/manager.go +++ b/cmd/backup-manager/app/backup/manager.go @@ -14,35 +14,30 @@ package backup import ( - "database/sql" "fmt" - "strings" "time" - "github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants" - "github.com/pingcap/tidb-operator/cmd/backup-manager/app/util" "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap/v1alpha1" "github.com/pingcap/tidb-operator/pkg/controller" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" glog "k8s.io/klog" ) -// BackupManager mainly used to manage backup related work -type BackupManager struct { +// Manager mainly used to manage backup related work +type Manager struct { backupLister listers.BackupLister StatusUpdater controller.BackupConditionUpdaterInterface - BackupOpts + Options } -// NewBackupManager return a BackupManager -func NewBackupManager( +// NewManager return a Manager +func NewManager( backupLister listers.BackupLister, statusUpdater controller.BackupConditionUpdaterInterface, - backupOpts BackupOpts) *BackupManager { - return &BackupManager{ + backupOpts Options) *Manager { + return &Manager{ backupLister, statusUpdater, backupOpts, @@ -50,7 +45,7 @@ func NewBackupManager( } // ProcessBackup used to process the backup logic -func (bm *BackupManager) ProcessBackup() error { +func (bm *Manager) ProcessBackup() error { backup, err := bm.backupLister.Backups(bm.Namespace).Get(bm.BackupName) if err != nil { glog.Errorf("can't find cluster %s backup %s CRD object, err: %v", bm, bm.BackupName, err) @@ -62,40 +57,13 @@ func (bm *BackupManager) ProcessBackup() error { }) } - if backup.Spec.BR != nil { - return bm.performBRBackup(backup.DeepCopy()) + if backup.Spec.BR == nil { + return fmt.Errorf("no br config in %s", bm) } - - var db *sql.DB - err = wait.PollImmediate(constants.PollInterval, constants.CheckTimeout, func() (done bool, err error) { - db, err = util.OpenDB(bm.getDSN(constants.TidbMetaDB)) - if err != nil { - glog.Warningf("can't open connection to tidb cluster %s, err: %v", bm, err) - return false, nil - } - - if err := db.Ping(); err != nil { - glog.Warningf("can't connect to tidb cluster %s, err: %s", bm, err) - return false, nil - } - return true, nil - }) - - if err != nil { - glog.Errorf("cluster %s connect failed, err: %s", bm, err) - return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupFailed, - Status: corev1.ConditionTrue, - Reason: "ConnectTidbFailed", - Message: err.Error(), - }) - } - - defer db.Close() - return bm.performBackup(backup.DeepCopy(), db) + return bm.performBackup(backup.DeepCopy()) } -func (bm *BackupManager) performBRBackup(backup *v1alpha1.Backup) error { +func (bm *Manager) performBackup(backup *v1alpha1.Backup) error { started := time.Now() err := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ @@ -106,9 +74,9 @@ func (bm *BackupManager) performBRBackup(backup *v1alpha1.Backup) error { return err } - backupFullPath, out, err := bm.brBackupData(backup) + backupFullPath, err := bm.backupData(backup) if err != nil { - glog.Errorf("backup cluster %s data to %s failed, err: %s", bm, bm.StorageType, err) + glog.Errorf("backup cluster %s data failed, err: %s", bm, err) return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ Type: v1alpha1.BackupFailed, Status: corev1.ConditionTrue, @@ -120,7 +88,7 @@ func (bm *BackupManager) performBRBackup(backup *v1alpha1.Backup) error { // Note: The size get from remote may be incorrect because the blobs // are eventually consistent. - size, err := getBRBackupSize(backupFullPath, backup) + size, err := getBackupSize(backup) if err != nil { glog.Errorf("Get size for backup files in %s of cluster %s failed, err: %s", backupFullPath, bm, err) return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ @@ -132,7 +100,7 @@ func (bm *BackupManager) performBRBackup(backup *v1alpha1.Backup) error { } glog.Infof("Get size %d for backup files in %s of cluster %s success", size, backupFullPath, bm) - commitTs, err := getBRCommitTs(out) + commitTs, err := getCommitTs(backup) if err != nil { glog.Errorf("get cluster %s commitTs failed, err: %s", bm, err) return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ @@ -142,7 +110,7 @@ func (bm *BackupManager) performBRBackup(backup *v1alpha1.Backup) error { Message: err.Error(), }) } - glog.Infof("get cluster %s commitTs %s success", bm, commitTs) + glog.Infof("get cluster %s commitTs %d success", bm, commitTs) finish := time.Now() @@ -150,203 +118,10 @@ func (bm *BackupManager) performBRBackup(backup *v1alpha1.Backup) error { backup.Status.TimeStarted = metav1.Time{Time: started} backup.Status.TimeCompleted = metav1.Time{Time: finish} backup.Status.BackupSize = size - backup.Status.CommitTs = commitTs - - return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupComplete, - Status: corev1.ConditionTrue, - }) -} - -func (bm *BackupManager) performBackup(backup *v1alpha1.Backup, db *sql.DB) error { - started := time.Now() - - err := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupRunning, - Status: corev1.ConditionTrue, - }) - if err != nil { - return err - } - - oldTikvGCTime, err := bm.getTikvGCLifeTime(db) - if err != nil { - glog.Errorf("cluster %s get %s failed, err: %s", bm, constants.TikvGCVariable, err) - return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupFailed, - Status: corev1.ConditionTrue, - Reason: "GetTikvGCLifeTimeFailed", - Message: err.Error(), - }) - } - glog.Infof("cluster %s %s is %s", bm, constants.TikvGCVariable, oldTikvGCTime) - - oldTikvGCTimeDuration, err := time.ParseDuration(oldTikvGCTime) - if err != nil { - glog.Errorf("cluster %s parse old %s failed, err: %s", bm, constants.TikvGCVariable, err) - return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupFailed, - Status: corev1.ConditionTrue, - Reason: "ParseOldTikvGCLifeTimeFailed", - Message: err.Error(), - }) - } - tikvGCTimeDuration, err := time.ParseDuration(constants.TikvGCLifeTime) - if err != nil { - glog.Errorf("cluster %s parse default %s failed, err: %s", bm, constants.TikvGCVariable, err) - return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupFailed, - Status: corev1.ConditionTrue, - Reason: "ParseDefaultTikvGCLifeTimeFailed", - Message: err.Error(), - }) - } - if oldTikvGCTimeDuration < tikvGCTimeDuration { - err = bm.setTikvGCLifeTime(db, constants.TikvGCLifeTime) - if err != nil { - glog.Errorf("cluster %s set tikv GC life time to %s failed, err: %s", bm, constants.TikvGCLifeTime, err) - return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupFailed, - Status: corev1.ConditionTrue, - Reason: "SetTikvGCLifeTimeFailed", - Message: err.Error(), - }) - } - glog.Infof("set cluster %s %s to %s success", bm, constants.TikvGCVariable, constants.TikvGCLifeTime) - } - - backupFullPath, err := bm.dumpTidbClusterData() - if err != nil { - glog.Errorf("dump cluster %s data failed, err: %s", bm, err) - return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupFailed, - Status: corev1.ConditionTrue, - Reason: "DumpTidbClusterFailed", - Message: err.Error(), - }) - } - glog.Infof("dump cluster %s data to %s success", bm, backupFullPath) - - if oldTikvGCTimeDuration < tikvGCTimeDuration { - err = bm.setTikvGCLifeTime(db, oldTikvGCTime) - if err != nil { - glog.Errorf("cluster %s reset tikv GC life time to %s failed, err: %s", bm, oldTikvGCTime, err) - return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupFailed, - Status: corev1.ConditionTrue, - Reason: "ResetTikvGCLifeTimeFailed", - Message: err.Error(), - }) - } - glog.Infof("reset cluster %s %s to %s success", bm, constants.TikvGCVariable, oldTikvGCTime) - } - // TODO: Concurrent get file size and upload backup data to speed up processing time - archiveBackupPath := backupFullPath + constants.DefaultArchiveExtention - err = archiveBackupData(backupFullPath, archiveBackupPath) - if err != nil { - glog.Errorf("archive cluster %s backup data %s failed, err: %s", bm, archiveBackupPath, err) - return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupFailed, - Status: corev1.ConditionTrue, - Reason: "ArchiveBackupDataFailed", - Message: err.Error(), - }) - } - glog.Infof("archive cluster %s backup data %s success", bm, archiveBackupPath) - - size, err := getBackupSize(archiveBackupPath) - if err != nil { - glog.Errorf("get cluster %s archived backup file %s size %d failed, err: %s", bm, archiveBackupPath, size, err) - return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupFailed, - Status: corev1.ConditionTrue, - Reason: "GetBackupSizeFailed", - Message: err.Error(), - }) - } - glog.Infof("get cluster %s archived backup file %s size %d success", bm, archiveBackupPath, size) - - commitTs, err := getCommitTsFromMetadata(backupFullPath) - if err != nil { - glog.Errorf("get cluster %s commitTs failed, err: %s", bm, err) - return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupFailed, - Status: corev1.ConditionTrue, - Reason: "GetCommitTsFailed", - Message: err.Error(), - }) - } - glog.Infof("get cluster %s commitTs %s success", bm, commitTs) - - remotePath := strings.TrimPrefix(archiveBackupPath, constants.BackupRootPath+"/") - bucketURI := bm.getDestBucketURI(remotePath) - err = bm.backupDataToRemote(archiveBackupPath, bucketURI) - if err != nil { - glog.Errorf("backup cluster %s data to %s failed, err: %s", bm, bm.StorageType, err) - return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupFailed, - Status: corev1.ConditionTrue, - Reason: "BackupDataToRemoteFailed", - Message: err.Error(), - }) - } - glog.Infof("backup cluster %s data to %s success", bm, bm.StorageType) - - finish := time.Now() - - backup.Status.BackupPath = bucketURI - backup.Status.TimeStarted = metav1.Time{Time: started} - backup.Status.TimeCompleted = metav1.Time{Time: finish} - backup.Status.BackupSize = size - backup.Status.CommitTs = commitTs + backup.Status.CommitTs = fmt.Sprintf("%d", commitTs) return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ Type: v1alpha1.BackupComplete, Status: corev1.ConditionTrue, }) } - -// ProcessCleanBackup used to clean the specific backup -func (bm *BackupManager) ProcessCleanBackup() error { - backup, err := bm.backupLister.Backups(bm.Namespace).Get(bm.BackupName) - if err != nil { - return fmt.Errorf("can't find cluster %s backup %s CRD object, err: %v", bm, bm.BackupName, err) - } - - return bm.performCleanBackup(backup.DeepCopy()) -} - -func (bm *BackupManager) performCleanBackup(backup *v1alpha1.Backup) error { - if backup.Status.BackupPath == "" { - glog.Errorf("cluster %s backup path is empty", bm) - return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupFailed, - Status: corev1.ConditionTrue, - Reason: "BackupPathIsEmpty", - Message: fmt.Sprintf("the cluster %s backup path is empty", bm), - }) - } - - var err error - if backup.Spec.BR != nil { - err = bm.cleanBRRemoteBackupData(backup) - } else { - err = bm.cleanRemoteBackupData(backup.Status.BackupPath) - } - - if err != nil { - glog.Errorf("clean cluster %s backup %s failed, err: %s", bm, backup.Status.BackupPath, err) - return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupFailed, - Status: corev1.ConditionTrue, - Reason: "CleanBackupDataFailed", - Message: err.Error(), - }) - } - - glog.Infof("clean cluster %s backup %s success", bm, backup.Status.BackupPath) - return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupClean, - Status: corev1.ConditionTrue, - }) -} diff --git a/cmd/backup-manager/app/clean/clean.go b/cmd/backup-manager/app/clean/clean.go new file mode 100644 index 00000000000..136866b0a95 --- /dev/null +++ b/cmd/backup-manager/app/clean/clean.go @@ -0,0 +1,75 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package clean + +import ( + "context" + "fmt" + "io" + "os/exec" + + glog "k8s.io/klog" + + "github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants" + "github.com/pingcap/tidb-operator/cmd/backup-manager/app/util" + "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" +) + +// Options contains the input arguments to the backup command +type Options struct { + Namespace string + BackupName string +} + +func (bo *Options) String() string { + return fmt.Sprintf("%s/%s", bo.Namespace, bo.BackupName) +} + +// cleanBRRemoteBackupData clean the backup data from remote +func (bo *Options) cleanBRRemoteBackupData(backup *v1alpha1.Backup) error { + s, err := util.NewRemoteStorage(backup) + if err != nil { + return err + } + defer s.Close() + ctx := context.Background() + iter := s.List(nil) + for { + obj, err := iter.Next(ctx) + if err == io.EOF { + break + } + if err != nil { + return err + } + glog.Infof("Prepare to delete %s for cluster %s", obj.Key, bo) + err = s.Delete(context.Background(), obj.Key) + if err != nil { + return err + } + glog.Infof("Delete %s for cluster %s successfully", obj.Key, bo) + } + return nil +} + +func (bo *Options) cleanRemoteBackupData(bucket string) error { + destBucket := util.NormalizeBucketURI(bucket) + output, err := exec.Command("rclone", constants.RcloneConfigArg, "deletefile", destBucket).CombinedOutput() + if err != nil { + return fmt.Errorf("cluster %s, execute rclone deletefile command failed, output: %s, err: %v", bo, string(output), err) + } + + glog.Infof("cluster %s backup %s was deleted successfully", bo, bucket) + return nil +} diff --git a/cmd/backup-manager/app/clean/manager.go b/cmd/backup-manager/app/clean/manager.go new file mode 100644 index 00000000000..01380b5eec8 --- /dev/null +++ b/cmd/backup-manager/app/clean/manager.go @@ -0,0 +1,89 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package clean + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + glog "k8s.io/klog" + + "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" + listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/controller" +) + +// Manager mainly used to manage backup related work +type Manager struct { + backupLister listers.BackupLister + StatusUpdater controller.BackupConditionUpdaterInterface + Options +} + +// NewManager return a Manager +func NewManager( + backupLister listers.BackupLister, + statusUpdater controller.BackupConditionUpdaterInterface, + backupOpts Options) *Manager { + return &Manager{ + backupLister, + statusUpdater, + backupOpts, + } +} + +// ProcessCleanBackup used to clean the specific backup +func (bm *Manager) ProcessCleanBackup() error { + backup, err := bm.backupLister.Backups(bm.Namespace).Get(bm.BackupName) + if err != nil { + return fmt.Errorf("can't find cluster %s backup %s CRD object, err: %v", bm, bm.BackupName, err) + } + + return bm.performCleanBackup(backup.DeepCopy()) +} + +func (bm *Manager) performCleanBackup(backup *v1alpha1.Backup) error { + if backup.Status.BackupPath == "" { + glog.Errorf("cluster %s backup path is empty", bm) + return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupFailed, + Status: corev1.ConditionTrue, + Reason: "BackupPathIsEmpty", + Message: fmt.Sprintf("the cluster %s backup path is empty", bm), + }) + } + + var err error + if backup.Spec.BR != nil { + err = bm.cleanBRRemoteBackupData(backup) + } else { + err = bm.cleanRemoteBackupData(backup.Status.BackupPath) + } + + if err != nil { + glog.Errorf("clean cluster %s backup %s failed, err: %s", bm, backup.Status.BackupPath, err) + return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupFailed, + Status: corev1.ConditionTrue, + Reason: "CleanBackupDataFailed", + Message: err.Error(), + }) + } + + glog.Infof("clean cluster %s backup %s success", bm, backup.Status.BackupPath) + return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupClean, + Status: corev1.ConditionTrue, + }) +} diff --git a/cmd/backup-manager/app/cmd/backup.go b/cmd/backup-manager/app/cmd/backup.go index a6f4f17b026..d0e2fce1651 100644 --- a/cmd/backup-manager/app/cmd/backup.go +++ b/cmd/backup-manager/app/cmd/backup.go @@ -16,12 +16,9 @@ package cmd import ( "context" - // registry mysql drive - _ "github.com/go-sql-driver/mysql" "github.com/pingcap/tidb-operator/cmd/backup-manager/app/backup" "github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants" "github.com/pingcap/tidb-operator/cmd/backup-manager/app/util" - bkconstants "github.com/pingcap/tidb-operator/pkg/backup/constants" informers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions" "github.com/pingcap/tidb-operator/pkg/controller" "github.com/spf13/cobra" @@ -32,7 +29,7 @@ import ( // NewBackupCommand implements the backup command func NewBackupCommand() *cobra.Command { - bo := backup.BackupOpts{} + bo := backup.Options{} cmd := &cobra.Command{ Use: "backup", @@ -44,18 +41,11 @@ func NewBackupCommand() *cobra.Command { } cmd.Flags().StringVar(&bo.Namespace, "namespace", "", "Backup CR's namespace") - cmd.Flags().StringVar(&bo.Host, "host", "", "Tidb cluster access address") - cmd.Flags().Int32Var(&bo.Port, "port", bkconstants.DefaultTidbPort, "Port number to use for connecting tidb cluster") - cmd.Flags().StringVar(&bo.Bucket, "bucket", "", "Bucket in which to store the backup data") - cmd.Flags().StringVar(&bo.Password, bkconstants.TidbPasswordKey, "", "Password to use when connecting to tidb cluster") - cmd.Flags().StringVar(&bo.User, "user", "", "User for login tidb cluster") - cmd.Flags().StringVar(&bo.StorageType, "storageType", "", "Backend storage type") cmd.Flags().StringVar(&bo.BackupName, "backupName", "", "Backup CRD object name") - util.SetFlagsFromEnv(cmd.Flags(), bkconstants.BackupManagerEnvVarPrefix) return cmd } -func runBackup(backupOpts backup.BackupOpts, kubecfg string) error { +func runBackup(backupOpts backup.Options, kubecfg string) error { kubeCli, cli, err := util.NewKubeAndCRCli(kubecfg) cmdutil.CheckErr(err) options := []informers.SharedInformerOption{ @@ -74,6 +64,6 @@ func runBackup(backupOpts backup.BackupOpts, kubecfg string) error { cache.WaitForCacheSync(ctx.Done(), backupInformer.Informer().HasSynced) glog.Infof("start to process backup %s", backupOpts.String()) - bm := backup.NewBackupManager(backupInformer.Lister(), statusUpdater, backupOpts) + bm := backup.NewManager(backupInformer.Lister(), statusUpdater, backupOpts) return bm.ProcessBackup() } diff --git a/cmd/backup-manager/app/cmd/clean.go b/cmd/backup-manager/app/cmd/clean.go index a2f62cbfdc1..35095223deb 100644 --- a/cmd/backup-manager/app/cmd/clean.go +++ b/cmd/backup-manager/app/cmd/clean.go @@ -16,9 +16,7 @@ package cmd import ( "context" - // registry mysql drive - _ "github.com/go-sql-driver/mysql" - "github.com/pingcap/tidb-operator/cmd/backup-manager/app/backup" + "github.com/pingcap/tidb-operator/cmd/backup-manager/app/clean" "github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants" "github.com/pingcap/tidb-operator/cmd/backup-manager/app/util" informers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions" @@ -31,7 +29,7 @@ import ( // NewCleanCommand implements the clean command func NewCleanCommand() *cobra.Command { - bo := backup.BackupOpts{} + bo := clean.Options{} cmd := &cobra.Command{ Use: "clean", @@ -47,7 +45,7 @@ func NewCleanCommand() *cobra.Command { return cmd } -func runClean(backupOpts backup.BackupOpts, kubecfg string) error { +func runClean(backupOpts clean.Options, kubecfg string) error { kubeCli, cli, err := util.NewKubeAndCRCli(kubecfg) cmdutil.CheckErr(err) options := []informers.SharedInformerOption{ @@ -67,6 +65,6 @@ func runClean(backupOpts backup.BackupOpts, kubecfg string) error { cache.WaitForCacheSync(ctx.Done(), backupInformer.Informer().HasSynced) glog.Infof("start to clean backup %s", backupOpts.String()) - bm := backup.NewBackupManager(backupInformer.Lister(), statusUpdater, backupOpts) + bm := clean.NewManager(backupInformer.Lister(), statusUpdater, backupOpts) return bm.ProcessCleanBackup() } diff --git a/cmd/backup-manager/app/cmd/cmd.go b/cmd/backup-manager/app/cmd/cmd.go index 015ac047661..9b5644e15e0 100644 --- a/cmd/backup-manager/app/cmd/cmd.go +++ b/cmd/backup-manager/app/cmd/cmd.go @@ -30,6 +30,7 @@ func NewBackupMgrCommand() *cobra.Command { cmds.PersistentFlags().StringVarP(&kubecfg, "kubeconfig", "k", "", "Path to kubeconfig file, omit this if run in cluster.") cmds.AddCommand(NewBackupCommand()) + cmds.AddCommand(NewExportCommand()) cmds.AddCommand(NewRestoreCommand()) cmds.AddCommand(NewCleanCommand()) return cmds diff --git a/cmd/backup-manager/app/cmd/export.go b/cmd/backup-manager/app/cmd/export.go new file mode 100644 index 00000000000..437506c8d16 --- /dev/null +++ b/cmd/backup-manager/app/cmd/export.go @@ -0,0 +1,79 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "context" + + // registry mysql drive + _ "github.com/go-sql-driver/mysql" + "github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants" + "github.com/pingcap/tidb-operator/cmd/backup-manager/app/export" + "github.com/pingcap/tidb-operator/cmd/backup-manager/app/util" + bkconstants "github.com/pingcap/tidb-operator/pkg/backup/constants" + informers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions" + "github.com/pingcap/tidb-operator/pkg/controller" + "github.com/spf13/cobra" + "k8s.io/client-go/tools/cache" + glog "k8s.io/klog" + cmdutil "k8s.io/kubectl/pkg/cmd/util" +) + +// NewExportCommand implements the backup command +func NewExportCommand() *cobra.Command { + bo := export.BackupOpts{} + + cmd := &cobra.Command{ + Use: "export", + Short: "Export specific tidb cluster.", + Run: func(cmd *cobra.Command, args []string) { + util.ValidCmdFlags(cmd.CommandPath(), cmd.LocalFlags()) + cmdutil.CheckErr(runExport(bo, kubecfg)) + }, + } + + cmd.Flags().StringVar(&bo.Namespace, "namespace", "", "Backup CR's namespace") + cmd.Flags().StringVar(&bo.Host, "host", "", "Tidb cluster access address") + cmd.Flags().Int32Var(&bo.Port, "port", bkconstants.DefaultTidbPort, "Port number to use for connecting tidb cluster") + cmd.Flags().StringVar(&bo.Bucket, "bucket", "", "Bucket in which to store the backup data") + cmd.Flags().StringVar(&bo.Password, bkconstants.TidbPasswordKey, "", "Password to use when connecting to tidb cluster") + cmd.Flags().StringVar(&bo.User, "user", "", "User for login tidb cluster") + cmd.Flags().StringVar(&bo.StorageType, "storageType", "", "Backend storage type") + cmd.Flags().StringVar(&bo.BackupName, "backupName", "", "Backup CRD object name") + util.SetFlagsFromEnv(cmd.Flags(), bkconstants.BackupManagerEnvVarPrefix) + return cmd +} + +func runExport(backupOpts export.BackupOpts, kubecfg string) error { + kubeCli, cli, err := util.NewKubeAndCRCli(kubecfg) + cmdutil.CheckErr(err) + options := []informers.SharedInformerOption{ + informers.WithNamespace(backupOpts.Namespace), + } + informerFactory := informers.NewSharedInformerFactoryWithOptions(cli, constants.ResyncDuration, options...) + recorder := util.NewEventRecorder(kubeCli, "backup") + backupInformer := informerFactory.Pingcap().V1alpha1().Backups() + statusUpdater := controller.NewRealBackupConditionUpdater(cli, backupInformer.Lister(), recorder) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go informerFactory.Start(ctx.Done()) + + // waiting for the shared informer's store has synced. + cache.WaitForCacheSync(ctx.Done(), backupInformer.Informer().HasSynced) + + glog.Infof("start to process backup %s", backupOpts.String()) + bm := export.NewBackupManager(backupInformer.Lister(), statusUpdater, backupOpts) + return bm.ProcessBackup() +} diff --git a/cmd/backup-manager/app/export/export.go b/cmd/backup-manager/app/export/export.go new file mode 100644 index 00000000000..194463ef623 --- /dev/null +++ b/cmd/backup-manager/app/export/export.go @@ -0,0 +1,203 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package export + +import ( + "database/sql" + "fmt" + "io/ioutil" + "os/exec" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/mholt/archiver" + glog "k8s.io/klog" + + "github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants" + "github.com/pingcap/tidb-operator/cmd/backup-manager/app/util" +) + +// BackupOpts contains the input arguments to the backup command +type BackupOpts struct { + Namespace string + BackupName string + Bucket string + Host string + Port int32 + Password string + User string + StorageType string +} + +func (bo *BackupOpts) String() string { + return fmt.Sprintf("%s/%s", bo.Namespace, bo.BackupName) +} + +func (bo *BackupOpts) getBackupFullPath() string { + return filepath.Join(constants.BackupRootPath, bo.getBackupRelativePath()) +} + +func (bo *BackupOpts) getBackupRelativePath() string { + backupName := fmt.Sprintf("backup-%s", time.Now().UTC().Format(time.RFC3339)) + return fmt.Sprintf("%s/%s", bo.Bucket, backupName) +} + +func (bo *BackupOpts) getDestBucketURI(remotePath string) string { + return fmt.Sprintf("%s://%s", bo.StorageType, remotePath) +} + +func (bo *BackupOpts) getTikvGCLifeTime(db *sql.DB) (string, error) { + var tikvGCTime string + sql := fmt.Sprintf("select variable_value from %s where variable_name= ?", constants.TidbMetaTable) + row := db.QueryRow(sql, constants.TikvGCVariable) + err := row.Scan(&tikvGCTime) + if err != nil { + return tikvGCTime, fmt.Errorf("query cluster %s %s failed, sql: %s, err: %v", bo, constants.TikvGCVariable, sql, err) + } + return tikvGCTime, nil +} + +func (bo *BackupOpts) setTikvGCLifeTime(db *sql.DB, gcTime string) error { + sql := fmt.Sprintf("update %s set variable_value = ? where variable_name = ?", constants.TidbMetaTable) + _, err := db.Exec(sql, gcTime, constants.TikvGCVariable) + if err != nil { + return fmt.Errorf("set cluster %s %s failed, sql: %s, err: %v", bo, constants.TikvGCVariable, sql, err) + } + return nil +} + +func (bo *BackupOpts) dumpTidbClusterData() (string, error) { + bfPath := bo.getBackupFullPath() + err := util.EnsureDirectoryExist(bfPath) + if err != nil { + return "", err + } + args := []string{ + fmt.Sprintf("--outputdir=%s", bfPath), + fmt.Sprintf("--host=%s", bo.Host), + fmt.Sprintf("--port=%d", bo.Port), + fmt.Sprintf("--user=%s", bo.User), + fmt.Sprintf("--password=%s", bo.Password), + "--long-query-guard=3600", + "--tidb-force-priority=LOW_PRIORITY", + "--verbose=3", + "--regex", + "^(?!(mysql|test|INFORMATION_SCHEMA|PERFORMANCE_SCHEMA))", + } + + output, err := exec.Command("/mydumper", args...).CombinedOutput() + if err != nil { + return bfPath, fmt.Errorf("cluster %s, execute mydumper command %v failed, output: %s, err: %v", bo, args, string(output), err) + } + return bfPath, nil +} + +func (bo *BackupOpts) backupDataToRemote(source, bucketURI string) error { + destBucket := util.NormalizeBucketURI(bucketURI) + tmpDestBucket := fmt.Sprintf("%s.tmp", destBucket) + // TODO: We may need to use exec.CommandContext to control timeouts. + output, err := exec.Command("rclone", constants.RcloneConfigArg, "copyto", source, tmpDestBucket).CombinedOutput() + if err != nil { + return fmt.Errorf("cluster %s, execute rclone copyto command for upload backup data %s failed, output: %s, err: %v", bo, bucketURI, string(output), err) + } + + glog.Infof("upload cluster %s backup data to %s successfully, now move it to permanent URL %s", bo, tmpDestBucket, destBucket) + + // the backup was a success + // remove .tmp extension + output, err = exec.Command("rclone", constants.RcloneConfigArg, "moveto", tmpDestBucket, destBucket).CombinedOutput() + if err != nil { + return fmt.Errorf("cluster %s, execute rclone moveto command failed, output: %s, err: %v", bo, string(output), err) + } + return nil +} + +func (bo *BackupOpts) getDSN(db string) string { + return fmt.Sprintf("%s:%s@(%s:%d)/%s?charset=utf8", bo.User, bo.Password, bo.Host, bo.Port, db) +} + +/* + getCommitTsFromMetadata get commitTs from mydumper's metadata file + + metadata file format is as follows: + + Started dump at: 2019-06-13 10:00:04 + SHOW MASTER STATUS: + Log: tidb-binlog + Pos: 409054741514944513 + GTID: + + Finished dump at: 2019-06-13 10:00:04 +*/ +func getCommitTsFromMetadata(backupPath string) (string, error) { + var commitTs string + + metaFile := filepath.Join(backupPath, constants.MetaDataFile) + if exist := util.IsFileExist(metaFile); !exist { + return commitTs, fmt.Errorf("file %s does not exist or is not regular file", metaFile) + } + contents, err := ioutil.ReadFile(metaFile) + if err != nil { + return commitTs, fmt.Errorf("read metadata file %s failed, err: %v", metaFile, err) + } + + for _, lineStr := range strings.Split(string(contents), "\n") { + if !strings.Contains(lineStr, "Pos") { + continue + } + lineStrSlice := strings.Split(lineStr, ":") + if len(lineStrSlice) != 2 { + return commitTs, fmt.Errorf("parse mydumper's metadata file %s failed, str: %s", metaFile, lineStr) + } + commitTs = strings.TrimSpace(lineStrSlice[1]) + break + } + return commitTs, nil +} + +// getBackupSize get the backup data size +func getBackupSize(backupPath string) (int64, error) { + var size int64 + if exist := util.IsFileExist(backupPath); !exist { + return size, fmt.Errorf("file %s does not exist or is not regular file", backupPath) + } + out, err := exec.Command("rclone", constants.RcloneConfigArg, "ls", backupPath).CombinedOutput() + if err != nil { + return size, fmt.Errorf("failed to get backup %s size, err: %v", backupPath, err) + } + sizeStr := strings.Fields(string(out))[0] + size, err = strconv.ParseInt(sizeStr, 10, 64) + if err != nil { + return size, fmt.Errorf("failed to parse size string %s, err: %v", sizeStr, err) + } + return size, nil +} + +// archiveBackupData archive backup data by destFile's extension name +func archiveBackupData(backupDir, destFile string) error { + if exist := util.IsDirExist(backupDir); !exist { + return fmt.Errorf("dir %s does not exist or is not a dir", backupDir) + } + destDir := filepath.Dir(destFile) + if err := util.EnsureDirectoryExist(destDir); err != nil { + return err + } + err := archiver.Archive([]string{backupDir}, destFile) + if err != nil { + return fmt.Errorf("archive backup data %s to %s failed, err: %v", backupDir, destFile, err) + } + return nil +} diff --git a/cmd/backup-manager/app/export/manager.go b/cmd/backup-manager/app/export/manager.go new file mode 100644 index 00000000000..f4316c6f5fc --- /dev/null +++ b/cmd/backup-manager/app/export/manager.go @@ -0,0 +1,239 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package export + +import ( + "database/sql" + "strings" + "time" + + "github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants" + "github.com/pingcap/tidb-operator/cmd/backup-manager/app/util" + "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" + listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/controller" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + glog "k8s.io/klog" +) + +// BackupManager mainly used to manage backup related work +type BackupManager struct { + backupLister listers.BackupLister + StatusUpdater controller.BackupConditionUpdaterInterface + BackupOpts +} + +// NewBackupManager return a BackupManager +func NewBackupManager( + backupLister listers.BackupLister, + statusUpdater controller.BackupConditionUpdaterInterface, + backupOpts BackupOpts) *BackupManager { + return &BackupManager{ + backupLister, + statusUpdater, + backupOpts, + } +} + +// ProcessBackup used to process the backup logic +func (bm *BackupManager) ProcessBackup() error { + backup, err := bm.backupLister.Backups(bm.Namespace).Get(bm.BackupName) + if err != nil { + glog.Errorf("can't find cluster %s backup %s CRD object, err: %v", bm, bm.BackupName, err) + return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupFailed, + Status: corev1.ConditionTrue, + Reason: "GetBackupCRFailed", + Message: err.Error(), + }) + } + + var db *sql.DB + err = wait.PollImmediate(constants.PollInterval, constants.CheckTimeout, func() (done bool, err error) { + db, err = util.OpenDB(bm.getDSN(constants.TidbMetaDB)) + if err != nil { + glog.Warningf("can't open connection to tidb cluster %s, err: %v", bm, err) + return false, nil + } + + if err := db.Ping(); err != nil { + glog.Warningf("can't connect to tidb cluster %s, err: %s", bm, err) + return false, nil + } + return true, nil + }) + + if err != nil { + glog.Errorf("cluster %s connect failed, err: %s", bm, err) + return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupFailed, + Status: corev1.ConditionTrue, + Reason: "ConnectTidbFailed", + Message: err.Error(), + }) + } + + defer db.Close() + return bm.performBackup(backup.DeepCopy(), db) +} + +func (bm *BackupManager) performBackup(backup *v1alpha1.Backup, db *sql.DB) error { + started := time.Now() + + err := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupRunning, + Status: corev1.ConditionTrue, + }) + if err != nil { + return err + } + + oldTikvGCTime, err := bm.getTikvGCLifeTime(db) + if err != nil { + glog.Errorf("cluster %s get %s failed, err: %s", bm, constants.TikvGCVariable, err) + return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupFailed, + Status: corev1.ConditionTrue, + Reason: "GetTikvGCLifeTimeFailed", + Message: err.Error(), + }) + } + glog.Infof("cluster %s %s is %s", bm, constants.TikvGCVariable, oldTikvGCTime) + + oldTikvGCTimeDuration, err := time.ParseDuration(oldTikvGCTime) + if err != nil { + glog.Errorf("cluster %s parse old %s failed, err: %s", bm, constants.TikvGCVariable, err) + return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupFailed, + Status: corev1.ConditionTrue, + Reason: "ParseOldTikvGCLifeTimeFailed", + Message: err.Error(), + }) + } + tikvGCTimeDuration, err := time.ParseDuration(constants.TikvGCLifeTime) + if err != nil { + glog.Errorf("cluster %s parse default %s failed, err: %s", bm, constants.TikvGCVariable, err) + return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupFailed, + Status: corev1.ConditionTrue, + Reason: "ParseDefaultTikvGCLifeTimeFailed", + Message: err.Error(), + }) + } + if oldTikvGCTimeDuration < tikvGCTimeDuration { + err = bm.setTikvGCLifeTime(db, constants.TikvGCLifeTime) + if err != nil { + glog.Errorf("cluster %s set tikv GC life time to %s failed, err: %s", bm, constants.TikvGCLifeTime, err) + return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupFailed, + Status: corev1.ConditionTrue, + Reason: "SetTikvGCLifeTimeFailed", + Message: err.Error(), + }) + } + glog.Infof("set cluster %s %s to %s success", bm, constants.TikvGCVariable, constants.TikvGCLifeTime) + } + + backupFullPath, err := bm.dumpTidbClusterData() + if err != nil { + glog.Errorf("dump cluster %s data failed, err: %s", bm, err) + return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupFailed, + Status: corev1.ConditionTrue, + Reason: "DumpTidbClusterFailed", + Message: err.Error(), + }) + } + glog.Infof("dump cluster %s data to %s success", bm, backupFullPath) + + if oldTikvGCTimeDuration < tikvGCTimeDuration { + err = bm.setTikvGCLifeTime(db, oldTikvGCTime) + if err != nil { + glog.Errorf("cluster %s reset tikv GC life time to %s failed, err: %s", bm, oldTikvGCTime, err) + return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupFailed, + Status: corev1.ConditionTrue, + Reason: "ResetTikvGCLifeTimeFailed", + Message: err.Error(), + }) + } + glog.Infof("reset cluster %s %s to %s success", bm, constants.TikvGCVariable, oldTikvGCTime) + } + // TODO: Concurrent get file size and upload backup data to speed up processing time + archiveBackupPath := backupFullPath + constants.DefaultArchiveExtention + err = archiveBackupData(backupFullPath, archiveBackupPath) + if err != nil { + glog.Errorf("archive cluster %s backup data %s failed, err: %s", bm, archiveBackupPath, err) + return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupFailed, + Status: corev1.ConditionTrue, + Reason: "ArchiveBackupDataFailed", + Message: err.Error(), + }) + } + glog.Infof("archive cluster %s backup data %s success", bm, archiveBackupPath) + + size, err := getBackupSize(archiveBackupPath) + if err != nil { + glog.Errorf("get cluster %s archived backup file %s size %d failed, err: %s", bm, archiveBackupPath, size, err) + return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupFailed, + Status: corev1.ConditionTrue, + Reason: "GetBackupSizeFailed", + Message: err.Error(), + }) + } + glog.Infof("get cluster %s archived backup file %s size %d success", bm, archiveBackupPath, size) + + commitTs, err := getCommitTsFromMetadata(backupFullPath) + if err != nil { + glog.Errorf("get cluster %s commitTs failed, err: %s", bm, err) + return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupFailed, + Status: corev1.ConditionTrue, + Reason: "GetCommitTsFailed", + Message: err.Error(), + }) + } + glog.Infof("get cluster %s commitTs %s success", bm, commitTs) + + remotePath := strings.TrimPrefix(archiveBackupPath, constants.BackupRootPath+"/") + bucketURI := bm.getDestBucketURI(remotePath) + err = bm.backupDataToRemote(archiveBackupPath, bucketURI) + if err != nil { + glog.Errorf("backup cluster %s data to %s failed, err: %s", bm, bm.StorageType, err) + return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupFailed, + Status: corev1.ConditionTrue, + Reason: "BackupDataToRemoteFailed", + Message: err.Error(), + }) + } + glog.Infof("backup cluster %s data to %s success", bm, bm.StorageType) + + finish := time.Now() + + backup.Status.BackupPath = bucketURI + backup.Status.TimeStarted = metav1.Time{Time: started} + backup.Status.TimeCompleted = metav1.Time{Time: finish} + backup.Status.BackupSize = size + backup.Status.CommitTs = commitTs + + return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupComplete, + Status: corev1.ConditionTrue, + }) +} diff --git a/cmd/backup-manager/app/util/remote.go b/cmd/backup-manager/app/util/remote.go index f69c583d327..71e75f6857b 100644 --- a/cmd/backup-manager/app/util/remote.go +++ b/cmd/backup-manager/app/util/remote.go @@ -46,7 +46,6 @@ const ( type s3Query struct { region string endpoint string - oriEndpoint string bucket string prefix string provider string @@ -124,12 +123,13 @@ func newS3StorageOption(qs *s3Query) ([]string, string) { // newS3Storage initialize a new s3 storage func newS3Storage(qs *s3Query) (*blob.Bucket, error) { - awsConfig := aws.NewConfig().WithMaxRetries(maxRetries).WithS3ForcePathStyle(qs.forcePathStyle) + awsConfig := aws.NewConfig().WithMaxRetries(maxRetries). + WithS3ForcePathStyle(qs.forcePathStyle) if qs.region != "" { awsConfig.WithRegion(qs.region) } - if qs.oriEndpoint != "" { - awsConfig.WithEndpoint(qs.oriEndpoint) + if qs.endpoint != "" { + awsConfig.WithEndpoint(qs.endpoint) } // awsConfig.WithLogLevel(aws.LogDebugWithSigning) awsSessionOpts := session.Options{ @@ -161,13 +161,14 @@ func checkS3Config(backup *v1alpha1.Backup, fakeRegion bool) (*s3Query, error) { sqs.provider = string(backup.Spec.S3.Provider) sqs.prefix = backup.Spec.S3.Prefix sqs.endpoint = backup.Spec.S3.Endpoint - sqs.oriEndpoint = backup.Spec.S3.Endpoint sqs.sse = backup.Spec.S3.SSE sqs.acl = backup.Spec.S3.Acl sqs.storageClass = backup.Spec.S3.StorageClass sqs.forcePathStyle = true // In some cases, we need to set ForcePathStyle to false. // Refer to: https://rclone.org/s3/#s3-force-path-style + // if UseAccelerateEndpoint is supported for AWS s3 in future, + // need to set forcePathStyle = false too. if sqs.provider == "alibaba" || sqs.provider == "netease" { sqs.forcePathStyle = false } @@ -179,37 +180,3 @@ func checkS3Config(backup *v1alpha1.Backup, fakeRegion bool) (*s3Query, error) { return &sqs, nil } - -// ConstructBRGlobalOptions constructs global options for BR and also return the remote path -func ConstructBRGlobalOptions(backup *v1alpha1.Backup) ([]string, string, error) { - var args []string - config := backup.Spec.BR - if config == nil { - return nil, "", fmt.Errorf("no config for br in backup %s/%s", backup.Namespace, backup.Name) - } - args = append(args, fmt.Sprintf("--pd=%s", config.PDAddress)) - if config.CA != "" { - args = append(args, fmt.Sprintf("--ca=%s", config.CA)) - } - if config.Cert != "" { - args = append(args, fmt.Sprintf("--cert=%s", config.Cert)) - } - if config.Key != "" { - args = append(args, fmt.Sprintf("--key=%s", config.Key)) - } - // Do not set log-file, backup-manager needs to get backup - // position from the output of BR with info log-level - // if config.LogFile != "" { - // args = append(args, fmt.Sprintf("--log-file=%s", config.LogFile)) - // } - args = append(args, "--log-level=info") - if config.StatusAddr != "" { - args = append(args, fmt.Sprintf("--status-addr=%s", config.StatusAddr)) - } - s, path, err := getRemoteStorage(backup) - if err != nil { - return nil, "", err - } - args = append(args, s...) - return args, path, nil -} diff --git a/cmd/backup-manager/app/util/util.go b/cmd/backup-manager/app/util/util.go index 7fadc756bb9..aef71adc4ef 100644 --- a/cmd/backup-manager/app/util/util.go +++ b/cmd/backup-manager/app/util/util.go @@ -21,6 +21,8 @@ import ( "github.com/spf13/pflag" cmdutil "k8s.io/kubectl/pkg/cmd/util" + + "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" ) var ( @@ -108,3 +110,37 @@ func SetFlagsFromEnv(flags *pflag.FlagSet, prefix string) error { return nil } + +// ConstructBRGlobalOptions constructs global options for BR and also return the remote path +func ConstructBRGlobalOptions(backup *v1alpha1.Backup) ([]string, string, error) { + var args []string + config := backup.Spec.BR + if config == nil { + return nil, "", fmt.Errorf("no config for br in backup %s/%s", backup.Namespace, backup.Name) + } + args = append(args, fmt.Sprintf("--pd=%s", config.PDAddress)) + if config.CA != "" { + args = append(args, fmt.Sprintf("--ca=%s", config.CA)) + } + if config.Cert != "" { + args = append(args, fmt.Sprintf("--cert=%s", config.Cert)) + } + if config.Key != "" { + args = append(args, fmt.Sprintf("--key=%s", config.Key)) + } + // Do not set log-file, backup-manager needs to get backup + // position from the output of BR with info log-level + // if config.LogFile != "" { + // args = append(args, fmt.Sprintf("--log-file=%s", config.LogFile)) + // } + args = append(args, "--log-level=info") + if config.StatusAddr != "" { + args = append(args, fmt.Sprintf("--status-addr=%s", config.StatusAddr)) + } + s, path, err := getRemoteStorage(backup) + if err != nil { + return nil, "", err + } + args = append(args, s...) + return args, path, nil +} diff --git a/go.mod b/go.mod index 408d17e2654..aa7527b1374 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/go-openapi/loads v0.19.4 github.com/go-openapi/spec v0.19.3 github.com/go-sql-driver/mysql v1.4.1 - github.com/gogo/protobuf v1.3.1 // indirect + github.com/gogo/protobuf v1.3.1 github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff // indirect github.com/golang/snappy v0.0.1 // indirect github.com/google/go-cmp v0.3.1 @@ -49,7 +49,7 @@ require ( github.com/pierrec/lz4 v2.0.5+incompatible // indirect github.com/pingcap/advanced-statefulset v0.1.0 github.com/pingcap/errors v0.11.0 - github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c + github.com/pingcap/kvproto v0.0.0-20191217072959-393e6c0fd4b7 github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd github.com/pingcap/pd v2.1.17+incompatible github.com/pingcap/tidb v2.1.0-beta+incompatible diff --git a/go.sum b/go.sum index 390ed49e6d5..43c72f7d804 100644 --- a/go.sum +++ b/go.sum @@ -652,6 +652,8 @@ github.com/pingcap/errors v0.11.0 h1:DCJQB8jrHbQ1VVlMFIrbj2ApScNNotVmkSNplu2yUt4 github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c h1:pY/MQQ5UajEHfSnQS8rFAM9gw9bBKzqBl414cdfhpRQ= github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20191217072959-393e6c0fd4b7 h1:thLL2vFObG8vxBCkAmfAbLVBPfXUkBSXaVxppStCrL0= +github.com/pingcap/kvproto v0.0.0-20191217072959-393e6c0fd4b7/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= github.com/pingcap/pd v2.1.17+incompatible h1:mpfJYffRC14jeAfiq0jbHkqXVc8ZGNV0Lr2xG1sJslw= diff --git a/images/backup-manager/entrypoint.sh b/images/backup-manager/entrypoint.sh index be366714ff0..126f4e6e45d 100755 --- a/images/backup-manager/entrypoint.sh +++ b/images/backup-manager/entrypoint.sh @@ -45,6 +45,11 @@ case "$1" in echo "$BACKUP_BIN backup $@" exec $BACKUP_BIN backup "$@" ;; + export) + shift 1 + echo "$BACKUP_BIN export $@" + exec $BACKUP_BIN export "$@" + ;; restore) shift 1 echo "$BACKUP_BIN restore $@" diff --git a/pkg/backup/backup/backup_manager.go b/pkg/backup/backup/backup_manager.go index 0fb7524a925..588aa405c4e 100644 --- a/pkg/backup/backup/backup_manager.go +++ b/pkg/backup/backup/backup_manager.go @@ -90,27 +90,44 @@ func (bm *backupManager) syncBackupJob(backup *v1alpha1.Backup) error { return fmt.Errorf("backup %s/%s get job %s failed, err: %v", ns, name, backupJobName, err) } - // not found backup job, so we need to create it - job, reason, err := bm.makeBackupJob(backup) - if err != nil { - bm.statusUpdater.Update(backup, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupRetryFailed, - Status: corev1.ConditionTrue, - Reason: reason, - Message: err.Error(), - }) - return err - } - - reason, err = bm.ensureBackupPVCExist(backup) - if err != nil { - bm.statusUpdater.Update(backup, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupRetryFailed, - Status: corev1.ConditionTrue, - Reason: reason, - Message: err.Error(), - }) - return err + var job *batchv1.Job + var reason string + if backup.Spec.BR == nil { + // not found backup job, so we need to create it + job, reason, err = bm.makeExportJob(backup) + if err != nil { + bm.statusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupRetryFailed, + Status: corev1.ConditionTrue, + Reason: reason, + Message: err.Error(), + }) + return err + } + + reason, err = bm.ensureBackupPVCExist(backup) + if err != nil { + bm.statusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupRetryFailed, + Status: corev1.ConditionTrue, + Reason: reason, + Message: err.Error(), + }) + return err + } + + } else { + // not found backup job, so we need to create it + job, reason, err = bm.makeBackupJob(backup) + if err != nil { + bm.statusUpdater.Update(backup, &v1alpha1.BackupCondition{ + Type: v1alpha1.BackupRetryFailed, + Status: corev1.ConditionTrue, + Reason: reason, + Message: err.Error(), + }) + return err + } } if err := bm.jobControl.CreateJob(backup, job); err != nil { @@ -130,7 +147,7 @@ func (bm *backupManager) syncBackupJob(backup *v1alpha1.Backup) error { }) } -func (bm *backupManager) makeBackupJob(backup *v1alpha1.Backup) (*batchv1.Job, string, error) { +func (bm *backupManager) makeExportJob(backup *v1alpha1.Backup) (*batchv1.Job, string, error) { ns := backup.GetNamespace() name := backup.GetName() @@ -157,7 +174,7 @@ func (bm *backupManager) makeBackupJob(backup *v1alpha1.Backup) (*batchv1.Job, s } args := []string{ - "backup", + "export", fmt.Sprintf("--namespace=%s", ns), fmt.Sprintf("--host=%s", backup.Spec.From.Host), fmt.Sprintf("--port=%d", backup.Spec.From.Port), @@ -219,6 +236,59 @@ func (bm *backupManager) makeBackupJob(backup *v1alpha1.Backup) (*batchv1.Job, s return job, "", nil } +func (bm *backupManager) makeBackupJob(backup *v1alpha1.Backup) (*batchv1.Job, string, error) { + ns := backup.GetNamespace() + name := backup.GetName() + + envVars, reason, err := backuputil.GenerateStorageCertEnv(ns, backup.Spec.StorageProvider, bm.secretLister) + if err != nil { + return nil, reason, fmt.Errorf("backup %s/%s, %v", ns, name, err) + } + + args := []string{ + "backup", + fmt.Sprintf("--namespace=%s", ns), + fmt.Sprintf("--backupName=%s", name), + } + + backupLabel := label.NewBackup().Instance(backup.Spec.From.GetTidbEndpoint()).BackupJob().Backup(name) + + podSpec := &corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: backupLabel.Labels(), + }, + Spec: corev1.PodSpec{ + ServiceAccountName: constants.DefaultServiceAccountName, + Containers: []corev1.Container{ + { + Name: label.BackupJobLabelVal, + Image: controller.TidbBackupManagerImage, + Args: args, + ImagePullPolicy: corev1.PullAlways, + Env: envVars, + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + } + + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: backup.GetBackupJobName(), + Namespace: ns, + Labels: backupLabel, + OwnerReferences: []metav1.OwnerReference{ + controller.GetBackupOwnerRef(backup), + }, + }, + Spec: batchv1.JobSpec{ + BackoffLimit: controller.Int32Ptr(0), + Template: *podSpec, + }, + } + + return job, "", nil +} func (bm *backupManager) ensureBackupPVCExist(backup *v1alpha1.Backup) (string, error) { ns := backup.GetNamespace()