From c3fc0410b3fb2aa782bc8837ae654c375d89d26c Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 23 Dec 2020 14:27:13 +0800 Subject: [PATCH] encode retry split key (#531) Signed-off-by: glorv --- lightning/backend/localhelper.go | 45 ++++-- lightning/backend/localhelper_test.go | 221 ++++++++++++++++++++++---- 2 files changed, 228 insertions(+), 38 deletions(-) diff --git a/lightning/backend/localhelper.go b/lightning/backend/localhelper.go index 6f54b915c..3f3cc5276 100644 --- a/lightning/backend/localhelper.go +++ b/lightning/backend/localhelper.go @@ -42,6 +42,9 @@ const ( var ( // the max keys count in a batch to split one region maxBatchSplitKeys = 4096 + // the base exponential backoff time + // the variable is only changed in unit test for running test faster. + splitRegionBaseBackOffTime = time.Second ) // TODO remove this file and use br internal functions @@ -55,17 +58,16 @@ func (local *local) SplitAndScatterRegionByRanges(ctx context.Context, ranges [] minKey := codec.EncodeBytes([]byte{}, ranges[0].start) maxKey := codec.EncodeBytes([]byte{}, ranges[len(ranges)-1].end) - - log.L().Info("split and scatter region", - zap.Binary("minKey", minKey), - zap.Binary("maxKey", maxKey), - ) - var err error scatterRegions := make([]*split.RegionInfo, 0) var retryKeys [][]byte - waitTime := 1 * time.Second + waitTime := splitRegionBaseBackOffTime for i := 0; i < SplitRetryTimes; i++ { + log.L().Info("split and scatter region", + zap.Binary("minKey", minKey), + zap.Binary("maxKey", maxKey), + zap.Int("retry", i), + ) if i > 0 { select { case <-time.After(waitTime): @@ -85,6 +87,15 @@ func (local *local) SplitAndScatterRegionByRanges(ctx context.Context, ranges [] continue } + if len(regions) == 0 { + log.L().Warn("paginate scan region returns empty result", zap.Binary("minKey", minKey), zap.Binary("maxKey", maxKey), + zap.Int("retry", i)) + return errors.New("paginate scan region returns empty result") + } + + log.L().Info("paginate scan region finished", zap.Binary("minKey", minKey), zap.Binary("maxKey", maxKey), + zap.Int("regions", len(regions))) + regionMap := make(map[uint64]*split.RegionInfo) for _, region := range regions { regionMap[region.Region.GetId()] = region @@ -92,6 +103,15 @@ func (local *local) SplitAndScatterRegionByRanges(ctx context.Context, ranges [] var splitKeyMap map[uint64][][]byte if len(retryKeys) > 0 { + firstKeyEnc := codec.EncodeBytes([]byte{}, retryKeys[0]) + lastKeyEnc := codec.EncodeBytes([]byte{}, retryKeys[len(retryKeys)-1]) + if bytes.Compare(firstKeyEnc, regions[0].Region.StartKey) < 0 || !beforeEnd(lastKeyEnc, regions[len(regions)-1].Region.EndKey) { + log.L().Warn("no valid key for split region", + zap.Binary("firstKey", firstKeyEnc), zap.Binary("lastKey", lastKeyEnc), + zap.Binary("firstRegionStart", regions[0].Region.StartKey), + zap.Binary("lastRegionEnd", regions[len(regions)-1].Region.EndKey)) + return errors.New("check split keys failed") + } splitKeyMap = getSplitKeys(retryKeys, regions) retryKeys = retryKeys[:0] } else { @@ -109,7 +129,7 @@ func (local *local) SplitAndScatterRegionByRanges(ctx context.Context, ranges [] end := utils.MinInt((j+1)*maxBatchSplitKeys, len(keys)) splitRegionStart := codec.EncodeBytes([]byte{}, keys[start]) splitRegionEnd := codec.EncodeBytes([]byte{}, keys[end-1]) - if bytes.Compare(splitRegionStart, splitRegion.Region.StartKey) <= 0 || !beforeEnd(splitRegionEnd, splitRegion.Region.EndKey) { + if bytes.Compare(splitRegionStart, splitRegion.Region.StartKey) < 0 || !beforeEnd(splitRegionEnd, splitRegion.Region.EndKey) { log.L().Fatal("no valid key in region", zap.Binary("startKey", splitRegionStart), zap.Binary("endKey", splitRegionEnd), zap.Binary("regionStart", splitRegion.Region.StartKey), zap.Binary("regionEnd", splitRegion.Region.EndKey), @@ -131,6 +151,9 @@ func (local *local) SplitAndScatterRegionByRanges(ctx context.Context, ranges [] retryKeys = append(retryKeys, keys[start:]...) break } else { + log.L().Info("batch split region", zap.Uint64("region_id", splitRegion.Region.Id), + zap.Int("keys", end-start), zap.Binary("firstKey", keys[start]), + zap.Binary("end", keys[end-1])) sort.Slice(newRegions, func(i, j int) bool { return bytes.Compare(newRegions[i].Region.StartKey, newRegions[j].Region.StartKey) < 0 }) @@ -148,8 +171,8 @@ func (local *local) SplitAndScatterRegionByRanges(ctx context.Context, ranges [] sort.Slice(retryKeys, func(i, j int) bool { return bytes.Compare(retryKeys[i], retryKeys[j]) < 0 }) - minKey = retryKeys[0] - maxKey = nextKey(retryKeys[len(retryKeys)-1]) + minKey = codec.EncodeBytes([]byte{}, retryKeys[0]) + maxKey = codec.EncodeBytes([]byte{}, nextKey(retryKeys[len(retryKeys)-1])) } } if err != nil { @@ -214,7 +237,7 @@ func (local *local) BatchSplitRegions(ctx context.Context, region *split.RegionI var failedErr error retryRegions := make([]*split.RegionInfo, 0) scatterRegions := newRegions - waitTime := time.Second + waitTime := splitRegionBaseBackOffTime for i := 0; i < maxRetryTimes; i++ { for _, region := range scatterRegions { // Wait for a while until the regions successfully splits. diff --git a/lightning/backend/localhelper_test.go b/lightning/backend/localhelper_test.go index 10aec9c13..50d1b9657 100644 --- a/lightning/backend/localhelper_test.go +++ b/lightning/backend/localhelper_test.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "sync" + "time" "github.com/pingcap/br/pkg/restore" . "github.com/pingcap/check" @@ -39,12 +40,14 @@ type testClient struct { regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions nextRegionID uint64 splitCount int + hook clientHook } func newTestClient( stores map[uint64]*metapb.Store, regions map[uint64]*restore.RegionInfo, nextRegionID uint64, + hook clientHook, ) *testClient { regionsInfo := core.NewRegionsInfo() for _, regionInfo := range regions { @@ -55,6 +58,7 @@ func newTestClient( regions: regions, regionsInfo: regionsInfo, nextRegionID: nextRegionID, + hook: hook, } } @@ -120,12 +124,17 @@ func (c *testClient) SplitRegion( Id: c.nextRegionID, StartKey: target.Region.StartKey, EndKey: splitKey, + RegionEpoch: &metapb.RegionEpoch{ + Version: target.Region.RegionEpoch.Version, + ConfVer: target.Region.RegionEpoch.ConfVer + 1, + }, }, } c.regions[c.nextRegionID] = newRegion c.regionsInfo.SetRegion(core.NewRegionInfo(newRegion.Region, newRegion.Leader)) c.nextRegionID++ target.Region.StartKey = splitKey + target.Region.RegionEpoch.ConfVer++ c.regions[target.Region.Id] = target c.regionsInfo.SetRegion(core.NewRegionInfo(target.Region, target.Leader)) return newRegion, nil @@ -134,6 +143,10 @@ func (c *testClient) SplitRegion( func (c *testClient) BatchSplitRegionsWithOrigin( ctx context.Context, regionInfo *restore.RegionInfo, keys [][]byte, ) (*restore.RegionInfo, []*restore.RegionInfo, error) { + if c.hook != nil { + regionInfo, keys = c.hook.BeforeSplitRegion(ctx, regionInfo, keys) + } + c.splitCount++ c.mu.Lock() defer c.mu.Unlock() @@ -150,6 +163,10 @@ func (c *testClient) BatchSplitRegionsWithOrigin( if target == nil { continue } + if target.Region.RegionEpoch.Version != regionInfo.Region.RegionEpoch.Version || + target.Region.RegionEpoch.ConfVer != regionInfo.Region.RegionEpoch.ConfVer { + return regionInfo, nil, errors.New("epoch not match") + } newRegion := &restore.RegionInfo{ Region: &metapb.Region{ Peers: target.Region.Peers, @@ -169,8 +186,12 @@ func (c *testClient) BatchSplitRegionsWithOrigin( if region != nil { c.regionsInfo.SetRegion(core.NewRegionInfo(region.Region, region.Leader)) } + var err error + if c.hook != nil { + newRegions, err = c.hook.AfterSplitRegion(ctx, region, keys, newRegions, nil) + } - return region, newRegions, nil + return region, newRegions, err } func (c *testClient) BatchSplitRegions( @@ -191,6 +212,10 @@ func (c *testClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.Ge } func (c *testClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*restore.RegionInfo, error) { + if c.hook != nil { + key, endKey, limit = c.hook.BeforeScanRegions(ctx, key, endKey, limit) + } + infos := c.regionsInfo.ScanRange(key, endKey, limit) regions := make([]*restore.RegionInfo, 0, len(infos)) for _, info := range infos { @@ -199,7 +224,12 @@ func (c *testClient) ScanRegions(ctx context.Context, key, endKey []byte, limit Leader: info.GetLeader(), }) } - return regions, nil + + var err error + if c.hook != nil { + regions, err = c.hook.AfterScanRegions(regions, nil) + } + return regions, err } func (c *testClient) GetPlacementRule(ctx context.Context, groupID, ruleID string) (r placement.Rule, err error) { @@ -218,8 +248,23 @@ func (c *testClient) SetStoresLabel(ctx context.Context, stores []uint64, labelK return nil } +func cloneRegion(region *restore.RegionInfo) *restore.RegionInfo { + r := &metapb.Region{} + if region.Region != nil { + b, _ := region.Region.Marshal() + _ = r.Unmarshal(b) + } + + l := &metapb.Peer{} + if region.Region != nil { + b, _ := region.Region.Marshal() + _ = l.Unmarshal(b) + } + return &restore.RegionInfo{Region: r, Leader: l} +} + // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) -func initTestClient(keys [][]byte) *testClient { +func initTestClient(keys [][]byte, hook clientHook) *testClient { peers := make([]*metapb.Peer, 1) peers[0] = &metapb.Peer{ Id: 1, @@ -237,10 +282,11 @@ func initTestClient(keys [][]byte) *testClient { } regions[i] = &restore.RegionInfo{ Region: &metapb.Region{ - Id: i, - Peers: peers, - StartKey: startKey, - EndKey: endKey, + Id: i, + Peers: peers, + StartKey: startKey, + EndKey: endKey, + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, }, } } @@ -248,11 +294,11 @@ func initTestClient(keys [][]byte) *testClient { stores[1] = &metapb.Store{ Id: 1, } - return newTestClient(stores, regions, uint64(len(keys))) + return newTestClient(stores, regions, uint64(len(keys)), hook) } func checkRegionRanges(c *C, regions []*restore.RegionInfo, keys [][]byte) { - c.Assert(len(regions)+1, Equals, len(keys)) + //c.Assert(len(regions)+1, Equals, len(keys)) for i, r := range regions { _, regionStart, _ := codec.DecodeBytes(r.Region.StartKey, []byte{}) _, regionEnd, _ := codec.DecodeBytes(r.Region.EndKey, []byte{}) @@ -261,15 +307,40 @@ func checkRegionRanges(c *C, regions []*restore.RegionInfo, keys [][]byte) { } } -func (s *localSuite) TestBatchSplitAndIngestRegion(c *C) { +type clientHook interface { + BeforeSplitRegion(ctx context.Context, regionInfo *restore.RegionInfo, keys [][]byte) (*restore.RegionInfo, [][]byte) + AfterSplitRegion(context.Context, *restore.RegionInfo, [][]byte, []*restore.RegionInfo, error) ([]*restore.RegionInfo, error) + BeforeScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]byte, []byte, int) + AfterScanRegions([]*restore.RegionInfo, error) ([]*restore.RegionInfo, error) +} + +type noopHook struct{} + +func (h *noopHook) BeforeSplitRegion(ctx context.Context, regionInfo *restore.RegionInfo, keys [][]byte) (*restore.RegionInfo, [][]byte) { + return regionInfo, keys +} +func (h *noopHook) AfterSplitRegion(c context.Context, r *restore.RegionInfo, keys [][]byte, res []*restore.RegionInfo, err error) ([]*restore.RegionInfo, error) { + return res, err +} +func (h *noopHook) BeforeScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]byte, []byte, int) { + return key, endKey, limit +} +func (h *noopHook) AfterScanRegions(res []*restore.RegionInfo, err error) ([]*restore.RegionInfo, error) { + return res, err +} + +func (s *localSuite) doTestBatchSplitRegionByRanges(c *C, hook clientHook, errPat string) { oldLimit := maxBatchSplitKeys + oldSplitBackoffTime := splitRegionBaseBackOffTime maxBatchSplitKeys = 4 + splitRegionBaseBackOffTime = time.Millisecond defer func() { maxBatchSplitKeys = oldLimit + splitRegionBaseBackOffTime = oldSplitBackoffTime }() keys := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca"), []byte("")} - client := initTestClient(keys) + client := initTestClient(keys, hook) local := &local{ splitCli: client, } @@ -292,9 +363,13 @@ func (s *localSuite) TestBatchSplitAndIngestRegion(c *C) { start = end } - // err = local.SplitAndScatterRegionByRanges(ctx, ranges) - c.Assert(err, IsNil) + if len(errPat) == 0 { + c.Assert(err, IsNil) + } else { + c.Assert(err, ErrorMatches, errPat) + return + } // so with a batch split size of 4, there will be 7 time batch split // 1. region: [aay, bba), keys: [b, ba, bb] @@ -304,7 +379,9 @@ func (s *localSuite) TestBatchSplitAndIngestRegion(c *C) { // 5. region: [bn, cca), keys: [bo, bp, bq, br] // 6. region: [br, cca), keys: [bs, bt, bu, bv] // 7. region: [bv, cca), keys: [bw, bx, by, bz] - c.Assert(client.splitCount, Equals, 7) + + // since it may encounter error retries, here only check the lower threshold. + c.Assert(client.splitCount >= 7, IsTrue) // check split ranges regions, err = paginateScanRegion(ctx, client, rangeStart, rangeEnd, 5) @@ -317,20 +394,98 @@ func (s *localSuite) TestBatchSplitAndIngestRegion(c *C) { checkRegionRanges(c, regions, result) } -func (s *localSuite) TestBatchSplitAndIngestWithClusteredIndex(c *C) { +func (s *localSuite) TestBatchSplitRegionByRanges(c *C) { + s.doTestBatchSplitRegionByRanges(c, nil, "") +} + +type scanRegionEmptyHook struct { + noopHook + cnt int +} + +func (h *scanRegionEmptyHook) AfterScanRegions(res []*restore.RegionInfo, err error) ([]*restore.RegionInfo, error) { + h.cnt++ + // skip the first call + if h.cnt == 1 { + return res, err + } + return nil, err +} + +func (s *localSuite) TestBatchSplitRegionByRangesScanFailed(c *C) { + s.doTestBatchSplitRegionByRanges(c, &scanRegionEmptyHook{}, "paginate scan region returns empty result") +} + +type splitRegionEpochNotMatchHook struct { + noopHook +} + +func (h *splitRegionEpochNotMatchHook) BeforeSplitRegion(ctx context.Context, regionInfo *restore.RegionInfo, keys [][]byte) (*restore.RegionInfo, [][]byte) { + regionInfo = cloneRegion(regionInfo) + // decrease the region epoch, so split region will fail + regionInfo.Region.RegionEpoch.Version-- + return regionInfo, keys +} + +func (s *localSuite) TestBatchSplitByRangesEpochNotMatch(c *C) { + s.doTestBatchSplitRegionByRanges(c, &splitRegionEpochNotMatchHook{}, "batch split regions failed: epoch not match") +} + +// return epoch not match error in every other call +type splitRegionEpochNotMatchHookRandom struct { + noopHook + cnt int +} + +func (h *splitRegionEpochNotMatchHookRandom) BeforeSplitRegion(ctx context.Context, regionInfo *restore.RegionInfo, keys [][]byte) (*restore.RegionInfo, [][]byte) { + h.cnt++ + if h.cnt%2 != 0 { + return regionInfo, keys + } + regionInfo = cloneRegion(regionInfo) + // decrease the region epoch, so split region will fail + regionInfo.Region.RegionEpoch.Version-- + return regionInfo, keys +} + +func (s *localSuite) TestBatchSplitByRangesEpochNotMatchOnce(c *C) { + s.doTestBatchSplitRegionByRanges(c, &splitRegionEpochNotMatchHookRandom{}, "") +} + +func (s *localSuite) doTestBatchSplitByRangesWithClusteredIndex(c *C, hook clientHook) { + oldLimit := maxBatchSplitKeys + oldSplitBackoffTime := splitRegionBaseBackOffTime + maxBatchSplitKeys = 10 + splitRegionBaseBackOffTime = time.Millisecond + defer func() { + maxBatchSplitKeys = oldLimit + splitRegionBaseBackOffTime = oldSplitBackoffTime + }() + + stmtCtx := new(stmtctx.StatementContext) + tableId := int64(1) tableStartKey := tablecodec.EncodeTablePrefix(tableId) tableEndKey := tablecodec.EncodeTablePrefix(tableId + 1) - keys := [][]byte{[]byte(""), tableStartKey, tableEndKey, []byte("")} - client := initTestClient(keys) + keys := [][]byte{[]byte(""), tableStartKey} + // pre split 2 regions + for i := int64(0); i < 2; i++ { + keyBytes, err := codec.EncodeKey(stmtCtx, nil, types.NewIntDatum(i)) + c.Assert(err, IsNil) + h, err := kv.NewCommonHandle(keyBytes) + c.Assert(err, IsNil) + key := tablecodec.EncodeRowKeyWithHandle(tableId, h) + keys = append(keys, key) + } + keys = append(keys, tableEndKey, []byte("")) + client := initTestClient(keys, hook) local := &local{ splitCli: client, } ctx := context.Background() // we batch generate a batch of row keys for table 1 with common handle - keys = make([][]byte, 0, 20+1) - stmtCtx := new(stmtctx.StatementContext) + rangeKeys := make([][]byte, 0, 20+1) for i := int64(0); i < 2; i++ { for j := int64(0); j < 10; j++ { keyBytes, err := codec.EncodeKey(stmtCtx, nil, types.NewIntDatum(i), types.NewIntDatum(j*10000)) @@ -338,13 +493,13 @@ func (s *localSuite) TestBatchSplitAndIngestWithClusteredIndex(c *C) { h, err := kv.NewCommonHandle(keyBytes) c.Assert(err, IsNil) key := tablecodec.EncodeRowKeyWithHandle(tableId, h) - keys = append(keys, key) + rangeKeys = append(rangeKeys, key) } } - start := keys[0] - ranges := make([]Range, 0, len(keys)-1) - for _, e := range keys[1:] { + start := rangeKeys[0] + ranges := make([]Range, 0, len(rangeKeys)-1) + for _, e := range rangeKeys[1:] { ranges = append(ranges, Range{start: start, end: e}) start = e } @@ -352,11 +507,23 @@ func (s *localSuite) TestBatchSplitAndIngestWithClusteredIndex(c *C) { err := local.SplitAndScatterRegionByRanges(ctx, ranges) c.Assert(err, IsNil) - startKey := codec.EncodeBytes([]byte{}, keys[0]) - endKey := codec.EncodeBytes([]byte{}, keys[len(keys)-1]) + startKey := codec.EncodeBytes([]byte{}, rangeKeys[0]) + endKey := codec.EncodeBytes([]byte{}, rangeKeys[len(rangeKeys)-1]) // check split ranges regions, err := paginateScanRegion(ctx, client, startKey, endKey, 5) c.Assert(err, IsNil) - c.Assert(len(regions), Equals, len(ranges)) - checkRegionRanges(c, regions, keys) + c.Assert(len(regions), Equals, len(ranges)+1) + + checkKeys := append([][]byte{}, rangeKeys[:10]...) + checkKeys = append(checkKeys, keys[3]) + checkKeys = append(checkKeys, rangeKeys[10:]...) + checkRegionRanges(c, regions, checkKeys) +} + +func (s *localSuite) TestBatchSplitByRangesWithClusteredIndex(c *C) { + s.doTestBatchSplitByRangesWithClusteredIndex(c, nil) +} + +func (s *localSuite) TestBatchSplitByRangesWithClusteredIndexEpochNotMatch(c *C) { + s.doTestBatchSplitByRangesWithClusteredIndex(c, &splitRegionEpochNotMatchHookRandom{}) }