Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: support temporary configuration (#3082) #3088

Merged
merged 2 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/cache/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,8 @@ func (c *TTLString) Pop() (string, interface{}, bool) {
}
return key, v, true
}

// Get return the value by key id
func (c *TTLString) Get(id string) (interface{}, bool) {
return c.ttlCache.get(id)
}
20 changes: 20 additions & 0 deletions server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"io/ioutil"
"net/http"
"reflect"
"strconv"
"strings"
"time"

"github.com/pingcap/errcode"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -73,6 +75,7 @@ func (h *confHandler) GetDefault(w http.ResponseWriter, r *http.Request) {
// @Tags config
// @Summary Update a config item.
// @Accept json
// @Param ttlSecond query integer false "ttl". ttl param is only for BR and lightning now. Don't use it.
// @Param body body object false "json params"
// @Produce json
// @Success 200 {string} string "The config is updated."
Expand All @@ -94,6 +97,23 @@ func (h *confHandler) Post(w http.ResponseWriter, r *http.Request) {
return
}

var ttls int
if ttlSec := r.URL.Query().Get("ttlSecond"); ttlSec != "" {
var err error
ttls, err = strconv.Atoi(ttlSec)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
}

// if ttlSecond defined, we will apply if to temp configuration.
if ttls > 0 {
h.svr.SaveTTLConfig(conf, time.Duration(ttls)*time.Second)
h.rd.JSON(w, http.StatusOK, "The config is updated.")
return
}

for k, v := range conf {
if s := strings.Split(k, "."); len(s) > 1 {
if err := h.updateConfig(cfg, k, v); err != nil {
Expand Down
26 changes: 26 additions & 0 deletions server/api/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,29 @@ func (s *testConfigSuite) TestConfigDefault(c *C) {
c.Assert(defaultCfg.Schedule.RegionScheduleLimit, Equals, uint64(2048))
c.Assert(defaultCfg.PDServerCfg.MetricStorage, Equals, "")
}

func (s *testConfigSuite) TestConfigTTL(c *C) {
addr := fmt.Sprintf("%s/config?ttlSecond=3", s.urlPrefix)
r := map[string]interface{}{
"schedule.max-snapshot-count": 999,
"schedule.enable-location-replacement": false,
"schedule.max-merge-region-size": 999,
"schedule.max-merge-region-keys": 999,
"schedule.scheduler-max-waiting-operator": 999,
}
postData, err := json.Marshal(r)
c.Assert(err, IsNil)
err = postJSON(testDialClient, addr, postData)
c.Assert(err, IsNil)
c.Assert(s.svr.GetPersistOptions().GetMaxSnapshotCount(), Equals, uint64(999))
c.Assert(s.svr.GetPersistOptions().IsLocationReplacementEnabled(), Equals, false)
c.Assert(s.svr.GetPersistOptions().GetMaxMergeRegionSize(), Equals, uint64(999))
c.Assert(s.svr.GetPersistOptions().GetMaxMergeRegionKeys(), Equals, uint64(999))
c.Assert(s.svr.GetPersistOptions().GetSchedulerMaxWaitingOperator(), Equals, uint64(999))
time.Sleep(5 * time.Second)
c.Assert(s.svr.GetPersistOptions().GetMaxSnapshotCount(), Not(Equals), uint64(999))
c.Assert(s.svr.GetPersistOptions().IsLocationReplacementEnabled(), Equals, true)
c.Assert(s.svr.GetPersistOptions().GetMaxMergeRegionSize(), Not(Equals), uint64(999))
c.Assert(s.svr.GetPersistOptions().GetMaxMergeRegionKeys(), Not(Equals), uint64(999))
c.Assert(s.svr.GetPersistOptions().GetSchedulerMaxWaitingOperator(), Not(Equals), uint64(999))
}
61 changes: 56 additions & 5 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package api

import (
"fmt"
"net/http"
"net/url"
"strconv"
Expand Down Expand Up @@ -341,6 +342,7 @@ func (h *storeHandler) SetWeight(w http.ResponseWriter, r *http.Request) {
// FIXME: details of input json body params
// @Tags store
// @Summary Set the store's limit.
// @Param ttlSecond query integer false "ttl". ttl param is only for BR and lightning now. Don't use it.
// @Param id path integer true "Store Id"
// @Param body body object true "json params"
// @Produce json
Expand Down Expand Up @@ -384,14 +386,29 @@ func (h *storeHandler) SetLimit(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}

var ttl int
if ttlSec := r.URL.Query().Get("ttlSecond"); ttlSec != "" {
var err error
ttl, err = strconv.Atoi(ttlSec)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
}
for _, typ := range typeValues {
if ttl > 0 {
key := fmt.Sprintf("add-peer-%v", storeID)
if typ == storelimit.RemovePeer {
key = fmt.Sprintf("remove-peer-%v", storeID)
}
h.Handler.SetStoreLimitTTL(key, ratePerMin, time.Duration(ttl)*time.Second)
continue
}
if err := h.SetStoreLimit(storeID, ratePerMin, typ); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
}

h.rd.JSON(w, http.StatusOK, nil)
}

Expand Down Expand Up @@ -428,6 +445,7 @@ func (h *storesHandler) RemoveTombStone(w http.ResponseWriter, r *http.Request)
// @Tags store
// @Summary Set limit of all stores in the cluster.
// @Accept json
// @Param ttlSecond query integer false "ttl". ttl param is only for BR and lightning now. Don't use it.
// @Param body body object true "json params"
// @Produce json
// @Success 200 {string} string "Set store limit success."
Expand Down Expand Up @@ -457,9 +475,42 @@ func (h *storesHandler) SetAllLimit(w http.ResponseWriter, r *http.Request) {
return
}

for _, typ := range typeValues {
if err := h.SetAllStoresLimit(ratePerMin, typ); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
var ttl int
if ttlSec := r.URL.Query().Get("ttlSecond"); ttlSec != "" {
var err error
ttl, err = strconv.Atoi(ttlSec)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
}

if _, ok := input["labels"]; !ok {
for _, typ := range typeValues {
if ttl > 0 {
if err := h.SetAllStoresLimitTTL(ratePerMin, typ, time.Duration(ttl)*time.Second); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
} else {
if err := h.SetAllStoresLimit(ratePerMin, typ); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
}
}
} else {
labelMap := input["labels"].(map[string]interface{})
labels := make([]*metapb.StoreLabel, 0, len(input))
for k, v := range labelMap {
labels = append(labels, &metapb.StoreLabel{
Key: k,
Value: v.(string),
})
}

if err := config.ValidateLabels(labels); err != nil {
apiutil.ErrorResp(h.rd, w, errcode.NewInvalidInputErr(err))
return
}
}
Expand Down
51 changes: 51 additions & 0 deletions server/api/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,3 +395,54 @@ func (s *testStoreSuite) TestGetAllLimit(c *C) {
}
}
}

func (s *testStoreSuite) TestStoreLimitTTL(c *C) {
// add peer
url := fmt.Sprintf("%s/store/1/limit?ttlSecond=%v", s.urlPrefix, 5)
data := map[string]interface{}{
"type": "add-peer",
"rate": 999,
}
postData, err := json.Marshal(data)
c.Assert(err, IsNil)
err = postJSON(testDialClient, url, postData)
c.Assert(err, IsNil)
// remove peer
data = map[string]interface{}{
"type": "remove-peer",
"rate": 998,
}
postData, err = json.Marshal(data)
c.Assert(err, IsNil)
err = postJSON(testDialClient, url, postData)
c.Assert(err, IsNil)
// all store limit add peer
url = fmt.Sprintf("%s/stores/limit?ttlSecond=%v", s.urlPrefix, 3)
data = map[string]interface{}{
"type": "add-peer",
"rate": 997,
}
postData, err = json.Marshal(data)
c.Assert(err, IsNil)
err = postJSON(testDialClient, url, postData)
c.Assert(err, IsNil)
// all store limit remove peer
data = map[string]interface{}{
"type": "remove-peer",
"rate": 996,
}
postData, err = json.Marshal(data)
c.Assert(err, IsNil)
err = postJSON(testDialClient, url, postData)
c.Assert(err, IsNil)

c.Assert(s.svr.GetPersistOptions().GetStoreLimit(uint64(1)).AddPeer, Equals, float64(999))
c.Assert(s.svr.GetPersistOptions().GetStoreLimit(uint64(1)).RemovePeer, Equals, float64(998))
c.Assert(s.svr.GetPersistOptions().GetStoreLimit(uint64(2)).AddPeer, Equals, float64(997))
c.Assert(s.svr.GetPersistOptions().GetStoreLimit(uint64(2)).RemovePeer, Equals, float64(996))
time.Sleep(5 * time.Second)
c.Assert(s.svr.GetPersistOptions().GetStoreLimit(uint64(1)).AddPeer, Not(Equals), float64(999))
c.Assert(s.svr.GetPersistOptions().GetStoreLimit(uint64(1)).RemovePeer, Not(Equals), float64(998))
c.Assert(s.svr.GetPersistOptions().GetStoreLimit(uint64(2)).AddPeer, Not(Equals), float64(997))
c.Assert(s.svr.GetPersistOptions().GetStoreLimit(uint64(2)).RemovePeer, Not(Equals), float64(996))
}
15 changes: 15 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1767,6 +1767,21 @@ func (c *RaftCluster) SetAllStoresLimit(typ storelimit.Type, ratePerMin float64)
c.opt.SetAllStoresLimit(typ, ratePerMin)
}

// SetAllStoresLimitTTL sets all store limit for a given type and rate with ttl.
func (c *RaftCluster) SetAllStoresLimitTTL(typ storelimit.Type, ratePerMin float64, ttl time.Duration) {
c.opt.SetAllStoresLimitTTL(c.ctx, typ, ratePerMin, ttl)
}

// GetClusterVersion returns the current cluster version.
func (c *RaftCluster) GetClusterVersion() string {
return c.opt.GetClusterVersion().String()
}

// GetEtcdClient returns the current etcd client
func (c *RaftCluster) GetEtcdClient() *clientv3.Client {
return c.etcdClient
}

var healthURL = "/pd/api/v1/ping"

// CheckHealth checks if members are healthy.
Expand Down
Loading