Skip to content

Commit

Permalink
Merge branch 'release-5.3' into release-5.3-4947cf1f596d
Browse files Browse the repository at this point in the history
  • Loading branch information
LittleFall authored Sep 19, 2022
2 parents 469a831 + e380216 commit 8eb8835
Show file tree
Hide file tree
Showing 311 changed files with 7,545 additions and 1,648 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/br_compatible_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ on:
push:
branches:
- master
- 'release-[0-9].[0-9]*'
paths:
- 'br/**'
- '!**.html'
Expand All @@ -17,7 +16,6 @@ on:
pull_request:
branches:
- master
- 'release-[0-9].[0-9]*'
paths:
- 'br/**'
- '!**.html'
Expand Down
2 changes: 0 additions & 2 deletions .github/workflows/compile_br.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ on:
push:
branches:
- master
- 'release-[0-9].[0-9]*'
paths:
- 'br/**'
- '!**.html'
Expand All @@ -16,7 +15,6 @@ on:
pull_request:
branches:
- master
- 'release-[0-9].[0-9]*'
paths:
- 'br/**'
- '!**.html'
Expand Down
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ coverage.txt
var
fix.sql
export-20*/
*-coverage.xml
*-junit-report.xml
# Files generated when testing
out
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
run:
timeout: 7m
timeout: 10m
linters:
disable-all: true
enable:
Expand Down
21 changes: 17 additions & 4 deletions br/cmd/tidb-lightning/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@ import (
"syscall"

"github.com/pingcap/tidb/br/pkg/lightning"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"go.uber.org/zap"
)

func main() {
globalCfg := config.Must(config.LoadGlobalConfig(os.Args[1:], nil))
fmt.Fprintf(os.Stdout, "Verbose debug logs will be written to %s\n\n", globalCfg.App.Config.File)
logToFile := globalCfg.App.File != "" && globalCfg.App.File != "-"
if logToFile {
fmt.Fprintf(os.Stdout, "Verbose debug logs will be written to %s\n\n", globalCfg.App.Config.File)
}

app := lightning.New(globalCfg)

Expand Down Expand Up @@ -86,16 +90,25 @@ func main() {
return app.RunOnce(context.Background(), cfg, nil)
}()

finished := true
if common.IsContextCanceledError(err) {
err = nil
finished = false
}
if err != nil {
logger.Error("tidb lightning encountered error stack info", zap.Error(err))
fmt.Fprintln(os.Stderr, "tidb lightning encountered error: ", err)
} else {
logger.Info("tidb lightning exit")
fmt.Fprintln(os.Stdout, "tidb lightning exit")
logger.Info("tidb lightning exit", zap.Bool("finished", finished))
exitMsg := "tidb lightning exit successfully"
if !finished {
exitMsg = "tidb lightning canceled"
}
fmt.Fprintln(os.Stdout, exitMsg)
}

// call Sync() with log to stdout may return error in some case, so just skip it
if globalCfg.App.File != "" {
if logToFile {
syncErr := logger.Sync()
if syncErr != nil {
fmt.Fprintln(os.Stderr, "sync log failed", syncErr)
Expand Down
45 changes: 34 additions & 11 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func (bc *Client) BackupRange(
// TODO: test fine grained backup.
err = bc.fineGrainedBackup(
ctx, startKey, endKey, req.StartVersion, req.EndVersion, req.CompressionType, req.CompressionLevel,
req.RateLimit, req.Concurrency, results, progressCallBack)
req.RateLimit, req.Concurrency, req.IsRawKv, req.CipherInfo, results, progressCallBack)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -550,10 +550,12 @@ func (bc *Client) BackupRange(
return nil
}

func (bc *Client) findRegionLeader(ctx context.Context, key []byte) (*metapb.Peer, error) {
func (bc *Client) findRegionLeader(ctx context.Context, key []byte, isRawKv bool) (*metapb.Peer, error) {
// Keys are saved in encoded format in TiKV, so the key must be encoded
// in order to find the correct region.
key = codec.EncodeBytes([]byte{}, key)
if !isRawKv {
key = codec.EncodeBytes([]byte{}, key)
}
for i := 0; i < 5; i++ {
// better backoff.
region, err := bc.mgr.GetPDClient().GetRegion(ctx, key)
Expand Down Expand Up @@ -584,6 +586,8 @@ func (bc *Client) fineGrainedBackup(
compressLevel int32,
rateLimit uint64,
concurrency uint32,
isRawKv bool,
cipherInfo *backuppb.CipherInfo,
rangeTree rtree.RangeTree,
progressCallBack func(ProgressUnit),
) error {
Expand Down Expand Up @@ -634,7 +638,7 @@ func (bc *Client) fineGrainedBackup(
for rg := range retry {
backoffMs, err :=
bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS,
compressType, compressLevel, rateLimit, concurrency, respCh)
compressType, compressLevel, rateLimit, concurrency, isRawKv, cipherInfo, respCh)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -779,9 +783,11 @@ func (bc *Client) handleFineGrained(
compressionLevel int32,
rateLimit uint64,
concurrency uint32,
isRawKv bool,
cipherInfo *backuppb.CipherInfo,
respCh chan<- *backuppb.BackupResponse,
) (int, error) {
leader, pderr := bc.findRegionLeader(ctx, rg.StartKey)
leader, pderr := bc.findRegionLeader(ctx, rg.StartKey, isRawKv)
if pderr != nil {
return 0, errors.Trace(pderr)
}
Expand All @@ -796,8 +802,10 @@ func (bc *Client) handleFineGrained(
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
IsRawKv: isRawKv,
CompressionType: compressType,
CompressionLevel: compressionLevel,
CipherInfo: cipherInfo,
}
lockResolver := bc.mgr.GetLockResolver()
client, err := bc.mgr.GetBackupClient(ctx, storeID)
Expand Down Expand Up @@ -902,9 +910,17 @@ backupLoop:
})
bcli, err := client.Backup(ctx, &req)
failpoint.Inject("reset-retryable-error", func(val failpoint.Value) {
if val.(bool) {
logutil.CL(ctx).Debug("failpoint reset-retryable-error injected.")
err = status.Error(codes.Unavailable, "Unavailable error")
switch val.(string) {
case "Unavaiable":
{
logutil.CL(ctx).Debug("failpoint reset-retryable-error unavailable injected.")
err = status.Error(codes.Unavailable, "Unavailable error")
}
case "Internal":
{
logutil.CL(ctx).Debug("failpoint reset-retryable-error internal injected.")
err = status.Error(codes.Internal, "Internal error")
}
}
})
failpoint.Inject("reset-not-retryable-error", func(val failpoint.Value) {
Expand Down Expand Up @@ -970,16 +986,23 @@ const (

// isRetryableError represents whether we should retry reset grpc connection.
func isRetryableError(err error) bool {

if status.Code(err) == codes.Unavailable {
return true
// some errors can be retried
// https://github.com/pingcap/tidb/issues/34350
switch status.Code(err) {
case codes.Unavailable, codes.DeadlineExceeded,
codes.ResourceExhausted, codes.Aborted, codes.Internal:
{
log.Warn("backup met some errors, these errors can be retry 5 times", zap.Error(err))
return true
}
}

// At least, there are two possible cancel() call,
// one from backup range, another from gRPC, here we retry when gRPC cancel with connection closing
if status.Code(err) == codes.Canceled {
if s, ok := status.FromError(err); ok {
if strings.Contains(s.Message(), gRPC_Cancel) {
log.Warn("backup met grpc cancel error, this errors can be retry 5 times", zap.Error(err))
return true
}
}
Expand Down
2 changes: 0 additions & 2 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ func (r *testBackup) TestSendCreds(c *C) {
c.Assert(err, IsNil)
opts := &storage.ExternalStorageOptions{
SendCredentials: true,
SkipCheckPath: true,
}
_, err = storage.New(r.ctx, backend, opts)
c.Assert(err, IsNil)
Expand All @@ -277,7 +276,6 @@ func (r *testBackup) TestSendCreds(c *C) {
c.Assert(err, IsNil)
opts = &storage.ExternalStorageOptions{
SendCredentials: false,
SkipCheckPath: true,
}
_, err = storage.New(r.ctx, backend, opts)
c.Assert(err, IsNil)
Expand Down
17 changes: 17 additions & 0 deletions br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/metapb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
Expand Down Expand Up @@ -116,6 +117,7 @@ func (push *pushDown) pushBackup(
close(push.respCh)
}()

regionErrorIngestedOnce := false
for {
select {
case respAndStore, ok := <-push.respCh:
Expand All @@ -139,6 +141,21 @@ func (push *pushDown) pushBackup(
Msg: msg,
}
})
failpoint.Inject("tikv-region-error", func(val failpoint.Value) {
if !regionErrorIngestedOnce {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint tikv-regionh-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
// Msg: msg,
Detail: &backuppb.Error_RegionError{
RegionError: &errorpb.Error{
Message: msg,
},
},
}
}
regionErrorIngestedOnce = true
})
if resp.GetError() == nil {
// None error means range has been backuped successfully.
res.Put(
Expand Down
1 change: 1 addition & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Session interface {
CreateDatabase(ctx context.Context, schema *model.DBInfo) error
CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error
Close()
GetGlobalVariable(name string) (string, error)
}

// Progress is an interface recording the current execution progress.
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ func (gs *tidbSession) Close() {
gs.se.Close()
}

// GetGlobalVariables implements glue.Session.
func (gs *tidbSession) GetGlobalVariable(name string) (string, error) {
return gs.se.GetSessionVars().GlobalVarsAccessor.GetTiDBTableValue(name)
}

// showCreateTable shows the result of SHOW CREATE TABLE from a TableInfo.
func (gs *tidbSession) showCreateTable(tbl *model.TableInfo) (string, error) {
table := tbl.Clone()
Expand Down
16 changes: 12 additions & 4 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,11 @@ const (
gRPCBackOffMaxDelay = 10 * time.Minute

// See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360
regionMaxKeyCount = 1_440_000
// lower the max-key-count to avoid tikv trigger region auto split
regionMaxKeyCount = 1_280_000
defaultRegionSplitSize = 96 * units.MiB
// The max ranges count in a batch to split and scatter.
maxBatchSplitRanges = 4096

propRangeIndex = "tikv.range_index"

Expand Down Expand Up @@ -915,7 +918,7 @@ func NewLocalBackend(
if err != nil {
return backend.MakeBackend(nil), errors.Annotate(err, "construct pd client failed")
}
splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig())
splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig(), false)

shouldCreate := true
if cfg.Checkpoint.Enable {
Expand Down Expand Up @@ -1513,7 +1516,12 @@ func (local *local) WriteToTiKV(
size := int64(0)
totalCount := int64(0)
firstLoop := true
regionMaxSize := regionSplitSize * 4 / 3
// if region-split-size <= 96MiB, we bump the threshold a bit to avoid too many retry split
// because the range-properties is not 100% accurate
regionMaxSize := regionSplitSize
if regionSplitSize <= defaultRegionSplitSize {
regionMaxSize = regionSplitSize * 4 / 3
}

for iter.First(); iter.Valid(); iter.Next() {
size += int64(len(iter.Key()) + len(iter.Value()))
Expand Down Expand Up @@ -2060,7 +2068,7 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi
needSplit := len(unfinishedRanges) > 1 || lfTotalSize > regionSplitSize || lfLength > regionSplitKeys
// split region by given ranges
for i := 0; i < maxRetryTimes; i++ {
err = local.SplitAndScatterRegionByRanges(ctx, unfinishedRanges, lf.tableInfo, needSplit, regionSplitSize)
err = local.SplitAndScatterRegionInBatches(ctx, unfinishedRanges, lf.tableInfo, needSplit, regionSplitSize, maxBatchSplitRanges)
if err == nil || common.IsContextCanceledError(err) {
break
}
Expand Down
Loading

0 comments on commit 8eb8835

Please sign in to comment.