Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add commit ts to restore status #2899

Merged
merged 20 commits into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 1 addition & 33 deletions cmd/backup-manager/app/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ import (
"path"
"strings"

"github.com/gogo/protobuf/proto"
kvbackup "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
backupUtil "github.com/pingcap/tidb-operator/cmd/backup-manager/app/util"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/util"
Expand Down Expand Up @@ -108,35 +105,6 @@ func (bo *Options) backupData(backup *v1alpha1.Backup) error {
return nil
}

// getCommitTs get backup position from `EndVersion` in BR backup meta
func getCommitTs(backup *v1alpha1.Backup) (uint64, error) {
var commitTs uint64
s, err := backupUtil.NewRemoteStorage(backup)
if err != nil {
return commitTs, err
}
defer s.Close()
ctx := context.Background()
exist, err := s.Exists(ctx, constants.MetaFile)
if err != nil {
return commitTs, err
}
if !exist {
return commitTs, fmt.Errorf("%s not exist", constants.MetaFile)

}
metaData, err := s.ReadAll(ctx, constants.MetaFile)
if err != nil {
return commitTs, err
}
backupMeta := &kvbackup.BackupMeta{}
err = proto.Unmarshal(metaData, backupMeta)
if err != nil {
return commitTs, err
}
return backupMeta.EndVersion, nil
}

// constructOptions constructs options for BR and also return the remote path
func constructOptions(backup *v1alpha1.Backup) ([]string, error) {
args, err := backupUtil.ConstructBRGlobalOptionsForBackup(backup)
Expand All @@ -162,7 +130,7 @@ func constructOptions(backup *v1alpha1.Backup) ([]string, error) {
// getBackupSize get the backup data size from remote
func getBackupSize(backup *v1alpha1.Backup) (int64, error) {
var size int64
s, err := backupUtil.NewRemoteStorage(backup)
s, err := backupUtil.NewRemoteStorage(backup.Spec.StorageProvider)
if err != nil {
return size, err
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/backup-manager/app/backup/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package backup
import (
"database/sql"
"fmt"
"strconv"
"time"

"github.com/dustin/go-humanize"
Expand Down Expand Up @@ -296,7 +297,7 @@ func (bm *Manager) performBackup(backup *v1alpha1.Backup, db *sql.DB) error {
}
klog.Infof("Get size %d for backup files in %s of cluster %s success", size, backupFullPath, bm)

commitTs, err := getCommitTs(backup)
commitTs, err := util.GetCommitTsFromBRMetaData(backup.Spec.StorageProvider)
if err != nil {
errs = append(errs, err)
klog.Errorf("get cluster %s commitTs failed, err: %s", bm, err)
Expand All @@ -317,7 +318,7 @@ func (bm *Manager) performBackup(backup *v1alpha1.Backup, db *sql.DB) error {
backup.Status.TimeCompleted = metav1.Time{Time: finish}
backup.Status.BackupSize = size
backup.Status.BackupSizeReadable = humanize.Bytes(uint64(size))
backup.Status.CommitTs = fmt.Sprintf("%d", commitTs)
backup.Status.CommitTs = strconv.FormatUint(commitTs, 10)

return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupComplete,
Expand Down
2 changes: 1 addition & 1 deletion cmd/backup-manager/app/clean/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (bo *Options) String() string {

// cleanBRRemoteBackupData clean the backup data from remote
func (bo *Options) cleanBRRemoteBackupData(backup *v1alpha1.Backup) error {
s, err := util.NewRemoteStorage(backup)
s, err := util.NewRemoteStorage(backup.Spec.StorageProvider)
if err != nil {
return err
}
Expand Down
40 changes: 0 additions & 40 deletions cmd/backup-manager/app/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package export

import (
"fmt"
"io/ioutil"
"os/exec"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -102,45 +101,6 @@ func (bo *Options) backupDataToRemote(source, bucketURI string, opts []string) e
return nil
}

/*
getCommitTsFromMetadata get commitTs from dumpling's metadata file

metadata file format is as follows:

Started dump at: 2019-06-13 10:00:04
SHOW MASTER STATUS:
Log: tidb-binlog
Pos: 409054741514944513
GTID:

Finished dump at: 2019-06-13 10:00:04
*/
func getCommitTsFromMetadata(backupPath string) (string, error) {
var commitTs string

metaFile := filepath.Join(backupPath, constants.MetaDataFile)
if exist := util.IsFileExist(metaFile); !exist {
return commitTs, fmt.Errorf("file %s does not exist or is not regular file", metaFile)
}
contents, err := ioutil.ReadFile(metaFile)
if err != nil {
return commitTs, fmt.Errorf("read metadata file %s failed, err: %v", metaFile, err)
}

for _, lineStr := range strings.Split(string(contents), "\n") {
if !strings.Contains(lineStr, "Pos") {
continue
}
lineStrSlice := strings.Split(lineStr, ":")
if len(lineStrSlice) != 2 {
return commitTs, fmt.Errorf("parse dumpling's metadata file %s failed, str: %s", metaFile, lineStr)
}
commitTs = strings.TrimSpace(lineStrSlice[1])
break
}
return commitTs, nil
}

// getBackupSize get the backup data size
func getBackupSize(backupPath string, opts []string) (int64, error) {
var size int64
Expand Down
2 changes: 1 addition & 1 deletion cmd/backup-manager/app/export/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (bm *BackupManager) performBackup(backup *v1alpha1.Backup, db *sql.DB) erro
}
klog.Infof("get cluster %s archived backup file %s size %d success", bm, archiveBackupPath, size)

commitTs, err := getCommitTsFromMetadata(backupFullPath)
commitTs, err := util.GetCommitTsFromMetadata(backupFullPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please wait for #2897 which removes the temp data after the compression.

if err != nil {
errs = append(errs, err)
klog.Errorf("get cluster %s commitTs failed, err: %s", bm, err)
Expand Down
16 changes: 16 additions & 0 deletions cmd/backup-manager/app/import/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,26 @@ func (rm *RestoreManager) performRestore(restore *v1alpha1.Restore) error {
}
klog.Infof("restore cluster %s from backup %s success", rm, rm.BackupPath)

commitTs, err := util.GetCommitTsFromMetadata(unarchiveDataPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per discussion, please move this block after the uncompression code block.

if err != nil {
errs = append(errs, err)
klog.Errorf("get cluster %s commitTs failed, err: %s", rm, err)
uerr := rm.StatusUpdater.Update(restore, &v1alpha1.RestoreCondition{
Type: v1alpha1.RestoreFailed,
Status: corev1.ConditionTrue,
Reason: "GetCommitTsFailed",
Message: err.Error(),
})
errs = append(errs, uerr)
return errorutils.NewAggregate(errs)
}
klog.Infof("get cluster %s commitTs %s success", rm, commitTs)

finish := time.Now()

restore.Status.TimeStarted = metav1.Time{Time: started}
restore.Status.TimeCompleted = metav1.Time{Time: finish}
restore.Status.CommitTs = commitTs
Copy link
Contributor

@cofyc cofyc Jul 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

always update commitTs when we get it successfully (edit: ignore)

Copy link
Contributor

@cofyc cofyc Jul 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, maybe it's better not to update it when the restore fails as the commitTs indicates the commit timestamp or position of the restored backup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which means current implementation is correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes in this file LGTM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see


return rm.StatusUpdater.Update(restore, &v1alpha1.RestoreCondition{
Type: v1alpha1.RestoreComplete,
Expand Down
16 changes: 16 additions & 0 deletions cmd/backup-manager/app/restore/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package restore
import (
"database/sql"
"fmt"
"strconv"
"time"

"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
Expand Down Expand Up @@ -247,9 +248,24 @@ func (rm *Manager) performRestore(restore *v1alpha1.Restore, db *sql.DB) error {
}
klog.Infof("restore cluster %s from %s succeed", rm, restore.Spec.Type)

commitTs, err := util.GetCommitTsFromBRMetaData(restore.Spec.StorageProvider)
if err != nil {
errs = append(errs, err)
klog.Errorf("get cluster %s commitTs failed, err: %s", rm, err)
uerr := rm.StatusUpdater.Update(restore, &v1alpha1.RestoreCondition{
Type: v1alpha1.RestoreFailed,
Status: corev1.ConditionTrue,
Reason: "GetCommitTsFailed",
Message: err.Error(),
})
errs = append(errs, uerr)
return errorutils.NewAggregate(errs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest not to return here.

Copy link
Contributor

@cofyc cofyc Jul 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok. I don't think it's necessary to continue the restore operation with the backup from which we can't even get the commitTs information.

btw, should we move this before the restore operation like we do in import/manager.go file?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, then i think we can move this block before the GC time setting code block.

}

finish := time.Now()
restore.Status.TimeStarted = metav1.Time{Time: started}
restore.Status.TimeCompleted = metav1.Time{Time: finish}
restore.Status.CommitTs = strconv.FormatUint(commitTs, 10)

return rm.StatusUpdater.Update(restore, &v1alpha1.RestoreCondition{
Type: v1alpha1.RestoreComplete,
Expand Down
8 changes: 4 additions & 4 deletions cmd/backup-manager/app/util/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,18 @@ type gcsQuery struct {
}

// NewRemoteStorage creates new remote storage
func NewRemoteStorage(backup *v1alpha1.Backup) (*blob.Bucket, error) {
st := util.GetStorageType(backup.Spec.StorageProvider)
func NewRemoteStorage(provider v1alpha1.StorageProvider) (*blob.Bucket, error) {
st := util.GetStorageType(provider)
switch st {
case v1alpha1.BackupStorageTypeS3:
qs := checkS3Config(backup.Spec.S3, true)
qs := checkS3Config(provider.S3, true)
bucket, err := newS3Storage(qs)
if err != nil {
return nil, err
}
return bucket, nil
case v1alpha1.BackupStorageTypeGcs:
qs := checkGcsConfig(backup.Spec.Gcs, true)
qs := checkGcsConfig(provider.Gcs, true)
bucket, err := newGcsStorage(qs)
if err != nil {
return nil, err
Expand Down
73 changes: 73 additions & 0 deletions cmd/backup-manager/app/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@
package util

import (
"context"
"database/sql"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"

"github.com/Masterminds/semver"
"github.com/gogo/protobuf/proto"
kvbackup "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/backup/util"
Expand Down Expand Up @@ -266,6 +271,74 @@ func GetOptions(provider v1alpha1.StorageProvider) []string {
}
}

/*
GetCommitTsFromMetadata get commitTs from mydumper's metadata file

metadata file format is as follows:

Started dump at: 2019-06-13 10:00:04
SHOW MASTER STATUS:
Log: tidb-binlog
Pos: 409054741514944513
GTID:

Finished dump at: 2019-06-13 10:00:04
*/
func GetCommitTsFromMetadata(backupPath string) (string, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a unit test function for this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in b6e3120

var commitTs string

metaFile := filepath.Join(backupPath, constants.MetaDataFile)
if exist := IsFileExist(metaFile); !exist {
return commitTs, fmt.Errorf("file %s does not exist or is not regular file", metaFile)
}
contents, err := ioutil.ReadFile(metaFile)
if err != nil {
return commitTs, fmt.Errorf("read metadata file %s failed, err: %v", metaFile, err)
}

for _, lineStr := range strings.Split(string(contents), "\n") {
if !strings.Contains(lineStr, "Pos") {
continue
}
lineStrSlice := strings.Split(lineStr, ":")
if len(lineStrSlice) != 2 {
return commitTs, fmt.Errorf("parse mydumper's metadata file %s failed, str: %s", metaFile, lineStr)
}
commitTs = strings.TrimSpace(lineStrSlice[1])
break
}
return commitTs, nil
}

// GetCommitTsFromBRMetaData get backup position from `EndVersion` in BR backup meta
func GetCommitTsFromBRMetaData(provider v1alpha1.StorageProvider) (uint64, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, better to have a unit test for this function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function get TS from S3 and ts is decoded by proto.Unmarshal. Maybe later we can mock it in another PR. This PR won't add this.

var commitTs uint64
s, err := NewRemoteStorage(provider)
if err != nil {
return commitTs, err
}
defer s.Close()
ctx := context.Background()
exist, err := s.Exists(ctx, constants.MetaFile)
if err != nil {
return commitTs, err
}
if !exist {
return commitTs, fmt.Errorf("%s not exist", constants.MetaFile)

}
metaData, err := s.ReadAll(ctx, constants.MetaFile)
if err != nil {
return commitTs, err
}
backupMeta := &kvbackup.BackupMeta{}
err = proto.Unmarshal(metaData, backupMeta)
if err != nil {
return commitTs, err
}
return backupMeta.EndVersion, nil
}

// ConstructArgs constructs the rclone args
func ConstructArgs(conf string, opts []string, command, source, dest string) []string {
var args []string
Expand Down
28 changes: 28 additions & 0 deletions cmd/backup-manager/app/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@
package util

import (
"io/ioutil"
"os"
"path/filepath"
"testing"

. "github.com/onsi/gomega"
appconstant "github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/backup/constants"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -91,6 +95,30 @@ func TestConstructDumplingOptionsForBackup(t *testing.T) {
}
}

func TestGetCommitTsFromMetadata(t *testing.T) {
g := NewGomegaWithT(t)
tmpdir, err := ioutil.TempDir("", "test-get-commitTs-metadata")
g.Expect(err).To(Succeed())

defer os.RemoveAll(tmpdir)
metaDataFileName := filepath.Join(tmpdir, appconstant.MetaDataFile)
metaDataFile, err := os.Open(metaDataFileName)
g.Expect(err).To(Succeed())

_, err = metaDataFile.WriteString(`Started dump at: 2019-06-13 10:00:04
SHOW MASTER STATUS:
Log: tidb-binlog
Pos: 409054741514944513
GTID:

Finished dump at: 2019-06-13 10:00:04`)
g.Expect(err).To(Succeed())

commitTs, err := GetCommitTsFromMetadata(tmpdir)
g.Expect(err).To(Succeed())
g.Expect(commitTs).To(Equal("409054741514944513"))
}

func newBackup() *v1alpha1.Backup {
return &v1alpha1.Backup{
TypeMeta: metav1.TypeMeta{
Expand Down
11 changes: 11 additions & 0 deletions docs/api-references/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -8266,6 +8266,17 @@ Kubernetes meta/v1.Time
</tr>
<tr>
<td>
<code>commitTs</code></br>
<em>
string
</em>
</td>
<td>
<p>CommitTs is the snapshot time point of tidb cluster.</p>
</td>
</tr>
<tr>
<td>
<code>conditions</code></br>
<em>
<a href="#restorecondition">
Expand Down
Loading