Skip to content

Commit

Permalink
cherry pick #3131 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
longfangsong authored and ti-srebot committed Nov 30, 2020
1 parent 25ab7fc commit 5f24ab7
Show file tree
Hide file tree
Showing 11 changed files with 433 additions and 119 deletions.
5 changes: 5 additions & 0 deletions pkg/cache/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,11 @@ func (c *TTLString) Put(key string, value interface{}) {
c.ttlCache.put(key, value)
}

// PutWithTTL puts an item into cache with specified TTL.
func (c *TTLString) PutWithTTL(key string, value interface{}, ttl time.Duration) {
c.ttlCache.putWithTTL(key, value, ttl)
}

// Pop one key/value that is not expired
func (c *TTLString) Pop() (string, interface{}, bool) {
k, v, success := c.ttlCache.pop()
Expand Down
10 changes: 10 additions & 0 deletions pkg/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,13 @@ func GetProtoMsgWithModRev(c *clientv3.Client, key string, msg proto.Message, op
}
return true, resp.Kvs[0].ModRevision, nil
}

// EtcdKVPutWithTTL put (key, value) into etcd with a ttl of ttlSeconds
func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value string, ttlSeconds int64) (*clientv3.PutResponse, error) {
kv := clientv3.NewKV(c)
grantResp, err := c.Grant(ctx, ttlSeconds)
if err != nil {
return nil, err
}
return kv.Put(ctx, key, value, clientv3.WithLease(grantResp.ID))
}
39 changes: 39 additions & 0 deletions pkg/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/url"
"os"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/tikv/pd/pkg/tempurl"
Expand Down Expand Up @@ -187,3 +188,41 @@ func (s *testEtcdutilSuite) TestEtcdKVGet(c *C) {
c.Assert(len(resp.Kvs), Equals, 2)
cleanConfig(cfg)
}

func (s *testEtcdutilSuite) TestEtcdKVPutWithTTL(c *C) {
cfg := newTestSingleConfig()
etcd, err := embed.StartEtcd(cfg)
c.Assert(err, IsNil)

ep := cfg.LCUrls[0].String()
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
c.Assert(err, IsNil)

<-etcd.Server.ReadyNotify()

_, err = EtcdKVPutWithTTL(context.TODO(), client, "test/ttl1", "val1", 2)
c.Assert(err, IsNil)
_, err = EtcdKVPutWithTTL(context.TODO(), client, "test/ttl2", "val2", 4)
c.Assert(err, IsNil)

time.Sleep(3 * time.Second)
// test/ttl1 is outdated
resp, err := EtcdKVGet(client, "test/ttl1")
c.Assert(err, IsNil)
c.Assert(resp.Count, Equals, int64(0))
// but test/ttl2 is not
resp, err = EtcdKVGet(client, "test/ttl2")
c.Assert(err, IsNil)
c.Assert(string(resp.Kvs[0].Value), Equals, "val2")

time.Sleep(2 * time.Second)

// test/ttl2 is also outdated
resp, err = EtcdKVGet(client, "test/ttl2")
c.Assert(err, IsNil)
c.Assert(resp.Count, Equals, int64(0))

cleanConfig(cfg)
}
6 changes: 5 additions & 1 deletion server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ func (h *confHandler) Post(w http.ResponseWriter, r *http.Request) {

// if ttlSecond defined, we will apply if to temp configuration.
if ttls > 0 {
h.svr.SaveTTLConfig(conf, time.Duration(ttls)*time.Second)
err := h.svr.SaveTTLConfig(conf, time.Duration(ttls)*time.Second)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, "The config is updated.")
return
}
Expand Down
51 changes: 31 additions & 20 deletions server/api/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,28 +270,39 @@ func (s *testConfigSuite) TestConfigDefault(c *C) {
c.Assert(defaultCfg.PDServerCfg.MetricStorage, Equals, "")
}

var ttlConfig = map[string]interface{}{
"schedule.max-snapshot-count": 999,
"schedule.enable-location-replacement": false,
"schedule.max-merge-region-size": 999,
"schedule.max-merge-region-keys": 999,
"schedule.scheduler-max-waiting-operator": 999,
"schedule.leader-schedule-limit": 999,
"schedule.region-schedule-limit": 999,
"schedule.hot-region-schedule-limit": 999,
"schedule.replica-schedule-limit": 999,
"schedule.merge-schedule-limit": 999,
}

func assertTTLConfig(c *C, options *config.PersistOptions, checker Checker) {
c.Assert(options.GetMaxSnapshotCount(), checker, uint64(999))
c.Assert(options.IsLocationReplacementEnabled(), checker, false)
c.Assert(options.GetMaxMergeRegionSize(), checker, uint64(999))
c.Assert(options.GetMaxMergeRegionKeys(), checker, uint64(999))
c.Assert(options.GetSchedulerMaxWaitingOperator(), checker, uint64(999))
c.Assert(options.GetLeaderScheduleLimit(), checker, uint64(999))
c.Assert(options.GetRegionScheduleLimit(), checker, uint64(999))
c.Assert(options.GetHotRegionScheduleLimit(), checker, uint64(999))
c.Assert(options.GetReplicaScheduleLimit(), checker, uint64(999))
c.Assert(options.GetMergeScheduleLimit(), checker, uint64(999))
}

func (s *testConfigSuite) TestConfigTTL(c *C) {
addr := fmt.Sprintf("%s/config?ttlSecond=3", s.urlPrefix)
r := map[string]interface{}{
"schedule.max-snapshot-count": 999,
"schedule.enable-location-replacement": false,
"schedule.max-merge-region-size": 999,
"schedule.max-merge-region-keys": 999,
"schedule.scheduler-max-waiting-operator": 999,
}
postData, err := json.Marshal(r)
addr := fmt.Sprintf("%s/config?ttlSecond=1", s.urlPrefix)
postData, err := json.Marshal(ttlConfig)
c.Assert(err, IsNil)
err = postJSON(testDialClient, addr, postData)
c.Assert(err, IsNil)
c.Assert(s.svr.GetPersistOptions().GetMaxSnapshotCount(), Equals, uint64(999))
c.Assert(s.svr.GetPersistOptions().IsLocationReplacementEnabled(), Equals, false)
c.Assert(s.svr.GetPersistOptions().GetMaxMergeRegionSize(), Equals, uint64(999))
c.Assert(s.svr.GetPersistOptions().GetMaxMergeRegionKeys(), Equals, uint64(999))
c.Assert(s.svr.GetPersistOptions().GetSchedulerMaxWaitingOperator(), Equals, uint64(999))
time.Sleep(5 * time.Second)
c.Assert(s.svr.GetPersistOptions().GetMaxSnapshotCount(), Not(Equals), uint64(999))
c.Assert(s.svr.GetPersistOptions().IsLocationReplacementEnabled(), Equals, true)
c.Assert(s.svr.GetPersistOptions().GetMaxMergeRegionSize(), Not(Equals), uint64(999))
c.Assert(s.svr.GetPersistOptions().GetMaxMergeRegionKeys(), Not(Equals), uint64(999))
c.Assert(s.svr.GetPersistOptions().GetSchedulerMaxWaitingOperator(), Not(Equals), uint64(999))
assertTTLConfig(c, s.svr.GetPersistOptions(), Equals)
time.Sleep(2 * time.Second)
assertTTLConfig(c, s.svr.GetPersistOptions(), Not(Equals))
}
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1797,7 +1797,7 @@ func (c *RaftCluster) SetAllStoresLimit(typ storelimit.Type, ratePerMin float64)

// SetAllStoresLimitTTL sets all store limit for a given type and rate with ttl.
func (c *RaftCluster) SetAllStoresLimitTTL(typ storelimit.Type, ratePerMin float64, ttl time.Duration) {
c.opt.SetAllStoresLimitTTL(c.ctx, typ, ratePerMin, ttl)
c.opt.SetAllStoresLimitTTL(c.ctx, c.etcdClient, typ, ratePerMin, ttl)
}

// GetClusterVersion returns the current cluster version.
Expand Down
Loading

0 comments on commit 5f24ab7

Please sign in to comment.