Skip to content

Commit

Permalink
*: fix the issue that loadcluster does not remove overlap regio… (tik…
Browse files Browse the repository at this point in the history
…v#2039)

Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Dec 22, 2019
1 parent 4934a65 commit 3193990
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 26 deletions.
31 changes: 18 additions & 13 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,19 @@ func loadClusterInfo(id core.IDAllocator, kv *core.KV, opt *scheduleOption) (*cl
)

start = time.Now()
if err := kv.LoadRegions(c.core.Regions); err != nil {
// used to load region from kv storage to cache storage.
putRegion := func(region *core.RegionInfo) []*metapb.Region {
c.Lock()
defer c.Unlock()
origin, err := c.core.PreCheckPutRegion(region)
if err != nil {
log.Warn("region is stale", zap.Error(err), zap.Stringer("origin", origin.GetMeta()))
// return the state region to delete.
return []*metapb.Region{region.GetMeta()}
}
return c.core.Regions.SetRegion(region)
}
if err := kv.LoadRegions(putRegion); err != nil {
return nil, err
}
log.Info("load regions",
Expand Down Expand Up @@ -512,14 +524,10 @@ func (c *clusterInfo) updateStoreStatusLocked(id uint64) {
// handleRegionHeartbeat updates the region information.
func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
c.RLock()
origin := c.core.Regions.GetRegion(region.GetID())
if origin == nil {
for _, item := range c.core.Regions.GetOverlaps(region) {
if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() {
c.RUnlock()
return ErrRegionIsStale(region.GetMeta(), item)
}
}
origin, err := c.core.PreCheckPutRegion(region)
if err != nil {
c.RUnlock()
return err
}
isWriteUpdate, writeItem := c.CheckWriteStatus(region)
isReadUpdate, readItem := c.CheckReadStatus(region)
Expand All @@ -538,10 +546,7 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
} else {
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
// Region meta is stale, return an error.
if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() {
return ErrRegionIsStale(region.GetMeta(), origin.GetMeta())
}

if r.GetVersion() > o.GetVersion() {
log.Info("region Version changed",
zap.Uint64("region-id", region.GetID()),
Expand Down
2 changes: 1 addition & 1 deletion server/cluster_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func checkStaleRegion(origin *metapb.Region, region *metapb.Region) error {
e := region.GetRegionEpoch()

if e.GetVersion() < o.GetVersion() || e.GetConfVer() < o.GetConfVer() {
return ErrRegionIsStale(region, origin)
return core.ErrRegionIsStale(region, origin)
}

return nil
Expand Down
20 changes: 20 additions & 0 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,23 @@ func (bc *BasicCluster) DeleteStore(store *StoreInfo) {
func (bc *BasicCluster) PutRegion(region *RegionInfo) {
bc.Regions.SetRegion(region)
}

// PreCheckPutRegion checks if the region is valid to put.
func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, error) {
for _, item := range bc.Regions.GetOverlaps(region) {
if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() {
return nil, ErrRegionIsStale(region.GetMeta(), item)
}
}
origin := bc.Regions.GetRegion(region.GetID())
if origin == nil {
return nil, nil
}
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
// Region meta is stale, return an error.
if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() {
return origin, ErrRegionIsStale(region.GetMeta(), origin.GetMeta())
}
return origin, nil
}
7 changes: 7 additions & 0 deletions server/core/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"net/http"

"github.com/pingcap/errcode"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pkg/errors"
)

var (
Expand Down Expand Up @@ -61,3 +63,8 @@ func (e StoreBlockedErr) Error() string {

// Code returns StoreBlockedCode
func (e StoreBlockedErr) Code() errcode.Code { return StoreBlockedCode }

// ErrRegionIsStale is error info for region is stale
var ErrRegionIsStale = func(region *metapb.Region, origin *metapb.Region) error {
return errors.Errorf("region is stale: region %v origin %v", region, origin)
}
6 changes: 3 additions & 3 deletions server/core/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,11 @@ func (kv *KV) LoadRegion(regionID uint64, region *metapb.Region) (bool, error) {
}

// LoadRegions loads all regions from KV to RegionsInfo.
func (kv *KV) LoadRegions(regions *RegionsInfo) error {
func (kv *KV) LoadRegions(f func(region *RegionInfo) []*metapb.Region) error {
if atomic.LoadInt32(&kv.useRegionKV) > 0 {
return loadRegions(kv.regionKV, regions)
return loadRegions(kv.regionKV, f)
}
return loadRegions(kv.KVBase, regions)
return loadRegions(kv.KVBase, f)
}

// SaveRegion saves one region to KV.
Expand Down
4 changes: 2 additions & 2 deletions server/core/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (s *testKVSuite) TestLoadRegions(c *C) {

n := 10
regions := mustSaveRegions(c, kv, n)
c.Assert(kv.LoadRegions(cache), IsNil)
c.Assert(kv.LoadRegions(cache.SetRegion), IsNil)

c.Assert(cache.GetRegionCount(), Equals, n)
for _, region := range cache.GetMetaRegions() {
Expand All @@ -151,7 +151,7 @@ func (s *testKVSuite) TestLoadRegionsExceedRangeLimit(c *C) {

n := 1000
regions := mustSaveRegions(c, kv, n)
c.Assert(kv.LoadRegions(cache), IsNil)
c.Assert(kv.LoadRegions(cache.SetRegion), IsNil)
c.Assert(cache.GetRegionCount(), Equals, n)
for _, region := range cache.GetMetaRegions() {
c.Assert(region, DeepEquals, regions[region.GetId()])
Expand Down
3 changes: 3 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,9 @@ func (r *RegionInfo) GetID() uint64 {

// GetMeta returns the meta information of the region.
func (r *RegionInfo) GetMeta() *metapb.Region {
if r == nil {
return nil
}
return r.meta
}

Expand Down
4 changes: 2 additions & 2 deletions server/core/region_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func deleteRegion(kv KVBase, region *metapb.Region) error {
return kv.Delete(regionPath(region.GetId()))
}

func loadRegions(kv KVBase, regions *RegionsInfo) error {
func loadRegions(kv KVBase, f func(region *RegionInfo) []*metapb.Region) error {
nextID := uint64(0)
endKey := regionPath(math.MaxUint64)

Expand All @@ -143,7 +143,7 @@ func loadRegions(kv KVBase, regions *RegionsInfo) error {
}

nextID = region.GetId() + 1
overlaps := regions.SetRegion(NewRegionInfo(region, nil))
overlaps := f(NewRegionInfo(region, nil))
for _, item := range overlaps {
if err := deleteRegion(kv, item); err != nil {
return err
Expand Down
5 changes: 0 additions & 5 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"time"

"github.com/pingcap/errcode"
"github.com/pingcap/kvproto/pkg/metapb"
log "github.com/pingcap/log"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/schedule"
Expand All @@ -45,10 +44,6 @@ var (
ErrRegionAbnormalPeer = func(regionID uint64) error {
return errors.Errorf("region %v has abnormal peer", regionID)
}
// ErrRegionIsStale is error info for region is stale
ErrRegionIsStale = func(region *metapb.Region, origin *metapb.Region) error {
return errors.Errorf("region is stale: region %v origin %v", region, origin)
}
)

// Handler is a helper to export methods to handle API/RPC requests.
Expand Down
24 changes: 24 additions & 0 deletions tests/server/region_syncer/region_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,30 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) {
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)
}

// merge case
// region2 -> region1 -> region0
// merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver
regions[0] = regions[0].Clone(core.WithEndKey(regions[2].GetEndKey()), core.WithIncVersion(), core.WithIncVersion())
err = rc.HandleRegionHeartbeat(regions[2])
c.Assert(err, IsNil)

// merge case
// region3 -> region4
// merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver
regions[4] = regions[3].Clone(core.WithEndKey(regions[4].GetEndKey()), core.WithIncVersion())
err = rc.HandleRegionHeartbeat(regions[4])
c.Assert(err, IsNil)

// merge case
// region0 -> region4
// merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver
regions[4] = regions[0].Clone(core.WithEndKey(regions[4].GetEndKey()), core.WithIncVersion(), core.WithIncVersion())
err = rc.HandleRegionHeartbeat(regions[4])
c.Assert(err, IsNil)
regions = regions[4:]
regionLen = len(regions)

// ensure flush to region kv
time.Sleep(3 * time.Second)
err = leaderServer.Stop()
Expand Down

0 comments on commit 3193990

Please sign in to comment.