Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: picks some hot-region improvements to release 3.1 #2342

Merged
merged 9 commits into from
Apr 10, 2020
18 changes: 3 additions & 15 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,20 @@ require (
github.com/coreos/go-semver v0.2.0
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf
github.com/docker/go-units v0.4.0
github.com/dustin/go-humanize v0.0.0-20180421182945-02af3965c54e // indirect
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385 // indirect
github.com/elazarl/go-bindata-assetfs v1.0.0
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
github.com/go-playground/universal-translator v0.17.0 // indirect
github.com/gogo/protobuf v1.3.1
github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff // indirect
github.com/golang/protobuf v1.3.2
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/google/btree v1.0.0
github.com/gorilla/mux v1.7.3
github.com/gorilla/websocket v1.2.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/json-iterator/go v1.1.9 // indirect
github.com/juju/ratelimit v1.0.1
github.com/leodido/go-urn v1.2.0 // indirect
github.com/mattn/go-isatty v0.0.11 // indirect
github.com/mattn/go-shellwords v1.0.3
github.com/montanaflynn/stats v0.0.0-20151014174947-eeaced052adb
github.com/onsi/gomega v1.4.2 // indirect
github.com/onsi/ginkgo v1.12.0 // indirect
github.com/onsi/gomega v1.9.0 // indirect
github.com/opentracing/opentracing-go v1.0.2
github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d
github.com/pingcap-incubator/tidb-dashboard v0.0.0-20200218115603-7ab5f06db73d
Expand All @@ -44,17 +38,11 @@ require (
github.com/spf13/cobra v0.0.3
github.com/spf13/pflag v1.0.1
github.com/syndtr/goleveldb v0.0.0-20180815032940-ae2bd5eed72d
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect
github.com/unrolled/render v0.0.0-20171102162132-65450fb6b2d3
github.com/urfave/negroni v0.3.0
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/goleak v0.10.0
go.uber.org/zap v1.13.0
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 // indirect
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1 // indirect
golang.org/x/tools v0.0.0-20200216192241-b320d3a0f5a2 // indirect
google.golang.org/grpc v1.25.1
gopkg.in/go-playground/validator.v9 v9.31.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/yaml.v2 v2.2.8 // indirect
)
65 changes: 26 additions & 39 deletions go.sum

Large diffs are not rendered by default.

72 changes: 70 additions & 2 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,14 @@ func (mc *Cluster) AddLeaderRegionWithRange(regionID uint64, startKey string, en
}

// AddLeaderRegionWithReadInfo adds region with specified leader, followers and read info.
func (mc *Cluster) AddLeaderRegionWithReadInfo(regionID uint64, leaderID uint64, readBytes uint64, reportInterval uint64, followerIds ...uint64) {
func (mc *Cluster) AddLeaderRegionWithReadInfo(
regionID uint64, leaderID uint64,
readBytes, readKeys uint64,
reportInterval uint64,
followerIds []uint64) {
r := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
r = r.Clone(core.SetReadBytes(readBytes))
r = r.Clone(core.SetReadKeys(readKeys))
r = r.Clone(core.SetReportInterval(reportInterval))
items := mc.HotCache.CheckRead(r, mc.StoresStats)
for _, item := range items {
Expand All @@ -287,9 +292,14 @@ func (mc *Cluster) AddLeaderRegionWithReadInfo(regionID uint64, leaderID uint64,
}

// AddLeaderRegionWithWriteInfo adds region with specified leader, followers and write info.
func (mc *Cluster) AddLeaderRegionWithWriteInfo(regionID uint64, leaderID uint64, writtenBytes uint64, reportInterval uint64, followerIds ...uint64) {
func (mc *Cluster) AddLeaderRegionWithWriteInfo(
regionID uint64, leaderID uint64,
writtenBytes, writtenKeys uint64,
reportInterval uint64,
followerIds []uint64) {
r := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
r = r.Clone(core.SetWrittenBytes(writtenBytes))
r = r.Clone(core.SetWrittenKeys(writtenKeys))
r = r.Clone(core.SetReportInterval(reportInterval))
items := mc.HotCache.CheckWrite(r, mc.StoresStats)
for _, item := range items {
Expand Down Expand Up @@ -383,11 +393,40 @@ func (mc *Cluster) UpdateStorageRatio(storeID uint64, usedRatio, availableRatio
mc.PutStore(newStore)
}

// UpdateStorageWrittenStats updates store written bytes.
func (mc *Cluster) UpdateStorageWrittenStats(storeID, bytesWritten, keysWritten uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.BytesWritten = bytesWritten
newStats.KeysWritten = keysWritten
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
}

// UpdateStorageReadStats updates store written bytes.
func (mc *Cluster) UpdateStorageReadStats(storeID, bytesWritten, keysWritten uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.BytesRead = bytesWritten
newStats.KeysRead = keysWritten
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
}

// UpdateStorageWrittenBytes updates store written bytes.
func (mc *Cluster) UpdateStorageWrittenBytes(storeID uint64, bytesWritten uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.BytesWritten = bytesWritten
newStats.KeysWritten = bytesWritten / 100
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
Expand All @@ -401,6 +440,35 @@ func (mc *Cluster) UpdateStorageReadBytes(storeID uint64, bytesRead uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.BytesRead = bytesRead
newStats.KeysRead = bytesRead / 100
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
}

// UpdateStorageWrittenKeys updates store written keys.
func (mc *Cluster) UpdateStorageWrittenKeys(storeID uint64, keysWritten uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.KeysWritten = keysWritten
newStats.BytesWritten = keysWritten * 100
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
}

// UpdateStorageReadKeys updates store read bytes.
func (mc *Cluster) UpdateStorageReadKeys(storeID uint64, keysRead uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.KeysRead = keysRead
newStats.BytesRead = keysRead * 100
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
Expand Down
41 changes: 40 additions & 1 deletion server/api/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,46 @@ func (s *testScheduleSuite) TestAPI(c *C) {
extraTestFunc func(name string, c *C)
}{
{name: "balance-leader-scheduler"},
{name: "balance-hot-region-scheduler"},
{
name: "balance-hot-region-scheduler",
extraTestFunc: func(name string, c *C) {
resp := make(map[string]interface{})
listURL := fmt.Sprintf("%s%s%s/%s/list", s.svr.GetAddr(), apiPrefix, server.SchedulerConfigHandlerPath, name)
c.Assert(readJSON(listURL, &resp), IsNil)
expectMap := map[string]float64{
"min-hot-byte-rate": 100,
"min-hot-key-rate": 10,
"max-zombie-rounds": 3,
"max-peer-number": 1000,
"byte-rate-rank-step-ratio": 0.05,
"key-rate-rank-step-ratio": 0.05,
"count-rank-step-ratio": 0.01,
"great-dec-ratio": 0.95,
"minor-dec-ratio": 0.99,
}
for key := range expectMap {
c.Assert(resp[key], DeepEquals, expectMap[key])
}
dataMap := make(map[string]interface{})
dataMap["max-zombie-rounds"] = 5.0
expectMap["max-zombie-rounds"] = 5.0
updateURL := fmt.Sprintf("%s%s%s/%s/config", s.svr.GetAddr(), apiPrefix, server.SchedulerConfigHandlerPath, name)
body, err := json.Marshal(dataMap)
c.Assert(err, IsNil)
c.Assert(postJSON(updateURL, body), IsNil)
resp = make(map[string]interface{})
c.Assert(readJSON(listURL, &resp), IsNil)
for key := range expectMap {
c.Assert(resp[key], DeepEquals, expectMap[key])
}
// update again
err = postJSON(updateURL, body, func(res []byte, code int) {
c.Assert(string(res), Equals, "no changed")
c.Assert(code, Equals, 200)
})
c.Assert(err, IsNil)
},
},
{name: "balance-region-scheduler"},
{name: "shuffle-leader-scheduler"},
{name: "shuffle-region-scheduler"},
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (
"go.uber.org/zap"
)

var backgroundJobInterval = time.Minute
var backgroundJobInterval = 10 * time.Second

const (
clientTimeout = 3 * time.Second
Expand Down
10 changes: 10 additions & 0 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,24 +402,30 @@ func (c *coordinator) collectHotSpotMetrics() {
stat, ok := status.AsPeer[storeID]
if ok {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(stat.TotalBytesRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_peer").Set(stat.TotalBytesRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(float64(stat.Count))
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_peer").Set(0)
}

stat, ok = status.AsLeader[storeID]
if ok {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(stat.TotalBytesRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_leader").Set(stat.TotalKeysRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(float64(stat.Count))
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(0)
}

infl := pendings[storeID]
// TODO: add to tidb-ansible after merging pending influence into operator influence.
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_byte_rate").Set(infl.ByteRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_key_rate").Set(infl.KeyRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_count").Set(infl.Count)
}

// Collects hot read region metrics.
Expand All @@ -432,14 +438,18 @@ func (c *coordinator) collectHotSpotMetrics() {
stat, ok := status.AsLeader[storeID]
if ok {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(stat.TotalBytesRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_keys_as_leader").Set(stat.TotalKeysRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(float64(stat.Count))
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_keys_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(0)
}

infl := pendings[storeID]
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_byte_rate").Set(infl.ByteRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_key_rate").Set(infl.KeyRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_count").Set(infl.Count)
}
}

Expand Down
Loading