Skip to content

Commit

Permalink
add pd-ctl limit command
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Apr 30, 2019
1 parent 602dbe0 commit aa79ff9
Show file tree
Hide file tree
Showing 12 changed files with 238 additions and 91 deletions.
9 changes: 6 additions & 3 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,17 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
router.HandleFunc("/api/v1/config/cluster-version", confHandler.GetClusterVersion).Methods("GET")
router.HandleFunc("/api/v1/config/cluster-version", confHandler.SetClusterVersion).Methods("POST")

storeHandler := newStoreHandler(svr, rd)
storeHandler := newStoreHandler(handler, rd)
router.HandleFunc("/api/v1/store/{id}", storeHandler.Get).Methods("GET")
router.HandleFunc("/api/v1/store/{id}", storeHandler.Delete).Methods("DELETE")
router.HandleFunc("/api/v1/store/{id}/state", storeHandler.SetState).Methods("POST")
router.HandleFunc("/api/v1/store/{id}/label", storeHandler.SetLabels).Methods("POST")
router.HandleFunc("/api/v1/store/{id}/weight", storeHandler.SetWeight).Methods("POST")
router.Handle("/api/v1/stores", newStoresHandler(svr, rd)).Methods("GET")
router.HandleFunc("/api/v1/stores/remove-tombstone", newStoresHandler(svr, rd).RemoveTombStone).Methods("DELETE")
router.HandleFunc("/api/v1/store/{id}/limit", storeHandler.SetLimit).Methods("POST")
storesHandler := newStoresHandler(handler, rd)
router.Handle("/api/v1/stores", storesHandler).Methods("GET")
router.HandleFunc("/api/v1/stores/remove-tombstone", storesHandler.RemoveTombStone).Methods("DELETE")
router.HandleFunc("/api/v1/stores/limit", storesHandler.SetAllLimit).Methods("POST")

labelsHandler := newLabelsHandler(svr, rd)
router.HandleFunc("/api/v1/labels", labelsHandler.Get).Methods("GET")
Expand Down
115 changes: 96 additions & 19 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,19 +121,19 @@ type StoresInfo struct {
}

type storeHandler struct {
svr *server.Server
rd *render.Render
*server.Handler
rd *render.Render
}

func newStoreHandler(svr *server.Server, rd *render.Render) *storeHandler {
func newStoreHandler(handler *server.Handler, rd *render.Render) *storeHandler {
return &storeHandler{
svr: svr,
rd: rd,
Handler: handler,
rd: rd,
}
}

func (h *storeHandler) Get(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
cluster := h.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
Expand All @@ -152,12 +152,12 @@ func (h *storeHandler) Get(w http.ResponseWriter, r *http.Request) {
return
}

storeInfo := newStoreInfo(h.svr.GetScheduleConfig(), store)
storeInfo := newStoreInfo(h.GetScheduleConfig(), store)
h.rd.JSON(w, http.StatusOK, storeInfo)
}

func (h *storeHandler) Delete(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
cluster := h.GetRaftCluster()
if cluster == nil {
errorResp(h.rd, w, errcode.NewInternalErr(server.ErrNotBootstrapped))
return
Expand Down Expand Up @@ -187,7 +187,7 @@ func (h *storeHandler) Delete(w http.ResponseWriter, r *http.Request) {
}

func (h *storeHandler) SetState(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
cluster := h.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
Expand Down Expand Up @@ -217,7 +217,7 @@ func (h *storeHandler) SetState(w http.ResponseWriter, r *http.Request) {
}

func (h *storeHandler) SetLabels(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
cluster := h.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
Expand Down Expand Up @@ -257,7 +257,7 @@ func (h *storeHandler) SetLabels(w http.ResponseWriter, r *http.Request) {
}

func (h *storeHandler) SetWeight(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
cluster := h.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
Expand Down Expand Up @@ -304,20 +304,62 @@ func (h *storeHandler) SetWeight(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, nil)
}

func (h *storeHandler) SetLimit(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
storeID, errParse := apiutil.ParseUint64VarsField(vars, "id")
if errParse != nil {
errorResp(h.rd, w, errcode.NewInvalidInputErr(errParse))
return
}

var input map[string]interface{}
if err := readJSONRespondError(h.rd, w, r.Body, &input); err != nil {
return
}

rateVal, ok := input["rate"]
if !ok {
h.rd.JSON(w, http.StatusBadRequest, "rate unset")
return
}
capacityVal, ok := input["capacity"]
if !ok {
h.rd.JSON(w, http.StatusBadRequest, "capacity unset")
return
}
rate, ok := rateVal.(float64)
if !ok || rate < 0 {
h.rd.JSON(w, http.StatusBadRequest, "badformat rate")
return
}
capacity, ok := capacityVal.(int64)
if !ok || capacity < 0 {
h.rd.JSON(w, http.StatusBadRequest, "badformat capacity, should not less than rate")
return
}

if err := h.SetStoreLimit(storeID, rate, capacity); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

h.rd.JSON(w, http.StatusOK, nil)
}

type storesHandler struct {
svr *server.Server
rd *render.Render
*server.Handler
rd *render.Render
}

func newStoresHandler(svr *server.Server, rd *render.Render) *storesHandler {
func newStoresHandler(handler *server.Handler, rd *render.Render) *storesHandler {
return &storesHandler{
svr: svr,
rd: rd,
Handler: handler,
rd: rd,
}
}

func (h *storesHandler) RemoveTombStone(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
cluster := h.GetRaftCluster()
if cluster == nil {
errorResp(h.rd, w, errcode.NewInternalErr(server.ErrNotBootstrapped))
return
Expand All @@ -332,8 +374,43 @@ func (h *storesHandler) RemoveTombStone(w http.ResponseWriter, r *http.Request)
h.rd.JSON(w, http.StatusOK, nil)
}

func (h *storesHandler) SetAllLimit(w http.ResponseWriter, r *http.Request) {
var input map[string]interface{}
if err := readJSONRespondError(h.rd, w, r.Body, &input); err != nil {
return
}

rateVal, ok := input["rate"]
if !ok {
h.rd.JSON(w, http.StatusBadRequest, "rate unset")
return
}
capacityVal, ok := input["capacity"]
if !ok {
h.rd.JSON(w, http.StatusBadRequest, "capacity unset")
return
}
rate, ok := rateVal.(float64)
if !ok || rate < 0 {
h.rd.JSON(w, http.StatusBadRequest, "badformat rate")
return
}
capacity, ok := capacityVal.(int64)
if !ok || capacity < 0 {
h.rd.JSON(w, http.StatusBadRequest, "badformat capacity, should not less than rate")
return
}

if err := h.SetAllStoresLimit(rate, capacity); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

h.rd.JSON(w, http.StatusOK, nil)
}

func (h *storesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
cluster := h.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
Expand All @@ -358,7 +435,7 @@ func (h *storesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

storeInfo := newStoreInfo(h.svr.GetScheduleConfig(), store)
storeInfo := newStoreInfo(h.GetScheduleConfig(), store)
StoresInfo.Stores = append(StoresInfo.Stores, storeInfo)
}
StoresInfo.Count = len(StoresInfo.Stores)
Expand Down
2 changes: 0 additions & 2 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,6 @@ func (c *RaftCluster) RemoveStore(storeID uint64) error {
}

newStore := store.Clone(core.SetStoreState(metapb.StoreState_Offline))
opController := c.coordinator.opController
opController.SetOfflineStoreLimit(newStore.GetID())

log.Warn("store has been offline",
zap.Uint64("store-id", newStore.GetID()),
Expand Down
8 changes: 0 additions & 8 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,14 +713,6 @@ func (c *clusterInfo) GetStoreBucketRate() float64 {
return c.opt.GetStoreBucketRate()
}

func (c *clusterInfo) GetOfflineStoreMaxScheduleCost() int64 {
return c.opt.GetOfflineStoreMaxScheduleCost()
}

func (c *clusterInfo) GetOfflineStoreBucketRate() float64 {
return c.opt.GetOfflineStoreBucketRate()
}

func (c *clusterInfo) GetTolerantSizeRatio() float64 {
return c.opt.GetTolerantSizeRatio()
}
Expand Down
48 changes: 19 additions & 29 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,10 +480,6 @@ type ScheduleConfig struct {
StoreMaxScheduleCost int64 `toml:"store-max-schedule-cost,omitempty" json:"store-max-schedule-cost"`
// StoreBucketRate is the maxinum of bucket rate for each store.
StoreBucketRate float64 `toml:"store-bucket-rate,omitempty" json:"store-bucket-rate"`
// OfflineStoreMaxScheduleCost is the maxinum of scheduling cost for a offline store.
OfflineStoreMaxScheduleCost int64 `toml:"offline-store-max-schedule-cost,omitempty" json:"offline-store-max-schedule-cost"`
// OfflineStoreBucketRate is the maxinum of bucket rate for a offline store.
OfflineStoreBucketRate float64 `toml:"offline-store-bucket-rate,omitempty" json:"offline-store-bucket-rate"`
// TolerantSizeRatio is the ratio of buffer size for balance scheduler.
TolerantSizeRatio float64 `toml:"tolerant-size-ratio,omitempty" json:"tolerant-size-ratio"`
//
Expand Down Expand Up @@ -544,8 +540,6 @@ func (c *ScheduleConfig) clone() *ScheduleConfig {
MaxScheduleCost: c.MaxScheduleCost,
StoreMaxScheduleCost: c.StoreMaxScheduleCost,
StoreBucketRate: c.StoreBucketRate,
OfflineStoreMaxScheduleCost: c.OfflineStoreMaxScheduleCost,
OfflineStoreBucketRate: c.OfflineStoreBucketRate,
TolerantSizeRatio: c.TolerantSizeRatio,
LowSpaceRatio: c.LowSpaceRatio,
HighSpaceRatio: c.HighSpaceRatio,
Expand All @@ -561,27 +555,25 @@ func (c *ScheduleConfig) clone() *ScheduleConfig {
}

const (
defaultMaxReplicas = 3
defaultMaxSnapshotCount = 3
defaultMaxPendingPeerCount = 16
defaultMaxMergeRegionSize = 20
defaultMaxMergeRegionKeys = 200000
defaultSplitMergeInterval = 1 * time.Hour
defaultPatrolRegionInterval = 100 * time.Millisecond
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 8
defaultRegionScheduleLimit = 1024
defaultReplicaScheduleLimit = 1024
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 2
defaultMaxScheduleCost = 0
defaultStoreMaxScheduleCost = 200
defaultStoreBucketRate = 100
defaultOfflineStoreMaxScheduleCost = 600
defaultOfflineStoreBucketRate = 300
defaultTolerantSizeRatio = 25
defaultLowSpaceRatio = 0.8
defaultHighSpaceRatio = 0.6
defaultMaxReplicas = 3
defaultMaxSnapshotCount = 3
defaultMaxPendingPeerCount = 16
defaultMaxMergeRegionSize = 20
defaultMaxMergeRegionKeys = 200000
defaultSplitMergeInterval = 1 * time.Hour
defaultPatrolRegionInterval = 100 * time.Millisecond
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 8
defaultRegionScheduleLimit = 1024
defaultReplicaScheduleLimit = 1024
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 2
defaultMaxScheduleCost = 0
defaultStoreMaxScheduleCost = 200
defaultStoreBucketRate = 100
defaultTolerantSizeRatio = 25
defaultLowSpaceRatio = 0.8
defaultHighSpaceRatio = 0.6
// defaultHotRegionCacheHitsThreshold is the low hit number threshold of the
// hot region.
defaultHotRegionCacheHitsThreshold = 3
Expand Down Expand Up @@ -631,8 +623,6 @@ func (c *ScheduleConfig) adjust(meta *configMetaData) error {
adjustFloat64(&c.TolerantSizeRatio, defaultTolerantSizeRatio)
}
adjustFloat64(&c.StoreBucketRate, defaultStoreBucketRate)
adjustInt64(&c.OfflineStoreMaxScheduleCost, defaultOfflineStoreMaxScheduleCost)
adjustFloat64(&c.OfflineStoreBucketRate, defaultOfflineStoreBucketRate)
adjustFloat64(&c.LowSpaceRatio, defaultLowSpaceRatio)
adjustFloat64(&c.HighSpaceRatio, defaultHighSpaceRatio)
adjustSchedulers(&c.Schedulers, defaultSchedulers)
Expand Down
30 changes: 30 additions & 0 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ func newHandler(s *Server) *Handler {
return &Handler{s: s, opt: s.scheduleOpt}
}

// GetRaftCluster returns RaftCluster.
func (h *Handler) GetRaftCluster() *RaftCluster {
return h.s.GetRaftCluster()
}

// GetScheduleConfig returns ScheduleConfig.
func (h *Handler) GetScheduleConfig() *ScheduleConfig {
return h.s.GetScheduleConfig()
}

func (h *Handler) getCoordinator() (*coordinator, error) {
cluster := h.s.GetRaftCluster()
if cluster == nil {
Expand Down Expand Up @@ -348,6 +358,26 @@ func (h *Handler) GetHistory(start time.Time) ([]schedule.OperatorHistory, error
return c.opController.GetHistory(start), nil
}

// SetAllStoresLimit is used to set limit of all stores.
func (h *Handler) SetAllStoresLimit(rate float64, capacity int64) error {
c, err := h.getCoordinator()
if err != nil {
return err
}
c.opController.SetAllStoresLimit(rate, capacity)
return nil
}

// SetStoreLimit is used to set the limit of a store.
func (h *Handler) SetStoreLimit(storeID uint64, rate float64, capacity int64) error {
c, err := h.getCoordinator()
if err != nil {
return err
}
c.opController.SetStoreLimit(storeID, rate, capacity)
return nil
}

// AddTransferLeaderOperator adds an operator to transfer leader to the store.
func (h *Handler) AddTransferLeaderOperator(regionID uint64, storeID uint64) error {
c, err := h.getCoordinator()
Expand Down
8 changes: 0 additions & 8 deletions server/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,6 @@ func (o *scheduleOption) GetStoreBucketRate() float64 {
return o.load().StoreBucketRate
}

func (o *scheduleOption) GetOfflineStoreMaxScheduleCost() int64 {
return o.load().OfflineStoreMaxScheduleCost
}

func (o *scheduleOption) GetOfflineStoreBucketRate() float64 {
return o.load().OfflineStoreBucketRate
}

func (o *scheduleOption) GetTolerantSizeRatio() float64 {
return o.load().TolerantSizeRatio
}
Expand Down
12 changes: 0 additions & 12 deletions server/schedule/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,6 @@ type MockSchedulerOptions struct {
MaxScheduleCost int64
StoreMaxScheduleCost int64
StoreBucketRate float64
OfflineStoreMaxScheduleCost int64
OfflineStoreBucketRate float64
MaxSnapshotCount uint64
MaxPendingPeerCount uint64
MaxMergeRegionSize uint64
Expand Down Expand Up @@ -610,16 +608,6 @@ func (mso *MockSchedulerOptions) GetStoreBucketRate() float64 {
return mso.StoreBucketRate
}

// GetOfflineStoreMaxScheduleCost mock method
func (mso *MockSchedulerOptions) GetOfflineStoreMaxScheduleCost() int64 {
return mso.OfflineStoreMaxScheduleCost
}

// GetOfflineStoreBucketRate mock method
func (mso *MockSchedulerOptions) GetOfflineStoreBucketRate() float64 {
return mso.OfflineStoreBucketRate
}

// GetMaxSnapshotCount mock method
func (mso *MockSchedulerOptions) GetMaxSnapshotCount() uint64 {
return mso.MaxSnapshotCount
Expand Down
Loading

0 comments on commit aa79ff9

Please sign in to comment.