Skip to content

Commit

Permalink
server: check the store version when changing tombstone status (#3200)
Browse files Browse the repository at this point in the history
* server: check the store version when changing a store whose status is tombstone

Signed-off-by: Zheng Xiangsheng <hundundm@gmail.com>

* update newTestStores

Signed-off-by: Zheng Xiangsheng <hundundm@gmail.com>

* refine PutStore

Signed-off-by: Zheng Xiangsheng <hundundm@gmail.com>

* refine store limit and stats

Signed-off-by: Zheng Xiangsheng <hundundm@gmail.com>

* add test

Signed-off-by: Zheng Xiangsheng <hundundm@gmail.com>

* remove CreateRollingStoreStats

Signed-off-by: Zheng Xiangsheng <hundundm@gmail.com>

Co-authored-by: Ti Prow Robot <71242396+ti-community-prow-bot@users.noreply.github.com>
  • Loading branch information
HunDunDM and ti-chi-bot authored Nov 24, 2020
1 parent 2d3f865 commit 57bfeb7
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 34 deletions.
55 changes: 41 additions & 14 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {
zap.Duration("cost", time.Since(start)),
)
for _, store := range c.GetStores() {
c.storesStats.CreateRollingStoreStats(store.GetID())
c.storesStats.GetOrCreateRollingStoreStats(store.GetID())
}
return c, nil
}
Expand Down Expand Up @@ -906,27 +906,31 @@ func (c *RaftCluster) UpdateStoreLabels(storeID uint64, labels []*metapb.StoreLa
newStore := proto.Clone(store.GetMeta()).(*metapb.Store)
newStore.Labels = labels
// PutStore will perform label merge.
err := c.PutStore(newStore, force)
return err
return c.putStoreImpl(newStore, force)
}

// PutStore puts a store.
func (c *RaftCluster) PutStore(store *metapb.Store) error {
if err := c.putStoreImpl(store, false); err != nil {
return err
}
c.OnStoreVersionChange()
c.AddStoreLimit(store)
return nil
}

// putStoreImpl puts a store.
// If 'force' is true, then overwrite the store's labels.
func (c *RaftCluster) PutStore(store *metapb.Store, force bool) error {
func (c *RaftCluster) putStoreImpl(store *metapb.Store, force bool) error {
c.Lock()
defer c.Unlock()

if store.GetId() == 0 {
return errors.Errorf("invalid put store %v", store)
}

v, err := versioninfo.ParseVersion(store.GetVersion())
if err != nil {
return errors.Errorf("invalid put store %v, error: %s", store, err)
}
clusterVersion := *c.opt.GetClusterVersion()
if !versioninfo.IsCompatible(clusterVersion, *v) {
return errors.Errorf("version should compatible with version %s, got %s", clusterVersion, v)
if err := c.checkStoreVersion(store); err != nil {
return err
}

// Store address can not be the same as other stores.
Expand Down Expand Up @@ -960,12 +964,24 @@ func (c *RaftCluster) PutStore(store *metapb.Store, force bool) error {
core.SetStoreDeployPath(store.DeployPath),
)
}
if err = c.checkStoreLabels(s); err != nil {
if err := c.checkStoreLabels(s); err != nil {
return err
}
return c.putStoreLocked(s)
}

func (c *RaftCluster) checkStoreVersion(store *metapb.Store) error {
v, err := versioninfo.ParseVersion(store.GetVersion())
if err != nil {
return errors.Errorf("invalid put store %v, error: %s", store, err)
}
clusterVersion := *c.opt.GetClusterVersion()
if !versioninfo.IsCompatible(clusterVersion, *v) {
return errors.Errorf("version should compatible with version %s, got %s", clusterVersion, v)
}
return nil
}

func (c *RaftCluster) checkStoreLabels(s *core.StoreInfo) error {
if c.opt.IsPlacementRulesEnabled() {
return nil
Expand Down Expand Up @@ -1090,6 +1106,12 @@ func (c *RaftCluster) SetStoreState(storeID uint64, state metapb.StoreState) err
return errs.ErrStoreNotFound.FastGenByArgs(storeID)
}

if store.GetState() == metapb.StoreState_Tombstone && state != metapb.StoreState_Tombstone {
if err := c.checkStoreVersion(store.GetMeta()); err != nil {
return err
}
}

newStore := store.Clone(core.SetStoreState(state))
log.Warn("store update state",
zap.Uint64("store-id", storeID),
Expand Down Expand Up @@ -1126,7 +1148,7 @@ func (c *RaftCluster) putStoreLocked(store *core.StoreInfo) error {
}
}
c.core.PutStore(store)
c.storesStats.CreateRollingStoreStats(store.GetID())
c.storesStats.GetOrCreateRollingStoreStats(store.GetID())
return nil
}

Expand Down Expand Up @@ -1603,7 +1625,12 @@ func (c *RaftCluster) GetAllStoresLimit() map[uint64]config.StoreLimitConfig {

// AddStoreLimit add a store limit for a given store ID.
func (c *RaftCluster) AddStoreLimit(store *metapb.Store) {
storeID := store.GetId()
cfg := c.opt.GetScheduleConfig().Clone()
if _, ok := cfg.StoreLimit[storeID]; ok {
return
}

sc := config.StoreLimitConfig{
AddPeer: config.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer),
RemovePeer: config.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer),
Expand All @@ -1614,7 +1641,7 @@ func (c *RaftCluster) AddStoreLimit(store *metapb.Store) {
RemovePeer: config.DefaultTiFlashStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer),
}
}
storeID := store.GetId()

cfg.StoreLimit[storeID] = sc
c.opt.SetScheduleConfig(cfg)
}
Expand Down
44 changes: 36 additions & 8 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) {
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())

n, np := uint64(3), uint64(3)
stores := newTestStores(n)
stores := newTestStores(n, "2.0.0")
storeMetasAfterHeartbeat := make([]*metapb.Store, 0, n)
regions := newTestRegions(n, np)

Expand Down Expand Up @@ -98,7 +98,7 @@ func (s *testClusterInfoSuite) TestFilterUnhealthyStore(c *C) {
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())

stores := newTestStores(3)
stores := newTestStores(3, "2.0.0")
for _, store := range stores {
storeStats := &pdpb.StoreStats{
StoreId: store.GetID(),
Expand All @@ -125,14 +125,38 @@ func (s *testClusterInfoSuite) TestFilterUnhealthyStore(c *C) {
}
}

func (s *testClusterInfoSuite) TestSetStoreState(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())

// Put 4 stores.
for _, store := range newTestStores(4, "2.0.0") {
c.Assert(cluster.PutStore(store.GetMeta()), IsNil)
}
// Store 3 and 4 offline normally.
for _, id := range []uint64{3, 4} {
c.Assert(cluster.RemoveStore(id), IsNil)
c.Assert(cluster.BuryStore(id, false), IsNil)
}
// Change the status of 3 directly back to Up.
c.Assert(cluster.SetStoreState(3, metapb.StoreState_Up), IsNil)
// Update store 1 2 3
for _, store := range newTestStores(3, "3.0.0") {
c.Assert(cluster.PutStore(store.GetMeta()), IsNil)
}
// Since the store 4 version is too low, setting it to Up should fail.
c.Assert(cluster.SetStoreState(4, metapb.StoreState_Up), NotNil)
}

func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())

n, np := uint64(3), uint64(3)

stores := newTestStores(3)
stores := newTestStores(3, "2.0.0")
regions := newTestRegions(n, np)

for _, store := range stores {
Expand Down Expand Up @@ -514,7 +538,7 @@ func (s *testClusterInfoSuite) TestUpdateStorePendingPeerCount(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestCluster(opt)
stores := newTestStores(5)
stores := newTestStores(5, "2.0.0")
for _, s := range stores {
c.Assert(tc.putStoreLocked(s), IsNil)
}
Expand Down Expand Up @@ -551,7 +575,7 @@ type testStoresInfoSuite struct{}
func (s *testStoresInfoSuite) TestStores(c *C) {
n := uint64(10)
cache := core.NewStoresInfo()
stores := newTestStores(n)
stores := newTestStores(n, "2.0.0")

for i, store := range stores {
id := store.GetID()
Expand Down Expand Up @@ -700,7 +724,7 @@ func (s *testGetStoresSuite) SetUpSuite(c *C) {
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
s.cluster = cluster

stores := newTestStores(200)
stores := newTestStores(200, "2.0.0")

for _, store := range stores {
c.Assert(s.cluster.putStoreLocked(store), IsNil)
Expand Down Expand Up @@ -750,11 +774,15 @@ func newTestRaftCluster(id id.Allocator, opt *config.PersistOptions, storage *co
}

// Create n stores (0..n).
func newTestStores(n uint64) []*core.StoreInfo {
func newTestStores(n uint64, version string) []*core.StoreInfo {
stores := make([]*core.StoreInfo, 0, n)
for i := uint64(1); i <= n; i++ {
store := &metapb.Store{
Id: i,
Id: i,
Address: fmt.Sprintf("127.0.0.1:%d", i),
State: metapb.StoreState_Up,
Version: version,
DeployPath: fmt.Sprintf("test/store%d", i),
}
stores = append(stores, core.NewStoreInfo(store))
}
Expand Down
4 changes: 1 addition & 3 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,12 @@ func (s *Server) PutStore(ctx context.Context, request *pdpb.PutStoreRequest) (*
return nil, status.Errorf(codes.FailedPrecondition, "placement rules is disabled")
}

if err := rc.PutStore(store, false); err != nil {
if err := rc.PutStore(store); err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
}

log.Info("put store ok", zap.Stringer("store", store))
rc.OnStoreVersionChange()
CheckPDVersion(s.persistOptions)
rc.AddStoreLimit(store)

return &pdpb.PutStoreResponse{
Header: s.header(),
Expand Down
7 changes: 0 additions & 7 deletions server/statistics/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,6 @@ func NewStoresStats() *StoresStats {
}
}

// CreateRollingStoreStats creates RollingStoreStats with a given store ID.
func (s *StoresStats) CreateRollingStoreStats(storeID uint64) {
s.Lock()
defer s.Unlock()
s.rollingStoresStats[storeID] = newRollingStoreStats()
}

// RemoveRollingStoreStats removes RollingStoreStats with a given store ID.
func (s *StoresStats) RemoveRollingStoreStats(storeID uint64) {
s.Lock()
Expand Down
2 changes: 1 addition & 1 deletion server/statistics/store_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (t *testStoreStatisticsSuite) TestStoreStatistics(c *C) {
stores := make([]*core.StoreInfo, 0, len(metaStores))
for _, m := range metaStores {
s := core.NewStoreInfo(m, core.SetLastHeartbeatTS(time.Now()))
storesStats.CreateRollingStoreStats(m.GetId())
storesStats.GetOrCreateRollingStoreStats(m.GetId())
stores = append(stores, s)
}

Expand Down
2 changes: 1 addition & 1 deletion tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (s *clusterTestSuite) TestRaftClusterMultipleRestart(c *C) {
store := newMetaStore(storeID, "127.0.0.1:4", "2.1.0", metapb.StoreState_Offline, fmt.Sprintf("test/store%d", storeID))
rc := leaderServer.GetRaftCluster()
c.Assert(rc, NotNil)
err = rc.PutStore(store, false)
err = rc.PutStore(store)
c.Assert(err, IsNil)
c.Assert(tc, NotNil)

Expand Down

0 comments on commit 57bfeb7

Please sign in to comment.