Skip to content

Commit

Permalink
Check if the cluster satisfies the version requirements before starti…
Browse files Browse the repository at this point in the history
…ng (pingcap#61)

* vendor: insert `go-semver` dependecy

* tidb, util: extract the HTTP JSON fetching code into its own function

* *: check for cluster versions before starting

* restore_test: added logs to detect spurious failure

Also, make the mock import operation only wait while we're still opening
engines to speed up the test.

* util: document GetJSON()

* restore: use a more robust version parsing strategy for TiDB

In semver, '2.0.4-1 < 2.0.4', so we need to properly skip the unrelated
portions of the version.

* restore, kv: detect if SwitchMode exists instead of check version

TiKV and PD's versions are exposed before 2.1, so we can't rely on their
APIs to verify for 2.0.4+. Since the only reason we need 2.0.4+ is the
SwitchMode gRPC interface, we instead inspect if SwitchMode is
unimplemented and display a user-friendly message on stderr.
  • Loading branch information
kennytm authored Sep 7, 2018
1 parent ddbb5e3 commit 6714e28
Show file tree
Hide file tree
Showing 12 changed files with 732 additions and 39 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a // indirect
github.com/codahale/hdrhistogram v0.0.0-20160425231609-f8ad88b59a58 // indirect
github.com/coreos/etcd v3.2.18+incompatible // indirect
github.com/coreos/go-semver v0.2.0
github.com/cznic/golex v0.0.0-20160422121650-da5a7153a510 // indirect
github.com/cznic/mathutil v0.0.0-20160613104831-78ad7f262603
github.com/cznic/parser v0.0.0-20160622100904-31edd927e5b1
Expand Down
33 changes: 33 additions & 0 deletions lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package common

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"strings"
"time"
Expand Down Expand Up @@ -210,3 +213,33 @@ func isRetryableError(err error) bool {
func UniqueTable(schema string, table string) string {
return fmt.Sprintf("`%s`.`%s`", schema, table)
}

// GetJSON fetches a page and parses it as JSON. The parsed result will be
// stored into the `v`. The variable `v` must be a pointer to a type that can be
// unmarshalled from JSON.
//
// Example:
//
// client := &http.Client{}
// var resp struct { IP string }
// if err := util.GetJSON(client, "http://api.ipify.org/?format=json", &resp); err != nil {
// return errors.Trace(err)
// }
// fmt.Println(resp.IP)
func GetJSON(client *http.Client, url string, v interface{}) error {
resp, err := client.Get(url)
if err != nil {
return errors.Trace(err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return errors.Trace(err)
}
return errors.Errorf("get %s http status code != 200, message %s", url, string(body))
}

return errors.Trace(json.NewDecoder(resp.Body).Decode(v))
}
8 changes: 5 additions & 3 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ func (c *Config) String() string {

type Lightning struct {
common.LogConfig
TableConcurrency int `toml:"table-concurrency" json:"table-concurrency"`
RegionConcurrency int `toml:"region-concurrency" json:"region-concurrency"`
ProfilePort int `toml:"pprof-port" json:"pprof-port"`
TableConcurrency int `toml:"table-concurrency" json:"table-concurrency"`
RegionConcurrency int `toml:"region-concurrency" json:"region-concurrency"`
ProfilePort int `toml:"pprof-port" json:"pprof-port"`
CheckRequirements bool `toml:"check-requirements" json:"check-requirements"`
}

// PostRestore has some options which will be executed after kv restored.
Expand All @@ -90,6 +91,7 @@ func NewConfig() *Config {
App: Lightning{
RegionConcurrency: runtime.NumCPU(),
TableConcurrency: 8,
CheckRequirements: true,
},
TiDB: DBStore{
SQLMode: "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION",
Expand Down
6 changes: 6 additions & 0 deletions lightning/kv/kv-deliver.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package kv

import (
"fmt"
"math"
"os"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -639,6 +642,9 @@ func (c *KVDeliverClient) Switch(mode sstpb.SwitchMode) error {
}
_, err := c.cli.SwitchMode(c.ctx, req)
if err != nil {
if strings.Contains(err.Error(), "status: Unimplemented") {
fmt.Fprintln(os.Stderr, "Error: The TiKV instance does not support mode switching. Please make sure the TiKV version is 2.0.4 or above.")
}
return errors.Trace(err)
}
common.AppLogger.Infof("switch to tikv %s mode takes %v", mode, time.Since(timer))
Expand Down
119 changes: 119 additions & 0 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
"database/sql"
"fmt"
"io"
"net/http"
"os"
"strings"
"sync"
"time"

"github.com/coreos/go-semver/semver"
"github.com/juju/errors"
sstpb "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/tidb-lightning/lightning/common"
Expand Down Expand Up @@ -40,6 +43,12 @@ const (
closeEngineMaxRetry = 5
)

var (
requiredTiDBVersion = *semver.New("2.0.4")
requiredPDVersion = *semver.New("2.0.4")
requiredTiKVVersion = *semver.New("2.0.4")
)

func init() {
cfg := tidbcfg.GetGlobalConfig()
cfg.Log.SlowThreshold = 3000
Expand Down Expand Up @@ -78,6 +87,7 @@ func (rc *RestoreController) Close() {
func (rc *RestoreController) Run(ctx context.Context) {
timer := time.Now()
opts := []func(context.Context) error{
rc.checkRequirements,
rc.switchToImportMode,
rc.restoreSchema,
rc.restoreTables,
Expand All @@ -93,6 +103,7 @@ func (rc *RestoreController) Run(ctx context.Context) {
}
if err != nil {
common.AppLogger.Errorf("run cause error : %s", errors.ErrorStack(err))
fmt.Fprintf(os.Stderr, "Error: %s\n", err)
break // ps : not continue
}
}
Expand Down Expand Up @@ -378,6 +389,114 @@ func (rc *RestoreController) switchTiKVMode(ctx context.Context, mode sstpb.Swit
return errors.Trace(cli.Switch(mode))
}

func (rc *RestoreController) checkRequirements(_ context.Context) error {
// skip requirement check if explicitly turned off
if !rc.cfg.App.CheckRequirements {
return nil
}

client := &http.Client{}
if err := rc.checkTiDBVersion(client); err != nil {
return errors.Trace(err)
}
// TODO: Reenable the PD/TiKV version check after we upgrade the dependency to 2.1.
if err := rc.checkPDVersion(client); err != nil {
// return errors.Trace(err)
common.AppLogger.Infof("PD version check failed: %v", err)
}
if err := rc.checkTiKVVersion(client); err != nil {
// return errors.Trace(err)
common.AppLogger.Infof("TiKV version check failed: %v", err)
}

return nil
}

func (rc *RestoreController) checkTiDBVersion(client *http.Client) error {
url := fmt.Sprintf("http://%s:%d/status", rc.cfg.TiDB.Host, rc.cfg.TiDB.StatusPort)
var status struct{ Version string }
err := common.GetJSON(client, url, &status)
if err != nil {
return errors.Trace(err)
}

// version format: "5.7.10-TiDB-v2.1.0-rc.1-7-g38c939f"
// ^~~~~~~~~^ we only want this part
// version format: "5.7.10-TiDB-v2.0.4-1-g06a0bf5"
// ^~~~^
versions := strings.Split(status.Version, "-")
if len(versions) < 5 {
return errors.Errorf("not a valid TiDB version: %s", status.Version)
}
rawVersion := strings.Join(versions[2:len(versions)-2], "-")
rawVersion = strings.TrimPrefix(rawVersion, "v")

version, err := semver.NewVersion(rawVersion)
if err != nil {
return errors.Trace(err)
}

return checkVersion("TiDB", requiredTiDBVersion, *version)
}

func (rc *RestoreController) checkPDVersion(client *http.Client) error {
url := fmt.Sprintf("http://%s/pd/api/v1/config/cluster-version", rc.cfg.TiDB.PdAddr)
var rawVersion string
err := common.GetJSON(client, url, &rawVersion)
if err != nil {
return errors.Trace(err)
}

version, err := semver.NewVersion(rawVersion)
if err != nil {
return errors.Trace(err)
}

return checkVersion("PD", requiredPDVersion, *version)
}

func (rc *RestoreController) checkTiKVVersion(client *http.Client) error {
url := fmt.Sprintf("http://%s/pd/api/v1/stores", rc.cfg.TiDB.PdAddr)

var stores struct {
Stores []struct {
Store struct {
Address string
Version string
}
}
}
err := common.GetJSON(client, url, &stores)
if err != nil {
return errors.Trace(err)
}

for _, store := range stores.Stores {
version, err := semver.NewVersion(store.Store.Version)
if err != nil {
return errors.Annotate(err, store.Store.Address)
}
component := fmt.Sprintf("TiKV (at %s)", store.Store.Address)
err = checkVersion(component, requiredTiKVVersion, *version)
if err != nil {
return errors.Trace(err)
}
}
return nil
}

func checkVersion(component string, expected, actual semver.Version) error {
if actual.Compare(expected) >= 0 {
return nil
}
return errors.Errorf(
"%s version too old, expected '>=%s', found '%s'",
component,
expected,
actual,
)
}

func (rc *RestoreController) getTables() []string {
var numOfTables int
for _, dbMeta := range rc.dbMetas {
Expand Down
35 changes: 29 additions & 6 deletions lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
"time"

pb "github.com/pingcap/kvproto/pkg/import_kvpb"
"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pingcap/tidb-lightning/lightning/config"
"github.com/pingcap/tidb-lightning/lightning/mydump"
"github.com/pingcap/tidb-lightning/lightning/restore"
"github.com/satori/go.uuid"

. "github.com/pingcap/check"
"golang.org/x/net/context"
Expand All @@ -25,7 +27,7 @@ import (

const (
tablesCount = 35
importDelay = 150 * time.Millisecond
importDelay = 500 * time.Millisecond
)

var _ = Suite(&testRestoreSuite{})
Expand Down Expand Up @@ -69,16 +71,27 @@ type mockKVService struct {
engineOverflowErrorFunc func() error
}

func formatUuid(uuidBytes []byte) string {
uuidValue, err := uuid.FromBytes(uuidBytes)
if err != nil {
panic(err)
}
return uuidValue.String()
}

func (s *mockKVService) SwitchMode(context.Context, *pb.SwitchModeRequest) (*pb.SwitchModeResponse, error) {
return &pb.SwitchModeResponse{}, nil
}
func (s *mockKVService) OpenEngine(_ context.Context, req *pb.OpenEngineRequest) (*pb.OpenEngineResponse, error) {
s.engineLock.Lock()
defer s.engineLock.Unlock()
s.engineList[string(req.Uuid)] += 0
uuid := formatUuid(req.Uuid)
s.engineList[uuid] += 0
if len(s.engineList) > s.engineOverflowLimit {
common.AppLogger.Errorf("[mock-importer] more than %d engines open: %v", s.engineOverflowLimit, s.engineList)
return nil, s.engineOverflowErrorFunc()
}
common.AppLogger.Infof("[mock-importer] opened engine %s; %v", uuid, s.engineList)
return &pb.OpenEngineResponse{}, nil
}
func (s *mockKVService) WriteEngine(wes pb.ImportKV_WriteEngineServer) error {
Expand All @@ -88,7 +101,8 @@ func (s *mockKVService) WriteEngine(wes pb.ImportKV_WriteEngineServer) error {
switch err {
case nil:
if head := req.GetHead(); head != nil {
engine = string(head.Uuid)
engine = formatUuid(head.Uuid)
common.AppLogger.Infof("[mock-importer] start write to engine %s", engine)
s.engineLock.Lock()
s.engineList[engine] += 1
s.engineLock.Unlock()
Expand All @@ -99,6 +113,7 @@ func (s *mockKVService) WriteEngine(wes pb.ImportKV_WriteEngineServer) error {
panic("Unexpected event type?")
}
case io.EOF:
common.AppLogger.Infof("[mock-importer] end write to engine %s", engine)
s.engineLock.Lock()
s.engineList[engine] -= 1
s.engineLock.Unlock()
Expand All @@ -111,7 +126,7 @@ func (s *mockKVService) WriteEngine(wes pb.ImportKV_WriteEngineServer) error {
func (s *mockKVService) CloseEngine(_ context.Context, req *pb.CloseEngineRequest) (*pb.CloseEngineResponse, error) {
s.engineLock.Lock()
defer s.engineLock.Unlock()
uuid := string(req.Uuid)
uuid := formatUuid(req.Uuid)
writerCount, exists := s.engineList[uuid]
if !exists {
return nil, fmt.Errorf("Engine %s not found", uuid)
Expand All @@ -120,10 +135,17 @@ func (s *mockKVService) CloseEngine(_ context.Context, req *pb.CloseEngineReques
return nil, fmt.Errorf("Engine %s still in use with %d writers left (EngineInUse)", uuid, writerCount)
}
delete(s.engineList, uuid)
common.AppLogger.Infof("[mock-importer] removed engine %s; %v", uuid, s.engineList)
return &pb.CloseEngineResponse{}, nil
}
func (s *mockKVService) ImportEngine(context.Context, *pb.ImportEngineRequest) (*pb.ImportEngineResponse, error) {
time.Sleep(importDelay)
s.engineLock.Lock()
defer s.engineLock.Unlock()
if len(s.engineList) > 0 {
// only simulate the slow import when multiple engines are open.
// this should speed up the test while still catching the original problem.
time.Sleep(importDelay)
}
s.events <- importEvent
return &pb.ImportEngineResponse{}, nil
}
Expand All @@ -137,7 +159,7 @@ func (s *mockKVService) CompactCluster(context.Context, *pb.CompactClusterReques
// Runs the mock tikv-importer gRPC service. Returns the server and its listening address.
func runMockKVServer(c *C, limit int, errorFunc func() error) (*grpc.Server, string, <-chan mockKVEvent) {
server := grpc.NewServer()
events := make(chan mockKVEvent, tablesCount*3)
events := make(chan mockKVEvent, tablesCount*4)
pb.RegisterImportKVServer(server, &mockKVService{
engineList: make(map[string]int),
events: events,
Expand Down Expand Up @@ -186,6 +208,7 @@ func createAppConfig(serverAddr string, concurrency int) *config.Config {
cfg := config.NewConfig()
cfg.TikvImporter.Addr = serverAddr
cfg.App.TableConcurrency = concurrency
cfg.App.CheckRequirements = false
// TODO Get rid of the TiDB test dependency!
cfg.TiDB.Host = "127.0.0.1"
cfg.TiDB.Port = 3306
Expand Down
Loading

0 comments on commit 6714e28

Please sign in to comment.