From 2d5e3f7e1affe7b3cb7247f99d998e4f1df6aeca Mon Sep 17 00:00:00 2001 From: Aylei Date: Thu, 22 Aug 2019 12:05:17 +0900 Subject: [PATCH 1/6] Add stability test for multiple drainers and reparo restore Signed-off-by: Aylei --- go.mod | 1 + tests/actions.go | 61 +++++--- tests/backup.go | 297 +++++++++++++++++++++++++++++++----- tests/cluster_info.go | 1 + tests/cmd/e2e/main.go | 1 + tests/cmd/stability/main.go | 52 +++++-- tests/drainer_info.go | 77 ++++++++++ tests/util.go | 56 +++++++ 8 files changed, 483 insertions(+), 63 deletions(-) create mode 100644 tests/drainer_info.go diff --git a/go.mod b/go.mod index 5f67af6417..b362f3d8fd 100644 --- a/go.mod +++ b/go.mod @@ -102,6 +102,7 @@ require ( go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.9.1 // indirect golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421 // indirect + golang.org/x/sync v0.0.0-20190423024810-112230192c58 golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 // indirect google.golang.org/genproto v0.0.0-20180731170733-daca94659cb5 // indirect google.golang.org/grpc v1.12.0 // indirect diff --git a/tests/actions.go b/tests/actions.go index 7db94ae06f..a008bd0fd7 100644 --- a/tests/actions.go +++ b/tests/actions.go @@ -106,6 +106,7 @@ const ( operartorChartName = "tidb-operator" tidbClusterChartName = "tidb-cluster" backupChartName = "tidb-backup" + drainerChartName = "tidb-drainer" statbilityTestTag = "stability" ) @@ -140,8 +141,13 @@ type OperatorActions interface { CheckScheduledBackup(info *TidbClusterConfig) error DeployIncrementalBackup(from *TidbClusterConfig, to *TidbClusterConfig, withDrainer bool, ts string) error CheckIncrementalBackup(info *TidbClusterConfig, withDrainer bool) error + DeployDrainer(info *DrainerConfig, from *TidbClusterConfig) error + DeployDrainerOrDie(info *DrainerConfig, from *TidbClusterConfig) + CheckDrainer(info *DrainerConfig, source *TidbClusterConfig) error Restore(from *TidbClusterConfig, to *TidbClusterConfig) error CheckRestore(from *TidbClusterConfig, to *TidbClusterConfig) error + RestoreIncrementalFiles(from *DrainerConfig, to *TidbClusterConfig) error + CheckRestoreIncrementalFiles(from *DrainerConfig, to *TidbClusterConfig) error ForceDeploy(info *TidbClusterConfig) error CreateSecret(info *TidbClusterConfig) error GetPodUIDMap(info *TidbClusterConfig) (map[string]types.UID, error) @@ -174,6 +180,8 @@ type OperatorActions interface { EmitEvent(info *TidbClusterConfig, msg string) BackupRestore(from, to *TidbClusterConfig) error BackupRestoreOrDie(from, to *TidbClusterConfig) + BackupAndRestoreToMultipleClusters(source *TidbClusterConfig, targets []BackupTarget) error + BackupAndRestoreToMultipleClustersOrDie(source *TidbClusterConfig, targets []BackupTarget) LabelNodes() error LabelNodesOrDie() CheckDisasterTolerance(info *TidbClusterConfig) error @@ -250,6 +258,7 @@ type TidbClusterConfig struct { InitSecretName string BackupSecretName string EnableConfigMapRollout bool + ClusterVersion string PDPreStartScript string TiDBPreStartScript string @@ -573,7 +582,7 @@ func (oa *operatorActions) CleanTidbCluster(info *TidbClusterConfig) error { } for _, chartName := range charts { res, err := exec.Command("helm", "del", "--purge", chartName).CombinedOutput() - if err != nil && releaseIsNotFound(err) { + if err != nil && !releaseIsNotFound(err) { return fmt.Errorf("failed to delete chart: %s/%s, %v, %s", info.Namespace, chartName, err, string(res)) } @@ -862,6 +871,10 @@ func (oa *operatorActions) backupChartPath(tag string) string { return oa.chartPath(backupChartName, tag) } +func (oa *operatorActions) drainerChartPath(tag string) string { + return oa.chartPath(drainerChartName, tag) +} + func (oa *operatorActions) ScaleTidbCluster(info *TidbClusterConfig) error { oa.EmitEvent(info, fmt.Sprintf("ScaleTidbCluster to pd: %s, tikv: %s, tidb: %s", info.Args["pd.replicas"], info.Args["tikv.replicas"], info.Args["tidb.replicas"])) @@ -1766,11 +1779,12 @@ func (oa *operatorActions) checkoutTag(tagName string) error { cmd := fmt.Sprintf("cd %s && git stash -u && git checkout %s && "+ "mkdir -p %s && cp -rf charts/tidb-operator %s && "+ "cp -rf charts/tidb-cluster %s && cp -rf charts/tidb-backup %s &&"+ - "cp -rf manifests %s", + "cp -rf manifests %s &&"+ + "cp -rf charts/tidb-drainer %s", oa.cfg.OperatorRepoDir, tagName, filepath.Join(oa.cfg.ChartDir, tagName), oa.operatorChartPath(tagName), oa.tidbClusterChartPath(tagName), oa.backupChartPath(tagName), - oa.manifestPath(tagName)) + oa.manifestPath(tagName), oa.drainerChartPath(tagName)) glog.Info(cmd) res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput() if err != nil { @@ -1957,6 +1971,18 @@ func (oa *operatorActions) CheckRestore(from *TidbClusterConfig, to *TidbCluster return nil } +func (oa *operatorActions) CleanRestoreJob(from *TidbClusterConfig) error { + + releaseName := fmt.Sprintf("%s-restore", from.ClusterName) + res, err := exec.Command("helm", "del", "--purge", releaseName).CombinedOutput() + if err != nil && !releaseIsNotFound(err) { + return fmt.Errorf("failed to delete release: %s/%s, %v, %s", + from.Namespace, releaseName, err, string(res)) + } + + return nil +} + func (oa *operatorActions) ForceDeploy(info *TidbClusterConfig) error { if err := oa.CleanTidbCluster(info); err != nil { return err @@ -2296,26 +2322,25 @@ func (oa *operatorActions) DeployIncrementalBackup(from *TidbClusterConfig, to * } if withDrainer { sets["binlog.drainer.create"] = "true" + from.drainerConfig = []string{ + "worker-count = 16", + "detect-interval = 10", + "disable-dispatch = false", + `ignore-schemas = ""`, + `safe-mode = false`, + `txn-batch = 20`, + `db-type = "mysql"`, + `[syncer.to]`, + fmt.Sprintf(`host = "%s-tidb.%s"`, to.ClusterName, to.Namespace), + fmt.Sprintf(`user = "%s"`, "root"), + fmt.Sprintf(`password = "%s"`, to.Password), + fmt.Sprintf(`port = %d`, 4000), + } } if ts != "" { sets["binlog.drainer.initialCommitTs"] = ts } - from.drainerConfig = []string{ - "worker-count = 16", - "detect-interval = 10", - "disable-dispatch = false", - `ignore-schemas = ""`, - `safe-mode = false`, - `txn-batch = 20`, - `db-type = "mysql"`, - `[syncer.to]`, - fmt.Sprintf(`host = "%s-tidb.%s"`, to.ClusterName, to.Namespace), - fmt.Sprintf(`user = "%s"`, "root"), - fmt.Sprintf(`password = "%s"`, to.Password), - fmt.Sprintf(`port = %d`, 4000), - } - cmd, err := oa.getHelmUpgradeClusterCmd(from, sets) if err != nil { return err diff --git a/tests/backup.go b/tests/backup.go index c4bb07e169..5f5a894866 100644 --- a/tests/backup.go +++ b/tests/backup.go @@ -1,73 +1,215 @@ package tests import ( + "bytes" + "fmt" + "os/exec" + "strings" + "text/template" "time" "github.com/golang/glog" + "github.com/pingcap/tidb-operator/pkg/tkctl/util" "github.com/pingcap/tidb-operator/tests/slack" + "golang.org/x/sync/errgroup" + "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" ) -func (oa *operatorActions) BackupRestore(from, to *TidbClusterConfig) error { - var ts string - err := oa.DeployIncrementalBackup(from, to, false, ts) - if err != nil { - return err +const ( + DrainerReplicas int32 = 1 + RunReparoCommandTemplate = `kubectl exec -it {{ .PodName }} -- bash -c \ +"wget http://download.pingcap.org/tidb-binlog-cluster-latest-linux-amd64.tar.gz && \ +tar -xzf tidb-binlog-cluster-latest-linux-amd64.tar.gz && \ +cd tidb-binlog-cluster-latest-linux-amd64/bin && \ +echo '{{ .ReparoConfig }}' > reparo.toml && \ +./reparo -config reparo.toml" +` +) + +type BackupTarget struct { + IncrementalType DbType + TargetCluster *TidbClusterConfig + IsAdditional bool +} + +func (t *BackupTarget) GetDrainerConfig(source *TidbClusterConfig, ts string) *DrainerConfig { + drainerConfig := &DrainerConfig{ + DrainerName: fmt.Sprintf("to-%s-%s", t.TargetCluster.ClusterName, t.IncrementalType), + InitialCommitTs: ts, + OperatorTag: source.OperatorTag, + SourceClusterName: source.ClusterName, + Namespace: source.Namespace, + DbType: t.IncrementalType, } + if t.IncrementalType == DbTypeMySQL || t.IncrementalType == DbTypeTiDB { + drainerConfig.Host = fmt.Sprintf("%s.%s.svc.cluster.local", + t.TargetCluster.ClusterName, t.TargetCluster.Namespace) + drainerConfig.Port = "4000" + } + return drainerConfig +} - err = oa.CheckIncrementalBackup(from, false) +func (oa *operatorActions) BackupAndRestoreToMultipleClusters(source *TidbClusterConfig, targets []BackupTarget) error { + err := oa.DeployAndCheckPump(source) if err != nil { return err } - err = oa.DeployAdHocBackup(from) + err = oa.DeployAdHocBackup(source) if err != nil { - glog.Errorf("cluster:[%s] deploy happen error: %v", from.ClusterName, err) + glog.Errorf("cluster:[%s] deploy happen error: %v", source.ClusterName, err) return err } - ts, err = oa.CheckAdHocBackup(from) + ts, err := oa.CheckAdHocBackup(source) if err != nil { - glog.Errorf("cluster:[%s] deploy happen error: %v", from.ClusterName, err) + glog.Errorf("cluster:[%s] deploy happen error: %v", source.ClusterName, err) return err } - err = oa.CheckTidbClusterStatus(to) - if err != nil { - glog.Errorf("cluster:[%s] deploy faild error: %v", to.ClusterName, err) + // Restore can only be done serially due to name collision + for i := range targets { + err = oa.CheckTidbClusterStatus(targets[i].TargetCluster) + if err != nil { + glog.Errorf("cluster:[%s] deploy faild error: %v", targets[i].TargetCluster.ClusterName, err) + return err + } + + err = oa.Restore(source, targets[i].TargetCluster) + if err != nil { + glog.Errorf("from cluster:[%s] to cluster [%s] restore happen error: %v", + source.ClusterName, targets[i].TargetCluster.ClusterName, err) + return err + } + + err = oa.CleanRestoreJob(source) + if err != nil && !releaseIsNotFound(err) { + glog.Errorf("clean the from cluster:[%s] to cluster [%s] restore job happen error: %v", + source.ClusterName, targets[i].TargetCluster.ClusterName, err) + } + } + + prepareIncremental := func(source *TidbClusterConfig, target BackupTarget) error { + err = oa.CheckRestore(source, target.TargetCluster) + if err != nil { + glog.Errorf("from cluster:[%s] to cluster [%s] restore failed error: %v", + source.ClusterName, target.TargetCluster.ClusterName, err) + return err + } + + if target.IsAdditional { + // Deploy an additional drainer release + drainerConfig := target.GetDrainerConfig(source, ts) + if err := oa.DeployDrainer(drainerConfig, source); err != nil { + return nil + } + if err := oa.CheckDrainer(drainerConfig, source); err != nil { + return nil + } + } else { + // Enable drainer of the source TiDB cluster release + if err := oa.DeployAndCheckIncrementalBackup(source, target.TargetCluster, ts); err != nil { + return err + } + } + return nil + } + + checkIncremental := func(source *TidbClusterConfig, target BackupTarget) error { + if target.IncrementalType == DbTypeFile { + if err := oa.RestoreIncrementalFiles(target.GetDrainerConfig(source, ts), target.TargetCluster); err != nil { + return err + } + } + + if err := oa.CheckDataConsistency(source, target.TargetCluster); err != nil { + return err + } + return nil + } + + var eg errgroup.Group + for i := range targets { + eg.Go(func() error { + return prepareIncremental(source, targets[i]) + }) + } + if err := eg.Wait(); err != nil { return err } - err = oa.Restore(from, to) - if err != nil { - glog.Errorf("from cluster:[%s] to cluster [%s] restore happen error: %v", - from.ClusterName, to.ClusterName, err) + glog.Infof("waiting 1 minutes to insert into more records") + time.Sleep(1 * time.Minute) + + glog.Infof("cluster[%s] stop insert data", source.ClusterName) + oa.StopInsertDataTo(source) + + for i := range targets { + eg.Go(func() error { + return checkIncremental(source, targets[i]) + }) + } + if err := eg.Wait(); err != nil { return err } - err = oa.CheckRestore(from, to) + go oa.BeginInsertDataToOrDie(source) + err = oa.DeployScheduledBackup(source) if err != nil { - glog.Errorf("from cluster:[%s] to cluster [%s] restore failed error: %v", - from.ClusterName, to.ClusterName, err) + glog.Errorf("cluster:[%s] scheduler happen error: %v", source.ClusterName, err) return err } - err = oa.DeployIncrementalBackup(from, to, true, ts) - if err != nil { + return oa.CheckScheduledBackup(source) +} + +func (oa *operatorActions) BackupAndRestoreToMultipleClustersOrDie(source *TidbClusterConfig, targets []BackupTarget) { + if err := oa.BackupAndRestoreToMultipleClusters(source, targets); err != nil { + slack.NotifyAndPanic(err) + } +} + +func (oa *operatorActions) BackupRestore(from, to *TidbClusterConfig) error { + + return oa.BackupAndRestoreToMultipleClusters(from, []BackupTarget{ + { + TargetCluster: to, + IncrementalType: DbTypeTiDB, + IsAdditional: false, + }, + }) +} + +func (oa *operatorActions) BackupRestoreOrDie(from, to *TidbClusterConfig) { + if err := oa.BackupRestore(from, to); err != nil { + slack.NotifyAndPanic(err) + } +} + +func (oa *operatorActions) DeployAndCheckPump(tc *TidbClusterConfig) error { + if err := oa.DeployIncrementalBackup(tc, nil, false, ""); err != nil { return err } - err = oa.CheckIncrementalBackup(from, true) - if err != nil { + if err := oa.CheckIncrementalBackup(tc, false); err != nil { return err } + return nil +} - glog.Infof("waiting 1 minutes to insert into more records") - time.Sleep(1 * time.Minute) +func (oa *operatorActions) DeployAndCheckIncrementalBackup(from, to *TidbClusterConfig, ts string) error { + if err := oa.DeployIncrementalBackup(from, to, true, ts); err != nil { + return err + } - glog.Infof("cluster[%s] stop insert data", from.ClusterName) - oa.StopInsertDataTo(from) + if err := oa.CheckIncrementalBackup(from, true); err != nil { + return err + } + return nil +} +func (oa *operatorActions) CheckDataConsistency(from, to *TidbClusterConfig) error { fn := func() (bool, error) { b, err := to.DataIsTheSameAs(from) if err != nil { @@ -83,19 +225,106 @@ func (oa *operatorActions) BackupRestore(from, to *TidbClusterConfig) error { if err := wait.Poll(DefaultPollInterval, 30*time.Minute, fn); err != nil { return err } + return nil +} + +func (oa *operatorActions) DeployDrainer(info *DrainerConfig, source *TidbClusterConfig) error { + oa.EmitEvent(source, "DeployDrainer") + glog.Infof("begin to deploy drainer [%s] namespace[%s], source cluster [%s]", info.DrainerName, + source.Namespace, source.ClusterName) - go oa.BeginInsertDataToOrDie(from) - err = oa.DeployScheduledBackup(from) + valuesPath, err := info.BuildSubValues(oa.drainerChartPath(source.OperatorTag)) if err != nil { - glog.Errorf("cluster:[%s] scheduler happen error: %v", from.ClusterName, err) return err } - return oa.CheckScheduledBackup(from) + cmd := fmt.Sprintf("helm install %s --name %s --namespace %s --set-string %s -f %s", + oa.drainerChartPath(source.OperatorTag), info.DrainerName, source.Namespace, info.DrainerHelmString(nil, source), valuesPath) + glog.Info(cmd) + + if res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput(); err != nil { + return fmt.Errorf("failed to deploy drainer [%s/%s], %v, %s", + source.Namespace, info.DrainerName, err, string(res)) + } + + return nil } -func (oa *operatorActions) BackupRestoreOrDie(from, to *TidbClusterConfig) { - if err := oa.BackupRestore(from, to); err != nil { +func (oa *operatorActions) DeployDrainerOrDie(info *DrainerConfig, source *TidbClusterConfig) { + if err := oa.DeployDrainer(info, source); err != nil { slack.NotifyAndPanic(err) } } + +func (oa *operatorActions) CheckDrainer(info *DrainerConfig, source *TidbClusterConfig) error { + glog.Infof("checking drainer [%s/%s]", info.DrainerName, source.Namespace) + + ns := source.Namespace + stsName := fmt.Sprintf("%s-%s-drainer", source.ClusterName, info.DrainerName) + fn := func() (bool, error) { + sts, err := oa.kubeCli.AppsV1().StatefulSets(source.Namespace).Get(stsName, v1.GetOptions{}) + if err != nil { + glog.Errorf("failed to get drainer StatefulSet %s ,%v", sts, err) + return false, nil + } + if *sts.Spec.Replicas != DrainerReplicas { + glog.Infof("StatefulSet: %s/%s .spec.Replicas(%d) != %d", + ns, sts.Name, *sts.Spec.Replicas, DrainerReplicas) + return false, nil + } + if sts.Status.ReadyReplicas != DrainerReplicas { + glog.Infof("StatefulSet: %s/%s .state.ReadyReplicas(%d) != %d", + ns, sts.Name, sts.Status.ReadyReplicas, DrainerReplicas) + } + return true, nil + } + + err := wait.Poll(DefaultPollInterval, DefaultPollTimeout, fn) + if err != nil { + return fmt.Errorf("failed to install drainer [%s/%s], %v", source.Namespace, info.DrainerName) + } + + return nil +} + +func (oa *operatorActions) RestoreIncrementalFiles(from *DrainerConfig, to *TidbClusterConfig) error { + glog.Infof("restoring incremental data from drainer [%s/%s] to TiDB cluster [%s/%s]", + from.Namespace, from.DrainerName, to.Namespace, to.ClusterName) + + // TODO: better incremental files restore solution + reparoConfig := strings.Join([]string{ + `data-dir = "/data/pb"`, + `log-level = "info"`, + `dest-type = "mysql"`, + `[dest-db]`, + fmt.Sprintf(`host = "%s"`, util.GetTidbServiceName(to.ClusterName)), + "port = 4000", + `user = "root"`, + `password = ""`, + }, "\n") + + temp, err := template.New("reparo-command").Parse(RunReparoCommandTemplate) + if err != nil { + return err + } + buff := new(bytes.Buffer) + if err := temp.Execute(buff, &struct { + ReparoConfig string + PodName string + }{ + ReparoConfig: reparoConfig, + PodName: fmt.Sprintf("%s-%s-drainer-0", from.SourceClusterName, from.DrainerName), + }); err != nil { + return err + } + + if res, err := exec.Command("/bin/sh", "-c", buff.String()).CombinedOutput(); err != nil { + return fmt.Errorf("failed to restore incremental files from dainer [%s/%s] to TiDB cluster [%s/%s], %v, %s", + from.Namespace, from.DrainerName, to.Namespace, to.ClusterName, err, res) + } + return nil +} + +func (oa *operatorActions) CheckRestoreIncrementalFiles(from *DrainerConfig, to *TidbClusterConfig) error { + return nil +} diff --git a/tests/cluster_info.go b/tests/cluster_info.go index 853e3c14c1..140798ff45 100644 --- a/tests/cluster_info.go +++ b/tests/cluster_info.go @@ -49,6 +49,7 @@ func (tc *TidbClusterConfig) UpgradeTiDB(image string) *TidbClusterConfig { } func (tc *TidbClusterConfig) UpgradeAll(tag string) *TidbClusterConfig { + tc.ClusterVersion = tag return tc. UpgradePD("pingcap/pd:" + tag). UpgradeTiKV("pingcap/tikv:" + tag). diff --git a/tests/cmd/e2e/main.go b/tests/cmd/e2e/main.go index f0db2a828c..187e54f272 100644 --- a/tests/cmd/e2e/main.go +++ b/tests/cmd/e2e/main.go @@ -229,5 +229,6 @@ func newTidbClusterConfig(ns, clusterName, password string) *tests.TidbClusterCo }, TopologyKey: topologyKey, EnableConfigMapRollout: true, + ClusterVersion: tidbVersion, } } diff --git a/tests/cmd/stability/main.go b/tests/cmd/stability/main.go index 7cca68beee..d158a52807 100644 --- a/tests/cmd/stability/main.go +++ b/tests/cmd/stability/main.go @@ -75,8 +75,10 @@ func run() { cluster2 := newTidbClusterConfig("ns2", "cluster2") cluster3 := newTidbClusterConfig("ns2", "cluster3") - restoreCluster1 := newTidbClusterConfig("ns1", "restore1") - restoreCluster2 := newTidbClusterConfig("ns2", "restore2") + directRestoreCluster1 := newTidbClusterConfig("ns1", "restore1") + fileRestoreCluster1 := newTidbClusterConfig("ns1", "file-restore1") + directRestoreCluster2 := newTidbClusterConfig("ns2", "restore2") + fileRestoreCluster2 := newTidbClusterConfig("ns2", "file-restore2") onePDCluster1 := newTidbClusterConfig("ns1", "one-pd-cluster-1") onePDCluster2 := newTidbClusterConfig("ns2", "one-pd-cluster-2") @@ -87,8 +89,10 @@ func run() { cluster1, cluster2, cluster3, - restoreCluster1, - restoreCluster2, + directRestoreCluster1, + fileRestoreCluster1, + directRestoreCluster2, + fileRestoreCluster2, onePDCluster1, onePDCluster2, } @@ -118,7 +122,7 @@ func run() { oa.CleanTidbClusterOrDie(cluster) } - caseFn := func(clusters []*tests.TidbClusterConfig, onePDClsuter *tests.TidbClusterConfig, restoreCluster *tests.TidbClusterConfig, upgradeVersion string) { + caseFn := func(clusters []*tests.TidbClusterConfig, onePDClsuter *tests.TidbClusterConfig, backupTargets []tests.BackupTarget, upgradeVersion string) { // check env fta.CheckAndRecoverEnvOrDie() oa.CheckK8sAvailableOrDie(nil, nil) @@ -206,10 +210,12 @@ func run() { } // backup and restore - oa.DeployTidbClusterOrDie(restoreCluster) - addDeployedClusterFn(restoreCluster) - oa.CheckTidbClusterStatusOrDie(restoreCluster) - oa.BackupRestoreOrDie(clusters[0], restoreCluster) + for i := range backupTargets { + oa.DeployTidbClusterOrDie(backupTargets[i].TargetCluster) + addDeployedClusterFn(backupTargets[i].TargetCluster) + oa.CheckTidbClusterStatusOrDie(backupTargets[i].TargetCluster) + } + oa.BackupAndRestoreToMultipleClustersOrDie(clusters[0], backupTargets) // delete operator oa.CleanOperatorOrDie(ocfg) @@ -289,7 +295,19 @@ func run() { cluster1, cluster2, } - caseFn(preUpgrade, onePDCluster1, restoreCluster1, upgradeVersions[0]) + backupTargets := []tests.BackupTarget{ + { + TargetCluster: directRestoreCluster1, + IsAdditional: false, + IncrementalType: tests.DbTypeTiDB, + }, + { + TargetCluster: fileRestoreCluster1, + IsAdditional: false, + IncrementalType: tests.DbTypeFile, + }, + } + caseFn(preUpgrade, onePDCluster1, backupTargets, upgradeVersions[0]) // after operator upgrade if cfg.UpgradeOperatorImage != "" && cfg.UpgradeOperatorTag != "" { @@ -306,8 +324,20 @@ func run() { if len(upgradeVersions) == 2 { v = upgradeVersions[1] } + postUpgradeBackupTargets := []tests.BackupTarget{ + { + TargetCluster: directRestoreCluster2, + IsAdditional: false, + IncrementalType: tests.DbTypeTiDB, + }, + { + TargetCluster: fileRestoreCluster2, + IsAdditional: false, + IncrementalType: tests.DbTypeFile, + }, + } // caseFn(postUpgrade, restoreCluster2, tidbUpgradeVersion) - caseFn(postUpgrade, onePDCluster2, restoreCluster2, v) + caseFn(postUpgrade, onePDCluster2, postUpgradeBackupTargets, v) } for _, cluster := range allClusters { diff --git a/tests/drainer_info.go b/tests/drainer_info.go new file mode 100644 index 0000000000..c71d5d0443 --- /dev/null +++ b/tests/drainer_info.go @@ -0,0 +1,77 @@ +// Copyright 2019. PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tests + +import ( + "fmt" + "io/ioutil" + "strings" + + "github.com/golang/glog" +) + +type DbType string + +const ( + DbTypeMySQL DbType = "mysql" + DbTypeFile DbType = "file" + DbTypeTiDB DbType = "tidb" + // The following downstream types are not supported in e2e & stability tests yet + // DbTypeKafka DbType = "kafka" + // DbTypeFlash DbType = "flash" +) + +type DrainerConfig struct { + DrainerName string + InitialCommitTs string + OperatorTag string + SourceClusterName string + Namespace string + + DbType DbType + Host string + User string + Password string + // use string type in case of empty port (db-type=file) + Port string +} + +func (d *DrainerConfig) DrainerHelmString(m map[string]string, source *TidbClusterConfig) string { + + set := map[string]string{ + "clusterName": source.ClusterName, + "clusterVersion": source.ClusterVersion, + } + + for k, v := range m { + set[k] = v + } + + arr := make([]string, 0, len(set)) + for k, v := range set { + arr = append(arr, fmt.Sprintf("%s=%s", k, v)) + } + return strings.Join(arr, ",") +} + +func (d *DrainerConfig) BuildSubValues(dir string) (string, error) { + + values := GetDrainerSubValuesOrDie(d) + path := fmt.Sprintf("%s/%s.yaml", dir, d.DrainerName) + if err := ioutil.WriteFile(path, []byte(values), 0644); err != nil { + return "", err + } + glog.Infof("Values of drainer %s:\n %s", d.DrainerName, values) + return path, nil +} diff --git a/tests/util.go b/tests/util.go index 91356e0329..fc800ee3bd 100644 --- a/tests/util.go +++ b/tests/util.go @@ -145,6 +145,32 @@ var binlogTemp string = `binlog: {{end}}{{end}} ` +var drainerConfigCommon string = `config: | + detect-interval = 10 + compressor = "" + [syncer] + worker-count = 16 + disable-dispatch = false + ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql" + safe-mode = false + txn-batch = 20 +` + +var fileDrainerConfigTemp string = drainerConfigCommon + ` + db-type = "file" + [syncer.to] + dir = "/data/pb" +` + +var sqlDrainerConfigTemp string = drainerConfigCommon + ` + db-type = "{{ .DbType }}" + [syncer.to] + host = {{ .Host }} + user = {{ .User }} + password = {{ .Password }} + port = {{ .Port }} +` + type AffinityInfo struct { ClusterName string Kind string @@ -201,6 +227,36 @@ func GetSubValuesOrDie(clusterName, namespace, topologyKey string, pdConfig []st return subValues } +func GetDrainerSubValuesOrDie(info *DrainerConfig) string { + if info == nil { + slack.NotifyAndPanic(fmt.Errorf("Cannot get drainer sub values, the drainer config is nil")) + } + buff := new(bytes.Buffer) + switch info.DbType { + case DbTypeFile: + temp, err := template.New("file-drainer").Parse(fileDrainerConfigTemp) + if err != nil { + slack.NotifyAndPanic(err) + } + if err := temp.Execute(buff, &info); err != nil { + slack.NotifyAndPanic(err) + } + case DbTypeTiDB: + fallthrough + case DbTypeMySQL: + temp, err := template.New("sql-drainer").Parse(sqlDrainerConfigTemp) + if err != nil { + slack.NotifyAndPanic(err) + } + if err := temp.Execute(buff, &info); err != nil { + slack.NotifyAndPanic(err) + } + default: + slack.NotifyAndPanic(fmt.Errorf("db-type %s has not been suppored yet", info.DbType)) + } + return buff.String() +} + const ( PodPollInterval = 2 * time.Second // PodTimeout is how long to wait for the pod to be started or From 68f148d0af10729116961003c89a98f925c51778 Mon Sep 17 00:00:00 2001 From: Aylei Date: Mon, 26 Aug 2019 16:09:27 +0900 Subject: [PATCH 2/6] Run file restoring and binlog slave in parrelel Signed-off-by: Aylei --- charts/tidb-drainer/values.yaml | 2 +- tests/actions.go | 56 ++++++----- tests/backup.go | 122 +++++++++++++---------- tests/cmd/stability/main.go | 6 +- tests/config.go | 7 +- tests/manifests/stability/stability.yaml | 6 +- tests/pkg/util/db.go | 41 ++++++++ tests/util.go | 10 +- 8 files changed, 164 insertions(+), 86 deletions(-) diff --git a/charts/tidb-drainer/values.yaml b/charts/tidb-drainer/values.yaml index e327c0f750..354245785d 100644 --- a/charts/tidb-drainer/values.yaml +++ b/charts/tidb-drainer/values.yaml @@ -49,4 +49,4 @@ nodeSelector: {} tolerations: [] -affinity: {} \ No newline at end of file +affinity: {} diff --git a/tests/actions.go b/tests/actions.go index a008bd0fd7..09ddb108f8 100644 --- a/tests/actions.go +++ b/tests/actions.go @@ -146,8 +146,7 @@ type OperatorActions interface { CheckDrainer(info *DrainerConfig, source *TidbClusterConfig) error Restore(from *TidbClusterConfig, to *TidbClusterConfig) error CheckRestore(from *TidbClusterConfig, to *TidbClusterConfig) error - RestoreIncrementalFiles(from *DrainerConfig, to *TidbClusterConfig) error - CheckRestoreIncrementalFiles(from *DrainerConfig, to *TidbClusterConfig) error + RestoreIncrementalFiles(from *DrainerConfig, to *TidbClusterConfig, stopTSO int64) error ForceDeploy(info *TidbClusterConfig) error CreateSecret(info *TidbClusterConfig) error GetPodUIDMap(info *TidbClusterConfig) (map[string]types.UID, error) @@ -579,10 +578,11 @@ func (oa *operatorActions) CleanTidbCluster(info *TidbClusterConfig) error { fmt.Sprintf("%s-backup", info.ClusterName), fmt.Sprintf("%s-restore", info.ClusterName), fmt.Sprintf("%s-scheduler-backup", info.ClusterName), + fmt.Sprintf("%s-drainer", info.ClusterName), } for _, chartName := range charts { res, err := exec.Command("helm", "del", "--purge", chartName).CombinedOutput() - if err != nil && !releaseIsNotFound(err) { + if err != nil && !notFound(string(res)) { return fmt.Errorf("failed to delete chart: %s/%s, %v, %s", info.Namespace, chartName, err, string(res)) } @@ -637,14 +637,20 @@ func (oa *operatorActions) CleanTidbCluster(info *TidbClusterConfig) error { return fmt.Errorf("failed to delete jobs: %v, %s", err, string(res)) } - resources := []string{"pvc"} - for _, resource := range resources { - if res, err := exec.Command("kubectl", "delete", resource, "-n", info.Namespace, "-l", - setStr).CombinedOutput(); err != nil { - return fmt.Errorf("failed to delete %s: %v, %s", resource, err, string(res)) - } + // delete all pvcs + allPvcSet := label.Label{}.Instance(info.ClusterName).String() + if res, err := exec.Command("kubectl", "delete", "pvc", "-n", info.Namespace, "-l", allPvcSet).CombinedOutput(); err != nil { + return fmt.Errorf("failed to delete pvc: %v, %s", err, string(res)) } + //resources := []string{"pvc"} + //for _, resource := range resources { + // if res, err := exec.Command("kubectl", "delete", resource, "-n", info.Namespace, "-l", + // setStr).CombinedOutput(); err != nil { + // return fmt.Errorf("failed to delete %s: %v, %s", resource, err, string(res)) + // } + //} + // delete all configmaps allConfigMaps := label.New().Instance(info.ClusterName).String() if res, err := exec.Command("kubectl", "delete", "configmaps", "-n", info.Namespace, "-l", allConfigMaps).CombinedOutput(); err != nil { @@ -1752,6 +1758,10 @@ func (oa *operatorActions) checkGrafanaData(clusterInfo *TidbClusterConfig) erro return nil } +func GetD(ns, tcName, databaseName, password string) string { + return fmt.Sprintf("root:%s@(%s-tidb.%s:4000)/%s?charset=utf8", password, tcName, ns, databaseName) +} + func getDSN(ns, tcName, databaseName, password string) string { return fmt.Sprintf("root:%s@(%s-tidb.%s:4000)/%s?charset=utf8", password, tcName, ns, databaseName) } @@ -1917,7 +1927,8 @@ func (oa *operatorActions) CheckAdHocBackup(info *TidbClusterConfig) (string, er func (oa *operatorActions) Restore(from *TidbClusterConfig, to *TidbClusterConfig) error { oa.EmitEvent(from, fmt.Sprintf("RestoreBackup: target: %s", to.ClusterName)) oa.EmitEvent(to, fmt.Sprintf("RestoreBackup: source: %s", from.ClusterName)) - glog.Infof("deploying restore cluster[%s/%s]", from.Namespace, from.ClusterName) + glog.Infof("deploying restore, the data is from cluster[%s/%s] to cluster[%s/%s]", + from.Namespace, from.ClusterName, to.Namespace, to.ClusterName) sets := map[string]string{ "name": to.BackupName, @@ -1929,7 +1940,7 @@ func (oa *operatorActions) Restore(from *TidbClusterConfig, to *TidbClusterConfi setString := to.BackupHelmSetString(sets) - restoreName := fmt.Sprintf("%s-restore", from.ClusterName) + restoreName := fmt.Sprintf("%s-restore", to.ClusterName) cmd := fmt.Sprintf("helm install -n %s --namespace %s %s --set-string %s", restoreName, to.Namespace, oa.backupChartPath(to.OperatorTag), setString) glog.Infof("install restore [%s]", cmd) @@ -1971,18 +1982,6 @@ func (oa *operatorActions) CheckRestore(from *TidbClusterConfig, to *TidbCluster return nil } -func (oa *operatorActions) CleanRestoreJob(from *TidbClusterConfig) error { - - releaseName := fmt.Sprintf("%s-restore", from.ClusterName) - res, err := exec.Command("helm", "del", "--purge", releaseName).CombinedOutput() - if err != nil && !releaseIsNotFound(err) { - return fmt.Errorf("failed to delete release: %s/%s, %v, %s", - from.Namespace, releaseName, err, string(res)) - } - - return nil -} - func (oa *operatorActions) ForceDeploy(info *TidbClusterConfig) error { if err := oa.CleanTidbCluster(info); err != nil { return err @@ -2314,8 +2313,15 @@ func (tc *TidbClusterConfig) FullName() string { } func (oa *operatorActions) DeployIncrementalBackup(from *TidbClusterConfig, to *TidbClusterConfig, withDrainer bool, ts string) error { - oa.EmitEvent(from, fmt.Sprintf("DeployIncrementalBackup: slave: %s", to.ClusterName)) - glog.Infof("begin to deploy incremental backup cluster[%s] namespace[%s]", from.ClusterName, from.Namespace) + if withDrainer { + oa.EmitEvent(from, fmt.Sprintf("DeployIncrementalBackup: slave: %s", to.ClusterName)) + glog.Infof("begin to deploy incremental backup, source cluster[%s/%s], target cluster [%s/%s]", + from.Namespace, from.ClusterName, to.Namespace, to.ClusterName) + } else { + oa.EmitEvent(from, "Enable pump cluster") + glog.Infof("begin to enable pump for cluster[%s/%s], target cluster [%s/%s]", + from.Namespace, from.ClusterName) + } sets := map[string]string{ "binlog.pump.create": "true", diff --git a/tests/backup.go b/tests/backup.go index 5f5a894866..8c5f214472 100644 --- a/tests/backup.go +++ b/tests/backup.go @@ -10,6 +10,7 @@ import ( "github.com/golang/glog" "github.com/pingcap/tidb-operator/pkg/tkctl/util" + sql_util "github.com/pingcap/tidb-operator/tests/pkg/util" "github.com/pingcap/tidb-operator/tests/slack" "golang.org/x/sync/errgroup" "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -18,13 +19,11 @@ import ( const ( DrainerReplicas int32 = 1 - RunReparoCommandTemplate = `kubectl exec -it {{ .PodName }} -- bash -c \ -"wget http://download.pingcap.org/tidb-binlog-cluster-latest-linux-amd64.tar.gz && \ -tar -xzf tidb-binlog-cluster-latest-linux-amd64.tar.gz && \ -cd tidb-binlog-cluster-latest-linux-amd64/bin && \ -echo '{{ .ReparoConfig }}' > reparo.toml && \ -./reparo -config reparo.toml" -` + // TODO: better way to do incremental restore from pb files + RunReparoCommandTemplate = `kubectl exec -n={{ .Namespace }} {{ .PodName }} -- sh -c \ +"while [ \$(grep -r 'commitTS' /data.drainer/savepoint| awk '{print (\$3)}') -lt {{ .StopTSO }} ]; do echo 'wait end tso reached' && sleep 60; done; \ +printf '{{ .ReparoConfig }}' > reparo.toml && \ +./reparo -config reparo.toml > /data/reparo.log" ` ) type BackupTarget struct { @@ -35,7 +34,7 @@ type BackupTarget struct { func (t *BackupTarget) GetDrainerConfig(source *TidbClusterConfig, ts string) *DrainerConfig { drainerConfig := &DrainerConfig{ - DrainerName: fmt.Sprintf("to-%s-%s", t.TargetCluster.ClusterName, t.IncrementalType), + DrainerName: fmt.Sprintf("%s-drainer", t.TargetCluster.ClusterName), InitialCommitTs: ts, OperatorTag: source.OperatorTag, SourceClusterName: source.ClusterName, @@ -62,35 +61,28 @@ func (oa *operatorActions) BackupAndRestoreToMultipleClusters(source *TidbCluste return err } + // Restoring via reparo is slow, so we stop inserting data as early as possible to reduce the size of incremental data + oa.StopInsertDataTo(source) + ts, err := oa.CheckAdHocBackup(source) if err != nil { glog.Errorf("cluster:[%s] deploy happen error: %v", source.ClusterName, err) return err } - // Restore can only be done serially due to name collision - for i := range targets { - err = oa.CheckTidbClusterStatus(targets[i].TargetCluster) + prepareIncremental := func(source *TidbClusterConfig, target BackupTarget) error { + err = oa.CheckTidbClusterStatus(target.TargetCluster) if err != nil { - glog.Errorf("cluster:[%s] deploy faild error: %v", targets[i].TargetCluster.ClusterName, err) + glog.Errorf("cluster:[%s] deploy faild error: %v", target.TargetCluster.ClusterName, err) return err } - err = oa.Restore(source, targets[i].TargetCluster) + err = oa.Restore(source, target.TargetCluster) if err != nil { glog.Errorf("from cluster:[%s] to cluster [%s] restore happen error: %v", - source.ClusterName, targets[i].TargetCluster.ClusterName, err) + source.ClusterName, target.TargetCluster.ClusterName, err) return err } - - err = oa.CleanRestoreJob(source) - if err != nil && !releaseIsNotFound(err) { - glog.Errorf("clean the from cluster:[%s] to cluster [%s] restore job happen error: %v", - source.ClusterName, targets[i].TargetCluster.ClusterName, err) - } - } - - prepareIncremental := func(source *TidbClusterConfig, target BackupTarget) error { err = oa.CheckRestore(source, target.TargetCluster) if err != nil { glog.Errorf("from cluster:[%s] to cluster [%s] restore failed error: %v", @@ -102,10 +94,10 @@ func (oa *operatorActions) BackupAndRestoreToMultipleClusters(source *TidbCluste // Deploy an additional drainer release drainerConfig := target.GetDrainerConfig(source, ts) if err := oa.DeployDrainer(drainerConfig, source); err != nil { - return nil + return err } if err := oa.CheckDrainer(drainerConfig, source); err != nil { - return nil + return err } } else { // Enable drainer of the source TiDB cluster release @@ -116,38 +108,58 @@ func (oa *operatorActions) BackupAndRestoreToMultipleClusters(source *TidbCluste return nil } - checkIncremental := func(source *TidbClusterConfig, target BackupTarget) error { + checkIncremental := func(source *TidbClusterConfig, target BackupTarget, stopWriteTS int64) error { if target.IncrementalType == DbTypeFile { - if err := oa.RestoreIncrementalFiles(target.GetDrainerConfig(source, ts), target.TargetCluster); err != nil { + var eg errgroup.Group + // Run reparo restoring and check concurrently to show restoring progress + eg.Go(func() error { + return oa.RestoreIncrementalFiles(target.GetDrainerConfig(source, ts), target.TargetCluster, stopWriteTS) + }) + eg.Go(func() error { + return oa.CheckDataConsistency(source, target.TargetCluster, 60 * time.Minute) + }) + if err := eg.Wait(); err != nil { + return err + } + } else { + if err := oa.CheckDataConsistency(source, target.TargetCluster, 30 * time.Minute); err != nil { return err } } - if err := oa.CheckDataConsistency(source, target.TargetCluster); err != nil { - return err - } return nil } var eg errgroup.Group - for i := range targets { + for _, target := range targets { + target := target eg.Go(func() error { - return prepareIncremental(source, targets[i]) + return prepareIncremental(source, target) }) } if err := eg.Wait(); err != nil { return err } - glog.Infof("waiting 1 minutes to insert into more records") + go oa.BeginInsertDataToOrDie(source) + if err != nil { + return err + } + glog.Infof("waiting 1 minute to insert into more records") time.Sleep(1 * time.Minute) glog.Infof("cluster[%s] stop insert data", source.ClusterName) oa.StopInsertDataTo(source) - for i := range targets { + stopWriteTS, err := sql_util.ShowMasterCommitTS(getDSN(source.Namespace, source.ClusterName, "test", source.Password)) + if err != nil { + return err + } + + for _, target := range targets { + target := target eg.Go(func() error { - return checkIncremental(source, targets[i]) + return checkIncremental(source, target, stopWriteTS) }) } if err := eg.Wait(); err != nil { @@ -209,7 +221,7 @@ func (oa *operatorActions) DeployAndCheckIncrementalBackup(from, to *TidbCluster return nil } -func (oa *operatorActions) CheckDataConsistency(from, to *TidbClusterConfig) error { +func (oa *operatorActions) CheckDataConsistency(from, to *TidbClusterConfig, timeout time.Duration) error { fn := func() (bool, error) { b, err := to.DataIsTheSameAs(from) if err != nil { @@ -222,7 +234,7 @@ func (oa *operatorActions) CheckDataConsistency(from, to *TidbClusterConfig) err return false, nil } - if err := wait.Poll(DefaultPollInterval, 30*time.Minute, fn); err != nil { + if err := wait.Poll(DefaultPollInterval, timeout, fn); err != nil { return err } return nil @@ -238,8 +250,13 @@ func (oa *operatorActions) DeployDrainer(info *DrainerConfig, source *TidbCluste return err } + override := map[string]string{} + if len(oa.cfg.AdditionalDrainerVersion) > 0 { + override["clusterVersion"] = oa.cfg.AdditionalDrainerVersion + } + cmd := fmt.Sprintf("helm install %s --name %s --namespace %s --set-string %s -f %s", - oa.drainerChartPath(source.OperatorTag), info.DrainerName, source.Namespace, info.DrainerHelmString(nil, source), valuesPath) + oa.drainerChartPath(source.OperatorTag), info.DrainerName, source.Namespace, info.DrainerHelmString(override, source), valuesPath) glog.Info(cmd) if res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput(); err != nil { @@ -287,20 +304,22 @@ func (oa *operatorActions) CheckDrainer(info *DrainerConfig, source *TidbCluster return nil } -func (oa *operatorActions) RestoreIncrementalFiles(from *DrainerConfig, to *TidbClusterConfig) error { +func (oa *operatorActions) RestoreIncrementalFiles(from *DrainerConfig, to *TidbClusterConfig, stopTSO int64) error { glog.Infof("restoring incremental data from drainer [%s/%s] to TiDB cluster [%s/%s]", from.Namespace, from.DrainerName, to.Namespace, to.ClusterName) // TODO: better incremental files restore solution reparoConfig := strings.Join([]string{ - `data-dir = "/data/pb"`, - `log-level = "info"`, - `dest-type = "mysql"`, + `data-dir = \"/data/pb\"`, + `log-level = \"info\"`, + `dest-type = \"mysql\"`, + `safe-mode = true`, + fmt.Sprintf(`stop-tso = %d`, stopTSO), `[dest-db]`, - fmt.Sprintf(`host = "%s"`, util.GetTidbServiceName(to.ClusterName)), + fmt.Sprintf(`host = \"%s\"`, util.GetTidbServiceName(to.ClusterName)), "port = 4000", - `user = "root"`, - `password = ""`, + `user = \"root\"`, + fmt.Sprintf(`password = \"%s\"`, to.Password), }, "\n") temp, err := template.New("reparo-command").Parse(RunReparoCommandTemplate) @@ -309,22 +328,25 @@ func (oa *operatorActions) RestoreIncrementalFiles(from *DrainerConfig, to *Tidb } buff := new(bytes.Buffer) if err := temp.Execute(buff, &struct { + Namespace string ReparoConfig string PodName string + StopTSO int64 }{ + Namespace: from.Namespace, ReparoConfig: reparoConfig, PodName: fmt.Sprintf("%s-%s-drainer-0", from.SourceClusterName, from.DrainerName), + StopTSO: stopTSO, }); err != nil { return err } - if res, err := exec.Command("/bin/sh", "-c", buff.String()).CombinedOutput(); err != nil { + cmd := buff.String() + glog.Infof("Restore incremental data, command: \n%s", cmd) + + if res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput(); err != nil { return fmt.Errorf("failed to restore incremental files from dainer [%s/%s] to TiDB cluster [%s/%s], %v, %s", from.Namespace, from.DrainerName, to.Namespace, to.ClusterName, err, res) } return nil } - -func (oa *operatorActions) CheckRestoreIncrementalFiles(from *DrainerConfig, to *TidbClusterConfig) error { - return nil -} diff --git a/tests/cmd/stability/main.go b/tests/cmd/stability/main.go index d158a52807..2d975445c8 100644 --- a/tests/cmd/stability/main.go +++ b/tests/cmd/stability/main.go @@ -70,6 +70,7 @@ func run() { cli, kubeCli := client.NewCliOrDie() ocfg := newOperatorConfig() + ocfg.Tag = "master" cluster1 := newTidbClusterConfig("ns1", "cluster1") cluster2 := newTidbClusterConfig("ns2", "cluster2") @@ -303,7 +304,7 @@ func run() { }, { TargetCluster: fileRestoreCluster1, - IsAdditional: false, + IsAdditional: true, IncrementalType: tests.DbTypeFile, }, } @@ -332,7 +333,7 @@ func run() { }, { TargetCluster: fileRestoreCluster2, - IsAdditional: false, + IsAdditional: true, IncrementalType: tests.DbTypeFile, }, } @@ -408,5 +409,6 @@ func newTidbClusterConfig(ns, clusterName string) *tests.TidbClusterConfig { Monitor: true, BlockWriteConfig: cfg.BlockWriter, TopologyKey: topologyKey, + ClusterVersion: tidbVersion, } } diff --git a/tests/config.go b/tests/config.go index 284834ef05..519ea9a697 100644 --- a/tests/config.go +++ b/tests/config.go @@ -43,6 +43,9 @@ type Config struct { TiKVGrpcConcurrency int `yaml:"tikv_grpc_concurrency" json:"tikv_grpc_concurrency"` TiDBTokenLimit int `yaml:"tidb_token_limit" json:"tidb_token_limit"` + // old versions of reparo does not support idempotent incremental recover, so we lock the version explicitly + AdditionalDrainerVersion string `yaml:"file_drainer_version" json:"file_drainer_version"` + // Block writer BlockWriter blockwriter.Config `yaml:"block_writer,omitempty"` @@ -64,6 +67,8 @@ type Nodes struct { // NewConfig creates a new config. func NewConfig() (*Config, error) { cfg := &Config{ + AdditionalDrainerVersion: "v3.0.2", + PDMaxReplicas: 5, TiDBTokenLimit: 1024, TiKVGrpcConcurrency: 8, @@ -78,7 +83,7 @@ func NewConfig() (*Config, error) { flag.StringVar(&cfg.configFile, "config", "", "Config file") flag.StringVar(&cfg.LogDir, "log-dir", "/logDir", "log directory") flag.IntVar(&cfg.FaultTriggerPort, "fault-trigger-port", 23332, "the http port of fault trigger service") - flag.StringVar(&cfg.TidbVersions, "tidb-versions", "v3.0.0-rc.1,v3.0.0-rc.2,v3.0.1", "tidb versions") + flag.StringVar(&cfg.TidbVersions, "tidb-versions", "v3.0.0,v3.0.1,v3.0.2", "tidb versions") flag.StringVar(&cfg.OperatorTag, "operator-tag", "master", "operator tag used to choose charts") flag.StringVar(&cfg.OperatorImage, "operator-image", "pingcap/tidb-operator:latest", "operator image") flag.StringVar(&cfg.UpgradeOperatorTag, "upgrade-operator-tag", "", "upgrade operator tag used to choose charts") diff --git a/tests/manifests/stability/stability.yaml b/tests/manifests/stability/stability.yaml index 4ce009dbb3..0e49301594 100644 --- a/tests/manifests/stability/stability.yaml +++ b/tests/manifests/stability/stability.yaml @@ -42,13 +42,13 @@ spec: serviceAccount: tidb-operator-stability containers: - name: tidb-operator-stability - image: "" + image: hub.pingcap.net/aylei/tidb-operator-stability-test:multiple-drainers21 imagePullPolicy: Always command: - /usr/local/bin/stability-test - --config=/etc/tidb-operator-stability/config.yaml - - --operator-image=pingcap/tidb-operator:v1.0.0 - - --operator-tag=v1.0.0 + - --operator-image=pingcap/tidb-operator:latest + - --operator-tag=master - --slack-webhook-url="" volumeMounts: - mountPath: /logDir diff --git a/tests/pkg/util/db.go b/tests/pkg/util/db.go index a31ab6c201..3e6e21ef43 100644 --- a/tests/pkg/util/db.go +++ b/tests/pkg/util/db.go @@ -2,6 +2,8 @@ package util import ( "database/sql" + "fmt" + "strings" "github.com/golang/glog" ) @@ -17,3 +19,42 @@ func OpenDB(dsn string, maxIdleConns int) (*sql.DB, error) { glog.V(4).Info("DB opens successfully") return db, nil } + +// Show master commit ts of TiDB +func ShowMasterCommitTS(dsn string) (int64, error) { + db, err := sql.Open("mysql", dsn) + if err != nil { + return 0, err + } + defer db.Close() + + rows, err := db.Query("SHOW MASTER STATUS") + defer rows.Close() + if err != nil { + return 0, err + } + cols, err := rows.Columns() + if err != nil { + return 0, err + } + idx := -1 + vals := make([]interface{}, len(cols)) + for i, _ := range cols { + if strings.ToLower(cols[i]) == "position" { + vals[i] = new(int64) + idx = i + } else { + vals[i] = new(sql.RawBytes) + } + } + if idx < 0 { + return 0, fmt.Errorf("Error show master commit ts of %s, cannot find 'Position' column", dsn) + } + if !rows.Next() { + return 0, fmt.Errorf("Error show master commit ts of %s, empty result set", dsn) + } + if err = rows.Scan(vals...); err != nil { + return 0, err + } + return *vals[idx].(*int64), nil +} diff --git a/tests/util.go b/tests/util.go index fc800ee3bd..86eafd6344 100644 --- a/tests/util.go +++ b/tests/util.go @@ -145,7 +145,9 @@ var binlogTemp string = `binlog: {{end}}{{end}} ` -var drainerConfigCommon string = `config: | +var drainerConfigCommon string = ` +initialCommitTs: "{{ .InitialCommitTs }}" +config: | detect-interval = 10 compressor = "" [syncer] @@ -210,9 +212,9 @@ func GetSubValuesOrDie(clusterName, namespace, topologyKey string, pdConfig []st } subValues := fmt.Sprintf("%s%s%s", pdbuff.String(), tikvbuff.String(), tidbbuff.String()) - if pumpConfig == nil && drainerConfig == nil { - return subValues - } + //if pumpConfig == nil && drainerConfig == nil { + // return subValues + //} btemp, err := template.New("binlog").Parse(binlogTemp) if err != nil { From 2edadd37051b4cc024a1f66c00ba1fd6f45f32a1 Mon Sep 17 00:00:00 2001 From: Aylei Date: Mon, 26 Aug 2019 22:35:57 +0900 Subject: [PATCH 3/6] Fix drainer cleanup Signed-off-by: Aylei --- tests/actions.go | 27 +++++++++++++----------- tests/backup.go | 2 +- tests/cmd/stability/main.go | 1 - tests/manifests/stability/stability.yaml | 6 +++--- 4 files changed, 19 insertions(+), 17 deletions(-) diff --git a/tests/actions.go b/tests/actions.go index 09ddb108f8..747a685bf0 100644 --- a/tests/actions.go +++ b/tests/actions.go @@ -578,7 +578,9 @@ func (oa *operatorActions) CleanTidbCluster(info *TidbClusterConfig) error { fmt.Sprintf("%s-backup", info.ClusterName), fmt.Sprintf("%s-restore", info.ClusterName), fmt.Sprintf("%s-scheduler-backup", info.ClusterName), - fmt.Sprintf("%s-drainer", info.ClusterName), + fmt.Sprintf("%s-%s-drainer", info.ClusterName, DbTypeFile), + fmt.Sprintf("%s-%s-drainer", info.ClusterName, DbTypeTiDB), + fmt.Sprintf("%s-%s-drainer", info.ClusterName, DbTypeMySQL), } for _, chartName := range charts { res, err := exec.Command("helm", "del", "--purge", chartName).CombinedOutput() @@ -637,19 +639,20 @@ func (oa *operatorActions) CleanTidbCluster(info *TidbClusterConfig) error { return fmt.Errorf("failed to delete jobs: %v, %s", err, string(res)) } - // delete all pvcs - allPvcSet := label.Label{}.Instance(info.ClusterName).String() - if res, err := exec.Command("kubectl", "delete", "pvc", "-n", info.Namespace, "-l", allPvcSet).CombinedOutput(); err != nil { - return fmt.Errorf("failed to delete pvc: %v, %s", err, string(res)) + resources := []string{"pvc"} + for _, resource := range resources { + if res, err := exec.Command("kubectl", "delete", resource, "-n", info.Namespace, "-l", + setStr).CombinedOutput(); err != nil { + return fmt.Errorf("failed to delete %s: %v, %s", resource, err, string(res)) + } } - //resources := []string{"pvc"} - //for _, resource := range resources { - // if res, err := exec.Command("kubectl", "delete", resource, "-n", info.Namespace, "-l", - // setStr).CombinedOutput(); err != nil { - // return fmt.Errorf("failed to delete %s: %v, %s", resource, err, string(res)) - // } - //} + // delete pvc of drainer + drainerPvcSet := label.Label{}.Instance(info.ClusterName).Component("drainer").String() + if res, err := exec.Command("kubectl", "delete", "pvc", "-n", info.Namespace, "-l", + drainerPvcSet).CombinedOutput(); err != nil { + return fmt.Errorf("failed to delete pvc: %v, %s", drainerPvcSet, err, string(res)) + } // delete all configmaps allConfigMaps := label.New().Instance(info.ClusterName).String() diff --git a/tests/backup.go b/tests/backup.go index 8c5f214472..d6f89c6296 100644 --- a/tests/backup.go +++ b/tests/backup.go @@ -34,7 +34,7 @@ type BackupTarget struct { func (t *BackupTarget) GetDrainerConfig(source *TidbClusterConfig, ts string) *DrainerConfig { drainerConfig := &DrainerConfig{ - DrainerName: fmt.Sprintf("%s-drainer", t.TargetCluster.ClusterName), + DrainerName: fmt.Sprintf("%s-%s-drainer", source.ClusterName, t.IncrementalType), InitialCommitTs: ts, OperatorTag: source.OperatorTag, SourceClusterName: source.ClusterName, diff --git a/tests/cmd/stability/main.go b/tests/cmd/stability/main.go index 2d975445c8..de5b6733db 100644 --- a/tests/cmd/stability/main.go +++ b/tests/cmd/stability/main.go @@ -70,7 +70,6 @@ func run() { cli, kubeCli := client.NewCliOrDie() ocfg := newOperatorConfig() - ocfg.Tag = "master" cluster1 := newTidbClusterConfig("ns1", "cluster1") cluster2 := newTidbClusterConfig("ns2", "cluster2") diff --git a/tests/manifests/stability/stability.yaml b/tests/manifests/stability/stability.yaml index 0e49301594..4ce009dbb3 100644 --- a/tests/manifests/stability/stability.yaml +++ b/tests/manifests/stability/stability.yaml @@ -42,13 +42,13 @@ spec: serviceAccount: tidb-operator-stability containers: - name: tidb-operator-stability - image: hub.pingcap.net/aylei/tidb-operator-stability-test:multiple-drainers21 + image: "" imagePullPolicy: Always command: - /usr/local/bin/stability-test - --config=/etc/tidb-operator-stability/config.yaml - - --operator-image=pingcap/tidb-operator:latest - - --operator-tag=master + - --operator-image=pingcap/tidb-operator:v1.0.0 + - --operator-tag=v1.0.0 - --slack-webhook-url="" volumeMounts: - mountPath: /logDir From 6527fd90548a6a7c8ab28f1e3807a3ccb2309c20 Mon Sep 17 00:00:00 2001 From: Aylei Date: Tue, 27 Aug 2019 11:27:20 +0900 Subject: [PATCH 4/6] Fix link errors Signed-off-by: Aylei --- tests/actions.go | 4 ++-- tests/backup.go | 25 ++++++++----------------- tests/config.go | 4 ++-- tests/pkg/util/db.go | 2 +- 4 files changed, 13 insertions(+), 22 deletions(-) diff --git a/tests/actions.go b/tests/actions.go index 747a685bf0..796d94b6d5 100644 --- a/tests/actions.go +++ b/tests/actions.go @@ -651,7 +651,7 @@ func (oa *operatorActions) CleanTidbCluster(info *TidbClusterConfig) error { drainerPvcSet := label.Label{}.Instance(info.ClusterName).Component("drainer").String() if res, err := exec.Command("kubectl", "delete", "pvc", "-n", info.Namespace, "-l", drainerPvcSet).CombinedOutput(); err != nil { - return fmt.Errorf("failed to delete pvc: %v, %s", drainerPvcSet, err, string(res)) + return fmt.Errorf("failed to delete drainer pvc: %v, %s", err, string(res)) } // delete all configmaps @@ -2322,7 +2322,7 @@ func (oa *operatorActions) DeployIncrementalBackup(from *TidbClusterConfig, to * from.Namespace, from.ClusterName, to.Namespace, to.ClusterName) } else { oa.EmitEvent(from, "Enable pump cluster") - glog.Infof("begin to enable pump for cluster[%s/%s], target cluster [%s/%s]", + glog.Infof("begin to enable pump for cluster[%s/%s]", from.Namespace, from.ClusterName) } diff --git a/tests/backup.go b/tests/backup.go index d6f89c6296..c22b92398a 100644 --- a/tests/backup.go +++ b/tests/backup.go @@ -18,9 +18,9 @@ import ( ) const ( - DrainerReplicas int32 = 1 + DrainerReplicas int32 = 1 // TODO: better way to do incremental restore from pb files - RunReparoCommandTemplate = `kubectl exec -n={{ .Namespace }} {{ .PodName }} -- sh -c \ + RunReparoCommandTemplate = `kubectl exec -n={{ .Namespace }} {{ .PodName }} -- sh -c \ "while [ \$(grep -r 'commitTS' /data.drainer/savepoint| awk '{print (\$3)}') -lt {{ .StopTSO }} ]; do echo 'wait end tso reached' && sleep 60; done; \ printf '{{ .ReparoConfig }}' > reparo.toml && \ ./reparo -config reparo.toml > /data/reparo.log" ` @@ -116,13 +116,13 @@ func (oa *operatorActions) BackupAndRestoreToMultipleClusters(source *TidbCluste return oa.RestoreIncrementalFiles(target.GetDrainerConfig(source, ts), target.TargetCluster, stopWriteTS) }) eg.Go(func() error { - return oa.CheckDataConsistency(source, target.TargetCluster, 60 * time.Minute) + return oa.CheckDataConsistency(source, target.TargetCluster, 60*time.Minute) }) if err := eg.Wait(); err != nil { return err } } else { - if err := oa.CheckDataConsistency(source, target.TargetCluster, 30 * time.Minute); err != nil { + if err := oa.CheckDataConsistency(source, target.TargetCluster, 30*time.Minute); err != nil { return err } } @@ -204,10 +204,7 @@ func (oa *operatorActions) DeployAndCheckPump(tc *TidbClusterConfig) error { return err } - if err := oa.CheckIncrementalBackup(tc, false); err != nil { - return err - } - return nil + return oa.CheckIncrementalBackup(tc, false) } func (oa *operatorActions) DeployAndCheckIncrementalBackup(from, to *TidbClusterConfig, ts string) error { @@ -215,10 +212,7 @@ func (oa *operatorActions) DeployAndCheckIncrementalBackup(from, to *TidbCluster return err } - if err := oa.CheckIncrementalBackup(from, true); err != nil { - return err - } - return nil + return oa.CheckIncrementalBackup(from, true) } func (oa *operatorActions) CheckDataConsistency(from, to *TidbClusterConfig, timeout time.Duration) error { @@ -234,10 +228,7 @@ func (oa *operatorActions) CheckDataConsistency(from, to *TidbClusterConfig, tim return false, nil } - if err := wait.Poll(DefaultPollInterval, timeout, fn); err != nil { - return err - } - return nil + return wait.Poll(DefaultPollInterval, timeout, fn) } func (oa *operatorActions) DeployDrainer(info *DrainerConfig, source *TidbClusterConfig) error { @@ -298,7 +289,7 @@ func (oa *operatorActions) CheckDrainer(info *DrainerConfig, source *TidbCluster err := wait.Poll(DefaultPollInterval, DefaultPollTimeout, fn) if err != nil { - return fmt.Errorf("failed to install drainer [%s/%s], %v", source.Namespace, info.DrainerName) + return fmt.Errorf("failed to install drainer [%s/%s], %v", source.Namespace, info.DrainerName, err) } return nil diff --git a/tests/config.go b/tests/config.go index 519ea9a697..4ca340331d 100644 --- a/tests/config.go +++ b/tests/config.go @@ -44,7 +44,7 @@ type Config struct { TiDBTokenLimit int `yaml:"tidb_token_limit" json:"tidb_token_limit"` // old versions of reparo does not support idempotent incremental recover, so we lock the version explicitly - AdditionalDrainerVersion string `yaml:"file_drainer_version" json:"file_drainer_version"` + AdditionalDrainerVersion string `yaml:"file_drainer_version" json:"file_drainer_version"` // Block writer BlockWriter blockwriter.Config `yaml:"block_writer,omitempty"` @@ -67,7 +67,7 @@ type Nodes struct { // NewConfig creates a new config. func NewConfig() (*Config, error) { cfg := &Config{ - AdditionalDrainerVersion: "v3.0.2", + AdditionalDrainerVersion: "v3.0.2", PDMaxReplicas: 5, TiDBTokenLimit: 1024, diff --git a/tests/pkg/util/db.go b/tests/pkg/util/db.go index 3e6e21ef43..a2652ad245 100644 --- a/tests/pkg/util/db.go +++ b/tests/pkg/util/db.go @@ -39,7 +39,7 @@ func ShowMasterCommitTS(dsn string) (int64, error) { } idx := -1 vals := make([]interface{}, len(cols)) - for i, _ := range cols { + for i := range cols { if strings.ToLower(cols[i]) == "position" { vals[i] = new(int64) idx = i From ce6e25953279063319850fee33086f4c4d10ad8b Mon Sep 17 00:00:00 2001 From: Aylei Date: Tue, 27 Aug 2019 11:57:39 +0900 Subject: [PATCH 5/6] Fix error from merge Signed-off-by: Aylei --- tests/actions.go | 72 ++++++++++++++++++++---------------------------- 1 file changed, 30 insertions(+), 42 deletions(-) diff --git a/tests/actions.go b/tests/actions.go index bfc53be7ff..3d922ee2f3 100644 --- a/tests/actions.go +++ b/tests/actions.go @@ -2316,6 +2316,10 @@ func (tc *TidbClusterConfig) FullName() string { } func (oa *operatorActions) DeployIncrementalBackup(from *TidbClusterConfig, to *TidbClusterConfig, withDrainer bool, ts string) error { + + if withDrainer && to == nil { + return fmt.Errorf("Target cluster is nil when deploying drainer") + } if withDrainer { oa.EmitEvent(from, fmt.Sprintf("DeployIncrementalBackup: slave: %s", to.ClusterName)) glog.Infof("begin to deploy incremental backup, source cluster[%s/%s], target cluster [%s/%s]", @@ -2330,54 +2334,38 @@ func (oa *operatorActions) DeployIncrementalBackup(from *TidbClusterConfig, to * // https://github.com/pingcap/tidb-operator/pull/693 isv1 := from.OperatorTag == "v1.0.0" - var sets map[string]string - if isv1 { - sets = map[string]string{ - "binlog.pump.create": "true", - "binlog.drainer.destDBType": "mysql", - "binlog.drainer.mysql.host": fmt.Sprintf("%s-tidb.%s", to.ClusterName, to.Namespace), - "binlog.drainer.mysql.user": "root", - "binlog.drainer.mysql.password": to.Password, - "binlog.drainer.mysql.port": "4000", - "binlog.drainer.ignoreSchemas": "", - } - } else { - sets = map[string]string{ - "binlog.pump.create": "true", - } - from.drainerConfig = []string{ - "worker-count = 16", - "detect-interval = 10", - "disable-dispatch = false", - `ignore-schemas = ""`, - `safe-mode = false`, - `txn-batch = 20`, - `db-type = "mysql"`, - `[syncer.to]`, - fmt.Sprintf(`host = "%s-tidb.%s"`, to.ClusterName, to.Namespace), - fmt.Sprintf(`user = "%s"`, "root"), - fmt.Sprintf(`password = "%s"`, to.Password), - fmt.Sprintf(`port = %d`, 4000), - } + sets := map[string]string{ + "binlog.pump.create": "true", } if withDrainer { sets["binlog.drainer.create"] = "true" - from.drainerConfig = []string{ - "worker-count = 16", - "detect-interval = 10", - "disable-dispatch = false", - `ignore-schemas = ""`, - `safe-mode = false`, - `txn-batch = 20`, - `db-type = "mysql"`, - `[syncer.to]`, - fmt.Sprintf(`host = "%s-tidb.%s"`, to.ClusterName, to.Namespace), - fmt.Sprintf(`user = "%s"`, "root"), - fmt.Sprintf(`password = "%s"`, to.Password), - fmt.Sprintf(`port = %d`, 4000), + if isv1 { + sets["binlog.pump.create"] = "true" + sets["binlog.drainer.destDBType"] = "mysql" + sets["binlog.drainer.mysql.host"] = fmt.Sprintf("%s-tidb.%s", to.ClusterName, to.Namespace) + sets["binlog.drainer.mysql.user"] = "root" + sets["binlog.drainer.mysql.password"] = to.Password + sets["binlog.drainer.mysql.port"] = "4000" + sets["binlog.drainer.ignoreSchemas"] = "" + } else { + from.drainerConfig = []string{ + "worker-count = 16", + "detect-interval = 10", + "disable-dispatch = false", + `ignore-schemas = ""`, + `safe-mode = false`, + `txn-batch = 20`, + `db-type = "mysql"`, + `[syncer.to]`, + fmt.Sprintf(`host = "%s-tidb.%s"`, to.ClusterName, to.Namespace), + fmt.Sprintf(`user = "%s"`, "root"), + fmt.Sprintf(`password = "%s"`, to.Password), + fmt.Sprintf(`port = %d`, 4000), + } } } + if ts != "" { sets["binlog.drainer.initialCommitTs"] = ts } From 0d946d9c4be99e41791e0f20c04218801f29d57f Mon Sep 17 00:00:00 2001 From: Aylei Date: Tue, 27 Aug 2019 23:53:26 +0900 Subject: [PATCH 6/6] Address review comments Signed-off-by: Aylei --- tests/backup.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/backup.go b/tests/backup.go index c22b92398a..41ef9df5d8 100644 --- a/tests/backup.go +++ b/tests/backup.go @@ -61,15 +61,15 @@ func (oa *operatorActions) BackupAndRestoreToMultipleClusters(source *TidbCluste return err } - // Restoring via reparo is slow, so we stop inserting data as early as possible to reduce the size of incremental data - oa.StopInsertDataTo(source) - ts, err := oa.CheckAdHocBackup(source) if err != nil { glog.Errorf("cluster:[%s] deploy happen error: %v", source.ClusterName, err) return err } + // Restoring via reparo is slow, so we stop inserting data as early as possible to reduce the size of incremental data + oa.StopInsertDataTo(source) + prepareIncremental := func(source *TidbClusterConfig, target BackupTarget) error { err = oa.CheckTidbClusterStatus(target.TargetCluster) if err != nil {