From 9b1baad56c2c5c4bd947c179f625b59ffa3826a9 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Tue, 1 Dec 2020 14:50:54 +0800 Subject: [PATCH] Persist temporary setting to etcd (#3131) (#3228) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * cherry pick #3131 to release-4.0 Signed-off-by: ti-srebot * fix rebasing error Signed-off-by: longfangsong Co-authored-by: 龙方淞 --- pkg/cache/ttl.go | 5 + pkg/etcdutil/etcdutil.go | 10 ++ pkg/etcdutil/etcdutil_test.go | 39 ++++++ server/api/config.go | 6 +- server/api/config_test.go | 51 ++++--- server/cluster/cluster.go | 2 +- server/config/persist_options.go | 230 +++++++++++++++++++------------ server/handler.go | 4 +- server/server.go | 13 +- tests/client/client_test.go | 66 +++++++++ 10 files changed, 307 insertions(+), 119 deletions(-) diff --git a/pkg/cache/ttl.go b/pkg/cache/ttl.go index cd589abcb94..bcd79d5492f 100644 --- a/pkg/cache/ttl.go +++ b/pkg/cache/ttl.go @@ -234,6 +234,11 @@ func (c *TTLString) Put(key string, value interface{}) { c.ttlCache.put(key, value) } +// PutWithTTL puts an item into cache with specified TTL. +func (c *TTLString) PutWithTTL(key string, value interface{}, ttl time.Duration) { + c.ttlCache.putWithTTL(key, value, ttl) +} + // Pop one key/value that is not expired func (c *TTLString) Pop() (string, interface{}, bool) { k, v, success := c.ttlCache.pop() diff --git a/pkg/etcdutil/etcdutil.go b/pkg/etcdutil/etcdutil.go index c1dd36766ed..6491d9e999d 100644 --- a/pkg/etcdutil/etcdutil.go +++ b/pkg/etcdutil/etcdutil.go @@ -161,3 +161,13 @@ func GetProtoMsgWithModRev(c *clientv3.Client, key string, msg proto.Message, op } return true, resp.Kvs[0].ModRevision, nil } + +// EtcdKVPutWithTTL put (key, value) into etcd with a ttl of ttlSeconds +func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value string, ttlSeconds int64) (*clientv3.PutResponse, error) { + kv := clientv3.NewKV(c) + grantResp, err := c.Grant(ctx, ttlSeconds) + if err != nil { + return nil, err + } + return kv.Put(ctx, key, value, clientv3.WithLease(grantResp.ID)) +} diff --git a/pkg/etcdutil/etcdutil_test.go b/pkg/etcdutil/etcdutil_test.go index f5b942d2267..0fd91e3996e 100644 --- a/pkg/etcdutil/etcdutil_test.go +++ b/pkg/etcdutil/etcdutil_test.go @@ -21,6 +21,7 @@ import ( "net/url" "os" "testing" + "time" . "github.com/pingcap/check" "github.com/tikv/pd/pkg/tempurl" @@ -187,3 +188,41 @@ func (s *testEtcdutilSuite) TestEtcdKVGet(c *C) { c.Assert(len(resp.Kvs), Equals, 2) cleanConfig(cfg) } + +func (s *testEtcdutilSuite) TestEtcdKVPutWithTTL(c *C) { + cfg := newTestSingleConfig() + etcd, err := embed.StartEtcd(cfg) + c.Assert(err, IsNil) + + ep := cfg.LCUrls[0].String() + client, err := clientv3.New(clientv3.Config{ + Endpoints: []string{ep}, + }) + c.Assert(err, IsNil) + + <-etcd.Server.ReadyNotify() + + _, err = EtcdKVPutWithTTL(context.TODO(), client, "test/ttl1", "val1", 2) + c.Assert(err, IsNil) + _, err = EtcdKVPutWithTTL(context.TODO(), client, "test/ttl2", "val2", 4) + c.Assert(err, IsNil) + + time.Sleep(3 * time.Second) + // test/ttl1 is outdated + resp, err := EtcdKVGet(client, "test/ttl1") + c.Assert(err, IsNil) + c.Assert(resp.Count, Equals, int64(0)) + // but test/ttl2 is not + resp, err = EtcdKVGet(client, "test/ttl2") + c.Assert(err, IsNil) + c.Assert(string(resp.Kvs[0].Value), Equals, "val2") + + time.Sleep(2 * time.Second) + + // test/ttl2 is also outdated + resp, err = EtcdKVGet(client, "test/ttl2") + c.Assert(err, IsNil) + c.Assert(resp.Count, Equals, int64(0)) + + cleanConfig(cfg) +} diff --git a/server/api/config.go b/server/api/config.go index 1b752d87e7a..4e01b692e38 100644 --- a/server/api/config.go +++ b/server/api/config.go @@ -109,7 +109,11 @@ func (h *confHandler) Post(w http.ResponseWriter, r *http.Request) { // if ttlSecond defined, we will apply if to temp configuration. if ttls > 0 { - h.svr.SaveTTLConfig(conf, time.Duration(ttls)*time.Second) + err := h.svr.SaveTTLConfig(conf, time.Duration(ttls)*time.Second) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } h.rd.JSON(w, http.StatusOK, "The config is updated.") return } diff --git a/server/api/config_test.go b/server/api/config_test.go index 162dc7f0130..a893ee0c072 100644 --- a/server/api/config_test.go +++ b/server/api/config_test.go @@ -270,28 +270,39 @@ func (s *testConfigSuite) TestConfigDefault(c *C) { c.Assert(defaultCfg.PDServerCfg.MetricStorage, Equals, "") } +var ttlConfig = map[string]interface{}{ + "schedule.max-snapshot-count": 999, + "schedule.enable-location-replacement": false, + "schedule.max-merge-region-size": 999, + "schedule.max-merge-region-keys": 999, + "schedule.scheduler-max-waiting-operator": 999, + "schedule.leader-schedule-limit": 999, + "schedule.region-schedule-limit": 999, + "schedule.hot-region-schedule-limit": 999, + "schedule.replica-schedule-limit": 999, + "schedule.merge-schedule-limit": 999, +} + +func assertTTLConfig(c *C, options *config.PersistOptions, checker Checker) { + c.Assert(options.GetMaxSnapshotCount(), checker, uint64(999)) + c.Assert(options.IsLocationReplacementEnabled(), checker, false) + c.Assert(options.GetMaxMergeRegionSize(), checker, uint64(999)) + c.Assert(options.GetMaxMergeRegionKeys(), checker, uint64(999)) + c.Assert(options.GetSchedulerMaxWaitingOperator(), checker, uint64(999)) + c.Assert(options.GetLeaderScheduleLimit(), checker, uint64(999)) + c.Assert(options.GetRegionScheduleLimit(), checker, uint64(999)) + c.Assert(options.GetHotRegionScheduleLimit(), checker, uint64(999)) + c.Assert(options.GetReplicaScheduleLimit(), checker, uint64(999)) + c.Assert(options.GetMergeScheduleLimit(), checker, uint64(999)) +} + func (s *testConfigSuite) TestConfigTTL(c *C) { - addr := fmt.Sprintf("%s/config?ttlSecond=3", s.urlPrefix) - r := map[string]interface{}{ - "schedule.max-snapshot-count": 999, - "schedule.enable-location-replacement": false, - "schedule.max-merge-region-size": 999, - "schedule.max-merge-region-keys": 999, - "schedule.scheduler-max-waiting-operator": 999, - } - postData, err := json.Marshal(r) + addr := fmt.Sprintf("%s/config?ttlSecond=1", s.urlPrefix) + postData, err := json.Marshal(ttlConfig) c.Assert(err, IsNil) err = postJSON(testDialClient, addr, postData) c.Assert(err, IsNil) - c.Assert(s.svr.GetPersistOptions().GetMaxSnapshotCount(), Equals, uint64(999)) - c.Assert(s.svr.GetPersistOptions().IsLocationReplacementEnabled(), Equals, false) - c.Assert(s.svr.GetPersistOptions().GetMaxMergeRegionSize(), Equals, uint64(999)) - c.Assert(s.svr.GetPersistOptions().GetMaxMergeRegionKeys(), Equals, uint64(999)) - c.Assert(s.svr.GetPersistOptions().GetSchedulerMaxWaitingOperator(), Equals, uint64(999)) - time.Sleep(5 * time.Second) - c.Assert(s.svr.GetPersistOptions().GetMaxSnapshotCount(), Not(Equals), uint64(999)) - c.Assert(s.svr.GetPersistOptions().IsLocationReplacementEnabled(), Equals, true) - c.Assert(s.svr.GetPersistOptions().GetMaxMergeRegionSize(), Not(Equals), uint64(999)) - c.Assert(s.svr.GetPersistOptions().GetMaxMergeRegionKeys(), Not(Equals), uint64(999)) - c.Assert(s.svr.GetPersistOptions().GetSchedulerMaxWaitingOperator(), Not(Equals), uint64(999)) + assertTTLConfig(c, s.svr.GetPersistOptions(), Equals) + time.Sleep(2 * time.Second) + assertTTLConfig(c, s.svr.GetPersistOptions(), Not(Equals)) } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 1c5196f825e..ec9ff2f4f67 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1797,7 +1797,7 @@ func (c *RaftCluster) SetAllStoresLimit(typ storelimit.Type, ratePerMin float64) // SetAllStoresLimitTTL sets all store limit for a given type and rate with ttl. func (c *RaftCluster) SetAllStoresLimitTTL(typ storelimit.Type, ratePerMin float64, ttl time.Duration) { - c.opt.SetAllStoresLimitTTL(c.ctx, typ, ratePerMin, ttl) + c.opt.SetAllStoresLimitTTL(c.ctx, c.etcdClient, typ, ratePerMin, ttl) } // GetClusterVersion returns the current cluster version. diff --git a/server/config/persist_options.go b/server/config/persist_options.go index bcd33306fa1..251f99b8f1f 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -17,27 +17,30 @@ import ( "context" "fmt" "reflect" + "strconv" "sync/atomic" "time" "unsafe" "github.com/coreos/go-semver/semver" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "github.com/tikv/pd/pkg/cache" + "github.com/tikv/pd/pkg/etcdutil" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/typeutil" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/kv" "github.com/tikv/pd/server/schedule" "github.com/tikv/pd/server/schedule/storelimit" + "go.etcd.io/etcd/clientv3" ) // PersistOptions wraps all configurations that need to persist to storage and // allows to access them safely. type PersistOptions struct { // configuration -> ttl value - ttl map[string]*cache.TTLString - ttlCancel map[string]context.CancelFunc + ttl *cache.TTLString schedule atomic.Value replication atomic.Value pdServerConfig atomic.Value @@ -55,8 +58,7 @@ func NewPersistOptions(cfg *Config) *PersistOptions { o.replicationMode.Store(&cfg.ReplicationMode) o.labelProperty.Store(cfg.LabelProperty) o.SetClusterVersion(&cfg.ClusterVersion) - o.ttl = make(map[string]*cache.TTLString, 6) - o.ttlCancel = make(map[string]context.CancelFunc, 6) + o.ttl = nil return o } @@ -159,42 +161,37 @@ func (o *PersistOptions) SetMaxReplicas(replicas int) { o.SetReplicationConfig(v) } +const ( + maxSnapshotCountKey = "schedule.max-snapshot-count" + maxMergeRegionSizeKey = "schedule.max-merge-region-size" + maxPendingPeerCountKey = "schedule.max-pending-peer-count" + maxMergeRegionKeysKey = "schedule.max-merge-region-keys" + leaderScheduleLimitKey = "schedule.leader-schedule-limit" + regionScheduleLimitKey = "schedule.region-schedule-limit" + replicaRescheduleLimitKey = "schedule.replica-schedule-limit" + mergeScheduleLimitKey = "schedule.merge-schedule-limit" + hotRegionScheduleLimitKey = "schedule.hot-region-schedule-limit" + schedulerMaxWaitingOperatorKey = "schedule.scheduler-max-waiting-operator" +) + // GetMaxSnapshotCount returns the number of the max snapshot which is allowed to send. func (o *PersistOptions) GetMaxSnapshotCount() uint64 { - if v, ok := o.getTTLData("schedule.max-snapshot-count"); ok { - r, ok := v.(float64) - if ok { - return uint64(r) - } - } - return o.GetScheduleConfig().MaxSnapshotCount + return o.getTTLUintOr(maxSnapshotCountKey, o.GetScheduleConfig().MaxSnapshotCount) } // GetMaxPendingPeerCount returns the number of the max pending peers. func (o *PersistOptions) GetMaxPendingPeerCount() uint64 { - return o.GetScheduleConfig().MaxPendingPeerCount + return o.getTTLUintOr(maxPendingPeerCountKey, o.GetScheduleConfig().MaxPendingPeerCount) } // GetMaxMergeRegionSize returns the max region size. func (o *PersistOptions) GetMaxMergeRegionSize() uint64 { - if v, ok := o.getTTLData("schedule.max-merge-region-size"); ok { - r, ok := v.(float64) - if ok { - return uint64(r) - } - } - return o.GetScheduleConfig().MaxMergeRegionSize + return o.getTTLUintOr(maxMergeRegionSizeKey, o.GetScheduleConfig().MaxMergeRegionSize) } // GetMaxMergeRegionKeys returns the max number of keys. func (o *PersistOptions) GetMaxMergeRegionKeys() uint64 { - if v, ok := o.getTTLData("schedule.max-merge-region-keys"); ok { - r, ok := v.(float64) - if ok { - return uint64(r) - } - } - return o.GetScheduleConfig().MaxMergeRegionKeys + return o.getTTLUintOr(maxMergeRegionKeysKey, o.GetScheduleConfig().MaxMergeRegionKeys) } // GetSplitMergeInterval returns the interval between finishing split and starting to merge. @@ -277,64 +274,62 @@ func (o *PersistOptions) GetMaxStoreDownTime() time.Duration { // GetLeaderScheduleLimit returns the limit for leader schedule. func (o *PersistOptions) GetLeaderScheduleLimit() uint64 { - return o.GetScheduleConfig().LeaderScheduleLimit + return o.getTTLUintOr(leaderScheduleLimitKey, o.GetScheduleConfig().LeaderScheduleLimit) } // GetRegionScheduleLimit returns the limit for region schedule. func (o *PersistOptions) GetRegionScheduleLimit() uint64 { - return o.GetScheduleConfig().RegionScheduleLimit + return o.getTTLUintOr(regionScheduleLimitKey, o.GetScheduleConfig().RegionScheduleLimit) } // GetReplicaScheduleLimit returns the limit for replica schedule. func (o *PersistOptions) GetReplicaScheduleLimit() uint64 { - return o.GetScheduleConfig().ReplicaScheduleLimit + return o.getTTLUintOr(replicaRescheduleLimitKey, o.GetScheduleConfig().ReplicaScheduleLimit) } // GetMergeScheduleLimit returns the limit for merge schedule. func (o *PersistOptions) GetMergeScheduleLimit() uint64 { - return o.GetScheduleConfig().MergeScheduleLimit + return o.getTTLUintOr(mergeScheduleLimitKey, o.GetScheduleConfig().MergeScheduleLimit) } // GetHotRegionScheduleLimit returns the limit for hot region schedule. func (o *PersistOptions) GetHotRegionScheduleLimit() uint64 { - return o.GetScheduleConfig().HotRegionScheduleLimit + return o.getTTLUintOr(hotRegionScheduleLimitKey, o.GetScheduleConfig().HotRegionScheduleLimit) } // GetStoreLimit returns the limit of a store. func (o *PersistOptions) GetStoreLimit(storeID uint64) (returnSC StoreLimitConfig) { defer func() { - if v, ok := o.getTTLData(fmt.Sprintf("remove-peer-%v", storeID)); ok { - r, ok := v.(float64) - if ok { - returnSC.RemovePeer = r - } - } - if v, ok := o.getTTLData(fmt.Sprintf("add-peer-%v", storeID)); ok { - r, ok := v.(float64) - if ok { - returnSC.AddPeer = r - } - } + returnSC.RemovePeer = o.getTTLFloatOr(fmt.Sprintf("remove-peer-%v", storeID), returnSC.RemovePeer) + returnSC.AddPeer = o.getTTLFloatOr(fmt.Sprintf("add-peer-%v", storeID), returnSC.AddPeer) }() if limit, ok := o.GetScheduleConfig().StoreLimit[storeID]; ok { return limit } - v1, ok1 := o.getTTLData("default-add-peer") - v2, ok2 := o.getTTLData("default-remove-peer") cfg := o.GetScheduleConfig().Clone() sc := StoreLimitConfig{ AddPeer: DefaultStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer), RemovePeer: DefaultStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer), } - if ok1 || ok2 { - r, ok := v1.(float64) - if ok { - returnSC.AddPeer = r - } - r, ok = v2.(float64) - if ok { - returnSC.RemovePeer = r - } + v, ok1, err := o.getTTLFloat("default-add-peer") + if err != nil { + log.Warn("failed to parse default-add-peer from PersistOptions's ttl storage") + } + canSetAddPeer := ok1 && err == nil + if canSetAddPeer { + returnSC.AddPeer = v + } + + v, ok2, err := o.getTTLFloat("default-remove-peer") + if err != nil { + log.Warn("failed to parse default-remove-peer from PersistOptions's ttl storage") + } + canSetRemovePeer := ok2 && err == nil + if canSetRemovePeer { + returnSC.RemovePeer = v + } + + if canSetAddPeer || canSetRemovePeer { return returnSC } cfg.StoreLimit[storeID] = sc @@ -346,19 +341,9 @@ func (o *PersistOptions) GetStoreLimit(storeID uint64) (returnSC StoreLimitConfi func (o *PersistOptions) GetStoreLimitByType(storeID uint64, typ storelimit.Type) (returned float64) { defer func() { if typ == storelimit.RemovePeer { - if v, ok := o.getTTLData(fmt.Sprintf("remove-peer-%v", storeID)); ok { - r, ok := v.(float64) - if ok { - returned = r - } - } + returned = o.getTTLFloatOr(fmt.Sprintf("remove-peer-%v", storeID), returned) } else if typ == storelimit.AddPeer { - if v, ok := o.getTTLData(fmt.Sprintf("add-peer-%v", storeID)); ok { - r, ok := v.(float64) - if ok { - returned = r - } - } + returned = o.getTTLFloatOr(fmt.Sprintf("add-peer-%v", storeID), returned) } }() limit := o.GetStoreLimit(storeID) @@ -399,13 +384,7 @@ func (o *PersistOptions) GetHighSpaceRatio() float64 { // GetSchedulerMaxWaitingOperator returns the number of the max waiting operators. func (o *PersistOptions) GetSchedulerMaxWaitingOperator() uint64 { - if v, ok := o.getTTLData("schedule.scheduler-max-waiting-operator"); ok { - r, ok := v.(float64) - if ok { - return uint64(r) - } - } - return o.GetScheduleConfig().SchedulerMaxWaitingOperator + return o.getTTLUintOr(schedulerMaxWaitingOperatorKey, o.GetScheduleConfig().SchedulerMaxWaitingOperator) } // GetLeaderSchedulePolicy is to get leader schedule policy. @@ -456,10 +435,11 @@ func (o *PersistOptions) IsRemoveExtraReplicaEnabled() bool { // IsLocationReplacementEnabled returns if location replace is enabled. func (o *PersistOptions) IsLocationReplacementEnabled() bool { if v, ok := o.getTTLData("schedule.enable-location-replacement"); ok { - r, ok := v.(bool) - if ok { - return r + result, err := strconv.ParseBool(v) + if err == nil { + return result } + log.Warn("failed to parse schedule.enable-location-replacement from PersistOptions's ttl storage") } return o.GetScheduleConfig().EnableLocationReplacement } @@ -614,31 +594,99 @@ func (o *PersistOptions) CheckLabelProperty(typ string, labels []*metapb.StoreLa return false } +const ttlConfigPrefix = "/config/ttl" + // SetTTLData set temporary configuration -func (o *PersistOptions) SetTTLData(parCtx context.Context, key string, value interface{}, ttl time.Duration) { - if data, ok := o.ttl[key]; ok { - data.Clear() - o.ttlCancel[key]() +func (o *PersistOptions) SetTTLData(parCtx context.Context, client *clientv3.Client, key string, value string, ttl time.Duration) error { + if o.ttl == nil { + o.ttl = cache.NewStringTTL(parCtx, time.Second*5, time.Minute*5) } - ctx, cancel := context.WithCancel(parCtx) - o.ttl[key] = cache.NewStringTTL(ctx, 5*time.Second, ttl) - o.ttl[key].Put(key, value) - o.ttlCancel[key] = cancel + _, err := etcdutil.EtcdKVPutWithTTL(parCtx, client, ttlConfigPrefix+"/"+key, value, int64(ttl.Seconds())) + if err != nil { + return err + } + o.ttl.PutWithTTL(key, value, ttl) + return nil +} + +func (o *PersistOptions) getTTLUint(key string) (uint64, bool, error) { + stringForm, ok := o.getTTLData(key) + if !ok { + return 0, false, nil + } + r, err := strconv.ParseUint(stringForm, 10, 64) + return r, true, err +} + +func (o *PersistOptions) getTTLUintOr(key string, defaultValue uint64) uint64 { + if v, ok, err := o.getTTLUint(key); ok { + if err == nil { + return v + } + log.Warn("failed to parse " + key + " from PersistOptions's ttl storage") + } + return defaultValue +} + +func (o *PersistOptions) getTTLFloat(key string) (float64, bool, error) { + stringForm, ok := o.getTTLData(key) + if !ok { + return 0, false, nil + } + r, err := strconv.ParseFloat(stringForm, 64) + return r, true, err +} + +func (o *PersistOptions) getTTLFloatOr(key string, defaultValue float64) float64 { + if v, ok, err := o.getTTLFloat(key); ok { + if err == nil { + return v + } + log.Warn("failed to parse " + key + " from PersistOptions's ttl storage") + } + return defaultValue } -func (o *PersistOptions) getTTLData(key string) (interface{}, bool) { - if data, ok := o.ttl[key]; ok { - return data.Get(key) +func (o *PersistOptions) getTTLData(key string) (string, bool) { + if o.ttl == nil { + return "", false + } + if result, ok := o.ttl.Get(key); ok { + return result.(string), ok + } + return "", false +} + +// LoadTTLFromEtcd loads temporary configuration which was persisted into etcd +func (o *PersistOptions) LoadTTLFromEtcd(ctx context.Context, client *clientv3.Client) error { + resps, err := etcdutil.EtcdKVGet(client, ttlConfigPrefix, clientv3.WithPrefix()) + if err != nil { + return err + } + if o.ttl == nil { + o.ttl = cache.NewStringTTL(ctx, time.Second*5, time.Minute*5) } - return nil, false + for _, resp := range resps.Kvs { + key := string(resp.Key)[len(ttlConfigPrefix)+1:] + value := string(resp.Value) + leaseID := resp.Lease + resp, err := client.TimeToLive(ctx, clientv3.LeaseID(leaseID)) + if err != nil { + return err + } + o.ttl.PutWithTTL(key, value, time.Duration(resp.TTL)*time.Second) + } + return nil } // SetAllStoresLimitTTL sets all store limit for a given type and rate with ttl. -func (o *PersistOptions) SetAllStoresLimitTTL(ctx context.Context, typ storelimit.Type, ratePerMin float64, ttl time.Duration) { +func (o *PersistOptions) SetAllStoresLimitTTL(ctx context.Context, client *clientv3.Client, typ storelimit.Type, ratePerMin float64, ttl time.Duration) error { + var err error switch typ { case storelimit.AddPeer: - o.SetTTLData(ctx, "default-add-peer", ratePerMin, ttl) + err = o.SetTTLData(ctx, client, "default-add-peer", fmt.Sprint(ratePerMin), ttl) case storelimit.RemovePeer: - o.SetTTLData(ctx, "default-remove-peer", ratePerMin, ttl) + err = o.SetTTLData(ctx, client, "default-remove-peer", fmt.Sprint(ratePerMin), ttl) } + return err } diff --git a/server/handler.go b/server/handler.go index ed17f2deae8..3346ca81ab0 100644 --- a/server/handler.go +++ b/server/handler.go @@ -904,8 +904,8 @@ func (h *Handler) GetAddr() string { } // SetStoreLimitTTL set storeLimit with ttl -func (h *Handler) SetStoreLimitTTL(data string, value float64, ttl time.Duration) { - h.s.SaveTTLConfig(map[string]interface{}{ +func (h *Handler) SetStoreLimitTTL(data string, value float64, ttl time.Duration) error { + return h.s.SaveTTLConfig(map[string]interface{}{ data: value, }, ttl) } diff --git a/server/server.go b/server/server.go index 5f5284e2a08..61e89c82af7 100644 --- a/server/server.go +++ b/server/server.go @@ -454,7 +454,6 @@ func (s *Server) Run() error { if err := s.startEtcd(s.ctx); err != nil { return err } - if err := s.startServer(s.ctx); err != nil { return err } @@ -1149,7 +1148,10 @@ func (s *Server) campaignLeader() { return } defer s.stopRaftCluster() - + if err := s.persistOptions.LoadTTLFromEtcd(s.ctx, s.client); err != nil { + log.Error("failed to load persistOptions from etcd", errs.ZapError(err)) + return + } s.member.EnableLeader() defer s.member.DisableLeader() @@ -1253,8 +1255,11 @@ func (s *Server) PersistFile(name string, data []byte) error { } // SaveTTLConfig save ttl config -func (s *Server) SaveTTLConfig(data map[string]interface{}, ttl time.Duration) { +func (s *Server) SaveTTLConfig(data map[string]interface{}, ttl time.Duration) error { for k, v := range data { - s.persistOptions.SetTTLData(s.ctx, k, v, ttl) + if err := s.persistOptions.SetTTLData(s.ctx, s.client, k, fmt.Sprint(v), ttl); err != nil { + return err + } } + return nil } diff --git a/tests/client/client_test.go b/tests/client/client_test.go index 65eb785156c..0e760e581bc 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -14,7 +14,10 @@ package client_test import ( + "bytes" "context" + "encoding/json" + "fmt" "math" "path/filepath" "sort" @@ -32,6 +35,7 @@ import ( "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/testutil" "github.com/tikv/pd/server" + "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" "github.com/tikv/pd/tests" "go.etcd.io/etcd/clientv3" @@ -790,3 +794,65 @@ func (s *testClientSuite) TestScatterRegionWithOption(c *C) { }) c.Succeed() } + +type testConfigTTLSuite struct { + ctx context.Context + cancel context.CancelFunc +} + +func (s *testConfigTTLSuite) SetUpSuite(c *C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) + server.EnableZap = true +} + +func (s *testConfigTTLSuite) TearDownSuite(c *C) { + s.cancel() +} + +var _ = SerialSuites(&testConfigTTLSuite{}) + +var ttlConfig = map[string]interface{}{ + "schedule.max-snapshot-count": 999, + "schedule.enable-location-replacement": false, + "schedule.max-merge-region-size": 999, + "schedule.max-merge-region-keys": 999, + "schedule.scheduler-max-waiting-operator": 999, + "schedule.leader-schedule-limit": 999, + "schedule.region-schedule-limit": 999, + "schedule.hot-region-schedule-limit": 999, + "schedule.replica-schedule-limit": 999, + "schedule.merge-schedule-limit": 999, +} + +func assertTTLConfig(c *C, options *config.PersistOptions, checker Checker) { + c.Assert(options.GetMaxSnapshotCount(), checker, uint64(999)) + c.Assert(options.IsLocationReplacementEnabled(), checker, false) + c.Assert(options.GetMaxMergeRegionSize(), checker, uint64(999)) + c.Assert(options.GetMaxMergeRegionKeys(), checker, uint64(999)) + c.Assert(options.GetSchedulerMaxWaitingOperator(), checker, uint64(999)) + c.Assert(options.GetLeaderScheduleLimit(), checker, uint64(999)) + c.Assert(options.GetRegionScheduleLimit(), checker, uint64(999)) + c.Assert(options.GetHotRegionScheduleLimit(), checker, uint64(999)) + c.Assert(options.GetReplicaScheduleLimit(), checker, uint64(999)) + c.Assert(options.GetMergeScheduleLimit(), checker, uint64(999)) +} + +func (s *testConfigTTLSuite) TestConfigTTLAfterTransferLeader(c *C) { + cluster, err := tests.NewTestCluster(s.ctx, 3) + c.Assert(err, IsNil) + defer cluster.Destroy() + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + leader := cluster.GetServer(cluster.WaitLeader()) + c.Assert(leader.BootstrapCluster(), IsNil) + addr := fmt.Sprintf("%s/pd/api/v1/config?ttlSecond=5", leader.GetAddr()) + postData, err := json.Marshal(ttlConfig) + c.Assert(err, IsNil) + _, err = leader.GetHTTPClient().Post(addr, "application/json", bytes.NewBuffer(postData)) + c.Assert(err, IsNil) + time.Sleep(2 * time.Second) + _ = leader.Destroy() + time.Sleep(2 * time.Second) + leader = cluster.GetServer(cluster.WaitLeader()) + assertTTLConfig(c, leader.GetPersistOptions(), Equals) +}