Skip to content

Commit

Permalink
*: support setting endKey for ScanRange (#1700)
Browse files Browse the repository at this point in the history
Signed-off-by: disksing <i@disksing.com>
  • Loading branch information
disksing authored Aug 22, 2019
1 parent 84f6a82 commit 864c221
Show file tree
Hide file tree
Showing 15 changed files with 44 additions and 88 deletions.
5 changes: 3 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Client interface {
// Limit limits the maximum number of regions returned.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
ScanRegions(ctx context.Context, key []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error)
ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error)
// GetStore gets a store from PD by store id.
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
Expand Down Expand Up @@ -689,7 +689,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Re
return resp.GetRegion(), resp.GetLeader(), nil
}

func (c *client) ScanRegions(ctx context.Context, key []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error) {
func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand All @@ -700,6 +700,7 @@ func (c *client) ScanRegions(ctx context.Context, key []byte, limit int) ([]*met
resp, err := c.leaderClient().ScanRegions(ctx, &pdpb.ScanRegionsRequest{
Header: c.requestHeader(),
StartKey: key,
EndKey: endKey,
Limit: int32(limit),
})
cancel()
Expand Down
14 changes: 8 additions & 6 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,16 +279,16 @@ func (s *testClientSuite) TestScanRegions(c *C) {

// Wait for region heartbeats.
testutil.WaitUntil(c, func(c *C) bool {
scanRegions, _, err := s.client.ScanRegions(context.Background(), []byte{0}, 10)
scanRegions, _, err := s.client.ScanRegions(context.Background(), []byte{0}, nil, 10)
return err == nil && len(scanRegions) == 10
})

// Set leader of region3 to nil.
region3 := core.NewRegionInfo(regions[3], nil)
s.srv.GetRaftCluster().HandleRegionHeartbeat(region3)

check := func(start []byte, limit int, expect []*metapb.Region) {
scanRegions, leaders, err := s.client.ScanRegions(context.Background(), start, limit)
check := func(start, end []byte, limit int, expect []*metapb.Region) {
scanRegions, leaders, err := s.client.ScanRegions(context.Background(), start, end, limit)
c.Assert(err, IsNil)
c.Assert(scanRegions, HasLen, len(expect))
c.Assert(leaders, HasLen, len(expect))
Expand All @@ -305,9 +305,11 @@ func (s *testClientSuite) TestScanRegions(c *C) {
}
}

check([]byte{0}, 10, regions)
check([]byte{1}, 5, regions[1:6])
check([]byte{100}, 1, nil)
check([]byte{0}, nil, 10, regions)
check([]byte{1}, nil, 5, regions[1:6])
check([]byte{100}, nil, 1, nil)
check([]byte{1}, []byte{6}, 0, regions[1:6])
check([]byte{1}, []byte{6}, 2, regions[1:3])
}

func (s *testClientSuite) TestGetRegionByID(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v0.8.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ github.com/pingcap/errors v0.11.0 h1:DCJQB8jrHbQ1VVlMFIrbj2ApScNNotVmkSNplu2yUt4
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c h1:hvQd3aOLKLF7xvRV6DzvPkKY4QXzfVbjU1BhW0d9yL8=
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c h1:pY/MQQ5UajEHfSnQS8rFAM9gw9bBKzqBl414cdfhpRQ=
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7 h1:tt24R4cv6GlvnmvkHNC1OrS/ETvXxbJkJ1Nrk4prtWo=
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
4 changes: 2 additions & 2 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func (mc *Cluster) allocID() (uint64, error) {
}

// ScanRegions scans region with start key, until number greater than limit.
func (mc *Cluster) ScanRegions(startKey []byte, limit int) []*core.RegionInfo {
return mc.Regions.ScanRange(startKey, limit)
func (mc *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*core.RegionInfo {
return mc.Regions.ScanRange(startKey, endKey, limit)
}

// LoadRegion puts region info without leader
Expand Down
4 changes: 2 additions & 2 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (h *regionsHandler) GetAll(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, regionsInfo)
}

func (h *regionsHandler) ScanRegionsByKey(w http.ResponseWriter, r *http.Request) {
func (h *regionsHandler) ScanRegions(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
Expand All @@ -170,7 +170,7 @@ func (h *regionsHandler) ScanRegionsByKey(w http.ResponseWriter, r *http.Request
if limit > maxRegionLimit {
limit = maxRegionLimit
}
regions := cluster.ScanRegionsByKey([]byte(startKey), limit)
regions := cluster.ScanRegions([]byte(startKey), nil, limit)
regionsInfo := convertToAPIRegions(regions)
h.rd.JSON(w, http.StatusOK, regionsInfo)
}
Expand Down
2 changes: 1 addition & 1 deletion server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {

regionsHandler := newRegionsHandler(svr, rd)
router.HandleFunc("/api/v1/regions", regionsHandler.GetAll).Methods("GET")
router.HandleFunc("/api/v1/regions/key", regionsHandler.ScanRegionsByKey).Methods("GET")
router.HandleFunc("/api/v1/regions/key", regionsHandler.ScanRegions).Methods("GET")
router.HandleFunc("/api/v1/regions/store/{id}", regionsHandler.GetStoreRegions).Methods("GET")
router.HandleFunc("/api/v1/regions/writeflow", regionsHandler.GetTopWriteFlow).Methods("GET")
router.HandleFunc("/api/v1/regions/readflow", regionsHandler.GetTopReadFlow).Methods("GET")
Expand Down
19 changes: 5 additions & 14 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,19 +557,10 @@ func (c *RaftCluster) GetRegionInfoByKey(regionKey []byte) *core.RegionInfo {
return c.core.SearchRegion(regionKey)
}

// ScanRegionsByKey scans region with start key, until number greater than limit.
func (c *RaftCluster) ScanRegionsByKey(startKey []byte, limit int) []*core.RegionInfo {
return c.core.ScanRange(startKey, limit)
}

// ScanRegions scans region with start key, until number greater than limit.
func (c *RaftCluster) ScanRegions(startKey []byte, limit int) []*core.RegionInfo {
return c.core.ScanRange(startKey, limit)
}

// ScanRangeWithEndKey scans regions intersecting [start key, end key).
func (c *RaftCluster) ScanRangeWithEndKey(startKey, endKey []byte) []*core.RegionInfo {
return c.core.ScanRangeWithEndKey(startKey, endKey)
// ScanRegions scans region with start key, until the region contains endKey, or
// total number greater than limit.
func (c *RaftCluster) ScanRegions(startKey, endKey []byte, limit int) []*core.RegionInfo {
return c.core.ScanRange(startKey, endKey, limit)
}

// GetRegionByID gets region and leader peer by regionID from cluster.
Expand Down Expand Up @@ -660,7 +651,7 @@ func (c *RaftCluster) GetAverageRegionSize() int64 {
func (c *RaftCluster) GetRegionStats(startKey, endKey []byte) *statistics.RegionStats {
c.RLock()
defer c.RUnlock()
return statistics.GetRegionStats(c.core.ScanRangeWithEndKey, startKey, endKey)
return statistics.GetRegionStats(c.core.ScanRange(startKey, endKey, -1))
}

// GetStoresStats returns stores' statistics from cluster.
Expand Down
2 changes: 1 addition & 1 deletion server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (c *coordinator) patrolRegions() {
return
}

regions := c.cluster.ScanRegions(key, patrolScanRegionLimit)
regions := c.cluster.ScanRegions(key, nil, patrolScanRegionLimit)
if len(regions) == 0 {
// Resets the scan key.
key = nil
Expand Down
17 changes: 5 additions & 12 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,19 +284,12 @@ func (bc *BasicCluster) SearchPrevRegion(regionKey []byte) *RegionInfo {
return bc.Regions.SearchPrevRegion(regionKey)
}

// ScanRange scans from the first region containing or behind start key,
// until number greater than limit.
func (bc *BasicCluster) ScanRange(startKey []byte, limit int) []*RegionInfo {
// ScanRange scans regions intersecting [start key, end key), returns at most
// `limit` regions. limit <= 0 means no limit.
func (bc *BasicCluster) ScanRange(startKey, endKey []byte, limit int) []*RegionInfo {
bc.RLock()
defer bc.RUnlock()
return bc.Regions.ScanRange(startKey, limit)
}

// ScanRangeWithEndKey scans regions intersecting [start key, end key).
func (bc *BasicCluster) ScanRangeWithEndKey(startKey, endKey []byte) []*RegionInfo {
bc.RLock()
defer bc.RUnlock()
return bc.Regions.ScanRangeWithEndKey(startKey, endKey)
return bc.Regions.ScanRange(startKey, endKey, limit)
}

// GetOverlaps returns the regions which are overlapped with the specified region range.
Expand All @@ -322,7 +315,7 @@ type RegionSetInformer interface {
GetStoreRegionCount(storeID uint64) int
GetRegion(id uint64) *RegionInfo
GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *RegionInfo)
ScanRegions(startKey []byte, limit int) []*RegionInfo
ScanRegions(startKey, endKey []byte, limit int) []*RegionInfo
}

// StoreSetInformer provides access to a shared informer of stores.
Expand Down
25 changes: 8 additions & 17 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,30 +709,21 @@ func (r *RegionsInfo) GetFollower(storeID uint64, regionID uint64) *RegionInfo {
return r.followers[storeID].Get(regionID)
}

// ScanRange scans from the first region containing or behind start key,
// until number greater than limit.
func (r *RegionsInfo) ScanRange(startKey []byte, limit int) []*RegionInfo {
res := make([]*RegionInfo, 0, limit)
r.tree.scanRange(startKey, func(metaRegion *metapb.Region) bool {
res = append(res, r.GetRegion(metaRegion.GetId()))
return len(res) < limit
})
return res
}

// ScanRangeWithEndKey scans regions intersecting [start key, end key).
func (r *RegionsInfo) ScanRangeWithEndKey(startKey, endKey []byte) []*RegionInfo {
var regions []*RegionInfo
// ScanRange scans regions intersecting [start key, end key), returns at most
// `limit` regions. limit <= 0 means no limit.
func (r *RegionsInfo) ScanRange(startKey, endKey []byte, limit int) []*RegionInfo {
var res []*RegionInfo
r.tree.scanRange(startKey, func(meta *metapb.Region) bool {
if len(endKey) > 0 && bytes.Compare(meta.StartKey, endKey) >= 0 {
return false
}
if region := r.GetRegion(meta.GetId()); region != nil {
regions = append(regions, region)
if limit > 0 && len(res) >= limit {
return false
}
res = append(res, r.GetRegion(meta.GetId()))
return true
})
return regions
return res
}

// ScanRangeWithIterator scans from the first region containing or behind start key,
Expand Down
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func (s *Server) ScanRegions(ctx context.Context, request *pdpb.ScanRegionsReque
if cluster == nil {
return &pdpb.ScanRegionsResponse{Header: s.notBootstrappedHeader()}, nil
}
regions := cluster.ScanRegionsByKey(request.GetStartKey(), int(request.GetLimit()))
regions := cluster.ScanRegions(request.GetStartKey(), request.GetEndKey(), int(request.GetLimit()))
resp := &pdpb.ScanRegionsResponse{Header: s.header()}
for _, r := range regions {
leader := r.GetLeader()
Expand Down
24 changes: 2 additions & 22 deletions server/schedule/range_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
package schedule

import (
"bytes"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/server/core"
Expand All @@ -34,26 +32,8 @@ const scanLimit = 128
// The cluster can only know the regions within [startKey, endKey].
func GenRangeCluster(cluster Cluster, startKey, endKey []byte) *RangeCluster {
regions := core.NewRegionsInfo()
scanKey := startKey
loopEnd := false
for !loopEnd {
collect := cluster.ScanRegions(scanKey, scanLimit)
if len(collect) == 0 {
break
}
for _, r := range collect {
if bytes.Compare(r.GetStartKey(), endKey) < 0 {
regions.SetRegion(r)
} else {
loopEnd = true
break
}
if string(r.GetEndKey()) == "" {
loopEnd = true
break
}
scanKey = r.GetEndKey()
}
for _, r := range cluster.ScanRegions(startKey, endKey, -1) {
regions.AddRegion(r)
}
return &RangeCluster{
Cluster: cluster,
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/adjacent_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (l *balanceAdjacentRegionScheduler) Schedule(cluster schedule.Cluster) []*o
}

l.cacheRegions.clear()
regions := cluster.ScanRegions(l.lastKey, scanLimit)
regions := cluster.ScanRegions(l.lastKey, nil, scanLimit)
// scan to the end
if len(regions) <= 1 {
schedulerStatus.WithLabelValues(l.GetName(), "adjacent_count").Set(float64(l.adjacentRegionsCount))
Expand Down
6 changes: 2 additions & 4 deletions server/statistics/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,9 @@ func (s *RegionStats) Observe(r *core.RegionInfo) {
}
}

// GetRegionStats scans regions that intersect range [startKey, endKey) and sums up
// their statistics.
func GetRegionStats(f func(startKey, endKey []byte) []*core.RegionInfo, startKey, endKey []byte) *RegionStats {
// GetRegionStats sums regions' statistics.
func GetRegionStats(regions []*core.RegionInfo) *RegionStats {
stats := newRegionStats()
regions := f(startKey, endKey)
for _, region := range regions {
stats.Observe(region)
}
Expand Down

0 comments on commit 864c221

Please sign in to comment.