Skip to content

Commit

Permalink
Merge pull request pingcap#5 from lichunzhu/refactorCodeDir
Browse files Browse the repository at this point in the history
LGTM
  • Loading branch information
knull-cn authored Dec 30, 2021
2 parents 94c454c + 6a912e7 commit 03dae47
Show file tree
Hide file tree
Showing 84 changed files with 2,231 additions and 2,158 deletions.
32 changes: 18 additions & 14 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,17 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/txnlock"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/tidb/br/pkg/conn"
connutil "github.com/pingcap/tidb/br/pkg/conn/util"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/metautil"
Expand All @@ -30,22 +40,16 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/br/pkg/utils/utildb"
"github.com/pingcap/tidb/br/pkg/utils/utilpool"
"github.com/pingcap/tidb/distsql/request"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/ranger"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/txnlock"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// ClientMgr manages connections needed by backup.
Expand Down Expand Up @@ -226,7 +230,7 @@ func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) {
ranges = ranger.FullIntRange(false)
}

kvRanges, err := distsql.TableHandleRangesToKVRanges(nil, []int64{tblID}, tbl.IsCommonHandle, ranges, nil)
kvRanges, err := request.TableHandleRangesToKVRanges(nil, []int64{tblID}, tbl.IsCommonHandle, ranges, nil)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -236,7 +240,7 @@ func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) {
continue
}
ranges = ranger.FullRange()
idxRanges, err := distsql.IndexRangesToKVRanges(nil, tblID, index.ID, ranges, nil)
idxRanges, err := request.IndexRangesToKVRanges(nil, tblID, index.ID, ranges, nil)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -443,7 +447,7 @@ func (bc *Client) BackupRanges(
}

// we collect all files in a single goroutine to avoid thread safety issues.
workerPool := utils.NewWorkerPool(concurrency, "Ranges")
workerPool := utilpool.NewWorkerPool(concurrency, "Ranges")
eg, ectx := errgroup.WithContext(ctx)
for id, r := range ranges {
id := id
Expand Down Expand Up @@ -484,7 +488,7 @@ func (bc *Client) BackupRange(
zap.Uint32("concurrency", req.Concurrency))

var allStores []*metapb.Store
allStores, err = conn.GetAllTiKVStoresWithRetry(ctx, bc.mgr.GetPDClient(), conn.SkipTiFlash)
allStores, err = conn.GetAllTiKVStoresWithRetry(ctx, bc.mgr.GetPDClient(), connutil.SkipTiFlash)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -758,7 +762,7 @@ func OnBackupResponse(
return nil, 0, errors.Annotatef(berrors.ErrKVClusterIDMismatch, "%v on storeID: %d", resp.Error, storeID)
default:
// UNSAFE! TODO: use meaningful error code instead of unstructured message to find failed to write error.
if utils.MessageIsRetryableStorageError(resp.GetError().GetMsg()) {
if utildb.MessageIsRetryableStorageError(resp.GetError().GetMsg()) {
log.Warn("backup occur storage error", zap.String("error", resp.GetError().GetMsg()))
// back off 3000ms, for S3 is 99.99% available (i.e. the max outage time would less than 52.56mins per year),
// this time would be probably enough for s3 to resume.
Expand Down
5 changes: 4 additions & 1 deletion br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ import (
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/metapb"

berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/utils/utildb"

"go.uber.org/zap"
)

Expand Down Expand Up @@ -159,7 +162,7 @@ func (push *pushDown) pushBackup(
logutil.CL(ctx).Error("backup occur cluster ID error", zap.Reflect("error", v))
return res, errors.Annotatef(berrors.ErrKVClusterIDMismatch, "%v", errPb)
default:
if utils.MessageIsRetryableStorageError(errPb.GetMsg()) {
if utildb.MessageIsRetryableStorageError(errPb.GetMsg()) {
logutil.CL(ctx).Warn("backup occur storage error", zap.String("error", errPb.GetMsg()))
continue
}
Expand Down
8 changes: 5 additions & 3 deletions br/pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@ import (
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/pingcap/tidb/br/pkg/checksum"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/utils/utilpool"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/statistics/handle"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -81,7 +83,7 @@ func (ss *Schemas) BackupSchemas(
ctx = opentracing.ContextWithSpan(ctx, span1)
}

workerPool := utils.NewWorkerPool(concurrency, "Schemas")
workerPool := utilpool.NewWorkerPool(concurrency, "Schemas")
errg, ectx := errgroup.WithContext(ctx)
startAll := time.Now()
op := metautil.AppendSchema
Expand Down
82 changes: 17 additions & 65 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,6 @@ import (
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/txnlock"
pd "github.com/tikv/pd/client"
Expand All @@ -34,6 +26,17 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"

"github.com/pingcap/tidb/br/pkg/conn/util"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/utils/utildb"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
)

const (
Expand Down Expand Up @@ -115,68 +118,17 @@ type Mgr struct {
ownsStorage bool
}

// StoreBehavior is the action to do in GetAllTiKVStores when a non-TiKV
// store (e.g. TiFlash store) is found.
type StoreBehavior uint8

const (
// ErrorOnTiFlash causes GetAllTiKVStores to return error when the store is
// found to be a TiFlash node.
ErrorOnTiFlash StoreBehavior = 0
// SkipTiFlash causes GetAllTiKVStores to skip the store when it is found to
// be a TiFlash node.
SkipTiFlash StoreBehavior = 1
// TiFlashOnly caused GetAllTiKVStores to skip the store which is not a
// TiFlash node.
TiFlashOnly StoreBehavior = 2
)

// GetAllTiKVStores returns all TiKV stores registered to the PD client. The
// stores must not be a tombstone and must never contain a label `engine=tiflash`.
func GetAllTiKVStores(
ctx context.Context,
pdClient pd.Client,
storeBehavior StoreBehavior,
) ([]*metapb.Store, error) {
// get all live stores.
stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return nil, errors.Trace(err)
}

// filter out all stores which are TiFlash.
j := 0
for _, store := range stores {
isTiFlash := false
if version.IsTiFlash(store) {
if storeBehavior == SkipTiFlash {
continue
} else if storeBehavior == ErrorOnTiFlash {
return nil, errors.Annotatef(berrors.ErrPDInvalidResponse,
"cannot restore to a cluster with active TiFlash stores (store %d at %s)", store.Id, store.Address)
}
isTiFlash = true
}
if !isTiFlash && storeBehavior == TiFlashOnly {
continue
}
stores[j] = store
j++
}
return stores[:j], nil
}

func GetAllTiKVStoresWithRetry(ctx context.Context,
pdClient pd.Client,
storeBehavior StoreBehavior,
storeBehavior util.StoreBehavior,
) ([]*metapb.Store, error) {
stores := make([]*metapb.Store, 0)
var err error

errRetry := utils.WithRetry(
errRetry := utildb.WithRetry(
ctx,
func() error {
stores, err = GetAllTiKVStores(ctx, pdClient, storeBehavior)
stores, err = util.GetAllTiKVStores(ctx, pdClient, storeBehavior)
failpoint.Inject("hint-GetAllTiKVStores-error", func(val failpoint.Value) {
if val.(bool) {
logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-error injected.")
Expand All @@ -201,9 +153,9 @@ func GetAllTiKVStoresWithRetry(ctx context.Context,

func checkStoresAlive(ctx context.Context,
pdclient pd.Client,
storeBehavior StoreBehavior) error {
storeBehavior util.StoreBehavior) error {
// Check live tikv.
stores, err := GetAllTiKVStores(ctx, pdclient, storeBehavior)
stores, err := util.GetAllTiKVStores(ctx, pdclient, storeBehavior)
if err != nil {
log.Error("fail to get store", zap.Error(err))
return errors.Trace(err)
Expand Down Expand Up @@ -231,7 +183,7 @@ func NewMgr(
tlsConf *tls.Config,
securityOption pd.SecurityOption,
keepalive keepalive.ClientParameters,
storeBehavior StoreBehavior,
storeBehavior util.StoreBehavior,
checkRequirements bool,
needDomain bool,
) (*Mgr, error) {
Expand Down
30 changes: 16 additions & 14 deletions br/pkg/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/tidb/br/pkg/conn/util"
"github.com/pingcap/tidb/br/pkg/pdutil"
)

type fakePDClient struct {
Expand Down Expand Up @@ -60,7 +62,7 @@ func TestGetAllTiKVStoresWithRetryCancel(t *testing.T) {
stores: stores,
}

_, err := GetAllTiKVStoresWithRetry(ctx, fpdc, SkipTiFlash)
_, err := GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash)
require.Error(t, err)
require.Equal(t, codes.Canceled, status.Code(errors.Cause(err)))
}
Expand Down Expand Up @@ -100,7 +102,7 @@ func TestGetAllTiKVStoresWithUnknown(t *testing.T) {
stores: stores,
}

_, err := GetAllTiKVStoresWithRetry(ctx, fpdc, SkipTiFlash)
_, err := GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash)
require.Error(t, err)
require.Equal(t, codes.Unknown, status.Code(errors.Cause(err)))
}
Expand Down Expand Up @@ -155,12 +157,12 @@ func TestCheckStoresAlive(t *testing.T) {
stores: stores,
}

kvStores, err := GetAllTiKVStoresWithRetry(ctx, fpdc, SkipTiFlash)
kvStores, err := GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash)
require.NoError(t, err)
require.Len(t, kvStores, 2)
require.Equal(t, stores[2:], kvStores)

err = checkStoresAlive(ctx, fpdc, SkipTiFlash)
err = checkStoresAlive(ctx, fpdc, util.SkipTiFlash)
require.NoError(t, err)
}

Expand All @@ -169,38 +171,38 @@ func TestGetAllTiKVStores(t *testing.T) {

testCases := []struct {
stores []*metapb.Store
storeBehavior StoreBehavior
storeBehavior util.StoreBehavior
expectedStores map[uint64]int
expectedError string
}{
{
stores: []*metapb.Store{
{Id: 1},
},
storeBehavior: SkipTiFlash,
storeBehavior: util.SkipTiFlash,
expectedStores: map[uint64]int{1: 1},
},
{
stores: []*metapb.Store{
{Id: 1},
},
storeBehavior: ErrorOnTiFlash,
storeBehavior: util.ErrorOnTiFlash,
expectedStores: map[uint64]int{1: 1},
},
{
stores: []*metapb.Store{
{Id: 1},
{Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}},
},
storeBehavior: SkipTiFlash,
storeBehavior: util.SkipTiFlash,
expectedStores: map[uint64]int{1: 1},
},
{
stores: []*metapb.Store{
{Id: 1},
{Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}},
},
storeBehavior: ErrorOnTiFlash,
storeBehavior: util.ErrorOnTiFlash,
expectedError: "cannot restore to a cluster with active TiFlash stores.*",
},
{
Expand All @@ -212,7 +214,7 @@ func TestGetAllTiKVStores(t *testing.T) {
{Id: 5, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tikv"}, {Key: "engine", Value: "tiflash"}}},
{Id: 6, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tiflash"}, {Key: "engine", Value: "tikv"}}},
},
storeBehavior: SkipTiFlash,
storeBehavior: util.SkipTiFlash,
expectedStores: map[uint64]int{1: 1, 3: 1, 4: 1, 6: 1},
},
{
Expand All @@ -224,7 +226,7 @@ func TestGetAllTiKVStores(t *testing.T) {
{Id: 5, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tikv"}, {Key: "engine", Value: "tiflash"}}},
{Id: 6, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tiflash"}, {Key: "engine", Value: "tikv"}}},
},
storeBehavior: ErrorOnTiFlash,
storeBehavior: util.ErrorOnTiFlash,
expectedError: "cannot restore to a cluster with active TiFlash stores.*",
},
{
Expand All @@ -236,14 +238,14 @@ func TestGetAllTiKVStores(t *testing.T) {
{Id: 5, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tikv"}, {Key: "engine", Value: "tiflash"}}},
{Id: 6, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tiflash"}, {Key: "engine", Value: "tikv"}}},
},
storeBehavior: TiFlashOnly,
storeBehavior: util.TiFlashOnly,
expectedStores: map[uint64]int{2: 1, 5: 1},
},
}

for _, testCase := range testCases {
pdClient := fakePDClient{stores: testCase.stores}
stores, err := GetAllTiKVStores(context.Background(), pdClient, testCase.storeBehavior)
stores, err := util.GetAllTiKVStores(context.Background(), pdClient, testCase.storeBehavior)
if len(testCase.expectedError) != 0 {
require.Error(t, err)
require.Regexp(t, testCase.expectedError, err.Error())
Expand Down
Loading

0 comments on commit 03dae47

Please sign in to comment.