From 5495dc245744b32b919c4f020c2c66ae54c354bc Mon Sep 17 00:00:00 2001 From: Aylei Date: Sun, 8 Sep 2019 19:01:13 +0900 Subject: [PATCH] Add stability test for multiple drainers and reparo restore (#803) * Add stability test for multiple drainers and reparo restore Signed-off-by: Aylei * Run file restoring and binlog slave in parrelel Signed-off-by: Aylei * Fix drainer cleanup Signed-off-by: Aylei * Fix link errors Signed-off-by: Aylei * Fix error from merge Signed-off-by: Aylei * Address review comments Signed-off-by: Aylei --- charts/tidb-drainer/values.yaml | 2 +- go.mod | 1 + tests/actions.go | 109 +++++++---- tests/backup.go | 310 ++++++++++++++++++++++++++++---- tests/cluster_info.go | 1 + tests/cmd/e2e/main.go | 1 + tests/cmd/stability/main.go | 53 ++++-- tests/config.go | 7 +- tests/drainer_info.go | 77 ++++++++ tests/pkg/util/db.go | 41 +++++ tests/util.go | 64 ++++++- 11 files changed, 580 insertions(+), 86 deletions(-) create mode 100644 tests/drainer_info.go diff --git a/charts/tidb-drainer/values.yaml b/charts/tidb-drainer/values.yaml index 602e5f0b49..910fc84198 100644 --- a/charts/tidb-drainer/values.yaml +++ b/charts/tidb-drainer/values.yaml @@ -49,4 +49,4 @@ nodeSelector: {} tolerations: [] -affinity: {} \ No newline at end of file +affinity: {} diff --git a/go.mod b/go.mod index e55b8192b6..e86b16c4a9 100644 --- a/go.mod +++ b/go.mod @@ -102,6 +102,7 @@ require ( go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.9.1 // indirect golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421 // indirect + golang.org/x/sync v0.0.0-20190423024810-112230192c58 golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 // indirect google.golang.org/genproto v0.0.0-20180731170733-daca94659cb5 // indirect google.golang.org/grpc v1.12.0 // indirect diff --git a/tests/actions.go b/tests/actions.go index 3110c1743b..0dcb63b24f 100644 --- a/tests/actions.go +++ b/tests/actions.go @@ -106,6 +106,7 @@ const ( operartorChartName = "tidb-operator" tidbClusterChartName = "tidb-cluster" backupChartName = "tidb-backup" + drainerChartName = "tidb-drainer" statbilityTestTag = "stability" ) @@ -140,8 +141,12 @@ type OperatorActions interface { CheckScheduledBackup(info *TidbClusterConfig) error DeployIncrementalBackup(from *TidbClusterConfig, to *TidbClusterConfig, withDrainer bool, ts string) error CheckIncrementalBackup(info *TidbClusterConfig, withDrainer bool) error + DeployDrainer(info *DrainerConfig, from *TidbClusterConfig) error + DeployDrainerOrDie(info *DrainerConfig, from *TidbClusterConfig) + CheckDrainer(info *DrainerConfig, source *TidbClusterConfig) error Restore(from *TidbClusterConfig, to *TidbClusterConfig) error CheckRestore(from *TidbClusterConfig, to *TidbClusterConfig) error + RestoreIncrementalFiles(from *DrainerConfig, to *TidbClusterConfig, stopTSO int64) error ForceDeploy(info *TidbClusterConfig) error CreateSecret(info *TidbClusterConfig) error GetPodUIDMap(info *TidbClusterConfig) (map[string]types.UID, error) @@ -174,6 +179,8 @@ type OperatorActions interface { EmitEvent(info *TidbClusterConfig, msg string) BackupRestore(from, to *TidbClusterConfig) error BackupRestoreOrDie(from, to *TidbClusterConfig) + BackupAndRestoreToMultipleClusters(source *TidbClusterConfig, targets []BackupTarget) error + BackupAndRestoreToMultipleClustersOrDie(source *TidbClusterConfig, targets []BackupTarget) LabelNodes() error LabelNodesOrDie() CheckDisasterTolerance(info *TidbClusterConfig) error @@ -250,6 +257,7 @@ type TidbClusterConfig struct { InitSecretName string BackupSecretName string EnableConfigMapRollout bool + ClusterVersion string PDPreStartScript string TiDBPreStartScript string @@ -570,10 +578,13 @@ func (oa *operatorActions) CleanTidbCluster(info *TidbClusterConfig) error { fmt.Sprintf("%s-backup", info.ClusterName), fmt.Sprintf("%s-restore", info.ClusterName), fmt.Sprintf("%s-scheduler-backup", info.ClusterName), + fmt.Sprintf("%s-%s-drainer", info.ClusterName, DbTypeFile), + fmt.Sprintf("%s-%s-drainer", info.ClusterName, DbTypeTiDB), + fmt.Sprintf("%s-%s-drainer", info.ClusterName, DbTypeMySQL), } for _, chartName := range charts { res, err := exec.Command("helm", "del", "--purge", chartName).CombinedOutput() - if err != nil && releaseIsNotFound(err) { + if err != nil && !notFound(string(res)) { return fmt.Errorf("failed to delete chart: %s/%s, %v, %s", info.Namespace, chartName, err, string(res)) } @@ -636,6 +647,13 @@ func (oa *operatorActions) CleanTidbCluster(info *TidbClusterConfig) error { } } + // delete pvc of drainer + drainerPvcSet := label.Label{}.Instance(info.ClusterName).Component("drainer").String() + if res, err := exec.Command("kubectl", "delete", "pvc", "-n", info.Namespace, "-l", + drainerPvcSet).CombinedOutput(); err != nil { + return fmt.Errorf("failed to delete drainer pvc: %v, %s", err, string(res)) + } + // delete all configmaps allConfigMaps := label.New().Instance(info.ClusterName).String() if res, err := exec.Command("kubectl", "delete", "configmaps", "-n", info.Namespace, "-l", allConfigMaps).CombinedOutput(); err != nil { @@ -862,6 +880,10 @@ func (oa *operatorActions) backupChartPath(tag string) string { return oa.chartPath(backupChartName, tag) } +func (oa *operatorActions) drainerChartPath(tag string) string { + return oa.chartPath(drainerChartName, tag) +} + func (oa *operatorActions) ScaleTidbCluster(info *TidbClusterConfig) error { oa.EmitEvent(info, fmt.Sprintf("ScaleTidbCluster to pd: %s, tikv: %s, tidb: %s", info.Args["pd.replicas"], info.Args["tikv.replicas"], info.Args["tidb.replicas"])) @@ -1739,6 +1761,10 @@ func (oa *operatorActions) checkGrafanaData(clusterInfo *TidbClusterConfig) erro return nil } +func GetD(ns, tcName, databaseName, password string) string { + return fmt.Sprintf("root:%s@(%s-tidb.%s:4000)/%s?charset=utf8", password, tcName, ns, databaseName) +} + func getDSN(ns, tcName, databaseName, password string) string { return fmt.Sprintf("root:%s@(%s-tidb.%s:4000)/%s?charset=utf8", password, tcName, ns, databaseName) } @@ -1766,11 +1792,12 @@ func (oa *operatorActions) checkoutTag(tagName string) error { cmd := fmt.Sprintf("cd %s && git stash -u && git checkout %s && "+ "mkdir -p %s && cp -rf charts/tidb-operator %s && "+ "cp -rf charts/tidb-cluster %s && cp -rf charts/tidb-backup %s &&"+ - "cp -rf manifests %s", + "cp -rf manifests %s &&"+ + "cp -rf charts/tidb-drainer %s", oa.cfg.OperatorRepoDir, tagName, filepath.Join(oa.cfg.ChartDir, tagName), oa.operatorChartPath(tagName), oa.tidbClusterChartPath(tagName), oa.backupChartPath(tagName), - oa.manifestPath(tagName)) + oa.manifestPath(tagName), oa.drainerChartPath(tagName)) glog.Info(cmd) res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput() if err != nil { @@ -1902,7 +1929,8 @@ func (oa *operatorActions) CheckAdHocBackup(info *TidbClusterConfig) (string, er func (oa *operatorActions) Restore(from *TidbClusterConfig, to *TidbClusterConfig) error { oa.EmitEvent(from, fmt.Sprintf("RestoreBackup: target: %s", to.ClusterName)) oa.EmitEvent(to, fmt.Sprintf("RestoreBackup: source: %s", from.ClusterName)) - glog.Infof("deploying restore cluster[%s/%s]", from.Namespace, from.ClusterName) + glog.Infof("deploying restore, the data is from cluster[%s/%s] to cluster[%s/%s]", + from.Namespace, from.ClusterName, to.Namespace, to.ClusterName) sets := map[string]string{ "name": to.BackupName, @@ -1914,7 +1942,7 @@ func (oa *operatorActions) Restore(from *TidbClusterConfig, to *TidbClusterConfi setString := to.BackupHelmSetString(sets) - restoreName := fmt.Sprintf("%s-restore", from.ClusterName) + restoreName := fmt.Sprintf("%s-restore", to.ClusterName) cmd := fmt.Sprintf("helm install -n %s --namespace %s %s --set-string %s", restoreName, to.Namespace, oa.backupChartPath(to.OperatorTag), setString) glog.Infof("install restore [%s]", cmd) @@ -2287,47 +2315,56 @@ func (tc *TidbClusterConfig) FullName() string { } func (oa *operatorActions) DeployIncrementalBackup(from *TidbClusterConfig, to *TidbClusterConfig, withDrainer bool, ts string) error { - oa.EmitEvent(from, fmt.Sprintf("DeployIncrementalBackup: slave: %s", to.ClusterName)) - glog.Infof("begin to deploy incremental backup cluster[%s] namespace[%s]", from.ClusterName, from.Namespace) + + if withDrainer && to == nil { + return fmt.Errorf("Target cluster is nil when deploying drainer") + } + if withDrainer { + oa.EmitEvent(from, fmt.Sprintf("DeployIncrementalBackup: slave: %s", to.ClusterName)) + glog.Infof("begin to deploy incremental backup, source cluster[%s/%s], target cluster [%s/%s]", + from.Namespace, from.ClusterName, to.Namespace, to.ClusterName) + } else { + oa.EmitEvent(from, "Enable pump cluster") + glog.Infof("begin to enable pump for cluster[%s/%s]", + from.Namespace, from.ClusterName) + } // v1.0.0 don't support `binlog.drainer.config` // https://github.com/pingcap/tidb-operator/pull/693 isv1 := from.OperatorTag == "v1.0.0" - var sets map[string]string - if isv1 { - sets = map[string]string{ - "binlog.pump.create": "true", - "binlog.drainer.destDBType": "mysql", - "binlog.drainer.mysql.host": fmt.Sprintf("%s-tidb.%s", to.ClusterName, to.Namespace), - "binlog.drainer.mysql.user": "root", - "binlog.drainer.mysql.password": to.Password, - "binlog.drainer.mysql.port": "4000", - "binlog.drainer.ignoreSchemas": "", - } - } else { - sets = map[string]string{ - "binlog.pump.create": "true", - } - from.drainerConfig = []string{ - "worker-count = 16", - "detect-interval = 10", - "disable-dispatch = false", - `ignore-schemas = ""`, - `safe-mode = false`, - `txn-batch = 20`, - `db-type = "mysql"`, - `[syncer.to]`, - fmt.Sprintf(`host = "%s-tidb.%s"`, to.ClusterName, to.Namespace), - fmt.Sprintf(`user = "%s"`, "root"), - fmt.Sprintf(`password = "%s"`, to.Password), - fmt.Sprintf(`port = %d`, 4000), - } + sets := map[string]string{ + "binlog.pump.create": "true", } if withDrainer { sets["binlog.drainer.create"] = "true" + if isv1 { + sets["binlog.pump.create"] = "true" + sets["binlog.drainer.destDBType"] = "mysql" + sets["binlog.drainer.mysql.host"] = fmt.Sprintf("%s-tidb.%s", to.ClusterName, to.Namespace) + sets["binlog.drainer.mysql.user"] = "root" + sets["binlog.drainer.mysql.password"] = to.Password + sets["binlog.drainer.mysql.port"] = "4000" + sets["binlog.drainer.ignoreSchemas"] = "" + } else { + from.drainerConfig = []string{ + "worker-count = 16", + "detect-interval = 10", + "disable-dispatch = false", + `ignore-schemas = ""`, + `safe-mode = false`, + `txn-batch = 20`, + `db-type = "mysql"`, + `[syncer.to]`, + fmt.Sprintf(`host = "%s-tidb.%s"`, to.ClusterName, to.Namespace), + fmt.Sprintf(`user = "%s"`, "root"), + fmt.Sprintf(`password = "%s"`, to.Password), + fmt.Sprintf(`port = %d`, 4000), + } + } } + if ts != "" { sets["binlog.drainer.initialCommitTs"] = ts } diff --git a/tests/backup.go b/tests/backup.go index c4bb07e169..41ef9df5d8 100644 --- a/tests/backup.go +++ b/tests/backup.go @@ -1,73 +1,221 @@ package tests import ( + "bytes" + "fmt" + "os/exec" + "strings" + "text/template" "time" "github.com/golang/glog" + "github.com/pingcap/tidb-operator/pkg/tkctl/util" + sql_util "github.com/pingcap/tidb-operator/tests/pkg/util" "github.com/pingcap/tidb-operator/tests/slack" + "golang.org/x/sync/errgroup" + "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" ) -func (oa *operatorActions) BackupRestore(from, to *TidbClusterConfig) error { - var ts string - err := oa.DeployIncrementalBackup(from, to, false, ts) +const ( + DrainerReplicas int32 = 1 + // TODO: better way to do incremental restore from pb files + RunReparoCommandTemplate = `kubectl exec -n={{ .Namespace }} {{ .PodName }} -- sh -c \ +"while [ \$(grep -r 'commitTS' /data.drainer/savepoint| awk '{print (\$3)}') -lt {{ .StopTSO }} ]; do echo 'wait end tso reached' && sleep 60; done; \ +printf '{{ .ReparoConfig }}' > reparo.toml && \ +./reparo -config reparo.toml > /data/reparo.log" ` +) + +type BackupTarget struct { + IncrementalType DbType + TargetCluster *TidbClusterConfig + IsAdditional bool +} + +func (t *BackupTarget) GetDrainerConfig(source *TidbClusterConfig, ts string) *DrainerConfig { + drainerConfig := &DrainerConfig{ + DrainerName: fmt.Sprintf("%s-%s-drainer", source.ClusterName, t.IncrementalType), + InitialCommitTs: ts, + OperatorTag: source.OperatorTag, + SourceClusterName: source.ClusterName, + Namespace: source.Namespace, + DbType: t.IncrementalType, + } + if t.IncrementalType == DbTypeMySQL || t.IncrementalType == DbTypeTiDB { + drainerConfig.Host = fmt.Sprintf("%s.%s.svc.cluster.local", + t.TargetCluster.ClusterName, t.TargetCluster.Namespace) + drainerConfig.Port = "4000" + } + return drainerConfig +} + +func (oa *operatorActions) BackupAndRestoreToMultipleClusters(source *TidbClusterConfig, targets []BackupTarget) error { + err := oa.DeployAndCheckPump(source) if err != nil { return err } - err = oa.CheckIncrementalBackup(from, false) + err = oa.DeployAdHocBackup(source) if err != nil { + glog.Errorf("cluster:[%s] deploy happen error: %v", source.ClusterName, err) return err } - err = oa.DeployAdHocBackup(from) + ts, err := oa.CheckAdHocBackup(source) if err != nil { - glog.Errorf("cluster:[%s] deploy happen error: %v", from.ClusterName, err) + glog.Errorf("cluster:[%s] deploy happen error: %v", source.ClusterName, err) return err } - ts, err = oa.CheckAdHocBackup(from) - if err != nil { - glog.Errorf("cluster:[%s] deploy happen error: %v", from.ClusterName, err) + // Restoring via reparo is slow, so we stop inserting data as early as possible to reduce the size of incremental data + oa.StopInsertDataTo(source) + + prepareIncremental := func(source *TidbClusterConfig, target BackupTarget) error { + err = oa.CheckTidbClusterStatus(target.TargetCluster) + if err != nil { + glog.Errorf("cluster:[%s] deploy faild error: %v", target.TargetCluster.ClusterName, err) + return err + } + + err = oa.Restore(source, target.TargetCluster) + if err != nil { + glog.Errorf("from cluster:[%s] to cluster [%s] restore happen error: %v", + source.ClusterName, target.TargetCluster.ClusterName, err) + return err + } + err = oa.CheckRestore(source, target.TargetCluster) + if err != nil { + glog.Errorf("from cluster:[%s] to cluster [%s] restore failed error: %v", + source.ClusterName, target.TargetCluster.ClusterName, err) + return err + } + + if target.IsAdditional { + // Deploy an additional drainer release + drainerConfig := target.GetDrainerConfig(source, ts) + if err := oa.DeployDrainer(drainerConfig, source); err != nil { + return err + } + if err := oa.CheckDrainer(drainerConfig, source); err != nil { + return err + } + } else { + // Enable drainer of the source TiDB cluster release + if err := oa.DeployAndCheckIncrementalBackup(source, target.TargetCluster, ts); err != nil { + return err + } + } + return nil + } + + checkIncremental := func(source *TidbClusterConfig, target BackupTarget, stopWriteTS int64) error { + if target.IncrementalType == DbTypeFile { + var eg errgroup.Group + // Run reparo restoring and check concurrently to show restoring progress + eg.Go(func() error { + return oa.RestoreIncrementalFiles(target.GetDrainerConfig(source, ts), target.TargetCluster, stopWriteTS) + }) + eg.Go(func() error { + return oa.CheckDataConsistency(source, target.TargetCluster, 60*time.Minute) + }) + if err := eg.Wait(); err != nil { + return err + } + } else { + if err := oa.CheckDataConsistency(source, target.TargetCluster, 30*time.Minute); err != nil { + return err + } + } + + return nil + } + + var eg errgroup.Group + for _, target := range targets { + target := target + eg.Go(func() error { + return prepareIncremental(source, target) + }) + } + if err := eg.Wait(); err != nil { return err } - err = oa.CheckTidbClusterStatus(to) + go oa.BeginInsertDataToOrDie(source) if err != nil { - glog.Errorf("cluster:[%s] deploy faild error: %v", to.ClusterName, err) return err } + glog.Infof("waiting 1 minute to insert into more records") + time.Sleep(1 * time.Minute) + + glog.Infof("cluster[%s] stop insert data", source.ClusterName) + oa.StopInsertDataTo(source) - err = oa.Restore(from, to) + stopWriteTS, err := sql_util.ShowMasterCommitTS(getDSN(source.Namespace, source.ClusterName, "test", source.Password)) if err != nil { - glog.Errorf("from cluster:[%s] to cluster [%s] restore happen error: %v", - from.ClusterName, to.ClusterName, err) return err } - err = oa.CheckRestore(from, to) - if err != nil { - glog.Errorf("from cluster:[%s] to cluster [%s] restore failed error: %v", - from.ClusterName, to.ClusterName, err) + for _, target := range targets { + target := target + eg.Go(func() error { + return checkIncremental(source, target, stopWriteTS) + }) + } + if err := eg.Wait(); err != nil { return err } - err = oa.DeployIncrementalBackup(from, to, true, ts) + go oa.BeginInsertDataToOrDie(source) + err = oa.DeployScheduledBackup(source) if err != nil { + glog.Errorf("cluster:[%s] scheduler happen error: %v", source.ClusterName, err) return err } - err = oa.CheckIncrementalBackup(from, true) - if err != nil { + return oa.CheckScheduledBackup(source) +} + +func (oa *operatorActions) BackupAndRestoreToMultipleClustersOrDie(source *TidbClusterConfig, targets []BackupTarget) { + if err := oa.BackupAndRestoreToMultipleClusters(source, targets); err != nil { + slack.NotifyAndPanic(err) + } +} + +func (oa *operatorActions) BackupRestore(from, to *TidbClusterConfig) error { + + return oa.BackupAndRestoreToMultipleClusters(from, []BackupTarget{ + { + TargetCluster: to, + IncrementalType: DbTypeTiDB, + IsAdditional: false, + }, + }) +} + +func (oa *operatorActions) BackupRestoreOrDie(from, to *TidbClusterConfig) { + if err := oa.BackupRestore(from, to); err != nil { + slack.NotifyAndPanic(err) + } +} + +func (oa *operatorActions) DeployAndCheckPump(tc *TidbClusterConfig) error { + if err := oa.DeployIncrementalBackup(tc, nil, false, ""); err != nil { return err } - glog.Infof("waiting 1 minutes to insert into more records") - time.Sleep(1 * time.Minute) + return oa.CheckIncrementalBackup(tc, false) +} + +func (oa *operatorActions) DeployAndCheckIncrementalBackup(from, to *TidbClusterConfig, ts string) error { + if err := oa.DeployIncrementalBackup(from, to, true, ts); err != nil { + return err + } - glog.Infof("cluster[%s] stop insert data", from.ClusterName) - oa.StopInsertDataTo(from) + return oa.CheckIncrementalBackup(from, true) +} +func (oa *operatorActions) CheckDataConsistency(from, to *TidbClusterConfig, timeout time.Duration) error { fn := func() (bool, error) { b, err := to.DataIsTheSameAs(from) if err != nil { @@ -80,22 +228,116 @@ func (oa *operatorActions) BackupRestore(from, to *TidbClusterConfig) error { return false, nil } - if err := wait.Poll(DefaultPollInterval, 30*time.Minute, fn); err != nil { - return err - } + return wait.Poll(DefaultPollInterval, timeout, fn) +} + +func (oa *operatorActions) DeployDrainer(info *DrainerConfig, source *TidbClusterConfig) error { + oa.EmitEvent(source, "DeployDrainer") + glog.Infof("begin to deploy drainer [%s] namespace[%s], source cluster [%s]", info.DrainerName, + source.Namespace, source.ClusterName) - go oa.BeginInsertDataToOrDie(from) - err = oa.DeployScheduledBackup(from) + valuesPath, err := info.BuildSubValues(oa.drainerChartPath(source.OperatorTag)) if err != nil { - glog.Errorf("cluster:[%s] scheduler happen error: %v", from.ClusterName, err) return err } - return oa.CheckScheduledBackup(from) + override := map[string]string{} + if len(oa.cfg.AdditionalDrainerVersion) > 0 { + override["clusterVersion"] = oa.cfg.AdditionalDrainerVersion + } + + cmd := fmt.Sprintf("helm install %s --name %s --namespace %s --set-string %s -f %s", + oa.drainerChartPath(source.OperatorTag), info.DrainerName, source.Namespace, info.DrainerHelmString(override, source), valuesPath) + glog.Info(cmd) + + if res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput(); err != nil { + return fmt.Errorf("failed to deploy drainer [%s/%s], %v, %s", + source.Namespace, info.DrainerName, err, string(res)) + } + + return nil } -func (oa *operatorActions) BackupRestoreOrDie(from, to *TidbClusterConfig) { - if err := oa.BackupRestore(from, to); err != nil { +func (oa *operatorActions) DeployDrainerOrDie(info *DrainerConfig, source *TidbClusterConfig) { + if err := oa.DeployDrainer(info, source); err != nil { slack.NotifyAndPanic(err) } } + +func (oa *operatorActions) CheckDrainer(info *DrainerConfig, source *TidbClusterConfig) error { + glog.Infof("checking drainer [%s/%s]", info.DrainerName, source.Namespace) + + ns := source.Namespace + stsName := fmt.Sprintf("%s-%s-drainer", source.ClusterName, info.DrainerName) + fn := func() (bool, error) { + sts, err := oa.kubeCli.AppsV1().StatefulSets(source.Namespace).Get(stsName, v1.GetOptions{}) + if err != nil { + glog.Errorf("failed to get drainer StatefulSet %s ,%v", sts, err) + return false, nil + } + if *sts.Spec.Replicas != DrainerReplicas { + glog.Infof("StatefulSet: %s/%s .spec.Replicas(%d) != %d", + ns, sts.Name, *sts.Spec.Replicas, DrainerReplicas) + return false, nil + } + if sts.Status.ReadyReplicas != DrainerReplicas { + glog.Infof("StatefulSet: %s/%s .state.ReadyReplicas(%d) != %d", + ns, sts.Name, sts.Status.ReadyReplicas, DrainerReplicas) + } + return true, nil + } + + err := wait.Poll(DefaultPollInterval, DefaultPollTimeout, fn) + if err != nil { + return fmt.Errorf("failed to install drainer [%s/%s], %v", source.Namespace, info.DrainerName, err) + } + + return nil +} + +func (oa *operatorActions) RestoreIncrementalFiles(from *DrainerConfig, to *TidbClusterConfig, stopTSO int64) error { + glog.Infof("restoring incremental data from drainer [%s/%s] to TiDB cluster [%s/%s]", + from.Namespace, from.DrainerName, to.Namespace, to.ClusterName) + + // TODO: better incremental files restore solution + reparoConfig := strings.Join([]string{ + `data-dir = \"/data/pb\"`, + `log-level = \"info\"`, + `dest-type = \"mysql\"`, + `safe-mode = true`, + fmt.Sprintf(`stop-tso = %d`, stopTSO), + `[dest-db]`, + fmt.Sprintf(`host = \"%s\"`, util.GetTidbServiceName(to.ClusterName)), + "port = 4000", + `user = \"root\"`, + fmt.Sprintf(`password = \"%s\"`, to.Password), + }, "\n") + + temp, err := template.New("reparo-command").Parse(RunReparoCommandTemplate) + if err != nil { + return err + } + buff := new(bytes.Buffer) + if err := temp.Execute(buff, &struct { + Namespace string + ReparoConfig string + PodName string + StopTSO int64 + }{ + Namespace: from.Namespace, + ReparoConfig: reparoConfig, + PodName: fmt.Sprintf("%s-%s-drainer-0", from.SourceClusterName, from.DrainerName), + StopTSO: stopTSO, + }); err != nil { + return err + } + + cmd := buff.String() + glog.Infof("Restore incremental data, command: \n%s", cmd) + + if res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput(); err != nil { + return fmt.Errorf("failed to restore incremental files from dainer [%s/%s] to TiDB cluster [%s/%s], %v, %s", + from.Namespace, from.DrainerName, to.Namespace, to.ClusterName, err, res) + } + return nil +} diff --git a/tests/cluster_info.go b/tests/cluster_info.go index e2acced3f4..5075f5cfc2 100644 --- a/tests/cluster_info.go +++ b/tests/cluster_info.go @@ -60,6 +60,7 @@ func (tc *TidbClusterConfig) UpgradeTiDB(image string) *TidbClusterConfig { } func (tc *TidbClusterConfig) UpgradeAll(tag string) *TidbClusterConfig { + tc.ClusterVersion = tag return tc. UpgradePD("pingcap/pd:" + tag). UpgradeTiKV("pingcap/tikv:" + tag). diff --git a/tests/cmd/e2e/main.go b/tests/cmd/e2e/main.go index 06c8c32af7..e068dda70c 100644 --- a/tests/cmd/e2e/main.go +++ b/tests/cmd/e2e/main.go @@ -239,5 +239,6 @@ func newTidbClusterConfig(ns, clusterName, password string) *tests.TidbClusterCo }, TopologyKey: topologyKey, EnableConfigMapRollout: true, + ClusterVersion: tidbVersion, } } diff --git a/tests/cmd/stability/main.go b/tests/cmd/stability/main.go index 7cca68beee..de5b6733db 100644 --- a/tests/cmd/stability/main.go +++ b/tests/cmd/stability/main.go @@ -75,8 +75,10 @@ func run() { cluster2 := newTidbClusterConfig("ns2", "cluster2") cluster3 := newTidbClusterConfig("ns2", "cluster3") - restoreCluster1 := newTidbClusterConfig("ns1", "restore1") - restoreCluster2 := newTidbClusterConfig("ns2", "restore2") + directRestoreCluster1 := newTidbClusterConfig("ns1", "restore1") + fileRestoreCluster1 := newTidbClusterConfig("ns1", "file-restore1") + directRestoreCluster2 := newTidbClusterConfig("ns2", "restore2") + fileRestoreCluster2 := newTidbClusterConfig("ns2", "file-restore2") onePDCluster1 := newTidbClusterConfig("ns1", "one-pd-cluster-1") onePDCluster2 := newTidbClusterConfig("ns2", "one-pd-cluster-2") @@ -87,8 +89,10 @@ func run() { cluster1, cluster2, cluster3, - restoreCluster1, - restoreCluster2, + directRestoreCluster1, + fileRestoreCluster1, + directRestoreCluster2, + fileRestoreCluster2, onePDCluster1, onePDCluster2, } @@ -118,7 +122,7 @@ func run() { oa.CleanTidbClusterOrDie(cluster) } - caseFn := func(clusters []*tests.TidbClusterConfig, onePDClsuter *tests.TidbClusterConfig, restoreCluster *tests.TidbClusterConfig, upgradeVersion string) { + caseFn := func(clusters []*tests.TidbClusterConfig, onePDClsuter *tests.TidbClusterConfig, backupTargets []tests.BackupTarget, upgradeVersion string) { // check env fta.CheckAndRecoverEnvOrDie() oa.CheckK8sAvailableOrDie(nil, nil) @@ -206,10 +210,12 @@ func run() { } // backup and restore - oa.DeployTidbClusterOrDie(restoreCluster) - addDeployedClusterFn(restoreCluster) - oa.CheckTidbClusterStatusOrDie(restoreCluster) - oa.BackupRestoreOrDie(clusters[0], restoreCluster) + for i := range backupTargets { + oa.DeployTidbClusterOrDie(backupTargets[i].TargetCluster) + addDeployedClusterFn(backupTargets[i].TargetCluster) + oa.CheckTidbClusterStatusOrDie(backupTargets[i].TargetCluster) + } + oa.BackupAndRestoreToMultipleClustersOrDie(clusters[0], backupTargets) // delete operator oa.CleanOperatorOrDie(ocfg) @@ -289,7 +295,19 @@ func run() { cluster1, cluster2, } - caseFn(preUpgrade, onePDCluster1, restoreCluster1, upgradeVersions[0]) + backupTargets := []tests.BackupTarget{ + { + TargetCluster: directRestoreCluster1, + IsAdditional: false, + IncrementalType: tests.DbTypeTiDB, + }, + { + TargetCluster: fileRestoreCluster1, + IsAdditional: true, + IncrementalType: tests.DbTypeFile, + }, + } + caseFn(preUpgrade, onePDCluster1, backupTargets, upgradeVersions[0]) // after operator upgrade if cfg.UpgradeOperatorImage != "" && cfg.UpgradeOperatorTag != "" { @@ -306,8 +324,20 @@ func run() { if len(upgradeVersions) == 2 { v = upgradeVersions[1] } + postUpgradeBackupTargets := []tests.BackupTarget{ + { + TargetCluster: directRestoreCluster2, + IsAdditional: false, + IncrementalType: tests.DbTypeTiDB, + }, + { + TargetCluster: fileRestoreCluster2, + IsAdditional: true, + IncrementalType: tests.DbTypeFile, + }, + } // caseFn(postUpgrade, restoreCluster2, tidbUpgradeVersion) - caseFn(postUpgrade, onePDCluster2, restoreCluster2, v) + caseFn(postUpgrade, onePDCluster2, postUpgradeBackupTargets, v) } for _, cluster := range allClusters { @@ -378,5 +408,6 @@ func newTidbClusterConfig(ns, clusterName string) *tests.TidbClusterConfig { Monitor: true, BlockWriteConfig: cfg.BlockWriter, TopologyKey: topologyKey, + ClusterVersion: tidbVersion, } } diff --git a/tests/config.go b/tests/config.go index 284834ef05..4ca340331d 100644 --- a/tests/config.go +++ b/tests/config.go @@ -43,6 +43,9 @@ type Config struct { TiKVGrpcConcurrency int `yaml:"tikv_grpc_concurrency" json:"tikv_grpc_concurrency"` TiDBTokenLimit int `yaml:"tidb_token_limit" json:"tidb_token_limit"` + // old versions of reparo does not support idempotent incremental recover, so we lock the version explicitly + AdditionalDrainerVersion string `yaml:"file_drainer_version" json:"file_drainer_version"` + // Block writer BlockWriter blockwriter.Config `yaml:"block_writer,omitempty"` @@ -64,6 +67,8 @@ type Nodes struct { // NewConfig creates a new config. func NewConfig() (*Config, error) { cfg := &Config{ + AdditionalDrainerVersion: "v3.0.2", + PDMaxReplicas: 5, TiDBTokenLimit: 1024, TiKVGrpcConcurrency: 8, @@ -78,7 +83,7 @@ func NewConfig() (*Config, error) { flag.StringVar(&cfg.configFile, "config", "", "Config file") flag.StringVar(&cfg.LogDir, "log-dir", "/logDir", "log directory") flag.IntVar(&cfg.FaultTriggerPort, "fault-trigger-port", 23332, "the http port of fault trigger service") - flag.StringVar(&cfg.TidbVersions, "tidb-versions", "v3.0.0-rc.1,v3.0.0-rc.2,v3.0.1", "tidb versions") + flag.StringVar(&cfg.TidbVersions, "tidb-versions", "v3.0.0,v3.0.1,v3.0.2", "tidb versions") flag.StringVar(&cfg.OperatorTag, "operator-tag", "master", "operator tag used to choose charts") flag.StringVar(&cfg.OperatorImage, "operator-image", "pingcap/tidb-operator:latest", "operator image") flag.StringVar(&cfg.UpgradeOperatorTag, "upgrade-operator-tag", "", "upgrade operator tag used to choose charts") diff --git a/tests/drainer_info.go b/tests/drainer_info.go new file mode 100644 index 0000000000..c71d5d0443 --- /dev/null +++ b/tests/drainer_info.go @@ -0,0 +1,77 @@ +// Copyright 2019. PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tests + +import ( + "fmt" + "io/ioutil" + "strings" + + "github.com/golang/glog" +) + +type DbType string + +const ( + DbTypeMySQL DbType = "mysql" + DbTypeFile DbType = "file" + DbTypeTiDB DbType = "tidb" + // The following downstream types are not supported in e2e & stability tests yet + // DbTypeKafka DbType = "kafka" + // DbTypeFlash DbType = "flash" +) + +type DrainerConfig struct { + DrainerName string + InitialCommitTs string + OperatorTag string + SourceClusterName string + Namespace string + + DbType DbType + Host string + User string + Password string + // use string type in case of empty port (db-type=file) + Port string +} + +func (d *DrainerConfig) DrainerHelmString(m map[string]string, source *TidbClusterConfig) string { + + set := map[string]string{ + "clusterName": source.ClusterName, + "clusterVersion": source.ClusterVersion, + } + + for k, v := range m { + set[k] = v + } + + arr := make([]string, 0, len(set)) + for k, v := range set { + arr = append(arr, fmt.Sprintf("%s=%s", k, v)) + } + return strings.Join(arr, ",") +} + +func (d *DrainerConfig) BuildSubValues(dir string) (string, error) { + + values := GetDrainerSubValuesOrDie(d) + path := fmt.Sprintf("%s/%s.yaml", dir, d.DrainerName) + if err := ioutil.WriteFile(path, []byte(values), 0644); err != nil { + return "", err + } + glog.Infof("Values of drainer %s:\n %s", d.DrainerName, values) + return path, nil +} diff --git a/tests/pkg/util/db.go b/tests/pkg/util/db.go index a31ab6c201..a2652ad245 100644 --- a/tests/pkg/util/db.go +++ b/tests/pkg/util/db.go @@ -2,6 +2,8 @@ package util import ( "database/sql" + "fmt" + "strings" "github.com/golang/glog" ) @@ -17,3 +19,42 @@ func OpenDB(dsn string, maxIdleConns int) (*sql.DB, error) { glog.V(4).Info("DB opens successfully") return db, nil } + +// Show master commit ts of TiDB +func ShowMasterCommitTS(dsn string) (int64, error) { + db, err := sql.Open("mysql", dsn) + if err != nil { + return 0, err + } + defer db.Close() + + rows, err := db.Query("SHOW MASTER STATUS") + defer rows.Close() + if err != nil { + return 0, err + } + cols, err := rows.Columns() + if err != nil { + return 0, err + } + idx := -1 + vals := make([]interface{}, len(cols)) + for i := range cols { + if strings.ToLower(cols[i]) == "position" { + vals[i] = new(int64) + idx = i + } else { + vals[i] = new(sql.RawBytes) + } + } + if idx < 0 { + return 0, fmt.Errorf("Error show master commit ts of %s, cannot find 'Position' column", dsn) + } + if !rows.Next() { + return 0, fmt.Errorf("Error show master commit ts of %s, empty result set", dsn) + } + if err = rows.Scan(vals...); err != nil { + return 0, err + } + return *vals[idx].(*int64), nil +} diff --git a/tests/util.go b/tests/util.go index 91356e0329..86eafd6344 100644 --- a/tests/util.go +++ b/tests/util.go @@ -145,6 +145,34 @@ var binlogTemp string = `binlog: {{end}}{{end}} ` +var drainerConfigCommon string = ` +initialCommitTs: "{{ .InitialCommitTs }}" +config: | + detect-interval = 10 + compressor = "" + [syncer] + worker-count = 16 + disable-dispatch = false + ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql" + safe-mode = false + txn-batch = 20 +` + +var fileDrainerConfigTemp string = drainerConfigCommon + ` + db-type = "file" + [syncer.to] + dir = "/data/pb" +` + +var sqlDrainerConfigTemp string = drainerConfigCommon + ` + db-type = "{{ .DbType }}" + [syncer.to] + host = {{ .Host }} + user = {{ .User }} + password = {{ .Password }} + port = {{ .Port }} +` + type AffinityInfo struct { ClusterName string Kind string @@ -184,9 +212,9 @@ func GetSubValuesOrDie(clusterName, namespace, topologyKey string, pdConfig []st } subValues := fmt.Sprintf("%s%s%s", pdbuff.String(), tikvbuff.String(), tidbbuff.String()) - if pumpConfig == nil && drainerConfig == nil { - return subValues - } + //if pumpConfig == nil && drainerConfig == nil { + // return subValues + //} btemp, err := template.New("binlog").Parse(binlogTemp) if err != nil { @@ -201,6 +229,36 @@ func GetSubValuesOrDie(clusterName, namespace, topologyKey string, pdConfig []st return subValues } +func GetDrainerSubValuesOrDie(info *DrainerConfig) string { + if info == nil { + slack.NotifyAndPanic(fmt.Errorf("Cannot get drainer sub values, the drainer config is nil")) + } + buff := new(bytes.Buffer) + switch info.DbType { + case DbTypeFile: + temp, err := template.New("file-drainer").Parse(fileDrainerConfigTemp) + if err != nil { + slack.NotifyAndPanic(err) + } + if err := temp.Execute(buff, &info); err != nil { + slack.NotifyAndPanic(err) + } + case DbTypeTiDB: + fallthrough + case DbTypeMySQL: + temp, err := template.New("sql-drainer").Parse(sqlDrainerConfigTemp) + if err != nil { + slack.NotifyAndPanic(err) + } + if err := temp.Execute(buff, &info); err != nil { + slack.NotifyAndPanic(err) + } + default: + slack.NotifyAndPanic(fmt.Errorf("db-type %s has not been suppored yet", info.DbType)) + } + return buff.String() +} + const ( PodPollInterval = 2 * time.Second // PodTimeout is how long to wait for the pod to be started or