Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' into lost_err
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer committed Dec 23, 2020
2 parents e75ceed + c3fc041 commit 449ed1a
Show file tree
Hide file tree
Showing 2 changed files with 228 additions and 38 deletions.
45 changes: 34 additions & 11 deletions lightning/backend/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ const (
var (
// the max keys count in a batch to split one region
maxBatchSplitKeys = 4096
// the base exponential backoff time
// the variable is only changed in unit test for running test faster.
splitRegionBaseBackOffTime = time.Second
)

// TODO remove this file and use br internal functions
Expand All @@ -55,17 +58,16 @@ func (local *local) SplitAndScatterRegionByRanges(ctx context.Context, ranges []

minKey := codec.EncodeBytes([]byte{}, ranges[0].start)
maxKey := codec.EncodeBytes([]byte{}, ranges[len(ranges)-1].end)

log.L().Info("split and scatter region",
zap.Binary("minKey", minKey),
zap.Binary("maxKey", maxKey),
)

var err error
scatterRegions := make([]*split.RegionInfo, 0)
var retryKeys [][]byte
waitTime := 1 * time.Second
waitTime := splitRegionBaseBackOffTime
for i := 0; i < SplitRetryTimes; i++ {
log.L().Info("split and scatter region",
zap.Binary("minKey", minKey),
zap.Binary("maxKey", maxKey),
zap.Int("retry", i),
)
if i > 0 {
select {
case <-time.After(waitTime):
Expand All @@ -85,13 +87,31 @@ func (local *local) SplitAndScatterRegionByRanges(ctx context.Context, ranges []
continue
}

if len(regions) == 0 {
log.L().Warn("paginate scan region returns empty result", zap.Binary("minKey", minKey), zap.Binary("maxKey", maxKey),
zap.Int("retry", i))
return errors.New("paginate scan region returns empty result")
}

log.L().Info("paginate scan region finished", zap.Binary("minKey", minKey), zap.Binary("maxKey", maxKey),
zap.Int("regions", len(regions)))

regionMap := make(map[uint64]*split.RegionInfo)
for _, region := range regions {
regionMap[region.Region.GetId()] = region
}

var splitKeyMap map[uint64][][]byte
if len(retryKeys) > 0 {
firstKeyEnc := codec.EncodeBytes([]byte{}, retryKeys[0])
lastKeyEnc := codec.EncodeBytes([]byte{}, retryKeys[len(retryKeys)-1])
if bytes.Compare(firstKeyEnc, regions[0].Region.StartKey) < 0 || !beforeEnd(lastKeyEnc, regions[len(regions)-1].Region.EndKey) {
log.L().Warn("no valid key for split region",
zap.Binary("firstKey", firstKeyEnc), zap.Binary("lastKey", lastKeyEnc),
zap.Binary("firstRegionStart", regions[0].Region.StartKey),
zap.Binary("lastRegionEnd", regions[len(regions)-1].Region.EndKey))
return errors.New("check split keys failed")
}
splitKeyMap = getSplitKeys(retryKeys, regions)
retryKeys = retryKeys[:0]
} else {
Expand All @@ -109,7 +129,7 @@ func (local *local) SplitAndScatterRegionByRanges(ctx context.Context, ranges []
end := utils.MinInt((j+1)*maxBatchSplitKeys, len(keys))
splitRegionStart := codec.EncodeBytes([]byte{}, keys[start])
splitRegionEnd := codec.EncodeBytes([]byte{}, keys[end-1])
if bytes.Compare(splitRegionStart, splitRegion.Region.StartKey) <= 0 || !beforeEnd(splitRegionEnd, splitRegion.Region.EndKey) {
if bytes.Compare(splitRegionStart, splitRegion.Region.StartKey) < 0 || !beforeEnd(splitRegionEnd, splitRegion.Region.EndKey) {
log.L().Fatal("no valid key in region",
zap.Binary("startKey", splitRegionStart), zap.Binary("endKey", splitRegionEnd),
zap.Binary("regionStart", splitRegion.Region.StartKey), zap.Binary("regionEnd", splitRegion.Region.EndKey),
Expand All @@ -131,6 +151,9 @@ func (local *local) SplitAndScatterRegionByRanges(ctx context.Context, ranges []
retryKeys = append(retryKeys, keys[start:]...)
break
} else {
log.L().Info("batch split region", zap.Uint64("region_id", splitRegion.Region.Id),
zap.Int("keys", end-start), zap.Binary("firstKey", keys[start]),
zap.Binary("end", keys[end-1]))
sort.Slice(newRegions, func(i, j int) bool {
return bytes.Compare(newRegions[i].Region.StartKey, newRegions[j].Region.StartKey) < 0
})
Expand All @@ -148,8 +171,8 @@ func (local *local) SplitAndScatterRegionByRanges(ctx context.Context, ranges []
sort.Slice(retryKeys, func(i, j int) bool {
return bytes.Compare(retryKeys[i], retryKeys[j]) < 0
})
minKey = retryKeys[0]
maxKey = nextKey(retryKeys[len(retryKeys)-1])
minKey = codec.EncodeBytes([]byte{}, retryKeys[0])
maxKey = codec.EncodeBytes([]byte{}, nextKey(retryKeys[len(retryKeys)-1]))
}
}
if err != nil {
Expand Down Expand Up @@ -214,7 +237,7 @@ func (local *local) BatchSplitRegions(ctx context.Context, region *split.RegionI
var failedErr error
retryRegions := make([]*split.RegionInfo, 0)
scatterRegions := newRegions
waitTime := time.Second
waitTime := splitRegionBaseBackOffTime
for i := 0; i < maxRetryTimes; i++ {
for _, region := range scatterRegions {
// Wait for a while until the regions successfully splits.
Expand Down
Loading

0 comments on commit 449ed1a

Please sign in to comment.