-
Notifications
You must be signed in to change notification settings - Fork 500
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add commit ts to restore status #2899
Changes from 6 commits
769e3cf
4a967ae
ae25bc3
55fcd61
889aac5
6c089c1
3140409
42d2926
3d6d55f
48d0d02
a42470c
9ab33cd
139f983
c9c8c6d
b6e3120
3924503
eb958e9
6f88e7b
1ee7648
dc29e84
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -167,6 +167,21 @@ func (rm *RestoreManager) performRestore(restore *v1alpha1.Restore) error { | |
} | ||
klog.Infof("unarchive cluster %s backup %s data success", rm, restoreDataPath) | ||
|
||
commitTs, err := util.GetCommitTsFromMetadata(unarchiveDataPath) | ||
if err != nil { | ||
errs = append(errs, err) | ||
klog.Errorf("get cluster %s commitTs failed, err: %s", rm, err) | ||
uerr := rm.StatusUpdater.Update(restore, &v1alpha1.RestoreCondition{ | ||
Type: v1alpha1.RestoreFailed, | ||
Status: corev1.ConditionTrue, | ||
Reason: "GetCommitTsFailed", | ||
Message: err.Error(), | ||
}) | ||
errs = append(errs, uerr) | ||
return errorutils.NewAggregate(errs) | ||
} | ||
klog.Infof("get cluster %s commitTs %s success", rm, commitTs) | ||
|
||
DanielZhangQD marked this conversation as resolved.
Show resolved
Hide resolved
|
||
err = rm.loadTidbClusterData(unarchiveDataPath) | ||
if err != nil { | ||
errs = append(errs, err) | ||
|
@@ -186,6 +201,7 @@ func (rm *RestoreManager) performRestore(restore *v1alpha1.Restore) error { | |
|
||
restore.Status.TimeStarted = metav1.Time{Time: started} | ||
restore.Status.TimeCompleted = metav1.Time{Time: finish} | ||
restore.Status.CommitTs = commitTs | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry, maybe it's better not to update it when the restore fails as the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. which means current implementation is correct? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. changes in this file LGTM There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see |
||
|
||
return rm.StatusUpdater.Update(restore, &v1alpha1.RestoreCondition{ | ||
Type: v1alpha1.RestoreComplete, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -247,9 +247,24 @@ func (rm *Manager) performRestore(restore *v1alpha1.Restore, db *sql.DB) error { | |
} | ||
klog.Infof("restore cluster %s from %s succeed", rm, restore.Spec.Type) | ||
|
||
commitTs, err := util.GetCommitTsFromBRMetaData(restore.Spec.StorageProvider, false) | ||
if err != nil { | ||
errs = append(errs, err) | ||
klog.Errorf("get cluster %s commitTs failed, err: %s", rm, err) | ||
uerr := rm.StatusUpdater.Update(restore, &v1alpha1.RestoreCondition{ | ||
Type: v1alpha1.RestoreFailed, | ||
Status: corev1.ConditionTrue, | ||
Reason: "GetCommitTsFailed", | ||
Message: err.Error(), | ||
}) | ||
errs = append(errs, uerr) | ||
return errorutils.NewAggregate(errs) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggest not to return here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's ok. I don't think it's necessary to continue the restore operation with the backup from which we can't even get the commitTs information. btw, should we move this before the restore operation like we do in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, then i think we can move this block before the GC time setting code block. |
||
} | ||
|
||
finish := time.Now() | ||
restore.Status.TimeStarted = metav1.Time{Time: started} | ||
restore.Status.TimeCompleted = metav1.Time{Time: finish} | ||
restore.Status.CommitTs = fmt.Sprintf("%d", commitTs) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. revised. |
||
|
||
return rm.StatusUpdater.Update(restore, &v1alpha1.RestoreCondition{ | ||
Type: v1alpha1.RestoreComplete, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,18 +58,18 @@ type gcsQuery struct { | |
} | ||
|
||
// NewRemoteStorage creates new remote storage | ||
func NewRemoteStorage(backup *v1alpha1.Backup) (*blob.Bucket, error) { | ||
st := util.GetStorageType(backup.Spec.StorageProvider) | ||
func NewRemoteStorage(provider v1alpha1.StorageProvider, fakeRegion bool) (*blob.Bucket, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why add |
||
st := util.GetStorageType(provider) | ||
switch st { | ||
case v1alpha1.BackupStorageTypeS3: | ||
qs := checkS3Config(backup.Spec.S3, true) | ||
qs := checkS3Config(provider.S3, fakeRegion) | ||
bucket, err := newS3Storage(qs) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return bucket, nil | ||
case v1alpha1.BackupStorageTypeGcs: | ||
qs := checkGcsConfig(backup.Spec.Gcs, true) | ||
qs := checkGcsConfig(provider.Gcs, fakeRegion) | ||
bucket, err := newGcsStorage(qs) | ||
if err != nil { | ||
return nil, err | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,12 +14,17 @@ | |
package util | ||
|
||
import ( | ||
"context" | ||
"database/sql" | ||
"fmt" | ||
"io/ioutil" | ||
"os" | ||
"path/filepath" | ||
"strings" | ||
|
||
"github.com/Masterminds/semver" | ||
"github.com/gogo/protobuf/proto" | ||
kvbackup "github.com/pingcap/kvproto/pkg/backup" | ||
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants" | ||
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" | ||
"github.com/pingcap/tidb-operator/pkg/backup/util" | ||
|
@@ -268,6 +273,74 @@ func GetOptions(provider v1alpha1.StorageProvider) []string { | |
} | ||
} | ||
|
||
/* | ||
GetCommitTsFromMetadata get commitTs from mydumper's metadata file | ||
|
||
metadata file format is as follows: | ||
|
||
Started dump at: 2019-06-13 10:00:04 | ||
SHOW MASTER STATUS: | ||
Log: tidb-binlog | ||
Pos: 409054741514944513 | ||
GTID: | ||
|
||
Finished dump at: 2019-06-13 10:00:04 | ||
*/ | ||
func GetCommitTsFromMetadata(backupPath string) (string, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a unit test function for this function? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added in b6e3120 |
||
var commitTs string | ||
|
||
metaFile := filepath.Join(backupPath, constants.MetaDataFile) | ||
if exist := IsFileExist(metaFile); !exist { | ||
return commitTs, fmt.Errorf("file %s does not exist or is not regular file", metaFile) | ||
} | ||
contents, err := ioutil.ReadFile(metaFile) | ||
if err != nil { | ||
return commitTs, fmt.Errorf("read metadata file %s failed, err: %v", metaFile, err) | ||
} | ||
|
||
for _, lineStr := range strings.Split(string(contents), "\n") { | ||
if !strings.Contains(lineStr, "Pos") { | ||
continue | ||
} | ||
lineStrSlice := strings.Split(lineStr, ":") | ||
if len(lineStrSlice) != 2 { | ||
return commitTs, fmt.Errorf("parse mydumper's metadata file %s failed, str: %s", metaFile, lineStr) | ||
} | ||
commitTs = strings.TrimSpace(lineStrSlice[1]) | ||
break | ||
} | ||
return commitTs, nil | ||
} | ||
|
||
// GetCommitTsFromBRMetaData get backup position from `EndVersion` in BR backup meta | ||
func GetCommitTsFromBRMetaData(provider v1alpha1.StorageProvider, fakeRegion bool) (uint64, error) { | ||
var commitTs uint64 | ||
s, err := NewRemoteStorage(provider, fakeRegion) | ||
if err != nil { | ||
return commitTs, err | ||
} | ||
defer s.Close() | ||
ctx := context.Background() | ||
exist, err := s.Exists(ctx, constants.MetaFile) | ||
if err != nil { | ||
return commitTs, err | ||
} | ||
if !exist { | ||
return commitTs, fmt.Errorf("%s not exist", constants.MetaFile) | ||
|
||
} | ||
metaData, err := s.ReadAll(ctx, constants.MetaFile) | ||
if err != nil { | ||
return commitTs, err | ||
} | ||
backupMeta := &kvbackup.BackupMeta{} | ||
err = proto.Unmarshal(metaData, backupMeta) | ||
if err != nil { | ||
return commitTs, err | ||
} | ||
return backupMeta.EndVersion, nil | ||
} | ||
|
||
// ConstructArgs constructs the rclone args | ||
func ConstructArgs(conf string, opts []string, command, source, dest string) []string { | ||
var args []string | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please wait for #2897 which removes the temp data after the compression.