Skip to content

Commit

Permalink
Backup: split batch store backup requests (#54151)
Browse files Browse the repository at this point in the history
ref #52534
  • Loading branch information
3pointer authored Jun 25, 2024
1 parent 7211d3d commit 99abc28
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 57 deletions.
2 changes: 1 addition & 1 deletion br/pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ go_test(
embed = [":backup"],
flaky = True,
race = "on",
shard_count = 11,
shard_count = 12,
deps = [
"//br/pkg/conn",
"//br/pkg/gluetidb/mock",
Expand Down
12 changes: 9 additions & 3 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ type MainBackupLoop struct {
// backup requests for all stores.
// the subRanges may changed every round.
BackupReq backuppb.BackupRequest
// the number of backup clients to send backup requests per store.
Concurrency uint
// record the whole backup progress in infinite loop.
GlobalProgressTree *rtree.ProgressRangeTree
ReplicaReadLabel map[string]string
Expand All @@ -91,6 +93,7 @@ func (s *MainBackupSender) SendAsync(
round uint64,
storeID uint64,
request backuppb.BackupRequest,
concurrency uint,
cli backuppb.BackupClient,
respCh chan *ResponseAndStore,
StateNotifier chan BackupRetryPolicy,
Expand All @@ -100,7 +103,7 @@ func (s *MainBackupSender) SendAsync(
logutil.CL(ctx).Info("store backup goroutine exits", zap.Uint64("store", storeID))
close(respCh)
}()
err := startBackup(ctx, storeID, request, cli, respCh)
err := startBackup(ctx, storeID, request, cli, concurrency, respCh)
if err != nil {
// only 2 kinds of errors will occur here.
// 1. grpc connection error(already retry inside)
Expand Down Expand Up @@ -176,6 +179,8 @@ func (bc *Client) RunLoop(ctx context.Context, loop *MainBackupLoop) error {
mainLoop:
for {
round += 1
// sleep 200ms. in case of tikv cluster abnormal state trigger too many backup rounds.
time.Sleep(200 * time.Millisecond)
logutil.CL(ctx).Info("This round of backup starts...", zap.Uint64("round", round))
// initialize the error context every round
errContext := utils.NewErrorContext("MainBackupLoop", 10)
Expand Down Expand Up @@ -250,7 +255,7 @@ mainLoop:
}
ch := make(chan *ResponseAndStore)
storeBackupResultChMap[storeID] = ch
loop.SendAsync(mainCtx, round, storeID, loop.BackupReq, cli, ch, loop.StateNotifier)
loop.SendAsync(mainCtx, round, storeID, loop.BackupReq, loop.Concurrency, cli, ch, loop.StateNotifier)
}
// infinite loop to collect region backup response to global channel
loop.CollectStoreBackupsAsync(handleCtx, round, storeBackupResultChMap, globalBackupResultCh)
Expand Down Expand Up @@ -309,7 +314,7 @@ mainLoop:

storeBackupResultChMap[storeID] = ch
// start backup for this store
loop.SendAsync(mainCtx, round, storeID, loop.BackupReq, cli, ch, loop.StateNotifier)
loop.SendAsync(mainCtx, round, storeID, loop.BackupReq, loop.Concurrency, cli, ch, loop.StateNotifier)
// re-create context for new handler loop
handleCtx, handleCancel = context.WithCancel(mainCtx)
// handleCancel makes the former collect goroutine exits
Expand Down Expand Up @@ -1136,6 +1141,7 @@ func (bc *Client) BackupRanges(
mainBackupLoop := &MainBackupLoop{
BackupSender: &MainBackupSender{},
BackupReq: request,
Concurrency: concurrency,
GlobalProgressTree: &globalProgressTree,
ReplicaReadLabel: replicaReadLabel,
StateNotifier: stateNotifier,
Expand Down
65 changes: 64 additions & 1 deletion br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (m *mockBackupBackupSender) SendAsync(
round uint64,
storeID uint64,
request backuppb.BackupRequest,
concurrency uint,
cli backuppb.BackupClient,
respCh chan *backup.ResponseAndStore,
StateNotifier chan backup.BackupRetryPolicy,
Expand Down Expand Up @@ -552,7 +553,7 @@ func TestMainBackupLoop(t *testing.T) {
// cancel the backup in another goroutine
ctx, cancel := context.WithCancel(backgroundCtx)
go func() {
time.Sleep(500 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
cancel()
}()
connectedStore = make(map[uint64]int)
Expand Down Expand Up @@ -807,3 +808,65 @@ func TestObserveStoreChangesAsync(t *testing.T) {

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/backup-store-change-tick"))
}

func TestSplitBackupReqRanges(t *testing.T) {
req := backuppb.BackupRequest{
SubRanges: []*kvrpcpb.KeyRange{},
}

// case #1 empty ranges
res := backup.SplitBackupReqRanges(req, 1)
require.Len(t, res, 1)
// case #2 empty ranges and limit is 0
res = backup.SplitBackupReqRanges(req, 0)
require.Len(t, res, 1)

genSubRanges := func(req *backuppb.BackupRequest, count int) {
for i := 0; i < count; i++ {
req.SubRanges = append(req.SubRanges, &kvrpcpb.KeyRange{
StartKey: []byte{byte(i)},
EndKey: []byte{byte(i + 1)},
})
}
}

genSubRanges(&req, 10)
// case #3: 10 subranges and split into 10 parts
res = backup.SplitBackupReqRanges(req, 10)
require.Len(t, res, 10)
for i := 0; i < 10; i++ {
require.Equal(t, res[i].SubRanges[0].StartKey, req.SubRanges[i].StartKey)
require.Equal(t, res[i].SubRanges[0].EndKey, req.SubRanges[i].EndKey)
}

// case #3.1: 10 subranges and split into 11 parts(has no difference with 10 parts)
res = backup.SplitBackupReqRanges(req, 11)
require.Len(t, res, 10)
for i := 0; i < 10; i++ {
require.Equal(t, res[i].SubRanges[0].StartKey, req.SubRanges[i].StartKey)
require.Equal(t, res[i].SubRanges[0].EndKey, req.SubRanges[i].EndKey)
}

// case #3.2: 10 subranges and split into 9 parts(has no difference with 10 parts)
res = backup.SplitBackupReqRanges(req, 9)
require.Len(t, res, 10)
for i := 0; i < 10; i++ {
require.Equal(t, res[i].SubRanges[0].StartKey, req.SubRanges[i].StartKey)
require.Equal(t, res[i].SubRanges[0].EndKey, req.SubRanges[i].EndKey)
}

// case #4: 10 subranges and split into 3 parts, each part has 3 subranges
// but actually it will generate 4 parts due to not divisible.
res = backup.SplitBackupReqRanges(req, 3)
require.Len(t, res, 4)
for i := 0; i < 3; i++ {
require.Len(t, res[i].SubRanges, 3)
for j := 0; j < 3; j++ {
require.Equal(t, res[i].SubRanges[j].StartKey, req.SubRanges[i*3+j].StartKey)
require.Equal(t, res[i].SubRanges[j].EndKey, req.SubRanges[i*3+j].EndKey)
}
}
require.Len(t, res[3].SubRanges, 1)
require.Equal(t, res[3].SubRanges[0].StartKey, req.SubRanges[9].StartKey)
require.Equal(t, res[3].SubRanges[0].EndKey, req.SubRanges[9].EndKey)
}
145 changes: 93 additions & 52 deletions br/pkg/backup/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/utils/storewatch"
tidbutil "github.com/pingcap/tidb/pkg/util"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand All @@ -36,6 +38,7 @@ type BackupSender interface {
round uint64,
storeID uint64,
request backuppb.BackupRequest,
concurrency uint,
cli backuppb.BackupClient,
respCh chan *ResponseAndStore,
StateNotifier chan BackupRetryPolicy)
Expand Down Expand Up @@ -130,66 +133,79 @@ func startBackup(
storeID uint64,
backupReq backuppb.BackupRequest,
backupCli backuppb.BackupClient,
concurrency uint,
respCh chan *ResponseAndStore,
) error {
// this goroutine handle the response from a single store
select {
case <-ctx.Done():
return ctx.Err()
default:
retry := -1
return utils.WithRetry(ctx, func() error {
retry += 1
logutil.CL(ctx).Info("try backup", zap.Uint64("storeID", storeID), zap.Int("retry time", retry))
// Send backup request to the store.
// handle the backup response or internal error here.
// handle the store error(reboot or network partition) outside.
return doSendBackup(ctx, backupCli, backupReq, func(resp *backuppb.BackupResponse) error {
// Forward all responses (including error).
failpoint.Inject("backup-timeout-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ctx).Info("failpoint backup-timeout-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
})
failpoint.Inject("backup-storage-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint backup-storage-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
})
failpoint.Inject("tikv-rw-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint tikv-rw-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
})
failpoint.Inject("tikv-region-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint tikv-region-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
// Msg: msg,
Detail: &backuppb.Error_RegionError{
RegionError: &errorpb.Error{
Message: msg,
},
},
}
})
select {
case <-ctx.Done():
return ctx.Err()
case respCh <- &ResponseAndStore{
Resp: resp,
StoreID: storeID,
}:
}
return nil
logutil.CL(ctx).Info("try backup", zap.Uint64("storeID", storeID))
// Send backup request to the store.
// handle the backup response or internal error here.
// handle the store error(reboot or network partition) outside.
reqs := SplitBackupReqRanges(backupReq, concurrency)
pool := tidbutil.NewWorkerPool(concurrency, "store_backup")
eg, ectx := errgroup.WithContext(ctx)
for i, req := range reqs {
bkReq := req
reqIndex := i
pool.ApplyOnErrorGroup(eg, func() error {
retry := -1
return utils.WithRetry(ectx, func() error {
retry += 1
logutil.CL(ectx).Info("backup to store", zap.Uint64("storeID", storeID),
zap.Int("retry", retry), zap.Int("reqIndex", reqIndex))
return doSendBackup(ectx, backupCli, bkReq, func(resp *backuppb.BackupResponse) error {
// Forward all responses (including error).
failpoint.Inject("backup-timeout-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ectx).Info("failpoint backup-timeout-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
})
failpoint.Inject("backup-storage-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ectx).Debug("failpoint backup-storage-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
})
failpoint.Inject("tikv-rw-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ectx).Debug("failpoint tikv-rw-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
})
failpoint.Inject("tikv-region-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ectx).Debug("failpoint tikv-region-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
// Msg: msg,
Detail: &backuppb.Error_RegionError{
RegionError: &errorpb.Error{
Message: msg,
},
},
}
})
select {
case <-ectx.Done():
return ectx.Err()
case respCh <- &ResponseAndStore{
Resp: resp,
StoreID: storeID,
}:
}
return nil
})
}, utils.NewBackupSSTBackoffer())
})
}, utils.NewBackupSSTBackoffer())
}
return eg.Wait()
}
}

Expand Down Expand Up @@ -264,3 +280,28 @@ func ObserveStoreChangesAsync(ctx context.Context, stateNotifier chan BackupRetr
}
}()
}

func SplitBackupReqRanges(req backuppb.BackupRequest, count uint) []backuppb.BackupRequest {
rangeCount := len(req.SubRanges)
if rangeCount == 0 {
return []backuppb.BackupRequest{req}
}
splitRequests := make([]backuppb.BackupRequest, 0, count)
if count <= 1 {
// 0/1 means no need to split, just send one batch request
return []backuppb.BackupRequest{req}
}
splitStep := rangeCount / int(count)
if splitStep == 0 {
// splitStep should be at least 1
// if count >= rangeCount, means no batch, split them all
splitStep = 1
}
subRanges := req.SubRanges
for i := 0; i < rangeCount; i += splitStep {
splitReq := req
splitReq.SubRanges = subRanges[i:min(i+splitStep, rangeCount)]
splitRequests = append(splitRequests, splitReq)
}
return splitRequests
}

0 comments on commit 99abc28

Please sign in to comment.