diff --git a/pkg/autoscaling/calculation_test.go b/pkg/autoscaling/calculation_test.go index 98e27ea8d82..86aa937be3d 100644 --- a/pkg/autoscaling/calculation_test.go +++ b/pkg/autoscaling/calculation_test.go @@ -14,6 +14,7 @@ package autoscaling import ( + "context" "encoding/json" "fmt" "math" @@ -32,11 +33,22 @@ func Test(t *testing.T) { var _ = Suite(&calculationTestSuite{}) -type calculationTestSuite struct{} +type calculationTestSuite struct { + ctx context.Context + cancel context.CancelFunc +} + +func (s *calculationTestSuite) SetUpSuite(c *C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) +} + +func (s *calculationTestSuite) TearDownTest(c *C) { + s.cancel() +} func (s *calculationTestSuite) TestGetScaledTiKVGroups(c *C) { // case1 indicates the tikv cluster with not any group existed - case1 := mockcluster.NewCluster(config.NewTestOptions()) + case1 := mockcluster.NewCluster(s.ctx, config.NewTestOptions()) case1.AddLabelsStore(1, 1, map[string]string{}) case1.AddLabelsStore(2, 1, map[string]string{ "foo": "bar", @@ -46,7 +58,7 @@ func (s *calculationTestSuite) TestGetScaledTiKVGroups(c *C) { }) // case2 indicates the tikv cluster with 1 auto-scaling group existed - case2 := mockcluster.NewCluster(config.NewTestOptions()) + case2 := mockcluster.NewCluster(s.ctx, config.NewTestOptions()) case2.AddLabelsStore(1, 1, map[string]string{}) case2.AddLabelsStore(2, 1, map[string]string{ groupLabelKey: fmt.Sprintf("%s-%s-0", autoScalingGroupLabelKeyPrefix, TiKV.String()), @@ -58,7 +70,7 @@ func (s *calculationTestSuite) TestGetScaledTiKVGroups(c *C) { }) // case3 indicates the tikv cluster with other group existed - case3 := mockcluster.NewCluster(config.NewTestOptions()) + case3 := mockcluster.NewCluster(s.ctx, config.NewTestOptions()) case3.AddLabelsStore(1, 1, map[string]string{}) case3.AddLabelsStore(2, 1, map[string]string{ groupLabelKey: "foo", @@ -323,7 +335,7 @@ func (s *calculationTestSuite) TestStrategyChangeCount(c *C) { } // tikv cluster with 1 auto-scaling group existed - cluster := mockcluster.NewCluster(config.NewTestOptions()) + cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions()) cluster.AddLabelsStore(1, 1, map[string]string{}) cluster.AddLabelsStore(2, 1, map[string]string{ groupLabelKey: fmt.Sprintf("%s-%s-0", autoScalingGroupLabelKeyPrefix, TiKV.String()), diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 8036f9d926d..8f1ca3ffb69 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -14,6 +14,7 @@ package mockcluster import ( + "context" "fmt" "strconv" "time" @@ -52,11 +53,11 @@ type Cluster struct { } // NewCluster creates a new Cluster -func NewCluster(opts *config.PersistOptions) *Cluster { +func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster { clus := &Cluster{ BasicCluster: core.NewBasicCluster(), IDAllocator: mockid.NewIDAllocator(), - HotStat: statistics.NewHotStat(), + HotStat: statistics.NewHotStat(ctx), PersistOptions: opts, suspectRegions: map[uint64]struct{}{}, disabledFeatures: make(map[versioninfo.Feature]struct{}), @@ -339,7 +340,7 @@ func (mc *Cluster) AddLeaderRegionWithReadInfo( var items []*statistics.HotPeerStat for i := 0; i < filledNum; i++ { - items = mc.HotCache.CheckRead(r) + items = mc.HotCache.CheckReadSync(r) for _, item := range items { mc.HotCache.Update(item) } @@ -366,7 +367,7 @@ func (mc *Cluster) AddLeaderRegionWithWriteInfo( var items []*statistics.HotPeerStat for i := 0; i < filledNum; i++ { - items = mc.HotCache.CheckWrite(r) + items = mc.HotCache.CheckWriteSync(r) for _, item := range items { mc.HotCache.Update(item) } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 2b62f6c8df2..ed1aa8abc16 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -205,7 +205,7 @@ func (c *RaftCluster) InitCluster(id id.Allocator, opt *config.PersistOptions, s c.storage = storage c.id = id c.labelLevelStats = statistics.NewLabelStatistics() - c.hotStat = statistics.NewHotStat() + c.hotStat = statistics.NewHotStat(c.ctx) c.prepareChecker = newPrepareChecker() c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit) c.suspectRegions = cache.NewIDTTL(c.ctx, time.Minute, 3*time.Minute) @@ -546,8 +546,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { c.RUnlock() return err } - writeItems := c.CheckWriteStatus(region) - readItems := c.CheckReadStatus(region) + c.CheckRWStatus(region) c.RUnlock() // Save to storage if meta is updated. @@ -623,7 +622,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { } } - if len(writeItems) == 0 && len(readItems) == 0 && !saveKV && !saveCache && !isNew { + if !saveKV && !saveCache && !isNew { return nil } @@ -682,13 +681,6 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { if c.regionStats != nil { c.regionStats.Observe(region, c.getRegionStoresLocked(region)) } - - for _, writeItem := range writeItems { - c.hotStat.Update(writeItem) - } - for _, readItem := range readItems { - c.hotStat.Update(readItem) - } c.Unlock() // If there are concurrent heartbeats from the same region, the last write will win even if @@ -1475,14 +1467,9 @@ func (c *RaftCluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { return c.hotStat.RegionStats(statistics.WriteFlow, c.GetOpts().GetHotRegionCacheHitsThreshold()) } -// CheckWriteStatus checks the write status, returns whether need update statistics and item. -func (c *RaftCluster) CheckWriteStatus(region *core.RegionInfo) []*statistics.HotPeerStat { - return c.hotStat.CheckWrite(region) -} - -// CheckReadStatus checks the read status, returns whether need update statistics and item. -func (c *RaftCluster) CheckReadStatus(region *core.RegionInfo) []*statistics.HotPeerStat { - return c.hotStat.CheckRead(region) +// CheckRWStatus checks the read/write status. +func (c *RaftCluster) CheckRWStatus(region *core.RegionInfo) { + c.hotStat.CheckRWAsync(region) } // TODO: remove me. diff --git a/server/core/kind.go b/server/core/kind.go index c28de0624c8..a35f6749870 100644 --- a/server/core/kind.go +++ b/server/core/kind.go @@ -129,3 +129,11 @@ func StringToKeyType(input string) KeyType { panic("invalid key type: " + input) } } + +// FlowStat indicates the stats of the flow +type FlowStat interface { + GetKeysWritten() uint64 + GetBytesWritten() uint64 + GetBytesRead() uint64 + GetKeysRead() uint64 +} diff --git a/server/core/peer.go b/server/core/peer.go index f92d4a246e7..3ae4e906a04 100644 --- a/server/core/peer.go +++ b/server/core/peer.go @@ -72,3 +72,53 @@ func CountInJointState(peers ...*metapb.Peer) int { } return count } + +// PeerInfo provides peer information +type PeerInfo struct { + *metapb.Peer + writtenBytes uint64 + writtenKeys uint64 + readBytes uint64 + readKeys uint64 +} + +// NewPeerInfo creates PeerInfo +func NewPeerInfo(meta *metapb.Peer, writtenBytes, writtenKeys, readBytes, readKeys uint64) *PeerInfo { + return &PeerInfo{ + Peer: meta, + writtenBytes: writtenBytes, + writtenKeys: writtenKeys, + readBytes: readBytes, + readKeys: readKeys, + } +} + +// GetKeysWritten provides peer written keys +func (p *PeerInfo) GetKeysWritten() uint64 { + return p.writtenKeys +} + +// GetBytesWritten provides peer written bytes +func (p *PeerInfo) GetBytesWritten() uint64 { + return p.writtenBytes +} + +// GetBytesRead provides peer read bytes +func (p *PeerInfo) GetBytesRead() uint64 { + return p.readBytes +} + +// GetKeysRead provides read keys +func (p *PeerInfo) GetKeysRead() uint64 { + return p.readKeys +} + +// GetStoreID provides located storeID +func (p *PeerInfo) GetStoreID() uint64 { + return p.GetStoreId() +} + +// GetPeerID provides peer id +func (p *PeerInfo) GetPeerID() uint64 { + return p.GetId() +} diff --git a/server/replication/replication_mode_test.go b/server/replication/replication_mode_test.go index 7b4c2138524..8f07390e0fb 100644 --- a/server/replication/replication_mode_test.go +++ b/server/replication/replication_mode_test.go @@ -34,12 +34,23 @@ func TestReplicationMode(t *testing.T) { var _ = Suite(&testReplicationMode{}) -type testReplicationMode struct{} +type testReplicationMode struct { + ctx context.Context + cancel context.CancelFunc +} + +func (s *testReplicationMode) SetUpSuite(c *C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) +} + +func (s *testReplicationMode) TearDownTest(c *C) { + s.cancel() +} func (s *testReplicationMode) TestInitial(c *C) { store := core.NewStorage(kv.NewMemoryKV()) conf := config.ReplicationModeConfig{ReplicationMode: modeMajority} - cluster := mockcluster.NewCluster(config.NewTestOptions()) + cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions()) rep, err := NewReplicationModeManager(conf, store, cluster, nil) c.Assert(err, IsNil) c.Assert(rep.GetReplicationStatus(), DeepEquals, &pb.ReplicationStatus{Mode: pb.ReplicationMode_MAJORITY}) @@ -72,7 +83,7 @@ func (s *testReplicationMode) TestStatus(c *C) { LabelKey: "dr-label", WaitSyncTimeout: typeutil.Duration{Duration: time.Minute}, }} - cluster := mockcluster.NewCluster(config.NewTestOptions()) + cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions()) rep, err := NewReplicationModeManager(conf, store, cluster, nil) c.Assert(err, IsNil) c.Assert(rep.GetReplicationStatus(), DeepEquals, &pb.ReplicationStatus{ @@ -147,7 +158,7 @@ func (s *testReplicationMode) TestStateSwitch(c *C) { WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, WaitSyncTimeout: typeutil.Duration{Duration: time.Minute}, }} - cluster := mockcluster.NewCluster(config.NewTestOptions()) + cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions()) var replicator mockFileReplicator rep, err := NewReplicationModeManager(conf, store, cluster, &replicator) c.Assert(err, IsNil) @@ -251,7 +262,7 @@ func (s *testReplicationMode) TestAsynctimeout(c *C) { WaitSyncTimeout: typeutil.Duration{Duration: time.Minute}, WaitAsyncTimeout: typeutil.Duration{Duration: 2 * time.Minute}, }} - cluster := mockcluster.NewCluster(config.NewTestOptions()) + cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions()) var replicator mockFileReplicator rep, err := NewReplicationModeManager(conf, store, cluster, &replicator) c.Assert(err, IsNil) @@ -302,7 +313,7 @@ func (s *testReplicationMode) TestRecoverProgress(c *C) { WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, WaitSyncTimeout: typeutil.Duration{Duration: time.Minute}, }} - cluster := mockcluster.NewCluster(config.NewTestOptions()) + cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions()) cluster.AddLabelsStore(1, 1, map[string]string{}) rep, err := NewReplicationModeManager(conf, store, cluster, nil) c.Assert(err, IsNil) diff --git a/server/schedule/checker/joint_state_checker_test.go b/server/schedule/checker/joint_state_checker_test.go index ef6a007c448..ce4ec9d1ccb 100644 --- a/server/schedule/checker/joint_state_checker_test.go +++ b/server/schedule/checker/joint_state_checker_test.go @@ -14,6 +14,8 @@ package checker import ( + "context" + . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/mock/mockcluster" @@ -27,10 +29,20 @@ var _ = Suite(&testJointStateCheckerSuite{}) type testJointStateCheckerSuite struct { cluster *mockcluster.Cluster jsc *JointStateChecker + ctx context.Context + cancel context.CancelFunc +} + +func (s *testJointStateCheckerSuite) SetUpSuite(c *C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) +} + +func (s *testJointStateCheckerSuite) TearDownTest(c *C) { + s.cancel() } func (s *testJointStateCheckerSuite) SetUpTest(c *C) { - s.cluster = mockcluster.NewCluster(config.NewTestOptions()) + s.cluster = mockcluster.NewCluster(s.ctx, config.NewTestOptions()) s.jsc = NewJointStateChecker(s.cluster) for id := uint64(1); id <= 10; id++ { s.cluster.PutStoreWithLabels(id) diff --git a/server/schedule/checker/learner_checker_test.go b/server/schedule/checker/learner_checker_test.go index 0f6f4b09d89..2964b82a30c 100644 --- a/server/schedule/checker/learner_checker_test.go +++ b/server/schedule/checker/learner_checker_test.go @@ -14,6 +14,8 @@ package checker import ( + "context" + . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/mock/mockcluster" @@ -28,10 +30,13 @@ var _ = Suite(&testLearnerCheckerSuite{}) type testLearnerCheckerSuite struct { cluster *mockcluster.Cluster lc *LearnerChecker + ctx context.Context + cancel context.CancelFunc } func (s *testLearnerCheckerSuite) SetUpTest(c *C) { - s.cluster = mockcluster.NewCluster(config.NewTestOptions()) + s.ctx, s.cancel = context.WithCancel(context.Background()) + s.cluster = mockcluster.NewCluster(s.ctx, config.NewTestOptions()) s.cluster.DisableFeature(versioninfo.JointConsensus) s.lc = NewLearnerChecker(s.cluster) for id := uint64(1); id <= 10; id++ { @@ -39,6 +44,10 @@ func (s *testLearnerCheckerSuite) SetUpTest(c *C) { } } +func (s *testLearnerCheckerSuite) TearDownTest(c *C) { + s.cancel() +} + func (s *testLearnerCheckerSuite) TestPromoteLearner(c *C) { lc := s.lc diff --git a/server/schedule/checker/merge_checker_test.go b/server/schedule/checker/merge_checker_test.go index 9339bf6999d..5b173522cc0 100644 --- a/server/schedule/checker/merge_checker_test.go +++ b/server/schedule/checker/merge_checker_test.go @@ -50,9 +50,17 @@ type testMergeCheckerSuite struct { regions []*core.RegionInfo } +func (s *testMergeCheckerSuite) SetUpSuite(c *C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) +} + +func (s *testMergeCheckerSuite) TearDownTest(c *C) { + s.cancel() +} + func (s *testMergeCheckerSuite) SetUpTest(c *C) { cfg := config.NewTestOptions() - s.cluster = mockcluster.NewCluster(cfg) + s.cluster = mockcluster.NewCluster(s.ctx, cfg) s.cluster.SetMaxMergeRegionSize(2) s.cluster.SetMaxMergeRegionKeys(2) s.cluster.SetLabelPropertyConfig(config.LabelPropertyConfig{ @@ -130,14 +138,9 @@ func (s *testMergeCheckerSuite) SetUpTest(c *C) { for _, region := range s.regions { s.cluster.PutRegion(region) } - s.ctx, s.cancel = context.WithCancel(context.Background()) s.mc = NewMergeChecker(s.ctx, s.cluster) } -func (s *testMergeCheckerSuite) TearDownTest(c *C) { - s.cancel() -} - func (s *testMergeCheckerSuite) TestBasic(c *C) { s.cluster.SetSplitMergeInterval(0) @@ -434,7 +437,7 @@ type testSplitMergeSuite struct{} func (s *testMergeCheckerSuite) TestCache(c *C) { cfg := config.NewTestOptions() - s.cluster = mockcluster.NewCluster(cfg) + s.cluster = mockcluster.NewCluster(s.ctx, cfg) s.cluster.SetMaxMergeRegionSize(2) s.cluster.SetMaxMergeRegionKeys(2) s.cluster.SetSplitMergeInterval(time.Hour) diff --git a/server/schedule/checker/replica_checker_test.go b/server/schedule/checker/replica_checker_test.go index b582b0fcb54..38c5259cb6f 100644 --- a/server/schedule/checker/replica_checker_test.go +++ b/server/schedule/checker/replica_checker_test.go @@ -14,6 +14,7 @@ package checker import ( + "context" "time" . "github.com/pingcap/check" @@ -39,11 +40,21 @@ var _ = Suite(&testReplicaCheckerSuite{}) type testReplicaCheckerSuite struct { cluster *mockcluster.Cluster rc *ReplicaChecker + ctx context.Context + cancel context.CancelFunc +} + +func (s *testReplicaCheckerSuite) SetUpSuite(c *C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) +} + +func (s *testReplicaCheckerSuite) TearDownTest(c *C) { + s.cancel() } func (s *testReplicaCheckerSuite) SetUpTest(c *C) { cfg := config.NewTestOptions() - s.cluster = mockcluster.NewCluster(cfg) + s.cluster = mockcluster.NewCluster(s.ctx, cfg) s.cluster.DisableFeature(versioninfo.JointConsensus) s.rc = NewReplicaChecker(s.cluster, cache.NewDefaultCache(10)) stats := &pdpb.StoreStats{ @@ -196,7 +207,7 @@ func (s *testReplicaCheckerSuite) downPeerAndCheck(c *C, aliveRole metapb.PeerRo func (s *testReplicaCheckerSuite) TestBasic(c *C) { opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) tc.SetMaxSnapshotCount(2) tc.DisableFeature(versioninfo.JointConsensus) rc := NewReplicaChecker(tc, cache.NewDefaultCache(10)) @@ -269,7 +280,7 @@ func (s *testReplicaCheckerSuite) TestBasic(c *C) { func (s *testReplicaCheckerSuite) TestLostStore(c *C) { opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) tc.DisableFeature(versioninfo.JointConsensus) tc.AddRegionStore(1, 1) @@ -288,7 +299,7 @@ func (s *testReplicaCheckerSuite) TestLostStore(c *C) { func (s *testReplicaCheckerSuite) TestOffline(c *C) { opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) tc.DisableFeature(versioninfo.JointConsensus) tc.SetMaxReplicas(3) tc.SetLocationLabels([]string{"zone", "rack", "host"}) @@ -340,7 +351,7 @@ func (s *testReplicaCheckerSuite) TestOffline(c *C) { func (s *testReplicaCheckerSuite) TestDistinctScore(c *C) { opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) tc.DisableFeature(versioninfo.JointConsensus) tc.SetMaxReplicas(3) tc.SetLocationLabels([]string{"zone", "rack", "host"}) @@ -419,7 +430,7 @@ func (s *testReplicaCheckerSuite) TestDistinctScore(c *C) { func (s *testReplicaCheckerSuite) TestDistinctScore2(c *C) { opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) tc.DisableFeature(versioninfo.JointConsensus) tc.SetMaxReplicas(5) tc.SetLocationLabels([]string{"zone", "host"}) @@ -449,7 +460,7 @@ func (s *testReplicaCheckerSuite) TestDistinctScore2(c *C) { func (s *testReplicaCheckerSuite) TestStorageThreshold(c *C) { opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) tc.SetLocationLabels([]string{"zone"}) tc.DisableFeature(versioninfo.JointConsensus) rc := NewReplicaChecker(tc, cache.NewDefaultCache(10)) @@ -485,7 +496,7 @@ func (s *testReplicaCheckerSuite) TestStorageThreshold(c *C) { func (s *testReplicaCheckerSuite) TestOpts(c *C) { opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) tc.DisableFeature(versioninfo.JointConsensus) rc := NewReplicaChecker(tc, cache.NewDefaultCache(10)) diff --git a/server/schedule/checker/rule_checker_test.go b/server/schedule/checker/rule_checker_test.go index fe2a55ee71f..c615a0b55e1 100644 --- a/server/schedule/checker/rule_checker_test.go +++ b/server/schedule/checker/rule_checker_test.go @@ -14,6 +14,7 @@ package checker import ( + "context" "encoding/hex" . "github.com/pingcap/check" @@ -34,11 +35,21 @@ type testRuleCheckerSuite struct { cluster *mockcluster.Cluster ruleManager *placement.RuleManager rc *RuleChecker + ctx context.Context + cancel context.CancelFunc +} + +func (s *testRuleCheckerSuite) SetUpSuite(c *C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) +} + +func (s *testRuleCheckerSuite) TearDownTest(c *C) { + s.cancel() } func (s *testRuleCheckerSuite) SetUpTest(c *C) { cfg := config.NewTestOptions() - s.cluster = mockcluster.NewCluster(cfg) + s.cluster = mockcluster.NewCluster(s.ctx, cfg) s.cluster.DisableFeature(versioninfo.JointConsensus) s.cluster.SetEnablePlacementRules(true) s.ruleManager = s.cluster.RuleManager diff --git a/server/schedule/filter/filters_test.go b/server/schedule/filter/filters_test.go index ecde84bad79..7b4be0d54cd 100644 --- a/server/schedule/filter/filters_test.go +++ b/server/schedule/filter/filters_test.go @@ -13,6 +13,7 @@ package filter import ( + "context" "testing" "time" @@ -31,7 +32,18 @@ func Test(t *testing.T) { var _ = Suite(&testFiltersSuite{}) -type testFiltersSuite struct{} +type testFiltersSuite struct { + ctx context.Context + cancel context.CancelFunc +} + +func (s *testFiltersSuite) SetUpSuite(c *C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) +} + +func (s *testFiltersSuite) TearDownTest(c *C) { + s.cancel() +} func (s *testFiltersSuite) TestDistinctScoreFilter(c *C) { labels := []string{"zone", "rack", "host"} @@ -69,7 +81,7 @@ func (s *testFiltersSuite) TestDistinctScoreFilter(c *C) { func (s *testFiltersSuite) TestLabelConstraintsFilter(c *C) { opt := config.NewTestOptions() - testCluster := mockcluster.NewCluster(opt) + testCluster := mockcluster.NewCluster(s.ctx, opt) store := core.NewStoreInfoWithLabel(1, 1, map[string]string{"id": "1"}) testCases := []struct { @@ -97,7 +109,7 @@ func (s *testFiltersSuite) TestLabelConstraintsFilter(c *C) { func (s *testFiltersSuite) TestRuleFitFilter(c *C) { opt := config.NewTestOptions() opt.SetPlacementRuleEnabled(false) - testCluster := mockcluster.NewCluster(opt) + testCluster := mockcluster.NewCluster(s.ctx, opt) testCluster.SetLocationLabels([]string{"zone"}) testCluster.SetEnablePlacementRules(true) region := core.NewRegionInfo(&metapb.Region{Peers: []*metapb.Peer{ @@ -184,7 +196,7 @@ func (s *testFiltersSuite) TestStoreStateFilter(c *C) { func (s *testFiltersSuite) TestIsolationFilter(c *C) { opt := config.NewTestOptions() - testCluster := mockcluster.NewCluster(opt) + testCluster := mockcluster.NewCluster(s.ctx, opt) testCluster.SetLocationLabels([]string{"zone", "rack", "host"}) allStores := []struct { storeID uint64 @@ -252,7 +264,7 @@ func (s *testFiltersSuite) TestIsolationFilter(c *C) { func (s *testFiltersSuite) TestPlacementGuard(c *C) { opt := config.NewTestOptions() opt.SetPlacementRuleEnabled(false) - testCluster := mockcluster.NewCluster(opt) + testCluster := mockcluster.NewCluster(s.ctx, opt) testCluster.SetLocationLabels([]string{"zone"}) testCluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) testCluster.AddLabelsStore(2, 1, map[string]string{"zone": "z1"}) diff --git a/server/schedule/hbstream/heartbeat_streams_test.go b/server/schedule/hbstream/heartbeat_streams_test.go index c824e18c6f7..bca3e8948d1 100644 --- a/server/schedule/hbstream/heartbeat_streams_test.go +++ b/server/schedule/hbstream/heartbeat_streams_test.go @@ -35,13 +35,23 @@ func TestHeaertbeatStreams(t *testing.T) { var _ = Suite(&testHeartbeatStreamSuite{}) type testHeartbeatStreamSuite struct { + ctx context.Context + cancel context.CancelFunc +} + +func (s *testHeartbeatStreamSuite) SetUpSuite(c *C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) +} + +func (s *testHeartbeatStreamSuite) TearDownTest(c *C) { + s.cancel() } func (s *testHeartbeatStreamSuite) TestActivity(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster := mockcluster.NewCluster(config.NewTestOptions()) + cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions()) cluster.AddRegionStore(1, 1) cluster.AddRegionStore(2, 0) cluster.AddLeaderRegion(1, 1) diff --git a/server/schedule/operator/builder_test.go b/server/schedule/operator/builder_test.go index 34658cb0c8d..df3ee6d2abf 100644 --- a/server/schedule/operator/builder_test.go +++ b/server/schedule/operator/builder_test.go @@ -14,6 +14,8 @@ package operator import ( + "context" + . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" @@ -27,11 +29,14 @@ var _ = Suite(&testBuilderSuite{}) type testBuilderSuite struct { cluster *mockcluster.Cluster + ctx context.Context + cancel context.CancelFunc } func (s *testBuilderSuite) SetUpTest(c *C) { opts := config.NewTestOptions() - s.cluster = mockcluster.NewCluster(opts) + s.ctx, s.cancel = context.WithCancel(context.Background()) + s.cluster = mockcluster.NewCluster(s.ctx, opts) s.cluster.SetLabelPropertyConfig(config.LabelPropertyConfig{ opt.RejectLeader: {{Key: "noleader", Value: "true"}}, }) @@ -48,6 +53,10 @@ func (s *testBuilderSuite) SetUpTest(c *C) { s.cluster.AddLabelsStore(10, 0, map[string]string{"zone": "z3", "host": "h1", "noleader": "true"}) } +func (s *testBuilderSuite) TearDownTest(c *C) { + s.cancel() +} + func (s *testBuilderSuite) TestNewBuilder(c *C) { peers := []*metapb.Peer{{Id: 11, StoreId: 1}, {Id: 12, StoreId: 2, Role: metapb.PeerRole_Learner}} region := core.NewRegionInfo(&metapb.Region{Id: 42, Peers: peers}, peers[0]) diff --git a/server/schedule/operator/create_operator_test.go b/server/schedule/operator/create_operator_test.go index cd29618d0e0..97d0db648dc 100644 --- a/server/schedule/operator/create_operator_test.go +++ b/server/schedule/operator/create_operator_test.go @@ -14,13 +14,13 @@ package operator import ( + "context" "strings" . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/tikv/pd/pkg/mock/mockcluster" "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" @@ -33,11 +33,14 @@ var _ = Suite(&testCreateOperatorSuite{}) type testCreateOperatorSuite struct { cluster *mockcluster.Cluster + ctx context.Context + cancel context.CancelFunc } func (s *testCreateOperatorSuite) SetUpTest(c *C) { opts := config.NewTestOptions() - s.cluster = mockcluster.NewCluster(opts) + s.ctx, s.cancel = context.WithCancel(context.Background()) + s.cluster = mockcluster.NewCluster(s.ctx, opts) s.cluster.SetLabelPropertyConfig(config.LabelPropertyConfig{ opt.RejectLeader: {{Key: "noleader", Value: "true"}}, }) @@ -54,6 +57,10 @@ func (s *testCreateOperatorSuite) SetUpTest(c *C) { s.cluster.AddLabelsStore(10, 0, map[string]string{"zone": "z3", "host": "h1", "noleader": "true"}) } +func (s *testCreateOperatorSuite) TearDownTest(c *C) { + s.cancel() +} + func (s *testCreateOperatorSuite) TestCreateSplitRegionOperator(c *C) { type testCase struct { startKey []byte diff --git a/server/schedule/operator/operator_test.go b/server/schedule/operator/operator_test.go index 50c8b9b92f1..f0655708973 100644 --- a/server/schedule/operator/operator_test.go +++ b/server/schedule/operator/operator_test.go @@ -14,6 +14,7 @@ package operator import ( + "context" "encoding/json" "sync/atomic" "testing" @@ -36,11 +37,14 @@ var _ = Suite(&testOperatorSuite{}) type testOperatorSuite struct { cluster *mockcluster.Cluster + ctx context.Context + cancel context.CancelFunc } func (s *testOperatorSuite) SetUpTest(c *C) { cfg := config.NewTestOptions() - s.cluster = mockcluster.NewCluster(cfg) + s.ctx, s.cancel = context.WithCancel(context.Background()) + s.cluster = mockcluster.NewCluster(s.ctx, cfg) s.cluster.SetMaxMergeRegionSize(2) s.cluster.SetMaxMergeRegionKeys(2) s.cluster.SetLabelPropertyConfig(config.LabelPropertyConfig{ @@ -56,6 +60,10 @@ func (s *testOperatorSuite) SetUpTest(c *C) { } } +func (s *testOperatorSuite) TearDownTest(c *C) { + s.cancel() +} + func (s *testOperatorSuite) newTestRegion(regionID uint64, leaderPeer uint64, peers ...[2]uint64) *core.RegionInfo { var ( region metapb.Region diff --git a/server/schedule/operator_controller_test.go b/server/schedule/operator_controller_test.go index 15ce7320d78..0c638f7e8cd 100644 --- a/server/schedule/operator_controller_test.go +++ b/server/schedule/operator_controller_test.go @@ -57,7 +57,7 @@ func (t *testOperatorControllerSuite) TearDownSuite(c *C) { // issue #1338 func (t *testOperatorControllerSuite) TestGetOpInfluence(c *C) { opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(t.ctx, opt) oc := NewOperatorController(t.ctx, tc, nil) tc.AddLeaderStore(2, 1) tc.AddLeaderRegion(1, 1, 2) @@ -98,7 +98,7 @@ func (t *testOperatorControllerSuite) TestGetOpInfluence(c *C) { func (t *testOperatorControllerSuite) TestOperatorStatus(c *C) { opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(t.ctx, opt) stream := hbstream.NewTestHeartbeatStreams(t.ctx, tc.ID, tc, false /* no need to run */) oc := NewOperatorController(t.ctx, tc, stream) tc.AddLeaderStore(1, 2) @@ -133,7 +133,7 @@ func (t *testOperatorControllerSuite) TestOperatorStatus(c *C) { func (t *testOperatorControllerSuite) TestFastFailOperator(c *C) { opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(t.ctx, opt) stream := hbstream.NewTestHeartbeatStreams(t.ctx, tc.ID, tc, false /* no need to run */) oc := NewOperatorController(t.ctx, tc, stream) tc.AddLeaderStore(1, 2) @@ -167,7 +167,7 @@ func (t *testOperatorControllerSuite) TestFastFailOperator(c *C) { func (t *testOperatorControllerSuite) TestCheckAddUnexpectedStatus(c *C) { c.Assert(failpoint.Disable("github.com/tikv/pd/server/schedule/unexpectedOperator"), IsNil) opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(t.ctx, opt) stream := hbstream.NewTestHeartbeatStreams(t.ctx, tc.ID, tc, false /* no need to run */) oc := NewOperatorController(t.ctx, tc, stream) tc.AddLeaderStore(1, 0) @@ -231,7 +231,7 @@ func (t *testOperatorControllerSuite) TestCheckAddUnexpectedStatus(c *C) { // issue #1716 func (t *testOperatorControllerSuite) TestConcurrentRemoveOperator(c *C) { opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(t.ctx, opt) stream := hbstream.NewTestHeartbeatStreams(t.ctx, tc.ID, tc, false /* no need to run */) oc := NewOperatorController(t.ctx, tc, stream) tc.AddLeaderStore(1, 0) @@ -272,7 +272,7 @@ func (t *testOperatorControllerSuite) TestConcurrentRemoveOperator(c *C) { func (t *testOperatorControllerSuite) TestPollDispatchRegion(c *C) { opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(t.ctx, opt) stream := hbstream.NewTestHeartbeatStreams(t.ctx, tc.ID, tc, false /* no need to run */) oc := NewOperatorController(t.ctx, tc, stream) tc.AddLeaderStore(1, 2) @@ -344,7 +344,7 @@ func (t *testOperatorControllerSuite) TestPollDispatchRegion(c *C) { func (t *testOperatorControllerSuite) TestStoreLimit(c *C) { opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(t.ctx, opt) stream := hbstream.NewTestHeartbeatStreams(t.ctx, tc.ID, tc, false /* no need to run */) oc := NewOperatorController(t.ctx, tc, stream) tc.AddLeaderStore(1, 0) @@ -411,7 +411,7 @@ func (t *testOperatorControllerSuite) TestStoreLimit(c *C) { // #1652 func (t *testOperatorControllerSuite) TestDispatchOutdatedRegion(c *C) { - cluster := mockcluster.NewCluster(config.NewTestOptions()) + cluster := mockcluster.NewCluster(t.ctx, config.NewTestOptions()) stream := hbstream.NewTestHeartbeatStreams(t.ctx, cluster.ID, cluster, false /* no need to run */) controller := NewOperatorController(t.ctx, cluster, stream) @@ -464,7 +464,7 @@ func (t *testOperatorControllerSuite) TestDispatchOutdatedRegion(c *C) { } func (t *testOperatorControllerSuite) TestDispatchUnfinishedStep(c *C) { - cluster := mockcluster.NewCluster(config.NewTestOptions()) + cluster := mockcluster.NewCluster(t.ctx, config.NewTestOptions()) stream := hbstream.NewTestHeartbeatStreams(t.ctx, cluster.ID, cluster, false /* no need to run */) controller := NewOperatorController(t.ctx, cluster, stream) @@ -573,7 +573,7 @@ func (t *testOperatorControllerSuite) TestDispatchUnfinishedStep(c *C) { func (t *testOperatorControllerSuite) TestStoreLimitWithMerge(c *C) { cfg := config.NewTestOptions() - tc := mockcluster.NewCluster(cfg) + tc := mockcluster.NewCluster(t.ctx, cfg) tc.SetMaxMergeRegionSize(2) tc.SetMaxMergeRegionKeys(2) tc.SetSplitMergeInterval(0) @@ -660,7 +660,7 @@ func checkRemoveOperatorSuccess(c *C, oc *OperatorController, op *operator.Opera } func (t *testOperatorControllerSuite) TestAddWaitingOperator(c *C) { - cluster := mockcluster.NewCluster(config.NewTestOptions()) + cluster := mockcluster.NewCluster(t.ctx, config.NewTestOptions()) stream := hbstream.NewTestHeartbeatStreams(t.ctx, cluster.ID, cluster, false /* no need to run */) controller := NewOperatorController(t.ctx, cluster, stream) cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) diff --git a/server/schedule/opt/healthy_test.go b/server/schedule/opt/healthy_test.go index d2c110c30a3..b437c00ca73 100644 --- a/server/schedule/opt/healthy_test.go +++ b/server/schedule/opt/healthy_test.go @@ -14,6 +14,7 @@ package opt import ( + "context" "testing" . "github.com/pingcap/check" @@ -30,7 +31,18 @@ func TestOpt(t *testing.T) { var _ = Suite(&testRegionHealthySuite{}) -type testRegionHealthySuite struct{} +type testRegionHealthySuite struct { + ctx context.Context + cancel context.CancelFunc +} + +func (s *testRegionHealthySuite) SetUpSuite(c *C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) +} + +func (s *testRegionHealthySuite) TearDownSuite(c *C) { + s.cancel() +} func (s *testRegionHealthySuite) TestIsRegionHealthy(c *C) { peers := func(ids ...uint64) []*metapb.Peer { @@ -71,7 +83,7 @@ func (s *testRegionHealthySuite) TestIsRegionHealthy(c *C) { } opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) tc.AddRegionStore(1, 1) tc.AddRegionStore(2, 1) tc.AddRegionStore(3, 1) diff --git a/server/schedule/region_scatterer_test.go b/server/schedule/region_scatterer_test.go index 49e1d31a7e7..729d1109d3d 100644 --- a/server/schedule/region_scatterer_test.go +++ b/server/schedule/region_scatterer_test.go @@ -86,8 +86,10 @@ func (s *testScatterRegionSuite) checkOperator(op *operator.Operator, c *C) { } func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64, useRules bool) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.DisableFeature(versioninfo.JointConsensus) // Add ordinary stores. @@ -100,9 +102,6 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64, use // region distributed in same stores. tc.AddLeaderRegion(i, 1, 2, 3) } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() scatterer := NewRegionScatterer(ctx, tc) for i := uint64(1); i <= numRegions; i++ { @@ -142,8 +141,10 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64, use } func (s *testScatterRegionSuite) scatterSpecial(c *C, numOrdinaryStores, numSpecialStores, numRegions uint64) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.DisableFeature(versioninfo.JointConsensus) // Add ordinary stores. @@ -169,9 +170,6 @@ func (s *testScatterRegionSuite) scatterSpecial(c *C, numOrdinaryStores, numSpec []uint64{numOrdinaryStores + 1, numOrdinaryStores + 2, numOrdinaryStores + 3}, ) } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() scatterer := NewRegionScatterer(ctx, tc) for i := uint64(1); i <= numRegions; i++ { @@ -221,7 +219,7 @@ func (s *testScatterRegionSuite) TestStoreLimit(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) oc := NewOperatorController(ctx, tc, stream) @@ -249,8 +247,10 @@ func (s *testScatterRegionSuite) TestStoreLimit(c *C) { } func (s *testScatterRegionSuite) TestScatterCheck(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) // Add 5 stores. for i := uint64(1); i <= 5; i++ { tc.AddRegionStore(i, 0) @@ -278,7 +278,6 @@ func (s *testScatterRegionSuite) TestScatterCheck(c *C) { } for _, testcase := range testcases { c.Logf(testcase.name) - ctx, cancel := context.WithCancel(context.Background()) scatterer := NewRegionScatterer(ctx, tc) _, err := scatterer.Scatter(testcase.checkRegion, "") if testcase.needFix { @@ -289,13 +288,14 @@ func (s *testScatterRegionSuite) TestScatterCheck(c *C) { c.Assert(tc.CheckRegionUnderSuspect(1), Equals, false) } tc.ResetSuspectRegions() - cancel() } } func (s *testScatterRegionSuite) TestScatterGroupInConcurrency(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) // Add 5 stores. for i := uint64(1); i <= 5; i++ { tc.AddRegionStore(i, 0) @@ -322,7 +322,6 @@ func (s *testScatterRegionSuite) TestScatterGroupInConcurrency(c *C) { // We send scatter interweave request for each group to simulate scattering multiple region groups in concurrency. for _, testcase := range testcases { c.Logf(testcase.name) - ctx, cancel := context.WithCancel(context.Background()) scatterer := NewRegionScatterer(ctx, tc) regionID := 1 for i := 0; i < 100; i++ { @@ -356,13 +355,14 @@ func (s *testScatterRegionSuite) TestScatterGroupInConcurrency(c *C) { checker(scatterer.ordinaryEngine.selectedLeader, 20, 5) // For peer, we expect each store have about 50 peers for each group checker(scatterer.ordinaryEngine.selectedPeer, 50, 15) - cancel() } } func (s *testScatterRegionSuite) TestScattersGroup(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) // Add 5 stores. for i := uint64(1); i <= 5; i++ { tc.AddRegionStore(i, 0) @@ -382,7 +382,6 @@ func (s *testScatterRegionSuite) TestScattersGroup(c *C) { } group := "group" for _, testcase := range testcases { - ctx, cancel := context.WithCancel(context.Background()) scatterer := NewRegionScatterer(ctx, tc) regions := map[uint64]*core.RegionInfo{} for i := 1; i <= 100; i++ { @@ -419,7 +418,6 @@ func (s *testScatterRegionSuite) TestScattersGroup(c *C) { } else { c.Assert(len(failures), Equals, 0) } - cancel() } } @@ -445,15 +443,15 @@ func (s *testScatterRegionSuite) TestSelectedStoreGC(c *C) { // TestRegionFromDifferentGroups test the multi regions. each region have its own group. // After scatter, the distribution for the whole cluster should be well. func (s *testScatterRegionSuite) TestRegionFromDifferentGroups(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) // Add 6 stores. storeCount := 6 for i := uint64(1); i <= uint64(storeCount); i++ { tc.AddRegionStore(i, 0) } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() scatterer := NewRegionScatterer(ctx, tc) regionCount := 50 for i := 1; i <= regionCount; i++ { diff --git a/server/schedule/region_splitter_test.go b/server/schedule/region_splitter_test.go index 2814b566f02..ca1c0b10355 100644 --- a/server/schedule/region_splitter_test.go +++ b/server/schedule/region_splitter_test.go @@ -61,32 +61,42 @@ func (m *mockSplitRegionsHandler) ScanRegionsByKeyRange(groupKeys *regionGroupKe var _ = Suite(&testRegionSplitterSuite{}) -type testRegionSplitterSuite struct{} +type testRegionSplitterSuite struct { + ctx context.Context + cancel context.CancelFunc +} + +func (s *testRegionSplitterSuite) SetUpSuite(c *C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) +} + +func (s *testRegionSplitterSuite) TearDownTest(c *C) { + s.cancel() +} func (s *testRegionSplitterSuite) TestRegionSplitter(c *C) { - ctx := context.Background() opt := config.NewTestOptions() opt.SetPlacementRuleEnabled(false) - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) handler := newMockSplitRegionsHandler() tc.AddLeaderRegionWithRange(1, "eee", "hhh", 2, 3, 4) splitter := NewRegionSplitter(tc, handler) newRegions := map[uint64]struct{}{} // assert success - failureKeys := splitter.splitRegionsByKeys(ctx, [][]byte{[]byte("fff"), []byte("ggg")}, newRegions) + failureKeys := splitter.splitRegionsByKeys(s.ctx, [][]byte{[]byte("fff"), []byte("ggg")}, newRegions) c.Assert(len(failureKeys), Equals, 0) c.Assert(len(newRegions), Equals, 2) - percentage, newRegionsID := splitter.SplitRegions(ctx, [][]byte{[]byte("fff"), []byte("ggg")}, 1) + percentage, newRegionsID := splitter.SplitRegions(s.ctx, [][]byte{[]byte("fff"), []byte("ggg")}, 1) c.Assert(percentage, Equals, 100) c.Assert(len(newRegionsID), Equals, 2) // assert out of range newRegions = map[uint64]struct{}{} - failureKeys = splitter.splitRegionsByKeys(ctx, [][]byte{[]byte("aaa"), []byte("bbb")}, newRegions) + failureKeys = splitter.splitRegionsByKeys(s.ctx, [][]byte{[]byte("aaa"), []byte("bbb")}, newRegions) c.Assert(len(failureKeys), Equals, 2) c.Assert(len(newRegions), Equals, 0) - percentage, newRegionsID = splitter.SplitRegions(ctx, [][]byte{[]byte("aaa"), []byte("bbb")}, 1) + percentage, newRegionsID = splitter.SplitRegions(s.ctx, [][]byte{[]byte("aaa"), []byte("bbb")}, 1) c.Assert(percentage, Equals, 0) c.Assert(len(newRegionsID), Equals, 0) } @@ -94,7 +104,7 @@ func (s *testRegionSplitterSuite) TestRegionSplitter(c *C) { func (s *testRegionSplitterSuite) TestGroupKeysByRegion(c *C) { opt := config.NewTestOptions() opt.SetPlacementRuleEnabled(false) - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) handler := newMockSplitRegionsHandler() tc.AddLeaderRegionWithRange(1, "aaa", "ccc", 2, 3, 4) tc.AddLeaderRegionWithRange(2, "ccc", "eee", 2, 3, 4) diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index 0ca1f849297..8c970fadd6e 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -34,7 +34,18 @@ import ( var _ = Suite(&testBalanceSuite{}) -type testBalanceSuite struct{} +type testBalanceSuite struct { + ctx context.Context + cancel context.CancelFunc +} + +func (s *testBalanceSuite) SetUpSuite(c *C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) +} + +func (s *testBalanceSuite) TearDownTest(c *C) { + s.cancel() +} type testBalanceSpeedCase struct { sourceCount uint64 @@ -99,7 +110,7 @@ func (s *testBalanceSuite) TestShouldBalance(c *C) { } opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) tc.SetTolerantSizeRatio(2.5) tc.SetRegionScoreFormulaVersion("v1") ctx, cancel := context.WithCancel(context.Background()) @@ -138,7 +149,7 @@ func (s *testBalanceSuite) TestShouldBalance(c *C) { func (s *testBalanceSuite) TestBalanceLimit(c *C) { opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) tc.AddLeaderStore(1, 10) tc.AddLeaderStore(2, 20) tc.AddLeaderStore(3, 30) @@ -153,7 +164,7 @@ func (s *testBalanceSuite) TestBalanceLimit(c *C) { func (s *testBalanceSuite) TestTolerantRatio(c *C) { opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) // create a region to control average region size. c.Assert(tc.AddLeaderRegion(1, 1, 2), NotNil) regionSize := int64(96 * KB) @@ -186,7 +197,7 @@ type testBalanceLeaderSchedulerSuite struct { func (s *testBalanceLeaderSchedulerSuite) SetUpTest(c *C) { s.ctx, s.cancel = context.WithCancel(context.Background()) s.opt = config.NewTestOptions() - s.tc = mockcluster.NewCluster(s.opt) + s.tc = mockcluster.NewCluster(s.ctx, s.opt) s.oc = schedule.NewOperatorController(s.ctx, s.tc, nil) lb, err := schedule.CreateScheduler(BalanceLeaderType, s.oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) c.Assert(err, IsNil) @@ -483,7 +494,7 @@ type testBalanceLeaderRangeSchedulerSuite struct { func (s *testBalanceLeaderRangeSchedulerSuite) SetUpTest(c *C) { s.ctx, s.cancel = context.WithCancel(context.Background()) opt := config.NewTestOptions() - s.tc = mockcluster.NewCluster(opt) + s.tc = mockcluster.NewCluster(s.ctx, opt) s.oc = schedule.NewOperatorController(s.ctx, nil, nil) } @@ -582,7 +593,7 @@ func (s *testBalanceRegionSchedulerSuite) TestBalance(c *C) { opt := config.NewTestOptions() // TODO: enable placementrules opt.SetPlacementRuleEnabled(false) - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) tc.DisableFeature(versioninfo.JointConsensus) oc := schedule.NewOperatorController(s.ctx, nil, nil) @@ -618,7 +629,7 @@ func (s *testBalanceRegionSchedulerSuite) TestReplicas3(c *C) { opt := config.NewTestOptions() //TODO: enable placementrules opt.SetPlacementRuleEnabled(false) - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) tc.SetMaxReplicas(3) tc.SetLocationLabels([]string{"zone", "rack", "host"}) tc.DisableFeature(versioninfo.JointConsensus) @@ -682,7 +693,7 @@ func (s *testBalanceRegionSchedulerSuite) TestReplicas5(c *C) { opt := config.NewTestOptions() //TODO: enable placementrules opt.SetPlacementRuleEnabled(false) - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) tc.SetMaxReplicas(5) tc.SetLocationLabels([]string{"zone", "rack", "host"}) @@ -746,7 +757,7 @@ func (s *testBalanceRegionSchedulerSuite) checkReplica5(c *C, tc *mockcluster.Cl func (s *testBalanceRegionSchedulerSuite) TestBalance1(c *C) { opt := config.NewTestOptions() opt.SetPlacementRuleEnabled(false) - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) tc.DisableFeature(versioninfo.JointConsensus) tc.SetTolerantSizeRatio(1) tc.SetRegionScheduleLimit(1) @@ -822,7 +833,7 @@ func (s *testBalanceRegionSchedulerSuite) TestBalance1(c *C) { func (s *testBalanceRegionSchedulerSuite) TestStoreWeight(c *C) { opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) // TODO: enable placementrules tc.SetPlacementRuleEnabled(false) tc.DisableFeature(versioninfo.JointConsensus) @@ -850,7 +861,7 @@ func (s *testBalanceRegionSchedulerSuite) TestStoreWeight(c *C) { func (s *testBalanceRegionSchedulerSuite) TestReplacePendingRegion(c *C) { opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) tc.SetMaxReplicas(3) tc.SetLocationLabels([]string{"zone", "rack", "host"}) tc.DisableFeature(versioninfo.JointConsensus) @@ -866,7 +877,7 @@ func (s *testBalanceRegionSchedulerSuite) TestReplacePendingRegion(c *C) { func (s *testBalanceRegionSchedulerSuite) TestOpInfluence(c *C) { opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) //TODO: enable placementrules tc.SetEnablePlacementRules(false) tc.DisableFeature(versioninfo.JointConsensus) @@ -917,7 +928,7 @@ func (s *testBalanceRegionSchedulerSuite) checkReplacePendingRegion(c *C, tc *mo func (s *testBalanceRegionSchedulerSuite) TestShouldNotBalance(c *C) { opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) tc.DisableFeature(versioninfo.JointConsensus) oc := schedule.NewOperatorController(s.ctx, nil, nil) sb, err := schedule.CreateScheduler(BalanceRegionType, oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) @@ -934,7 +945,7 @@ func (s *testBalanceRegionSchedulerSuite) TestShouldNotBalance(c *C) { func (s *testBalanceRegionSchedulerSuite) TestEmptyRegion(c *C) { opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) tc.DisableFeature(versioninfo.JointConsensus) oc := schedule.NewOperatorController(s.ctx, nil, nil) sb, err := schedule.CreateScheduler(BalanceRegionType, oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) @@ -971,7 +982,18 @@ func (s *testBalanceRegionSchedulerSuite) TestEmptyRegion(c *C) { var _ = Suite(&testRandomMergeSchedulerSuite{}) -type testRandomMergeSchedulerSuite struct{} +type testRandomMergeSchedulerSuite struct { + ctx context.Context + cancel context.CancelFunc +} + +func (s *testRandomMergeSchedulerSuite) SetUpSuite(c *C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) +} + +func (s *testRandomMergeSchedulerSuite) TearDownSuite(c *C) { + s.cancel() +} func (s *testRandomMergeSchedulerSuite) TestMerge(c *C) { ctx, cancel := context.WithCancel(context.Background()) @@ -979,7 +1001,7 @@ func (s *testRandomMergeSchedulerSuite) TestMerge(c *C) { opt := config.NewTestOptions() //TODO: enable palcementrules opt.SetPlacementRuleEnabled(false) - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) tc.SetMergeScheduleLimit(1) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, true /* need to run */) oc := schedule.NewOperatorController(ctx, tc, stream) @@ -1026,7 +1048,7 @@ func (s *testScatterRangeLeaderSuite) TestBalance(c *C) { opt := config.NewTestOptions() // TODO: enable palcementrules opt.SetPlacementRuleEnabled(false) - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) tc.DisableFeature(versioninfo.JointConsensus) tc.SetTolerantSizeRatio(2.5) // Add stores 1,2,3,4,5. @@ -1090,7 +1112,7 @@ func (s *testScatterRangeLeaderSuite) TestBalance(c *C) { func (s *testScatterRangeLeaderSuite) TestBalanceLeaderLimit(c *C) { opt := config.NewTestOptions() opt.SetPlacementRuleEnabled(false) - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) tc.DisableFeature(versioninfo.JointConsensus) tc.SetTolerantSizeRatio(2.5) // Add stores 1,2,3,4,5. @@ -1165,7 +1187,7 @@ func (s *testScatterRangeLeaderSuite) TestBalanceLeaderLimit(c *C) { func (s *testScatterRangeLeaderSuite) TestConcurrencyUpdateConfig(c *C) { opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) oc := schedule.NewOperatorController(s.ctx, nil, nil) hb, err := schedule.CreateScheduler(ScatterRangeType, oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_50", "t"})) sche := hb.(*scatterRangeScheduler) @@ -1191,7 +1213,7 @@ func (s *testScatterRangeLeaderSuite) TestConcurrencyUpdateConfig(c *C) { func (s *testScatterRangeLeaderSuite) TestBalanceWhenRegionNotHeartbeat(c *C) { opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(s.ctx, opt) // Add stores 1,2,3. tc.AddRegionStore(1, 0) tc.AddRegionStore(2, 0) diff --git a/server/schedulers/hot_test.go b/server/schedulers/hot_test.go index 855d726965f..175c6895e7f 100644 --- a/server/schedulers/hot_test.go +++ b/server/schedulers/hot_test.go @@ -51,7 +51,7 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.SetMaxReplicas(3) tc.SetLocationLabels([]string{"zone", "host"}) for id := uint64(1); id <= 10; id++ { @@ -139,7 +139,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestByteRateOnly(c *C) { opt := config.NewTestOptions() // TODO: enable palcement rules opt.SetPlacementRuleEnabled(false) - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.SetMaxReplicas(3) tc.SetLocationLabels([]string{"zone", "host"}) tc.DisableFeature(versioninfo.JointConsensus) @@ -324,7 +324,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithKeyRate(c *C) { hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.SetHotRegionCacheHitsThreshold(0) tc.DisableFeature(versioninfo.JointConsensus) tc.AddRegionStore(1, 20) @@ -378,7 +378,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestUnhealthyStore(c *C) { hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.SetHotRegionCacheHitsThreshold(0) tc.DisableFeature(versioninfo.JointConsensus) tc.AddRegionStore(1, 20) @@ -423,7 +423,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestLeader(c *C) { hb, err := schedule.CreateScheduler(HotWriteRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) c.Assert(err, IsNil) - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.SetHotRegionCacheHitsThreshold(0) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) @@ -482,7 +482,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithPendingInfluence(c *C) { for i := 0; i < 2; i++ { // 0: byte rate // 1: key rate - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.SetHotRegionCacheHitsThreshold(0) tc.SetLeaderScheduleLimit(0) tc.DisableFeature(versioninfo.JointConsensus) @@ -561,7 +561,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithRuleEnabled(c *C) { defer cancel() statistics.Denoising = false opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.SetEnablePlacementRules(true) hb, err := schedule.CreateScheduler(HotWriteRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) c.Assert(err, IsNil) @@ -634,7 +634,7 @@ func (s *testHotReadRegionSchedulerSuite) TestByteRateOnly(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.DisableFeature(versioninfo.JointConsensus) hb, err := schedule.CreateScheduler(HotReadRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) c.Assert(err, IsNil) @@ -744,7 +744,7 @@ func (s *testHotReadRegionSchedulerSuite) TestWithKeyRate(c *C) { hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) hb.(*hotScheduler).conf.SetDstToleranceRatio(1) - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.SetHotRegionCacheHitsThreshold(0) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) @@ -801,7 +801,7 @@ func (s *testHotReadRegionSchedulerSuite) TestWithPendingInfluence(c *C) { for i := 0; i < 2; i++ { // 0: byte rate // 1: key rate - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.SetHotRegionCacheHitsThreshold(0) tc.DisableFeature(versioninfo.JointConsensus) tc.AddRegionStore(1, 20) @@ -888,8 +888,10 @@ func (s *testHotReadRegionSchedulerSuite) TestWithPendingInfluence(c *C) { } func (s *testHotCacheSuite) TestUpdateCache(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.SetHotRegionCacheHitsThreshold(0) /// For read flow @@ -936,9 +938,11 @@ func (s *testHotCacheSuite) TestUpdateCache(c *C) { } func (s *testHotCacheSuite) TestKeyThresholds(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() opt := config.NewTestOptions() { // only a few regions - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.SetHotRegionCacheHitsThreshold(0) addRegionInfo(tc, read, []testRegionInfo{ {1, []uint64{1, 2, 3}, 0, 1}, @@ -956,7 +960,7 @@ func (s *testHotCacheSuite) TestKeyThresholds(c *C) { c.Assert(stats[6], HasLen, 1) } { // many regions - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) regions := []testRegionInfo{} for i := 1; i <= 1000; i += 2 { regions = append(regions, @@ -1007,8 +1011,10 @@ func (s *testHotCacheSuite) TestKeyThresholds(c *C) { } func (s *testHotCacheSuite) TestByteAndKey(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.SetHotRegionCacheHitsThreshold(0) regions := []testRegionInfo{} for i := 1; i <= 500; i++ { @@ -1080,7 +1086,7 @@ func (s *testHotCacheSuite) TestCheckRegionFlow(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.SetMaxReplicas(3) tc.SetLocationLabels([]string{"zone", "host"}) tc.DisableFeature(versioninfo.JointConsensus) @@ -1144,8 +1150,10 @@ func (s *testHotCacheSuite) checkRegionFlowTest(c *C, tc *mockcluster.Cluster, h } func (s *testHotCacheSuite) TestCheckRegionFlowWithDifferentThreshold(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.SetMaxReplicas(3) tc.SetLocationLabels([]string{"zone", "host"}) tc.DisableFeature(versioninfo.JointConsensus) @@ -1185,7 +1193,7 @@ func (s *testInfluenceSerialSuite) TestInfluenceByRWType(c *C) { c.Assert(err, IsNil) hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.SetHotRegionCacheHitsThreshold(0) tc.DisableFeature(versioninfo.JointConsensus) tc.AddRegionStore(1, 20) diff --git a/server/schedulers/scheduler_test.go b/server/schedulers/scheduler_test.go index 6b6f5956ab6..11cf59b18fe 100644 --- a/server/schedulers/scheduler_test.go +++ b/server/schedulers/scheduler_test.go @@ -49,7 +49,7 @@ func (s *testShuffleLeaderSuite) TestShuffle(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) sl, err := schedule.CreateScheduler(ShuffleLeaderType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder(ShuffleLeaderType, []string{"", ""})) c.Assert(err, IsNil) @@ -84,7 +84,7 @@ func (s *testRejectLeaderSuite) TestRejectLeader(c *C) { opts.SetLabelPropertyConfig(config.LabelPropertyConfig{ opt.RejectLeader: {{Key: "noleader", Value: "true"}}, }) - tc := mockcluster.NewCluster(opts) + tc := mockcluster.NewCluster(ctx, opts) // Add 3 stores 1,2,3. tc.AddLabelsStore(1, 1, map[string]string{"noleader": "true"}) @@ -142,7 +142,7 @@ func (s *testShuffleHotRegionSchedulerSuite) TestBalance(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.SetMaxReplicas(3) tc.SetLocationLabels([]string{"zone", "host"}) tc.DisableFeature(versioninfo.JointConsensus) @@ -203,7 +203,7 @@ func (s *testHotRegionSchedulerSuite) TestAbnormalReplica(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.SetLeaderScheduleLimit(0) hb, err := schedule.CreateScheduler(HotReadRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) c.Assert(err, IsNil) @@ -233,7 +233,7 @@ func (s *testEvictLeaderSuite) TestEvictLeader(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) // Add stores 1, 2, 3 tc.AddLeaderStore(1, 0) @@ -259,7 +259,7 @@ func (s *testShuffleRegionSuite) TestShuffle(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) sl, err := schedule.CreateScheduler(ShuffleRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder(ShuffleRegionType, []string{"", ""})) c.Assert(err, IsNil) @@ -288,7 +288,7 @@ func (s *testShuffleRegionSuite) TestRole(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.DisableFeature(versioninfo.JointConsensus) // update rule to 1leader+1follower+1learner @@ -355,7 +355,7 @@ func (s *testSpecialUseSuite) TestSpecialUseHotRegion(c *C) { c.Assert(err, IsNil) opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.SetHotRegionCacheHitsThreshold(0) tc.DisableFeature(versioninfo.JointConsensus) tc.AddRegionStore(1, 10) @@ -406,7 +406,7 @@ func (s *testSpecialUseSuite) TestSpecialUseReserved(c *C) { c.Assert(err, IsNil) opt := config.NewTestOptions() - tc := mockcluster.NewCluster(opt) + tc := mockcluster.NewCluster(ctx, opt) tc.SetHotRegionCacheHitsThreshold(0) tc.DisableFeature(versioninfo.JointConsensus) tc.AddRegionStore(1, 10) @@ -444,7 +444,7 @@ type testBalanceLeaderSchedulerWithRuleEnabledSuite struct { func (s *testBalanceLeaderSchedulerWithRuleEnabledSuite) SetUpTest(c *C) { s.ctx, s.cancel = context.WithCancel(context.Background()) s.opt = config.NewTestOptions() - s.tc = mockcluster.NewCluster(s.opt) + s.tc = mockcluster.NewCluster(s.ctx, s.opt) s.tc.SetEnablePlacementRules(true) s.oc = schedule.NewOperatorController(s.ctx, nil, nil) lb, err := schedule.CreateScheduler(BalanceLeaderType, s.oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) diff --git a/server/statistics/hot_cache.go b/server/statistics/hot_cache.go index c175fe34bfa..557b7e1a35d 100644 --- a/server/statistics/hot_cache.go +++ b/server/statistics/hot_cache.go @@ -14,6 +14,7 @@ package statistics import ( + "context" "math/rand" "github.com/tikv/pd/server/core" @@ -23,30 +24,43 @@ import ( // only turned off by the simulator and the test. var Denoising = true +const queueCap = 1000 + // HotCache is a cache hold hot regions. type HotCache struct { + flowQueue chan *core.RegionInfo writeFlow *hotPeerCache readFlow *hotPeerCache } // NewHotCache creates a new hot spot cache. -func NewHotCache() *HotCache { - return &HotCache{ +func NewHotCache(ctx context.Context) *HotCache { + w := &HotCache{ + flowQueue: make(chan *core.RegionInfo, queueCap), writeFlow: NewHotStoresStats(WriteFlow), readFlow: NewHotStoresStats(ReadFlow), } + go w.updateItems(ctx) + return w } -// CheckWrite checks the write status, returns update items. -func (w *HotCache) CheckWrite(region *core.RegionInfo) []*HotPeerStat { +// CheckWriteSync checks the write status, returns update items. +// This is used for mockcluster. +func (w *HotCache) CheckWriteSync(region *core.RegionInfo) []*HotPeerStat { return w.writeFlow.CheckRegionFlow(region) } -// CheckRead checks the read status, returns update items. -func (w *HotCache) CheckRead(region *core.RegionInfo) []*HotPeerStat { +// CheckReadSync checks the read status, returns update items. +// This is used for mockcluster. +func (w *HotCache) CheckReadSync(region *core.RegionInfo) []*HotPeerStat { return w.readFlow.CheckRegionFlow(region) } +// CheckRWAsync puts the region into queue, and check it asynchronously +func (w *HotCache) CheckRWAsync(region *core.RegionInfo) { + w.flowQueue <- region +} + // Update updates the cache. func (w *HotCache) Update(item *HotPeerStat) { switch item.Kind { @@ -121,3 +135,23 @@ func (w *HotCache) GetFilledPeriod(kind FlowKind) int { } return 0 } + +func (w *HotCache) updateItems(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case region, ok := <-w.flowQueue: + if ok && region != nil { + items := w.readFlow.CheckRegionFlow(region) + for _, item := range items { + w.Update(item) + } + items = w.writeFlow.CheckRegionFlow(region) + for _, item := range items { + w.Update(item) + } + } + } + } +} diff --git a/server/statistics/hot_peer.go b/server/statistics/hot_peer.go index 1df72721830..70d2be3ff62 100644 --- a/server/statistics/hot_peer.go +++ b/server/statistics/hot_peer.go @@ -88,9 +88,10 @@ type HotPeerStat struct { // LastUpdateTime used to calculate average write LastUpdateTime time.Time `json:"last_update_time"` - needDelete bool - isLeader bool - isNew bool + needDelete bool + isLeader bool + isNew bool + //TODO: remove it when we send peer stat by store info justTransferLeader bool interval uint64 thresholds []float64 @@ -128,7 +129,7 @@ func (stat *HotPeerStat) Log(str string, level func(msg string, fields ...zap.Fi // IsNeedCoolDownTransferLeader use cooldown time after transfer leader to avoid unnecessary schedule func (stat *HotPeerStat) IsNeedCoolDownTransferLeader(minHotDegree int) bool { - return time.Since(stat.lastTransferLeaderTime).Seconds() < float64(minHotDegree*RegionHeartBeatReportInterval) + return time.Since(stat.lastTransferLeaderTime).Seconds() < float64(minHotDegree*HotStatReportInterval) } // IsNeedDelete to delete the item in cache. diff --git a/server/statistics/hot_peer_cache.go b/server/statistics/hot_peer_cache.go index aca38dee14c..e08b96e94f2 100644 --- a/server/statistics/hot_peer_cache.go +++ b/server/statistics/hot_peer_cache.go @@ -30,7 +30,10 @@ const ( TopNN = 60 // HotThresholdRatio is used to calculate hot thresholds HotThresholdRatio = 0.8 - topNTTL = 3 * RegionHeartBeatReportInterval * time.Second + // HotStatReportInterval indicates the interval between each data reporting + // TODO: change into StoreHeartBeatReportInterval when we use store heartbeat to report data + HotStatReportInterval = RegionHeartBeatReportInterval + topNTTL = 3 * HotStatReportInterval * time.Second rollingWindowsSize = 5 @@ -52,6 +55,7 @@ type hotPeerCache struct { kind FlowKind peersOfStore map[uint64]*TopN // storeID -> hot peers storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs + inheritItem map[uint64]*HotPeerStat // regionID -> HotPeerStat } // NewHotStoresStats creates a HotStoresStats @@ -60,9 +64,11 @@ func NewHotStoresStats(kind FlowKind) *hotPeerCache { kind: kind, peersOfStore: make(map[uint64]*TopN), storesOfRegion: make(map[uint64]map[uint64]struct{}), + inheritItem: make(map[uint64]*HotPeerStat), } } +// TODO: rename RegionStats as PeerStats // RegionStats returns hot items func (f *hotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat { res := make(map[uint64][]*HotPeerStat) @@ -108,6 +114,7 @@ func (f *hotPeerCache) Update(item *HotPeerStat) { } } +// TODO: remove it in future func (f *hotPeerCache) collectRegionMetrics(loads []float64, interval uint64) { regionHeartbeatIntervalHist.Observe(float64(interval)) if interval == 0 { @@ -128,93 +135,42 @@ func (f *hotPeerCache) collectRegionMetrics(loads []float64, interval uint64) { } } -func (f *hotPeerCache) getRegionDeltaLoads(region *core.RegionInfo) []float64 { - ret := make([]float64, RegionStatCount) - for k := RegionStatKind(0); k < RegionStatCount; k++ { - switch k { - case RegionReadBytes: - ret[k] = float64(region.GetBytesRead()) - case RegionReadKeys: - ret[k] = float64(region.GetKeysRead()) - case RegionWriteBytes: - ret[k] = float64(region.GetBytesWritten()) - case RegionWriteKeys: - ret[k] = float64(region.GetKeysWritten()) - } - } - return ret -} - // CheckRegionFlow checks the flow information of region. func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo) (ret []*HotPeerStat) { reportInterval := region.GetInterval() interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - - deltaLoads := f.getRegionDeltaLoads(region) - loads := make([]float64, len(deltaLoads)) - for i := range deltaLoads { - loads[i] = deltaLoads[i] / float64(interval) - } - f.collectRegionMetrics(loads, interval) - - // old region is in the front and new region is in the back - // which ensures it will hit the cache if moving peer or transfer leader occurs with the same replica number - - var peers []uint64 - for _, peer := range region.GetPeers() { - peers = append(peers, peer.StoreId) + { + // TODO: collect metrics by peer stat + deltaLoads := f.getFlowDeltaLoads(region) + loads := make([]float64, len(deltaLoads)) + for i := range deltaLoads { + loads[i] = deltaLoads[i] / float64(interval) + } + f.collectRegionMetrics(loads, interval) } - - var tmpItem *HotPeerStat + regionID := region.GetID() storeIDs := f.getAllStoreIDs(region) - justTransferLeader := f.justTransferLeader(region) for _, storeID := range storeIDs { - isExpired := f.isRegionExpired(region, storeID) // transfer read leader or remove write peer - oldItem := f.getOldHotPeerStat(region.GetID(), storeID) - if isExpired && oldItem != nil { // it may has been moved to other store, we save it to tmpItem - tmpItem = oldItem - } - - // This is used for the simulator and test. Ignore if report too fast. - if !isExpired && Denoising && interval < HotRegionReportMinInterval { - continue - } - - thresholds := f.calcHotThresholds(storeID) - - newItem := &HotPeerStat{ - StoreID: storeID, - RegionID: region.GetID(), - Kind: f.kind, - Loads: loads, - LastUpdateTime: time.Now(), - needDelete: isExpired, - isLeader: region.GetLeader().GetStoreId() == storeID, - justTransferLeader: justTransferLeader, - interval: interval, - peers: peers, - thresholds: thresholds, - } - - if oldItem == nil { - if tmpItem != nil { // use the tmpItem cached from the store where this region was in before - oldItem = tmpItem - } else { // new item is new peer after adding replica - for _, storeID := range storeIDs { - oldItem = f.getOldHotPeerStat(region.GetID(), storeID) - if oldItem != nil { - break - } - } - } + peer := region.GetStorePeer(storeID) + var item *HotPeerStat + if peer != nil { + peerInfo := core.NewPeerInfo(peer, + region.GetBytesWritten(), + region.GetKeysWritten(), + region.GetBytesRead(), + region.GetKeysRead()) + item = f.CheckPeerFlow(peerInfo, region, interval) + } else { + item = f.markExpiredItem(regionID, storeID) } - - newItem = f.updateHotPeerStat(newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second) - if newItem != nil { - ret = append(ret, newItem) + if item != nil { + ret = append(ret, item) } } - + var peers []uint64 + for _, peer := range region.GetPeers() { + peers = append(peers, peer.StoreId) + } log.Debug("region heartbeat info", zap.String("type", f.kind.String()), zap.Uint64("region", region.GetID()), @@ -224,6 +180,59 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo) (ret []*HotPeerS return ret } +// CheckPeerFlow checks the flow information of a peer. +// Notice: CheckPeerFlow couldn't be used concurrently. +func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInfo, interval uint64) *HotPeerStat { + storeID := peer.GetStoreID() + deltaLoads := f.getFlowDeltaLoads(peer) + loads := make([]float64, len(deltaLoads)) + for i := range deltaLoads { + loads[i] = deltaLoads[i] / float64(interval) + } + justTransferLeader := f.justTransferLeader(region) + // transfer read leader or remove write peer + isExpired := f.isPeerExpired(peer, region) + oldItem := f.getOldHotPeerStat(region.GetID(), storeID) + if isExpired && oldItem != nil { + f.putInheritItem(oldItem) + } + if !isExpired && Denoising && interval < HotRegionReportMinInterval { + return nil + } + thresholds := f.calcHotThresholds(storeID) + var peers []uint64 + for _, peer := range region.GetPeers() { + peers = append(peers, peer.StoreId) + } + newItem := &HotPeerStat{ + StoreID: storeID, + RegionID: region.GetID(), + Kind: f.kind, + Loads: loads, + LastUpdateTime: time.Now(), + needDelete: isExpired, + isLeader: region.GetLeader().GetStoreId() == storeID, + justTransferLeader: justTransferLeader, + interval: interval, + peers: peers, + thresholds: thresholds, + } + if oldItem == nil { + inheritItem := f.takeInheritItem(region.GetID()) + if inheritItem != nil { + oldItem = inheritItem + } else { + for _, storeID := range f.getAllStoreIDs(region) { + oldItem = f.getOldHotPeerStat(region.GetID(), storeID) + if oldItem != nil { + break + } + } + } + } + return f.updateHotPeerStat(newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second) +} + func (f *hotPeerCache) IsRegionHot(region *core.RegionInfo, hotDegree int) bool { switch f.kind { case WriteFlow: @@ -255,10 +264,12 @@ func (f *hotPeerCache) getOldHotPeerStat(regionID, storeID uint64) *HotPeerStat return nil } -func (f *hotPeerCache) isRegionExpired(region *core.RegionInfo, storeID uint64) bool { +func (f *hotPeerCache) isPeerExpired(peer *core.PeerInfo, region *core.RegionInfo) bool { + storeID := peer.GetStoreID() switch f.kind { case WriteFlow: return region.GetStorePeer(storeID) == nil + //TODO: make readFlow isPeerExpired condition as same as the writeFlow case ReadFlow: return region.GetLeader().GetStoreId() != storeID } @@ -309,6 +320,7 @@ func (f *hotPeerCache) getAllStoreIDs(region *core.RegionInfo) []uint64 { return ret } + func (f *hotPeerCache) isOldColdPeer(oldItem *HotPeerStat, storeID uint64) bool { isOldPeer := func() bool { for _, id := range oldItem.peers { @@ -371,7 +383,7 @@ func (f *hotPeerCache) isRegionHotWithPeer(region *core.RegionInfo, peer *metapb } func (f *hotPeerCache) getDefaultTimeMedian() *movingaverage.TimeMedian { - return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, RegionHeartBeatReportInterval*time.Second) + return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, HotStatReportInterval*time.Second) } func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat { @@ -391,7 +403,7 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoa if !isHot { return nil } - if interval.Seconds() >= RegionHeartBeatReportInterval { + if interval.Seconds() >= HotStatReportInterval { newItem.HotDegree = 1 newItem.AntiCount = hotRegionAntiCount } @@ -410,6 +422,7 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoa newItem.rollingLoads = oldItem.rollingLoads + // TODO: don't inherit hot degree after transfer leader when we report peer by store heartbeat. if newItem.justTransferLeader { // skip the first heartbeat flow statistic after transfer leader, because its statistics are calculated by the last leader in this store and are inaccurate // maintain anticount and hotdegree to avoid store threshold and hot peer are unstable. @@ -454,3 +467,42 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoa } return newItem } + +func (f *hotPeerCache) markExpiredItem(regionID, storeID uint64) *HotPeerStat { + item := f.getOldHotPeerStat(regionID, storeID) + item.needDelete = true + return item +} + +func (f *hotPeerCache) getFlowDeltaLoads(stat core.FlowStat) []float64 { + ret := make([]float64, RegionStatCount) + for k := RegionStatKind(0); k < RegionStatCount; k++ { + switch k { + case RegionReadBytes: + ret[k] = float64(stat.GetBytesRead()) + case RegionReadKeys: + ret[k] = float64(stat.GetKeysRead()) + case RegionWriteBytes: + ret[k] = float64(stat.GetBytesWritten()) + case RegionWriteKeys: + ret[k] = float64(stat.GetKeysWritten()) + } + } + return ret +} + +func (f *hotPeerCache) putInheritItem(item *HotPeerStat) { + f.inheritItem[item.RegionID] = item +} + +func (f *hotPeerCache) takeInheritItem(regionID uint64) *HotPeerStat { + item, ok := f.inheritItem[regionID] + if !ok { + return nil + } + if item != nil { + delete(f.inheritItem, regionID) + return item + } + return nil +} diff --git a/server/statistics/hotstat.go b/server/statistics/hotstat.go index e122e5bf6a9..50a97285b90 100644 --- a/server/statistics/hotstat.go +++ b/server/statistics/hotstat.go @@ -13,6 +13,8 @@ package statistics +import "context" + // HotStat contains cluster's hotspot statistics. type HotStat struct { *HotCache @@ -20,9 +22,9 @@ type HotStat struct { } // NewHotStat creates the container to hold cluster's hotspot statistics. -func NewHotStat() *HotStat { +func NewHotStat(ctx context.Context) *HotStat { return &HotStat{ - HotCache: NewHotCache(), + HotCache: NewHotCache(ctx), StoresStats: NewStoresStats(), } }