diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index c85b28e96bc..b7df61202f1 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -297,7 +297,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) { zap.Duration("cost", time.Since(start)), ) for _, store := range c.GetStores() { - c.storesStats.CreateRollingStoreStats(store.GetID()) + c.storesStats.GetOrCreateRollingStoreStats(store.GetID()) } return c, nil } @@ -906,13 +906,22 @@ func (c *RaftCluster) UpdateStoreLabels(storeID uint64, labels []*metapb.StoreLa newStore := proto.Clone(store.GetMeta()).(*metapb.Store) newStore.Labels = labels // PutStore will perform label merge. - err := c.PutStore(newStore, force) - return err + return c.putStoreImpl(newStore, force) } // PutStore puts a store. +func (c *RaftCluster) PutStore(store *metapb.Store) error { + if err := c.putStoreImpl(store, false); err != nil { + return err + } + c.OnStoreVersionChange() + c.AddStoreLimit(store) + return nil +} + +// putStoreImpl puts a store. // If 'force' is true, then overwrite the store's labels. -func (c *RaftCluster) PutStore(store *metapb.Store, force bool) error { +func (c *RaftCluster) putStoreImpl(store *metapb.Store, force bool) error { c.Lock() defer c.Unlock() @@ -920,13 +929,8 @@ func (c *RaftCluster) PutStore(store *metapb.Store, force bool) error { return errors.Errorf("invalid put store %v", store) } - v, err := versioninfo.ParseVersion(store.GetVersion()) - if err != nil { - return errors.Errorf("invalid put store %v, error: %s", store, err) - } - clusterVersion := *c.opt.GetClusterVersion() - if !versioninfo.IsCompatible(clusterVersion, *v) { - return errors.Errorf("version should compatible with version %s, got %s", clusterVersion, v) + if err := c.checkStoreVersion(store); err != nil { + return err } // Store address can not be the same as other stores. @@ -960,12 +964,24 @@ func (c *RaftCluster) PutStore(store *metapb.Store, force bool) error { core.SetStoreDeployPath(store.DeployPath), ) } - if err = c.checkStoreLabels(s); err != nil { + if err := c.checkStoreLabels(s); err != nil { return err } return c.putStoreLocked(s) } +func (c *RaftCluster) checkStoreVersion(store *metapb.Store) error { + v, err := versioninfo.ParseVersion(store.GetVersion()) + if err != nil { + return errors.Errorf("invalid put store %v, error: %s", store, err) + } + clusterVersion := *c.opt.GetClusterVersion() + if !versioninfo.IsCompatible(clusterVersion, *v) { + return errors.Errorf("version should compatible with version %s, got %s", clusterVersion, v) + } + return nil +} + func (c *RaftCluster) checkStoreLabels(s *core.StoreInfo) error { if c.opt.IsPlacementRulesEnabled() { return nil @@ -1090,6 +1106,12 @@ func (c *RaftCluster) SetStoreState(storeID uint64, state metapb.StoreState) err return errs.ErrStoreNotFound.FastGenByArgs(storeID) } + if store.GetState() == metapb.StoreState_Tombstone && state != metapb.StoreState_Tombstone { + if err := c.checkStoreVersion(store.GetMeta()); err != nil { + return err + } + } + newStore := store.Clone(core.SetStoreState(state)) log.Warn("store update state", zap.Uint64("store-id", storeID), @@ -1126,7 +1148,7 @@ func (c *RaftCluster) putStoreLocked(store *core.StoreInfo) error { } } c.core.PutStore(store) - c.storesStats.CreateRollingStoreStats(store.GetID()) + c.storesStats.GetOrCreateRollingStoreStats(store.GetID()) return nil } @@ -1603,7 +1625,12 @@ func (c *RaftCluster) GetAllStoresLimit() map[uint64]config.StoreLimitConfig { // AddStoreLimit add a store limit for a given store ID. func (c *RaftCluster) AddStoreLimit(store *metapb.Store) { + storeID := store.GetId() cfg := c.opt.GetScheduleConfig().Clone() + if _, ok := cfg.StoreLimit[storeID]; ok { + return + } + sc := config.StoreLimitConfig{ AddPeer: config.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer), RemovePeer: config.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer), @@ -1614,7 +1641,7 @@ func (c *RaftCluster) AddStoreLimit(store *metapb.Store) { RemovePeer: config.DefaultTiFlashStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer), } } - storeID := store.GetId() + cfg.StoreLimit[storeID] = sc c.opt.SetScheduleConfig(cfg) } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index c510ca4aa72..407b5ab702b 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -50,7 +50,7 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) { cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) n, np := uint64(3), uint64(3) - stores := newTestStores(n) + stores := newTestStores(n, "2.0.0") storeMetasAfterHeartbeat := make([]*metapb.Store, 0, n) regions := newTestRegions(n, np) @@ -98,7 +98,7 @@ func (s *testClusterInfoSuite) TestFilterUnhealthyStore(c *C) { c.Assert(err, IsNil) cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) - stores := newTestStores(3) + stores := newTestStores(3, "2.0.0") for _, store := range stores { storeStats := &pdpb.StoreStats{ StoreId: store.GetID(), @@ -125,6 +125,30 @@ func (s *testClusterInfoSuite) TestFilterUnhealthyStore(c *C) { } } +func (s *testClusterInfoSuite) TestSetStoreState(c *C) { + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) + cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) + + // Put 4 stores. + for _, store := range newTestStores(4, "2.0.0") { + c.Assert(cluster.PutStore(store.GetMeta()), IsNil) + } + // Store 3 and 4 offline normally. + for _, id := range []uint64{3, 4} { + c.Assert(cluster.RemoveStore(id), IsNil) + c.Assert(cluster.BuryStore(id, false), IsNil) + } + // Change the status of 3 directly back to Up. + c.Assert(cluster.SetStoreState(3, metapb.StoreState_Up), IsNil) + // Update store 1 2 3 + for _, store := range newTestStores(3, "3.0.0") { + c.Assert(cluster.PutStore(store.GetMeta()), IsNil) + } + // Since the store 4 version is too low, setting it to Up should fail. + c.Assert(cluster.SetStoreState(4, metapb.StoreState_Up), NotNil) +} + func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) @@ -132,7 +156,7 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { n, np := uint64(3), uint64(3) - stores := newTestStores(3) + stores := newTestStores(3, "2.0.0") regions := newTestRegions(n, np) for _, store := range stores { @@ -514,7 +538,7 @@ func (s *testClusterInfoSuite) TestUpdateStorePendingPeerCount(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestCluster(opt) - stores := newTestStores(5) + stores := newTestStores(5, "2.0.0") for _, s := range stores { c.Assert(tc.putStoreLocked(s), IsNil) } @@ -551,7 +575,7 @@ type testStoresInfoSuite struct{} func (s *testStoresInfoSuite) TestStores(c *C) { n := uint64(10) cache := core.NewStoresInfo() - stores := newTestStores(n) + stores := newTestStores(n, "2.0.0") for i, store := range stores { id := store.GetID() @@ -700,7 +724,7 @@ func (s *testGetStoresSuite) SetUpSuite(c *C) { cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) s.cluster = cluster - stores := newTestStores(200) + stores := newTestStores(200, "2.0.0") for _, store := range stores { c.Assert(s.cluster.putStoreLocked(store), IsNil) @@ -750,11 +774,15 @@ func newTestRaftCluster(id id.Allocator, opt *config.PersistOptions, storage *co } // Create n stores (0..n). -func newTestStores(n uint64) []*core.StoreInfo { +func newTestStores(n uint64, version string) []*core.StoreInfo { stores := make([]*core.StoreInfo, 0, n) for i := uint64(1); i <= n; i++ { store := &metapb.Store{ - Id: i, + Id: i, + Address: fmt.Sprintf("127.0.0.1:%d", i), + State: metapb.StoreState_Up, + Version: version, + DeployPath: fmt.Sprintf("test/store%d", i), } stores = append(stores, core.NewStoreInfo(store)) } diff --git a/server/grpc_service.go b/server/grpc_service.go index cbfbdc4d086..bf6305946ea 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -242,14 +242,12 @@ func (s *Server) PutStore(ctx context.Context, request *pdpb.PutStoreRequest) (* return nil, status.Errorf(codes.FailedPrecondition, "placement rules is disabled") } - if err := rc.PutStore(store, false); err != nil { + if err := rc.PutStore(store); err != nil { return nil, status.Errorf(codes.Unknown, err.Error()) } log.Info("put store ok", zap.Stringer("store", store)) - rc.OnStoreVersionChange() CheckPDVersion(s.persistOptions) - rc.AddStoreLimit(store) return &pdpb.PutStoreResponse{ Header: s.header(), diff --git a/server/statistics/store.go b/server/statistics/store.go index 2e4c0bfc737..6d499a89c6d 100644 --- a/server/statistics/store.go +++ b/server/statistics/store.go @@ -41,13 +41,6 @@ func NewStoresStats() *StoresStats { } } -// CreateRollingStoreStats creates RollingStoreStats with a given store ID. -func (s *StoresStats) CreateRollingStoreStats(storeID uint64) { - s.Lock() - defer s.Unlock() - s.rollingStoresStats[storeID] = newRollingStoreStats() -} - // RemoveRollingStoreStats removes RollingStoreStats with a given store ID. func (s *StoresStats) RemoveRollingStoreStats(storeID uint64) { s.Lock() diff --git a/server/statistics/store_collection_test.go b/server/statistics/store_collection_test.go index 9f1dd702040..5bd3e665d12 100644 --- a/server/statistics/store_collection_test.go +++ b/server/statistics/store_collection_test.go @@ -47,7 +47,7 @@ func (t *testStoreStatisticsSuite) TestStoreStatistics(c *C) { stores := make([]*core.StoreInfo, 0, len(metaStores)) for _, m := range metaStores { s := core.NewStoreInfo(m, core.SetLastHeartbeatTS(time.Now())) - storesStats.CreateRollingStoreStats(m.GetId()) + storesStats.GetOrCreateRollingStoreStats(m.GetId()) stores = append(stores, s) } diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 9aa88e5fe5e..0af8d349d20 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -367,7 +367,7 @@ func (s *clusterTestSuite) TestRaftClusterMultipleRestart(c *C) { store := newMetaStore(storeID, "127.0.0.1:4", "2.1.0", metapb.StoreState_Offline, fmt.Sprintf("test/store%d", storeID)) rc := leaderServer.GetRaftCluster() c.Assert(rc, NotNil) - err = rc.PutStore(store, false) + err = rc.PutStore(store) c.Assert(err, IsNil) c.Assert(tc, NotNil)