Skip to content

Commit

Permalink
*: fix the issue that loadcluster does not remove overlap regions (#2022
Browse files Browse the repository at this point in the history
)

Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch authored and sre-bot committed Dec 17, 2019
1 parent 54974eb commit 8198108
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 19 deletions.
35 changes: 17 additions & 18 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ const (
defaultChangedRegionsLimit = 10000
)

// ErrRegionIsStale is error info for region is stale.
var ErrRegionIsStale = func(region *metapb.Region, origin *metapb.Region) error {
return errors.Errorf("region is stale: region %v origin %v", region, origin)
}

// Server is the interface for cluster.
type Server interface {
GetAllocator() *id.AllocatorImpl
Expand Down Expand Up @@ -242,7 +237,17 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {

start = time.Now()

if err := c.storage.LoadRegions(c.core.PutRegion); err != nil {
// used to load region from kv storage to cache storage.
putRegion := func(region *core.RegionInfo) []*core.RegionInfo {
origin, err := c.core.PreCheckPutRegion(region)
if err != nil {
log.Warn("region is stale", zap.Error(err), zap.Stringer("origin", origin.GetMeta()))
// return the state region to delete.
return []*core.RegionInfo{region}
}
return c.core.PutRegion(region)
}
if err := c.storage.LoadRegions(putRegion); err != nil {
return nil, err
}
log.Info("load regions",
Expand Down Expand Up @@ -394,14 +399,10 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
// processRegionHeartbeat updates the region information.
func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
c.RLock()
origin := c.GetRegion(region.GetID())
if origin == nil {
for _, item := range c.core.GetOverlaps(region) {
if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() {
c.RUnlock()
return ErrRegionIsStale(region.GetMeta(), item.GetMeta())
}
}
origin, err := c.core.PreCheckPutRegion(region)
if err != nil {
c.RUnlock()
return err
}
writeItems := c.CheckWriteStatus(region)
readItems := c.CheckReadStatus(region)
Expand All @@ -420,10 +421,6 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
} else {
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
// Region meta is stale, return an error.
if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() {
return ErrRegionIsStale(region.GetMeta(), origin.GetMeta())
}
if r.GetVersion() > o.GetVersion() {
log.Info("region Version changed",
zap.Uint64("region-id", region.GetID()),
Expand Down Expand Up @@ -1454,6 +1451,8 @@ func (c *RaftCluster) CheckReadStatus(region *core.RegionInfo) []*statistics.Hot
return c.hotSpotCache.CheckRead(region, c.storesStats)
}

// TODO: remove me.
// only used in test.
func (c *RaftCluster) putRegion(region *core.RegionInfo) error {
c.Lock()
defer c.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ func checkStaleRegion(origin *metapb.Region, region *metapb.Region) error {
e := region.GetRegionEpoch()

if e.GetVersion() < o.GetVersion() || e.GetConfVer() < o.GetConfVer() {
return ErrRegionIsStale(region, origin)
return core.ErrRegionIsStale(region, origin)
}

return nil
Expand Down
23 changes: 23 additions & 0 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,29 @@ func (bc *BasicCluster) TakeStore(storeID uint64) *StoreInfo {
return bc.Stores.TakeStore(storeID)
}

// PreCheckPutRegion checks if the region is valid to put.
func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, error) {
bc.RLock()
for _, item := range bc.Regions.GetOverlaps(region) {
if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() {
bc.RUnlock()
return nil, ErrRegionIsStale(region.GetMeta(), item.GetMeta())
}
}
origin := bc.Regions.GetRegion(region.GetID())
bc.RUnlock()
if origin == nil {
return nil, nil
}
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
// Region meta is stale, return an error.
if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() {
return origin, ErrRegionIsStale(region.GetMeta(), origin.GetMeta())
}
return origin, nil
}

// PutRegion put a region.
func (bc *BasicCluster) PutRegion(region *RegionInfo) []*RegionInfo {
bc.Lock()
Expand Down
7 changes: 7 additions & 0 deletions server/core/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"net/http"

"github.com/pingcap/errcode"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pkg/errors"
)

var (
Expand Down Expand Up @@ -61,3 +63,8 @@ func (e StoreBlockedErr) Error() string {

// Code returns StoreBlockedCode
func (e StoreBlockedErr) Code() errcode.Code { return StoreBlockedCode }

// ErrRegionIsStale is error info for region is stale.
var ErrRegionIsStale = func(region *metapb.Region, origin *metapb.Region) error {
return errors.Errorf("region is stale: region %v origin %v", region, origin)
}
3 changes: 3 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ func (r *RegionInfo) GetID() uint64 {

// GetMeta returns the meta information of the region.
func (r *RegionInfo) GetMeta() *metapb.Region {
if r == nil {
return nil
}
return r.meta
}

Expand Down
26 changes: 26 additions & 0 deletions tests/server/region_syncer/region_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,29 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) {
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)
}
// merge case
// region2 -> region1 -> region0
// merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver
regions[0] = regions[0].Clone(core.WithEndKey(regions[2].GetEndKey()), core.WithIncVersion(), core.WithIncVersion())
err = rc.HandleRegionHeartbeat(regions[2])
c.Assert(err, IsNil)

// merge case
// region3 -> region4
// merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver
regions[4] = regions[3].Clone(core.WithEndKey(regions[4].GetEndKey()), core.WithIncVersion())
err = rc.HandleRegionHeartbeat(regions[4])
c.Assert(err, IsNil)

// merge case
// region0 -> region4
// merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver
regions[4] = regions[0].Clone(core.WithEndKey(regions[4].GetEndKey()), core.WithIncVersion(), core.WithIncVersion())
err = rc.HandleRegionHeartbeat(regions[4])
c.Assert(err, IsNil)
regions = regions[4:]
regionLen = len(regions)

// ensure flush to region storage, we use a duration larger than the
// region storage flush rate limit (3s).
time.Sleep(4 * time.Second)
Expand All @@ -104,6 +127,9 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) {
c.Assert(leaderServer, NotNil)
loadRegions := leaderServer.GetServer().GetRaftCluster().GetRegions()
c.Assert(len(loadRegions), Equals, regionLen)
for _, region := range regions {
c.Assert(leaderServer.GetRegionInfoByID(region.GetID()).GetMeta(), DeepEquals, region.GetMeta())
}
}

func (s *serverTestSuite) TestFullSyncWithAddMember(c *C) {
Expand Down

0 comments on commit 8198108

Please sign in to comment.