From 99abc284f53edb6d864de1336ada07061b9aa0ce Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 25 Jun 2024 12:58:20 +0800 Subject: [PATCH] Backup: split batch store backup requests (#54151) ref pingcap/tidb#52534 --- br/pkg/backup/BUILD.bazel | 2 +- br/pkg/backup/client.go | 12 ++- br/pkg/backup/client_test.go | 65 +++++++++++++++- br/pkg/backup/store.go | 145 ++++++++++++++++++++++------------- 4 files changed, 167 insertions(+), 57 deletions(-) diff --git a/br/pkg/backup/BUILD.bazel b/br/pkg/backup/BUILD.bazel index 760e37c44d26b..29ee9a06a654a 100644 --- a/br/pkg/backup/BUILD.bazel +++ b/br/pkg/backup/BUILD.bazel @@ -67,7 +67,7 @@ go_test( embed = [":backup"], flaky = True, race = "on", - shard_count = 11, + shard_count = 12, deps = [ "//br/pkg/conn", "//br/pkg/gluetidb/mock", diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 606f6cf4d67be..7790dc0ee3858 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -75,6 +75,8 @@ type MainBackupLoop struct { // backup requests for all stores. // the subRanges may changed every round. BackupReq backuppb.BackupRequest + // the number of backup clients to send backup requests per store. + Concurrency uint // record the whole backup progress in infinite loop. GlobalProgressTree *rtree.ProgressRangeTree ReplicaReadLabel map[string]string @@ -91,6 +93,7 @@ func (s *MainBackupSender) SendAsync( round uint64, storeID uint64, request backuppb.BackupRequest, + concurrency uint, cli backuppb.BackupClient, respCh chan *ResponseAndStore, StateNotifier chan BackupRetryPolicy, @@ -100,7 +103,7 @@ func (s *MainBackupSender) SendAsync( logutil.CL(ctx).Info("store backup goroutine exits", zap.Uint64("store", storeID)) close(respCh) }() - err := startBackup(ctx, storeID, request, cli, respCh) + err := startBackup(ctx, storeID, request, cli, concurrency, respCh) if err != nil { // only 2 kinds of errors will occur here. // 1. grpc connection error(already retry inside) @@ -176,6 +179,8 @@ func (bc *Client) RunLoop(ctx context.Context, loop *MainBackupLoop) error { mainLoop: for { round += 1 + // sleep 200ms. in case of tikv cluster abnormal state trigger too many backup rounds. + time.Sleep(200 * time.Millisecond) logutil.CL(ctx).Info("This round of backup starts...", zap.Uint64("round", round)) // initialize the error context every round errContext := utils.NewErrorContext("MainBackupLoop", 10) @@ -250,7 +255,7 @@ mainLoop: } ch := make(chan *ResponseAndStore) storeBackupResultChMap[storeID] = ch - loop.SendAsync(mainCtx, round, storeID, loop.BackupReq, cli, ch, loop.StateNotifier) + loop.SendAsync(mainCtx, round, storeID, loop.BackupReq, loop.Concurrency, cli, ch, loop.StateNotifier) } // infinite loop to collect region backup response to global channel loop.CollectStoreBackupsAsync(handleCtx, round, storeBackupResultChMap, globalBackupResultCh) @@ -309,7 +314,7 @@ mainLoop: storeBackupResultChMap[storeID] = ch // start backup for this store - loop.SendAsync(mainCtx, round, storeID, loop.BackupReq, cli, ch, loop.StateNotifier) + loop.SendAsync(mainCtx, round, storeID, loop.BackupReq, loop.Concurrency, cli, ch, loop.StateNotifier) // re-create context for new handler loop handleCtx, handleCancel = context.WithCancel(mainCtx) // handleCancel makes the former collect goroutine exits @@ -1136,6 +1141,7 @@ func (bc *Client) BackupRanges( mainBackupLoop := &MainBackupLoop{ BackupSender: &MainBackupSender{}, BackupReq: request, + Concurrency: concurrency, GlobalProgressTree: &globalProgressTree, ReplicaReadLabel: replicaReadLabel, StateNotifier: stateNotifier, diff --git a/br/pkg/backup/client_test.go b/br/pkg/backup/client_test.go index 55ddcf2e64a74..cc955bd430607 100644 --- a/br/pkg/backup/client_test.go +++ b/br/pkg/backup/client_test.go @@ -75,6 +75,7 @@ func (m *mockBackupBackupSender) SendAsync( round uint64, storeID uint64, request backuppb.BackupRequest, + concurrency uint, cli backuppb.BackupClient, respCh chan *backup.ResponseAndStore, StateNotifier chan backup.BackupRetryPolicy, @@ -552,7 +553,7 @@ func TestMainBackupLoop(t *testing.T) { // cancel the backup in another goroutine ctx, cancel := context.WithCancel(backgroundCtx) go func() { - time.Sleep(500 * time.Millisecond) + time.Sleep(100 * time.Millisecond) cancel() }() connectedStore = make(map[uint64]int) @@ -807,3 +808,65 @@ func TestObserveStoreChangesAsync(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/backup-store-change-tick")) } + +func TestSplitBackupReqRanges(t *testing.T) { + req := backuppb.BackupRequest{ + SubRanges: []*kvrpcpb.KeyRange{}, + } + + // case #1 empty ranges + res := backup.SplitBackupReqRanges(req, 1) + require.Len(t, res, 1) + // case #2 empty ranges and limit is 0 + res = backup.SplitBackupReqRanges(req, 0) + require.Len(t, res, 1) + + genSubRanges := func(req *backuppb.BackupRequest, count int) { + for i := 0; i < count; i++ { + req.SubRanges = append(req.SubRanges, &kvrpcpb.KeyRange{ + StartKey: []byte{byte(i)}, + EndKey: []byte{byte(i + 1)}, + }) + } + } + + genSubRanges(&req, 10) + // case #3: 10 subranges and split into 10 parts + res = backup.SplitBackupReqRanges(req, 10) + require.Len(t, res, 10) + for i := 0; i < 10; i++ { + require.Equal(t, res[i].SubRanges[0].StartKey, req.SubRanges[i].StartKey) + require.Equal(t, res[i].SubRanges[0].EndKey, req.SubRanges[i].EndKey) + } + + // case #3.1: 10 subranges and split into 11 parts(has no difference with 10 parts) + res = backup.SplitBackupReqRanges(req, 11) + require.Len(t, res, 10) + for i := 0; i < 10; i++ { + require.Equal(t, res[i].SubRanges[0].StartKey, req.SubRanges[i].StartKey) + require.Equal(t, res[i].SubRanges[0].EndKey, req.SubRanges[i].EndKey) + } + + // case #3.2: 10 subranges and split into 9 parts(has no difference with 10 parts) + res = backup.SplitBackupReqRanges(req, 9) + require.Len(t, res, 10) + for i := 0; i < 10; i++ { + require.Equal(t, res[i].SubRanges[0].StartKey, req.SubRanges[i].StartKey) + require.Equal(t, res[i].SubRanges[0].EndKey, req.SubRanges[i].EndKey) + } + + // case #4: 10 subranges and split into 3 parts, each part has 3 subranges + // but actually it will generate 4 parts due to not divisible. + res = backup.SplitBackupReqRanges(req, 3) + require.Len(t, res, 4) + for i := 0; i < 3; i++ { + require.Len(t, res[i].SubRanges, 3) + for j := 0; j < 3; j++ { + require.Equal(t, res[i].SubRanges[j].StartKey, req.SubRanges[i*3+j].StartKey) + require.Equal(t, res[i].SubRanges[j].EndKey, req.SubRanges[i*3+j].EndKey) + } + } + require.Len(t, res[3].SubRanges, 1) + require.Equal(t, res[3].SubRanges[0].StartKey, req.SubRanges[9].StartKey) + require.Equal(t, res[3].SubRanges[0].EndKey, req.SubRanges[9].EndKey) +} diff --git a/br/pkg/backup/store.go b/br/pkg/backup/store.go index be2c34a6413a2..02f7166193918 100644 --- a/br/pkg/backup/store.go +++ b/br/pkg/backup/store.go @@ -19,8 +19,10 @@ import ( "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/utils/storewatch" + tidbutil "github.com/pingcap/tidb/pkg/util" pd "github.com/tikv/pd/client" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -36,6 +38,7 @@ type BackupSender interface { round uint64, storeID uint64, request backuppb.BackupRequest, + concurrency uint, cli backuppb.BackupClient, respCh chan *ResponseAndStore, StateNotifier chan BackupRetryPolicy) @@ -130,6 +133,7 @@ func startBackup( storeID uint64, backupReq backuppb.BackupRequest, backupCli backuppb.BackupClient, + concurrency uint, respCh chan *ResponseAndStore, ) error { // this goroutine handle the response from a single store @@ -137,59 +141,71 @@ func startBackup( case <-ctx.Done(): return ctx.Err() default: - retry := -1 - return utils.WithRetry(ctx, func() error { - retry += 1 - logutil.CL(ctx).Info("try backup", zap.Uint64("storeID", storeID), zap.Int("retry time", retry)) - // Send backup request to the store. - // handle the backup response or internal error here. - // handle the store error(reboot or network partition) outside. - return doSendBackup(ctx, backupCli, backupReq, func(resp *backuppb.BackupResponse) error { - // Forward all responses (including error). - failpoint.Inject("backup-timeout-error", func(val failpoint.Value) { - msg := val.(string) - logutil.CL(ctx).Info("failpoint backup-timeout-error injected.", zap.String("msg", msg)) - resp.Error = &backuppb.Error{ - Msg: msg, - } - }) - failpoint.Inject("backup-storage-error", func(val failpoint.Value) { - msg := val.(string) - logutil.CL(ctx).Debug("failpoint backup-storage-error injected.", zap.String("msg", msg)) - resp.Error = &backuppb.Error{ - Msg: msg, - } - }) - failpoint.Inject("tikv-rw-error", func(val failpoint.Value) { - msg := val.(string) - logutil.CL(ctx).Debug("failpoint tikv-rw-error injected.", zap.String("msg", msg)) - resp.Error = &backuppb.Error{ - Msg: msg, - } - }) - failpoint.Inject("tikv-region-error", func(val failpoint.Value) { - msg := val.(string) - logutil.CL(ctx).Debug("failpoint tikv-region-error injected.", zap.String("msg", msg)) - resp.Error = &backuppb.Error{ - // Msg: msg, - Detail: &backuppb.Error_RegionError{ - RegionError: &errorpb.Error{ - Message: msg, - }, - }, - } - }) - select { - case <-ctx.Done(): - return ctx.Err() - case respCh <- &ResponseAndStore{ - Resp: resp, - StoreID: storeID, - }: - } - return nil + logutil.CL(ctx).Info("try backup", zap.Uint64("storeID", storeID)) + // Send backup request to the store. + // handle the backup response or internal error here. + // handle the store error(reboot or network partition) outside. + reqs := SplitBackupReqRanges(backupReq, concurrency) + pool := tidbutil.NewWorkerPool(concurrency, "store_backup") + eg, ectx := errgroup.WithContext(ctx) + for i, req := range reqs { + bkReq := req + reqIndex := i + pool.ApplyOnErrorGroup(eg, func() error { + retry := -1 + return utils.WithRetry(ectx, func() error { + retry += 1 + logutil.CL(ectx).Info("backup to store", zap.Uint64("storeID", storeID), + zap.Int("retry", retry), zap.Int("reqIndex", reqIndex)) + return doSendBackup(ectx, backupCli, bkReq, func(resp *backuppb.BackupResponse) error { + // Forward all responses (including error). + failpoint.Inject("backup-timeout-error", func(val failpoint.Value) { + msg := val.(string) + logutil.CL(ectx).Info("failpoint backup-timeout-error injected.", zap.String("msg", msg)) + resp.Error = &backuppb.Error{ + Msg: msg, + } + }) + failpoint.Inject("backup-storage-error", func(val failpoint.Value) { + msg := val.(string) + logutil.CL(ectx).Debug("failpoint backup-storage-error injected.", zap.String("msg", msg)) + resp.Error = &backuppb.Error{ + Msg: msg, + } + }) + failpoint.Inject("tikv-rw-error", func(val failpoint.Value) { + msg := val.(string) + logutil.CL(ectx).Debug("failpoint tikv-rw-error injected.", zap.String("msg", msg)) + resp.Error = &backuppb.Error{ + Msg: msg, + } + }) + failpoint.Inject("tikv-region-error", func(val failpoint.Value) { + msg := val.(string) + logutil.CL(ectx).Debug("failpoint tikv-region-error injected.", zap.String("msg", msg)) + resp.Error = &backuppb.Error{ + // Msg: msg, + Detail: &backuppb.Error_RegionError{ + RegionError: &errorpb.Error{ + Message: msg, + }, + }, + } + }) + select { + case <-ectx.Done(): + return ectx.Err() + case respCh <- &ResponseAndStore{ + Resp: resp, + StoreID: storeID, + }: + } + return nil + }) + }, utils.NewBackupSSTBackoffer()) }) - }, utils.NewBackupSSTBackoffer()) + } + return eg.Wait() } } @@ -264,3 +280,28 @@ func ObserveStoreChangesAsync(ctx context.Context, stateNotifier chan BackupRetr } }() } + +func SplitBackupReqRanges(req backuppb.BackupRequest, count uint) []backuppb.BackupRequest { + rangeCount := len(req.SubRanges) + if rangeCount == 0 { + return []backuppb.BackupRequest{req} + } + splitRequests := make([]backuppb.BackupRequest, 0, count) + if count <= 1 { + // 0/1 means no need to split, just send one batch request + return []backuppb.BackupRequest{req} + } + splitStep := rangeCount / int(count) + if splitStep == 0 { + // splitStep should be at least 1 + // if count >= rangeCount, means no batch, split them all + splitStep = 1 + } + subRanges := req.SubRanges + for i := 0; i < rangeCount; i += splitStep { + splitReq := req + splitReq.SubRanges = subRanges[i:min(i+splitStep, rangeCount)] + splitRequests = append(splitRequests, splitReq) + } + return splitRequests +}