Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server.api add api for history_hot_region info;server.cluster add storage for hot region;sever.config add 'hot-regions-reserved-days' and 'hot-region-write-interval' #3988

Closed
wants to merge 11 commits into from
Closed
Binary file added cluster.test
Binary file not shown.
4 changes: 4 additions & 0 deletions conf/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@
## When PD fails to receive the heartbeat from a store after the specified period of time,
## it adds replicas at other nodes.
# max-store-down-time = "30m"
## Controls the time interval between write hot regions info into leveldb
# hot-regions-write-interval= "20m"
## the day of hot regions data to be reserved
# hot-regions-reserved-days= "30"
## The number of Leader scheduling tasks performed at the same time.
# leader-schedule-limit = 4
## The number of Region scheduling tasks performed at the same time.
Expand Down
4 changes: 4 additions & 0 deletions conf/simconfig.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ leader-schedule-limit = 32
region-schedule-limit = 128
replica-schedule-limit = 32
merge-schedule-limit = 32
##TODO
##Find a better place to put this config
hot-regions-reserved-days= "30"
hot-regions-write-interval= "20m"
12 changes: 10 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@ require (
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e
github.com/coreos/go-semver v0.3.0
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f
github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect
github.com/docker/go-units v0.4.0
github.com/go-echarts/go-echarts v1.0.0
github.com/go-openapi/jsonreference v0.19.6 // indirect
github.com/go-openapi/spec v0.20.3 // indirect
github.com/go-openapi/swag v0.19.15 // indirect
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.3.4
github.com/google/btree v1.0.0
github.com/gorilla/mux v1.7.4
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/juju/ratelimit v1.0.1
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-shellwords v1.0.3
github.com/mgechev/revive v1.0.2
github.com/montanaflynn/stats v0.5.0
Expand All @@ -41,16 +46,19 @@ require (
github.com/spf13/cobra v1.0.0
github.com/spf13/pflag v1.0.5
github.com/swaggo/http-swagger v0.0.0-20200308142732-58ac5e232fba
github.com/swaggo/swag v1.6.6-0.20200529100950-7c765ddd0476
github.com/swaggo/swag v1.6.7
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965
github.com/unrolled/render v1.0.1
github.com/urfave/cli/v2 v2.3.0 // indirect
github.com/urfave/negroni v0.3.0
// Fix panic in unit test with go >= 1.14, ref: etcd-io/bbolt#201 https://github.com/etcd-io/bbolt/pull/201
go.etcd.io/bbolt v1.3.5 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/goleak v1.1.10
go.uber.org/zap v1.16.0
golang.org/x/tools v0.0.0-20210112230658-8b4aab62c064
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d // indirect
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e // indirect
golang.org/x/tools v0.1.5
google.golang.org/grpc v1.26.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)
71 changes: 51 additions & 20 deletions go.sum

Large diffs are not rendered by default.

29 changes: 29 additions & 0 deletions server/api/hot_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package api

import (
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"

Expand Down Expand Up @@ -141,3 +143,30 @@ func (h *hotStatusHandler) GetHotStores(w http.ResponseWriter, r *http.Request)
}
h.rd.JSON(w, http.StatusOK, stats)
}

// @Tags hotspot
// @Summary List the history hot regions.
// @Accept json
// @Produce json
// @Success 200 {object} statistics.HistoryHotRegions
// @Router /hotspot/regions/history [post]
func (h *hotStatusHandler) GetHistoryHotRegions(w http.ResponseWriter, r *http.Request) {
data, err := io.ReadAll(r.Body)
r.Body.Close()
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
historyHotRegionsRequest := &statistics.HistoryHotRegionsRequest{}
err = json.Unmarshal(data, historyHotRegionsRequest)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
results, err := h.Handler.GetAllRequestHistroyHotRegion(historyHotRegionsRequest)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, results)
}
235 changes: 235 additions & 0 deletions server/api/hot_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,18 @@
package api

import (
"encoding/json"
"fmt"
"reflect"
"time"

. "github.com/pingcap/check"
"github.com/syndtr/goleveldb/leveldb"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/cluster"
"github.com/tikv/pd/server/kv"
_ "github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/statistics"
)

var _ = Suite(&testHotStatusSuite{})
Expand Down Expand Up @@ -48,3 +55,231 @@ func (s testHotStatusSuite) TestGetHotStore(c *C) {
err := readJSON(testDialClient, s.urlPrefix+"/stores", &stat)
c.Assert(err, IsNil)
}
func (s testHotStatusSuite) TestGetHistoryHotRegionsBasic(c *C) {
request := statistics.HistoryHotRegionsRequest{
StartTime: 0,
EndTime: time.Now().AddDate(0, 2, 0).Unix(),
}
data, err := json.Marshal(request)
c.Assert(err, IsNil)
err = postJSON(testDialClient, s.urlPrefix+"/regions/history", data)
c.Assert(err, IsNil)
}

func (s testHotStatusSuite) TestGetHistoryHotRegionsTimeRange(c *C) {
storage := s.svr.GetHistoryHotRegionStorage()
now := time.Now()
hotRegions := []*statistics.HistoryHotRegion{
{
RegionID: 1,
UpdateTime: now.Unix(),
},
{
RegionID: 1,
UpdateTime: now.Add(10 * time.Minute).Unix(),
},
}
request := statistics.HistoryHotRegionsRequest{
StartTime: now.Unix(),
EndTime: now.Add(10 * time.Second).Unix(),
}
check := func(res []byte, statusCode int) {
c.Assert(statusCode, Equals, 200)
historyHotRegions := &statistics.HistoryHotRegions{}
json.Unmarshal(res, historyHotRegions)
for _, region := range historyHotRegions.HistoryHotRegion {
c.Assert(region.UpdateTime, GreaterEqual, request.StartTime)
c.Assert(region.UpdateTime, LessEqual, request.EndTime)
}
}
writeToDB(c, storage.LeveldbKV, hotRegions)
data, err := json.Marshal(request)
c.Assert(err, IsNil)
err = postJSON(testDialClient, s.urlPrefix+"/regions/history", data, check)
c.Assert(err, IsNil)
}

func (s testHotStatusSuite) TestGetHistoryHotRegionsIDAndTypes(c *C) {
storage := s.svr.GetHistoryHotRegionStorage()
now := time.Now()
hotRegions := []*statistics.HistoryHotRegion{
{
RegionID: 1,
StoreID: 1,
PeerID: 1,
HotRegionType: "read",
UpdateTime: now.Unix(),
},
{
RegionID: 1,
StoreID: 2,
PeerID: 1,
HotRegionType: "read",
UpdateTime: now.Add(10 * time.Second).Unix(),
},
{
RegionID: 1,
StoreID: 1,
PeerID: 2,
HotRegionType: "read",
UpdateTime: now.Add(20 * time.Second).Unix(),
},
{
RegionID: 1,
StoreID: 1,
PeerID: 1,
HotRegionType: "write",
UpdateTime: now.Add(30 * time.Second).Unix(),
},
}
request := statistics.HistoryHotRegionsRequest{
RegionIDs: []uint64{1},
StoreIDs: []uint64{1},
PeerIDs: []uint64{1},
HotRegionTypes: []string{"read"},
EndTime: now.Add(10 * time.Minute).Unix(),
}
check := func(res []byte, statusCode int) {
c.Assert(statusCode, Equals, 200)
historyHotRegions := &statistics.HistoryHotRegions{}
json.Unmarshal(res, historyHotRegions)
c.Assert(len(historyHotRegions.HistoryHotRegion), Equals, 1)
c.Assert(reflect.DeepEqual(historyHotRegions.HistoryHotRegion[0], hotRegions[0]), IsTrue)
}
writeToDB(c, storage.LeveldbKV, hotRegions)
data, err := json.Marshal(request)
c.Assert(err, IsNil)
err = postJSON(testDialClient, s.urlPrefix+"/regions/history", data, check)
c.Assert(err, IsNil)
}

func (s testHotStatusSuite) TestGetHistoryHotRegionsBetween(c *C) {
storage := s.svr.GetHistoryHotRegionStorage()
now := time.Now()
hotRegions := []*statistics.HistoryHotRegion{
{
RegionID: 1,
HotDegree: 10,
FlowBytes: 10.0,
KeyRate: 10.0,
QueryRate: 10.0,
StartKey: []byte("3"),
EndKey: []byte("5"),
UpdateTime: now.Unix(),
},
{
RegionID: 2,
HotDegree: 20,
FlowBytes: 10.0,
KeyRate: 10.0,
QueryRate: 10.0,
StartKey: []byte("3"),
EndKey: []byte("5"),
UpdateTime: now.Unix(),
},
{
RegionID: 3,
HotDegree: 1,
FlowBytes: 10.0,
KeyRate: 10.0,
QueryRate: 10.0,
StartKey: []byte("3"),
EndKey: []byte("5"),
UpdateTime: now.Unix(),
},
{
RegionID: 4,
HotDegree: 10,
FlowBytes: 20.0,
KeyRate: 10.0,
QueryRate: 10.0,
StartKey: []byte("3"),
EndKey: []byte("5"),
UpdateTime: now.Unix(),
},
{
RegionID: 5,
HotDegree: 10,
FlowBytes: 1.0,
KeyRate: 10.0,
QueryRate: 10.0,
StartKey: []byte("3"),
EndKey: []byte("5"),
UpdateTime: now.Unix(),
},
{
RegionID: 6,
HotDegree: 10,
FlowBytes: 10.0,
KeyRate: 20.0,
QueryRate: 10.0,
StartKey: []byte("3"),
EndKey: []byte("5"),
UpdateTime: now.Unix(),
},
{
RegionID: 7,
HotDegree: 10,
FlowBytes: 10.0,
KeyRate: 1.0,
QueryRate: 10.0,
StartKey: []byte("3"),
EndKey: []byte("5"),
UpdateTime: now.Unix(),
},
{
RegionID: 8,
HotDegree: 10,
FlowBytes: 10.0,
KeyRate: 10.0,
QueryRate: 20.0,
StartKey: []byte("3"),
EndKey: []byte("5"),
UpdateTime: now.Unix(),
},
{
RegionID: 9,
HotDegree: 10,
FlowBytes: 10.0,
KeyRate: 10.0,
QueryRate: 1.0,
StartKey: []byte("3"),
EndKey: []byte("5"),
UpdateTime: now.Unix(),
},
}
request := statistics.HistoryHotRegionsRequest{
HighHotDegree: 11,
LowHotDegree: 10,
HighFlowBytes: 11.0,
LowFlowBytes: 10.0,
HighKeyRate: 11.0,
LowKeyRate: 10.0,
HighQueryRate: 11.0,
LowQueryRate: 10.0,
EndTime: now.Unix(),
}
check := func(res []byte, statusCode int) {
c.Assert(statusCode, Equals, 200)
historyHotRegions := &statistics.HistoryHotRegions{}
json.Unmarshal(res, historyHotRegions)
c.Assert(len(historyHotRegions.HistoryHotRegion), Equals, 1)
c.Assert(reflect.DeepEqual(historyHotRegions.HistoryHotRegion[0], hotRegions[0]), IsTrue)
}
writeToDB(c, storage.LeveldbKV, hotRegions)
data, err := json.Marshal(request)
c.Assert(err, IsNil)
err = postJSON(testDialClient, s.urlPrefix+"/regions/history", data, check)
c.Assert(err, IsNil)
}

func writeToDB(c *C, kv *kv.LeveldbKV, hotRegions []*statistics.HistoryHotRegion) {
batch := new(leveldb.Batch)
for _, region := range hotRegions {
key := cluster.HotRegionStorePath("read", region.UpdateTime, region.RegionID)
value, err := json.Marshal(region)
c.Assert(err, IsNil)
batch.Put([]byte(key), value)
}
kv.Write(batch, nil)
}
2 changes: 2 additions & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,12 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
clusterRouter.HandleFunc("/labels", labelsHandler.Get).Methods("GET")
clusterRouter.HandleFunc("/labels/stores", labelsHandler.GetStores).Methods("GET")


hotStatusHandler := newHotStatusHandler(handler, rd)
apiRouter.HandleFunc("/hotspot/regions/write", hotStatusHandler.GetHotWriteRegions).Methods("GET")
apiRouter.HandleFunc("/hotspot/regions/read", hotStatusHandler.GetHotReadRegions).Methods("GET")
apiRouter.HandleFunc("/hotspot/stores", hotStatusHandler.GetHotStores).Methods("GET")
apiRouter.HandleFunc("/hotspot/regions/history", hotStatusHandler.GetHistoryHotRegions).Methods("POST")

regionHandler := newRegionHandler(svr, rd)
clusterRouter.HandleFunc("/region/id/{id}", regionHandler.GetRegionByID).Methods("GET")
Expand Down
Loading