From ce91ee946799e85a4f5be1e6328eada3801e1cdd Mon Sep 17 00:00:00 2001 From: ShuNing Date: Wed, 18 Dec 2019 18:05:18 -0800 Subject: [PATCH] =?UTF-8?q?*:=20fix=20the=20issue=20that=20loadcluster=20d?= =?UTF-8?q?oes=20not=20remove=20overlap=20regio=E2=80=A6=20(#2039)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: nolouch --- server/cluster_info.go | 31 +++++++++++-------- server/cluster_info_test.go | 2 +- server/core/basic_cluster.go | 20 ++++++++++++ server/core/errors.go | 7 +++++ server/core/kv.go | 6 ++-- server/core/kv_test.go | 4 +-- server/core/region.go | 3 ++ server/core/region_kv.go | 4 +-- server/handler.go | 5 --- .../region_syncer/region_syncer_test.go | 24 ++++++++++++++ 10 files changed, 80 insertions(+), 26 deletions(-) diff --git a/server/cluster_info.go b/server/cluster_info.go index 07ee7a87dc7..a8185e32616 100644 --- a/server/cluster_info.go +++ b/server/cluster_info.go @@ -82,7 +82,19 @@ func loadClusterInfo(id core.IDAllocator, kv *core.KV, opt *scheduleOption) (*cl ) start = time.Now() - if err := kv.LoadRegions(c.core.Regions); err != nil { + // used to load region from kv storage to cache storage. + putRegion := func(region *core.RegionInfo) []*metapb.Region { + c.Lock() + defer c.Unlock() + origin, err := c.core.PreCheckPutRegion(region) + if err != nil { + log.Warn("region is stale", zap.Error(err), zap.Stringer("origin", origin.GetMeta())) + // return the state region to delete. + return []*metapb.Region{region.GetMeta()} + } + return c.core.Regions.SetRegion(region) + } + if err := kv.LoadRegions(putRegion); err != nil { return nil, err } log.Info("load regions", @@ -512,14 +524,10 @@ func (c *clusterInfo) updateStoreStatusLocked(id uint64) { // handleRegionHeartbeat updates the region information. func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { c.RLock() - origin := c.core.Regions.GetRegion(region.GetID()) - if origin == nil { - for _, item := range c.core.Regions.GetOverlaps(region) { - if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() { - c.RUnlock() - return ErrRegionIsStale(region.GetMeta(), item) - } - } + origin, err := c.core.PreCheckPutRegion(region) + if err != nil { + c.RUnlock() + return err } isWriteUpdate, writeItem := c.CheckWriteStatus(region) isReadUpdate, readItem := c.CheckReadStatus(region) @@ -538,10 +546,7 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { } else { r := region.GetRegionEpoch() o := origin.GetRegionEpoch() - // Region meta is stale, return an error. - if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() { - return ErrRegionIsStale(region.GetMeta(), origin.GetMeta()) - } + if r.GetVersion() > o.GetVersion() { log.Info("region Version changed", zap.Uint64("region-id", region.GetID()), diff --git a/server/cluster_info_test.go b/server/cluster_info_test.go index 1849213b738..ee5ea056ae2 100644 --- a/server/cluster_info_test.go +++ b/server/cluster_info_test.go @@ -32,7 +32,7 @@ func checkStaleRegion(origin *metapb.Region, region *metapb.Region) error { e := region.GetRegionEpoch() if e.GetVersion() < o.GetVersion() || e.GetConfVer() < o.GetConfVer() { - return ErrRegionIsStale(region, origin) + return core.ErrRegionIsStale(region, origin) } return nil diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index af6109de4fc..1cf174961b8 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -123,3 +123,23 @@ func (bc *BasicCluster) DeleteStore(store *StoreInfo) { func (bc *BasicCluster) PutRegion(region *RegionInfo) { bc.Regions.SetRegion(region) } + +// PreCheckPutRegion checks if the region is valid to put. +func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, error) { + for _, item := range bc.Regions.GetOverlaps(region) { + if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() { + return nil, ErrRegionIsStale(region.GetMeta(), item) + } + } + origin := bc.Regions.GetRegion(region.GetID()) + if origin == nil { + return nil, nil + } + r := region.GetRegionEpoch() + o := origin.GetRegionEpoch() + // Region meta is stale, return an error. + if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() { + return origin, ErrRegionIsStale(region.GetMeta(), origin.GetMeta()) + } + return origin, nil +} diff --git a/server/core/errors.go b/server/core/errors.go index a58b9adcc53..5260af9802c 100644 --- a/server/core/errors.go +++ b/server/core/errors.go @@ -21,6 +21,8 @@ import ( "net/http" "github.com/pingcap/errcode" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pkg/errors" ) var ( @@ -61,3 +63,8 @@ func (e StoreBlockedErr) Error() string { // Code returns StoreBlockedCode func (e StoreBlockedErr) Code() errcode.Code { return StoreBlockedCode } + +// ErrRegionIsStale is error info for region is stale +var ErrRegionIsStale = func(region *metapb.Region, origin *metapb.Region) error { + return errors.Errorf("region is stale: region %v origin %v", region, origin) +} diff --git a/server/core/kv.go b/server/core/kv.go index fb407197218..968bcf5ceaa 100644 --- a/server/core/kv.go +++ b/server/core/kv.go @@ -128,11 +128,11 @@ func (kv *KV) LoadRegion(regionID uint64, region *metapb.Region) (bool, error) { } // LoadRegions loads all regions from KV to RegionsInfo. -func (kv *KV) LoadRegions(regions *RegionsInfo) error { +func (kv *KV) LoadRegions(f func(region *RegionInfo) []*metapb.Region) error { if atomic.LoadInt32(&kv.useRegionKV) > 0 { - return loadRegions(kv.regionKV, regions) + return loadRegions(kv.regionKV, f) } - return loadRegions(kv.KVBase, regions) + return loadRegions(kv.KVBase, f) } // SaveRegion saves one region to KV. diff --git a/server/core/kv_test.go b/server/core/kv_test.go index f671a3b0b1d..9aed59889c3 100644 --- a/server/core/kv_test.go +++ b/server/core/kv_test.go @@ -137,7 +137,7 @@ func (s *testKVSuite) TestLoadRegions(c *C) { n := 10 regions := mustSaveRegions(c, kv, n) - c.Assert(kv.LoadRegions(cache), IsNil) + c.Assert(kv.LoadRegions(cache.SetRegion), IsNil) c.Assert(cache.GetRegionCount(), Equals, n) for _, region := range cache.GetMetaRegions() { @@ -151,7 +151,7 @@ func (s *testKVSuite) TestLoadRegionsExceedRangeLimit(c *C) { n := 1000 regions := mustSaveRegions(c, kv, n) - c.Assert(kv.LoadRegions(cache), IsNil) + c.Assert(kv.LoadRegions(cache.SetRegion), IsNil) c.Assert(cache.GetRegionCount(), Equals, n) for _, region := range cache.GetMetaRegions() { c.Assert(region, DeepEquals, regions[region.GetId()]) diff --git a/server/core/region.go b/server/core/region.go index cf967cc1082..e0379628cca 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -296,6 +296,9 @@ func (r *RegionInfo) GetID() uint64 { // GetMeta returns the meta information of the region. func (r *RegionInfo) GetMeta() *metapb.Region { + if r == nil { + return nil + } return r.meta } diff --git a/server/core/region_kv.go b/server/core/region_kv.go index 06306edabff..5371f2563b2 100644 --- a/server/core/region_kv.go +++ b/server/core/region_kv.go @@ -118,7 +118,7 @@ func deleteRegion(kv KVBase, region *metapb.Region) error { return kv.Delete(regionPath(region.GetId())) } -func loadRegions(kv KVBase, regions *RegionsInfo) error { +func loadRegions(kv KVBase, f func(region *RegionInfo) []*metapb.Region) error { nextID := uint64(0) endKey := regionPath(math.MaxUint64) @@ -143,7 +143,7 @@ func loadRegions(kv KVBase, regions *RegionsInfo) error { } nextID = region.GetId() + 1 - overlaps := regions.SetRegion(NewRegionInfo(region, nil)) + overlaps := f(NewRegionInfo(region, nil)) for _, item := range overlaps { if err := deleteRegion(kv, item); err != nil { return err diff --git a/server/handler.go b/server/handler.go index bdcbea5fba2..7105c99e738 100644 --- a/server/handler.go +++ b/server/handler.go @@ -19,7 +19,6 @@ import ( "time" "github.com/pingcap/errcode" - "github.com/pingcap/kvproto/pkg/metapb" log "github.com/pingcap/log" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/schedule" @@ -47,10 +46,6 @@ var ( ErrRegionAbnormalPeer = func(regionID uint64) error { return errors.Errorf("region %v has abnormal peer", regionID) } - // ErrRegionIsStale is error info for region is stale - ErrRegionIsStale = func(region *metapb.Region, origin *metapb.Region) error { - return errors.Errorf("region is stale: region %v origin %v", region, origin) - } ) // Handler is a helper to export methods to handle API/RPC requests. diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index b72c147e6ff..bdba10ce1bc 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -80,6 +80,30 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) { err = rc.HandleRegionHeartbeat(region) c.Assert(err, IsNil) } + + // merge case + // region2 -> region1 -> region0 + // merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver + regions[0] = regions[0].Clone(core.WithEndKey(regions[2].GetEndKey()), core.WithIncVersion(), core.WithIncVersion()) + err = rc.HandleRegionHeartbeat(regions[2]) + c.Assert(err, IsNil) + + // merge case + // region3 -> region4 + // merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver + regions[4] = regions[3].Clone(core.WithEndKey(regions[4].GetEndKey()), core.WithIncVersion()) + err = rc.HandleRegionHeartbeat(regions[4]) + c.Assert(err, IsNil) + + // merge case + // region0 -> region4 + // merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver + regions[4] = regions[0].Clone(core.WithEndKey(regions[4].GetEndKey()), core.WithIncVersion(), core.WithIncVersion()) + err = rc.HandleRegionHeartbeat(regions[4]) + c.Assert(err, IsNil) + regions = regions[4:] + regionLen = len(regions) + // ensure flush to region kv time.Sleep(3 * time.Second) err = leaderServer.Stop()