Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store label limit #2674

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 51 additions & 5 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,19 @@ func (h *storesHandler) SetAllLimit(w http.ResponseWriter, r *http.Request) {
return
}

label := config.StoreLabel{}
labelKey, keyOk := input["labelKey"]
labelValue, valueOk := input["labelValue"]
if keyOk && valueOk {
label.Key = labelKey.(string)
label.Value = labelValue.(string)
} else if !keyOk && !valueOk {

} else {
h.rd.JSON(w, http.StatusBadRequest, "label key and value must match")
return
}

rateVal, ok := input["rate"]
if !ok {
h.rd.JSON(w, http.StatusBadRequest, "rate unset")
Expand All @@ -452,9 +465,16 @@ func (h *storesHandler) SetAllLimit(w http.ResponseWriter, r *http.Request) {
}

for _, typ := range typeValues {
if err := h.SetAllStoresLimit(ratePerMin, typ); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
if label.Key == "" {
if err := h.SetLabelStoresLimit(label, ratePerMin, typ); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
} else {
if err := h.SetAllStoresLimit(ratePerMin, typ); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
}
}

Expand All @@ -469,8 +489,34 @@ func (h *storesHandler) SetAllLimit(w http.ResponseWriter, r *http.Request) {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /stores/limit [get]
func (h *storesHandler) GetAllLimit(w http.ResponseWriter, r *http.Request) {
limits := h.GetScheduleConfig().StoreLimit
h.rd.JSON(w, http.StatusOK, limits)
var input map[string]interface{}
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil {
return
}

label := config.StoreLabel{}
labelKey, keyOk := input["labelKey"]
labelValue, valueOk := input["labelValue"]
if keyOk && valueOk {
label.Key = labelKey.(string)
label.Value = labelValue.(string)
} else if !keyOk && !valueOk {

} else {
h.rd.JSON(w, http.StatusBadRequest, "label key and value must match")
return
}

if keyOk {
if limit, err := h.GetLabelStoresLimit(label, 0); err != nil {
return
} else {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)

h.rd.JSON(w, http.StatusOK, limit)
}
} else {
limits := h.GetScheduleConfig().StoreLimit
h.rd.JSON(w, http.StatusOK, limits)
}
}

// @Tags store
Expand Down
35 changes: 31 additions & 4 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1695,6 +1695,21 @@ func (c *RaftCluster) GetStoreLimitByType(storeID uint64, typ storelimit.Type) f
return c.opt.GetStoreLimitByType(storeID, typ)
}

func (c *RaftCluster) GetLabelStores(label config.StoreLabel) []uint64 {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method RaftCluster.GetLabelStores should have comment or be unexported

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method RaftCluster.GetLabelStores should have comment or be unexported

var stores []uint64
for _, s := range c.GetStores() {
if s.GetLabelValue(label.Key) == label.Value {
stores = append(stores, s.GetID())
}
}
return stores
}

// GetLabelStoresLimit returns store limit for a label.
func (c *RaftCluster) GetLabelStoresLimit(label config.StoreLabel) config.StoreLimitConfig {
return c.opt.GetLabelStoresLimit(label)
}

// GetAllStoresLimit returns all store limit
func (c *RaftCluster) GetAllStoresLimit() map[uint64]config.StoreLimitConfig {
return c.opt.GetAllStoresLimit()
Expand All @@ -1707,10 +1722,13 @@ func (c *RaftCluster) AddStoreLimit(store *metapb.Store) {
AddPeer: config.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer),
RemovePeer: config.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer),
}
if core.IsTiFlashStore(store) {
sc = config.StoreLimitConfig{
AddPeer: config.DefaultTiFlashStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer),
RemovePeer: config.DefaultTiFlashStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer),
for _, l := range store.GetLabels() {
label := config.StoreLabel{Key: l.GetKey(), Value: l.GetValue()}
if limit, ok := config.StoreLabelLimits[label]; ok {
sc = config.StoreLimitConfig{
AddPeer: limit.GetDefaultStoreLimit(storelimit.AddPeer),
RemovePeer: limit.GetDefaultStoreLimit(storelimit.RemovePeer),
}
}
}
storeID := store.GetId()
Expand All @@ -1733,6 +1751,15 @@ func (c *RaftCluster) SetStoreLimit(storeID uint64, typ storelimit.Type, ratePer
c.opt.SetStoreLimit(storeID, typ, ratePerMin)
}

// SetLabelStoresLimit sets store limit for a given label, type and rate.
func (c *RaftCluster) SetLabelStoresLimit(label config.StoreLabel, typ storelimit.Type, ratePerMin float64) {
stores := c.GetLabelStores(label)
for _, s := range stores {
c.SetStoreLimit(s, typ, ratePerMin)
}
c.opt.SetLabelStoresLimit(label, typ, ratePerMin)
}

// SetAllStoresLimit sets all store limit for a given type and rate.
func (c *RaftCluster) SetAllStoresLimit(typ storelimit.Type, ratePerMin float64) {
c.opt.SetAllStoresLimit(typ, ratePerMin)
Expand Down
28 changes: 25 additions & 3 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,11 @@ var (
defaultRuntimeServices = []string{}
defaultLocationLabels = []string{}
// DefaultStoreLimit is the default store limit of add peer and remove peer.
DefaultStoreLimit StoreLimit = StoreLimit{AddPeer: 15, RemovePeer: 15}
// DefaultTiFlashStoreLimit is the default TiFlash store limit of add peer and remove peer.
DefaultTiFlashStoreLimit StoreLimit = StoreLimit{AddPeer: 30, RemovePeer: 30}
DefaultStoreLimit = StoreLimit{AddPeer: 15, RemovePeer: 15}
// StoreLabelLimits are other store limits of add peer and remove peer.
StoreLabelLimits = map[StoreLabel]*StoreLimit{
StoreLabel{Key: "engine", Value: "tiflash"}: {AddPeer: 30, RemovePeer: 30},
}
)

func init() {
Expand Down Expand Up @@ -605,6 +607,8 @@ type ScheduleConfig struct {
StoreBalanceRate float64 `toml:"store-balance-rate" json:"store-balance-rate,omitempty"`
// StoreLimit is the limit of scheduling for stores.
StoreLimit map[uint64]StoreLimitConfig `toml:"store-limit" json:"store-limit"`
// StoreLabelLimit is the limit of scheduling for stores by labels.
StoreLabelLimit map[string]StoreLimitConfig `toml:"store-label-limit" json:"store-label-limit"`
// TolerantSizeRatio is the ratio of buffer size for balance scheduler.
TolerantSizeRatio float64 `toml:"tolerant-size-ratio" json:"tolerant-size-ratio"`
//
Expand Down Expand Up @@ -681,6 +685,10 @@ func (c *ScheduleConfig) Clone() *ScheduleConfig {
for k, v := range c.StoreLimit {
storeLimit[k] = v
}
storeLabelLimit := make(map[string]StoreLimitConfig, len(c.StoreLabelLimit))
for k, v := range c.StoreLabelLimit {
storeLabelLimit[k] = v
}
return &ScheduleConfig{
MaxSnapshotCount: c.MaxSnapshotCount,
MaxPendingPeerCount: c.MaxPendingPeerCount,
Expand All @@ -699,6 +707,7 @@ func (c *ScheduleConfig) Clone() *ScheduleConfig {
HotRegionScheduleLimit: c.HotRegionScheduleLimit,
HotRegionCacheHitsThreshold: c.HotRegionCacheHitsThreshold,
StoreLimit: storeLimit,
StoreLabelLimit: storeLabelLimit,
TolerantSizeRatio: c.TolerantSizeRatio,
LowSpaceRatio: c.LowSpaceRatio,
HighSpaceRatio: c.HighSpaceRatio,
Expand Down Expand Up @@ -1060,6 +1069,19 @@ type StoreLabel struct {
Value string `toml:"value" json:"value"`
}

func (sl StoreLabel) String() string {
return sl.Key + "=" + sl.Value
}

func genStoreLabel(s string) (sl StoreLabel) {
kv := strings.Split(s, "=")
if len(kv) == 2 {
sl.Key = kv[0]
sl.Value = kv[1]
}
return sl
}

// LabelPropertyConfig is the config section to set properties to store labels.
type LabelPropertyConfig map[string][]StoreLabel

Expand Down
27 changes: 27 additions & 0 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,24 @@ func (o *PersistOptions) SetStoreLimit(storeID uint64, typ storelimit.Type, rate
o.SetScheduleConfig(v)
}

// SetLabelStoresLimit sets store limit for a given label, type and rate.
func (o *PersistOptions) SetLabelStoresLimit(label StoreLabel, typ storelimit.Type, ratePerMin float64) {
v := o.GetScheduleConfig().Clone()
if _, ok := StoreLabelLimits[label]; !ok {
sl := DefaultStoreLimit
StoreLabelLimits[label] = &sl
}
switch typ {
case storelimit.AddPeer:
StoreLabelLimits[label].SetDefaultStoreLimit(storelimit.AddPeer, ratePerMin)
case storelimit.RemovePeer:
StoreLabelLimits[label].SetDefaultStoreLimit(storelimit.RemovePeer, ratePerMin)
}
sll, _ := StoreLabelLimits[label]
v.StoreLabelLimit[label.String()] = StoreLimitConfig{AddPeer: sll.AddPeer, RemovePeer: sll.RemovePeer}
o.SetScheduleConfig(v)
}

// SetAllStoresLimit sets all store limit for a given type and rate.
func (o *PersistOptions) SetAllStoresLimit(typ storelimit.Type, ratePerMin float64) {
v := o.GetScheduleConfig().Clone()
Expand Down Expand Up @@ -296,6 +314,15 @@ func (o *PersistOptions) GetStoreLimitByType(storeID uint64, typ storelimit.Type
}
}

// GetLabelStoresLimit returns the limit of stores for a label.
func (o *PersistOptions) GetLabelStoresLimit(label StoreLabel) StoreLimitConfig {
if l, ok := o.GetScheduleConfig().StoreLabelLimit[label.String()]; !ok {
return l
} else {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)

return StoreLimitConfig{AddPeer: DefaultStoreLimit.AddPeer, RemovePeer: DefaultStoreLimit.RemovePeer}
}
}

// GetAllStoresLimit returns the limit of all stores.
func (o *PersistOptions) GetAllStoresLimit() map[uint64]StoreLimitConfig {
return o.GetScheduleConfig().StoreLimit
Expand Down
5 changes: 2 additions & 3 deletions server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,9 +673,8 @@ func (s *StoresInfo) UpdateStoreStatus(storeID uint64, leaderCount int, regionCo
}
}

// IsTiFlashStore used to judge flash store.
// FIXME: remove the hack way
func IsTiFlashStore(store *metapb.Store) bool {
// Judge if it needs placement rule.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on exported function IsPlacementNeeded should be of the form "IsPlacementNeeded ..."

func IsPlacementNeeded(store *metapb.Store) bool {
for _, l := range store.GetLabels() {
if l.GetKey() == "engine" && l.GetValue() == "tiflash" {
return true
Expand Down
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (s *Server) PutStore(ctx context.Context, request *pdpb.PutStoreRequest) (*
}

// NOTE: can be removed when placement rules feature is enabled by default.
if !s.GetConfig().Replication.EnablePlacementRules && core.IsTiFlashStore(store) {
if !s.GetConfig().Replication.EnablePlacementRules && core.IsPlacementNeeded(store) {
return nil, status.Errorf(codes.FailedPrecondition, "placement rules is disabled")
}

Expand Down
19 changes: 19 additions & 0 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,16 @@ func (h *Handler) GetHistory(start time.Time) ([]operator.OpHistory, error) {
return c.GetHistory(start), nil
}

// SetLabelStoresLimit is used to set limit of stores for a label.
func (h *Handler) SetLabelStoresLimit(label config.StoreLabel, ratePerMin float64, limitType storelimit.Type) error {
c, err := h.GetRaftCluster()
if err != nil {
return err
}
c.SetLabelStoresLimit(label, limitType, ratePerMin)
return nil
}

// SetAllStoresLimit is used to set limit of all stores.
func (h *Handler) SetAllStoresLimit(ratePerMin float64, limitType storelimit.Type) error {
c, err := h.GetRaftCluster()
Expand All @@ -419,6 +429,15 @@ func (h *Handler) SetAllStoresLimit(ratePerMin float64, limitType storelimit.Typ
return nil
}

// GetAllStoresLimit is used to get limit of all stores.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on exported method Handler.GetLabelStoresLimit should be of the form "GetLabelStoresLimit ..."

func (h *Handler) GetLabelStoresLimit(label config.StoreLabel, limitType storelimit.Type) (config.StoreLimitConfig, error) {
c, err := h.GetRaftCluster()
if err != nil {
return config.StoreLimitConfig{AddPeer: config.DefaultStoreLimit.AddPeer, RemovePeer: config.DefaultStoreLimit.RemovePeer}, err
}
return c.GetLabelStoresLimit(label), nil
}

// GetAllStoresLimit is used to get limit of all stores.
func (h *Handler) GetAllStoresLimit(limitType storelimit.Type) (map[uint64]config.StoreLimitConfig, error) {
c, err := h.GetRaftCluster()
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error {
} else {
// NOTE: can be removed after placement rules feature is enabled by default.
for _, s := range raftCluster.GetStores() {
if !s.IsTombstone() && core.IsTiFlashStore(s.GetMeta()) {
if !s.IsTombstone() && core.IsPlacementNeeded(s.GetMeta()) {
return errors.New("cannot disable placement rules with TiFlash nodes")
}
}
Expand Down
26 changes: 24 additions & 2 deletions tools/pd-ctl/pdctl/command/store_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ func NewSetStoreWeightCommand() *cobra.Command {
// NewStoreLimitCommand returns a limit subcommand of storeCmd.
func NewStoreLimitCommand() *cobra.Command {
c := &cobra.Command{
Use: "limit [<type>]|[<store_id>|<all> <limit> <type>]",
Use: "limit [<type>]|[<store_id>|<all> <limit> <type>]|[<label> <key> <value> <limit> <type>]",
Short: "show or set a store's rate limit",
Long: "show or set a store's rate limit, <type> can be 'add-peer'(default) or 'remove-peer'",
Long: "show or set a store's rate limit, <type> can be 'add-peer'(default) or 'remove-peer', <key> <value> can match a label",
Run: storeLimitCommandFunc,
}
return c
Expand Down Expand Up @@ -394,6 +394,28 @@ func storeLimitCommandFunc(cmd *cobra.Command, args []string) {
postInput["type"] = args[2]
}
postJSON(cmd, prefix, postInput)
case 4, 5:
if args[0] != "label" {
cmd.Println("No keyword label.")
return
}
rate, err := strconv.ParseFloat(args[3], 64)
if err != nil || rate < 0 {
cmd.Println("rate should be a number that >= 0.")
return
}
labelKey := args[1]
labelValue := args[2]
prefix := path.Join(storesPrefix, "limit")
postInput := map[string]interface{}{
"rate": rate,
"labelKey": labelKey,
"labelValue": labelValue,
}
if argsCount == 5 {
postInput["type"] = args[4]
}
postJSON(cmd, prefix, postInput)
default:
cmd.Usage()
return
Expand Down