Skip to content

Commit

Permalink
statistics: support hotPeerCache maintained in Peer level (#3627)
Browse files Browse the repository at this point in the history
* support peer

Signed-off-by: yisaer <disxiaofei@163.com>

* fix lint

Signed-off-by: yisaer <disxiaofei@163.com>

* revise

Signed-off-by: yisaer <disxiaofei@163.com>

* revise

Signed-off-by: yisaer <disxiaofei@163.com>

* revise

Signed-off-by: yisaer <disxiaofei@163.com>

* revise

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* asynchronous check

Signed-off-by: yisaer <disxiaofei@163.com>

* fix ctx

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>
  • Loading branch information
Yisaer authored May 8, 2021
1 parent a052077 commit df1614b
Show file tree
Hide file tree
Showing 27 changed files with 545 additions and 245 deletions.
22 changes: 17 additions & 5 deletions pkg/autoscaling/calculation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package autoscaling

import (
"context"
"encoding/json"
"fmt"
"math"
Expand All @@ -32,11 +33,22 @@ func Test(t *testing.T) {

var _ = Suite(&calculationTestSuite{})

type calculationTestSuite struct{}
type calculationTestSuite struct {
ctx context.Context
cancel context.CancelFunc
}

func (s *calculationTestSuite) SetUpSuite(c *C) {
s.ctx, s.cancel = context.WithCancel(context.Background())
}

func (s *calculationTestSuite) TearDownTest(c *C) {
s.cancel()
}

func (s *calculationTestSuite) TestGetScaledTiKVGroups(c *C) {
// case1 indicates the tikv cluster with not any group existed
case1 := mockcluster.NewCluster(config.NewTestOptions())
case1 := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
case1.AddLabelsStore(1, 1, map[string]string{})
case1.AddLabelsStore(2, 1, map[string]string{
"foo": "bar",
Expand All @@ -46,7 +58,7 @@ func (s *calculationTestSuite) TestGetScaledTiKVGroups(c *C) {
})

// case2 indicates the tikv cluster with 1 auto-scaling group existed
case2 := mockcluster.NewCluster(config.NewTestOptions())
case2 := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
case2.AddLabelsStore(1, 1, map[string]string{})
case2.AddLabelsStore(2, 1, map[string]string{
groupLabelKey: fmt.Sprintf("%s-%s-0", autoScalingGroupLabelKeyPrefix, TiKV.String()),
Expand All @@ -58,7 +70,7 @@ func (s *calculationTestSuite) TestGetScaledTiKVGroups(c *C) {
})

// case3 indicates the tikv cluster with other group existed
case3 := mockcluster.NewCluster(config.NewTestOptions())
case3 := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
case3.AddLabelsStore(1, 1, map[string]string{})
case3.AddLabelsStore(2, 1, map[string]string{
groupLabelKey: "foo",
Expand Down Expand Up @@ -323,7 +335,7 @@ func (s *calculationTestSuite) TestStrategyChangeCount(c *C) {
}

// tikv cluster with 1 auto-scaling group existed
cluster := mockcluster.NewCluster(config.NewTestOptions())
cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
cluster.AddLabelsStore(1, 1, map[string]string{})
cluster.AddLabelsStore(2, 1, map[string]string{
groupLabelKey: fmt.Sprintf("%s-%s-0", autoScalingGroupLabelKeyPrefix, TiKV.String()),
Expand Down
9 changes: 5 additions & 4 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package mockcluster

import (
"context"
"fmt"
"strconv"
"time"
Expand Down Expand Up @@ -52,11 +53,11 @@ type Cluster struct {
}

// NewCluster creates a new Cluster
func NewCluster(opts *config.PersistOptions) *Cluster {
func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
clus := &Cluster{
BasicCluster: core.NewBasicCluster(),
IDAllocator: mockid.NewIDAllocator(),
HotStat: statistics.NewHotStat(),
HotStat: statistics.NewHotStat(ctx),
PersistOptions: opts,
suspectRegions: map[uint64]struct{}{},
disabledFeatures: make(map[versioninfo.Feature]struct{}),
Expand Down Expand Up @@ -339,7 +340,7 @@ func (mc *Cluster) AddLeaderRegionWithReadInfo(

var items []*statistics.HotPeerStat
for i := 0; i < filledNum; i++ {
items = mc.HotCache.CheckRead(r)
items = mc.HotCache.CheckReadSync(r)
for _, item := range items {
mc.HotCache.Update(item)
}
Expand All @@ -366,7 +367,7 @@ func (mc *Cluster) AddLeaderRegionWithWriteInfo(

var items []*statistics.HotPeerStat
for i := 0; i < filledNum; i++ {
items = mc.HotCache.CheckWrite(r)
items = mc.HotCache.CheckWriteSync(r)
for _, item := range items {
mc.HotCache.Update(item)
}
Expand Down
25 changes: 6 additions & 19 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (c *RaftCluster) InitCluster(id id.Allocator, opt *config.PersistOptions, s
c.storage = storage
c.id = id
c.labelLevelStats = statistics.NewLabelStatistics()
c.hotStat = statistics.NewHotStat()
c.hotStat = statistics.NewHotStat(c.ctx)
c.prepareChecker = newPrepareChecker()
c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit)
c.suspectRegions = cache.NewIDTTL(c.ctx, time.Minute, 3*time.Minute)
Expand Down Expand Up @@ -546,8 +546,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
c.RUnlock()
return err
}
writeItems := c.CheckWriteStatus(region)
readItems := c.CheckReadStatus(region)
c.CheckRWStatus(region)
c.RUnlock()

// Save to storage if meta is updated.
Expand Down Expand Up @@ -623,7 +622,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
}
}

if len(writeItems) == 0 && len(readItems) == 0 && !saveKV && !saveCache && !isNew {
if !saveKV && !saveCache && !isNew {
return nil
}

Expand Down Expand Up @@ -682,13 +681,6 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
if c.regionStats != nil {
c.regionStats.Observe(region, c.getRegionStoresLocked(region))
}

for _, writeItem := range writeItems {
c.hotStat.Update(writeItem)
}
for _, readItem := range readItems {
c.hotStat.Update(readItem)
}
c.Unlock()

// If there are concurrent heartbeats from the same region, the last write will win even if
Expand Down Expand Up @@ -1475,14 +1467,9 @@ func (c *RaftCluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat {
return c.hotStat.RegionStats(statistics.WriteFlow, c.GetOpts().GetHotRegionCacheHitsThreshold())
}

// CheckWriteStatus checks the write status, returns whether need update statistics and item.
func (c *RaftCluster) CheckWriteStatus(region *core.RegionInfo) []*statistics.HotPeerStat {
return c.hotStat.CheckWrite(region)
}

// CheckReadStatus checks the read status, returns whether need update statistics and item.
func (c *RaftCluster) CheckReadStatus(region *core.RegionInfo) []*statistics.HotPeerStat {
return c.hotStat.CheckRead(region)
// CheckRWStatus checks the read/write status.
func (c *RaftCluster) CheckRWStatus(region *core.RegionInfo) {
c.hotStat.CheckRWAsync(region)
}

// TODO: remove me.
Expand Down
8 changes: 8 additions & 0 deletions server/core/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,11 @@ func StringToKeyType(input string) KeyType {
panic("invalid key type: " + input)
}
}

// FlowStat indicates the stats of the flow
type FlowStat interface {
GetKeysWritten() uint64
GetBytesWritten() uint64
GetBytesRead() uint64
GetKeysRead() uint64
}
50 changes: 50 additions & 0 deletions server/core/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,53 @@ func CountInJointState(peers ...*metapb.Peer) int {
}
return count
}

// PeerInfo provides peer information
type PeerInfo struct {
*metapb.Peer
writtenBytes uint64
writtenKeys uint64
readBytes uint64
readKeys uint64
}

// NewPeerInfo creates PeerInfo
func NewPeerInfo(meta *metapb.Peer, writtenBytes, writtenKeys, readBytes, readKeys uint64) *PeerInfo {
return &PeerInfo{
Peer: meta,
writtenBytes: writtenBytes,
writtenKeys: writtenKeys,
readBytes: readBytes,
readKeys: readKeys,
}
}

// GetKeysWritten provides peer written keys
func (p *PeerInfo) GetKeysWritten() uint64 {
return p.writtenKeys
}

// GetBytesWritten provides peer written bytes
func (p *PeerInfo) GetBytesWritten() uint64 {
return p.writtenBytes
}

// GetBytesRead provides peer read bytes
func (p *PeerInfo) GetBytesRead() uint64 {
return p.readBytes
}

// GetKeysRead provides read keys
func (p *PeerInfo) GetKeysRead() uint64 {
return p.readKeys
}

// GetStoreID provides located storeID
func (p *PeerInfo) GetStoreID() uint64 {
return p.GetStoreId()
}

// GetPeerID provides peer id
func (p *PeerInfo) GetPeerID() uint64 {
return p.GetId()
}
23 changes: 17 additions & 6 deletions server/replication/replication_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,23 @@ func TestReplicationMode(t *testing.T) {

var _ = Suite(&testReplicationMode{})

type testReplicationMode struct{}
type testReplicationMode struct {
ctx context.Context
cancel context.CancelFunc
}

func (s *testReplicationMode) SetUpSuite(c *C) {
s.ctx, s.cancel = context.WithCancel(context.Background())
}

func (s *testReplicationMode) TearDownTest(c *C) {
s.cancel()
}

func (s *testReplicationMode) TestInitial(c *C) {
store := core.NewStorage(kv.NewMemoryKV())
conf := config.ReplicationModeConfig{ReplicationMode: modeMajority}
cluster := mockcluster.NewCluster(config.NewTestOptions())
cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
rep, err := NewReplicationModeManager(conf, store, cluster, nil)
c.Assert(err, IsNil)
c.Assert(rep.GetReplicationStatus(), DeepEquals, &pb.ReplicationStatus{Mode: pb.ReplicationMode_MAJORITY})
Expand Down Expand Up @@ -72,7 +83,7 @@ func (s *testReplicationMode) TestStatus(c *C) {
LabelKey: "dr-label",
WaitSyncTimeout: typeutil.Duration{Duration: time.Minute},
}}
cluster := mockcluster.NewCluster(config.NewTestOptions())
cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
rep, err := NewReplicationModeManager(conf, store, cluster, nil)
c.Assert(err, IsNil)
c.Assert(rep.GetReplicationStatus(), DeepEquals, &pb.ReplicationStatus{
Expand Down Expand Up @@ -147,7 +158,7 @@ func (s *testReplicationMode) TestStateSwitch(c *C) {
WaitStoreTimeout: typeutil.Duration{Duration: time.Minute},
WaitSyncTimeout: typeutil.Duration{Duration: time.Minute},
}}
cluster := mockcluster.NewCluster(config.NewTestOptions())
cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
var replicator mockFileReplicator
rep, err := NewReplicationModeManager(conf, store, cluster, &replicator)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -251,7 +262,7 @@ func (s *testReplicationMode) TestAsynctimeout(c *C) {
WaitSyncTimeout: typeutil.Duration{Duration: time.Minute},
WaitAsyncTimeout: typeutil.Duration{Duration: 2 * time.Minute},
}}
cluster := mockcluster.NewCluster(config.NewTestOptions())
cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
var replicator mockFileReplicator
rep, err := NewReplicationModeManager(conf, store, cluster, &replicator)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -302,7 +313,7 @@ func (s *testReplicationMode) TestRecoverProgress(c *C) {
WaitStoreTimeout: typeutil.Duration{Duration: time.Minute},
WaitSyncTimeout: typeutil.Duration{Duration: time.Minute},
}}
cluster := mockcluster.NewCluster(config.NewTestOptions())
cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
cluster.AddLabelsStore(1, 1, map[string]string{})
rep, err := NewReplicationModeManager(conf, store, cluster, nil)
c.Assert(err, IsNil)
Expand Down
14 changes: 13 additions & 1 deletion server/schedule/checker/joint_state_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package checker

import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/mock/mockcluster"
Expand All @@ -27,10 +29,20 @@ var _ = Suite(&testJointStateCheckerSuite{})
type testJointStateCheckerSuite struct {
cluster *mockcluster.Cluster
jsc *JointStateChecker
ctx context.Context
cancel context.CancelFunc
}

func (s *testJointStateCheckerSuite) SetUpSuite(c *C) {
s.ctx, s.cancel = context.WithCancel(context.Background())
}

func (s *testJointStateCheckerSuite) TearDownTest(c *C) {
s.cancel()
}

func (s *testJointStateCheckerSuite) SetUpTest(c *C) {
s.cluster = mockcluster.NewCluster(config.NewTestOptions())
s.cluster = mockcluster.NewCluster(s.ctx, config.NewTestOptions())
s.jsc = NewJointStateChecker(s.cluster)
for id := uint64(1); id <= 10; id++ {
s.cluster.PutStoreWithLabels(id)
Expand Down
11 changes: 10 additions & 1 deletion server/schedule/checker/learner_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package checker

import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/mock/mockcluster"
Expand All @@ -28,17 +30,24 @@ var _ = Suite(&testLearnerCheckerSuite{})
type testLearnerCheckerSuite struct {
cluster *mockcluster.Cluster
lc *LearnerChecker
ctx context.Context
cancel context.CancelFunc
}

func (s *testLearnerCheckerSuite) SetUpTest(c *C) {
s.cluster = mockcluster.NewCluster(config.NewTestOptions())
s.ctx, s.cancel = context.WithCancel(context.Background())
s.cluster = mockcluster.NewCluster(s.ctx, config.NewTestOptions())
s.cluster.DisableFeature(versioninfo.JointConsensus)
s.lc = NewLearnerChecker(s.cluster)
for id := uint64(1); id <= 10; id++ {
s.cluster.PutStoreWithLabels(id)
}
}

func (s *testLearnerCheckerSuite) TearDownTest(c *C) {
s.cancel()
}

func (s *testLearnerCheckerSuite) TestPromoteLearner(c *C) {
lc := s.lc

Expand Down
17 changes: 10 additions & 7 deletions server/schedule/checker/merge_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,17 @@ type testMergeCheckerSuite struct {
regions []*core.RegionInfo
}

func (s *testMergeCheckerSuite) SetUpSuite(c *C) {
s.ctx, s.cancel = context.WithCancel(context.Background())
}

func (s *testMergeCheckerSuite) TearDownTest(c *C) {
s.cancel()
}

func (s *testMergeCheckerSuite) SetUpTest(c *C) {
cfg := config.NewTestOptions()
s.cluster = mockcluster.NewCluster(cfg)
s.cluster = mockcluster.NewCluster(s.ctx, cfg)
s.cluster.SetMaxMergeRegionSize(2)
s.cluster.SetMaxMergeRegionKeys(2)
s.cluster.SetLabelPropertyConfig(config.LabelPropertyConfig{
Expand Down Expand Up @@ -130,14 +138,9 @@ func (s *testMergeCheckerSuite) SetUpTest(c *C) {
for _, region := range s.regions {
s.cluster.PutRegion(region)
}
s.ctx, s.cancel = context.WithCancel(context.Background())
s.mc = NewMergeChecker(s.ctx, s.cluster)
}

func (s *testMergeCheckerSuite) TearDownTest(c *C) {
s.cancel()
}

func (s *testMergeCheckerSuite) TestBasic(c *C) {
s.cluster.SetSplitMergeInterval(0)

Expand Down Expand Up @@ -434,7 +437,7 @@ type testSplitMergeSuite struct{}

func (s *testMergeCheckerSuite) TestCache(c *C) {
cfg := config.NewTestOptions()
s.cluster = mockcluster.NewCluster(cfg)
s.cluster = mockcluster.NewCluster(s.ctx, cfg)
s.cluster.SetMaxMergeRegionSize(2)
s.cluster.SetMaxMergeRegionKeys(2)
s.cluster.SetSplitMergeInterval(time.Hour)
Expand Down
Loading

0 comments on commit df1614b

Please sign in to comment.