From 5f24ab7d7fec93ce315a775e18eb2b4513734cf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BE=99=E6=96=B9=E6=B7=9E?= Date: Fri, 27 Nov 2020 14:02:58 +0800 Subject: [PATCH] cherry pick #3131 to release-4.0 Signed-off-by: ti-srebot --- pkg/cache/ttl.go | 5 + pkg/etcdutil/etcdutil.go | 10 + pkg/etcdutil/etcdutil_test.go | 39 ++++ server/api/config.go | 6 +- server/api/config_test.go | 51 ++++-- server/cluster/cluster.go | 2 +- server/config/persist_options.go | 234 +++++++++++++++--------- server/handler.go | 4 +- server/schedule/region_splitter_test.go | 123 +++++++++++++ server/server.go | 13 +- tests/client/client_test.go | 65 +++++++ 11 files changed, 433 insertions(+), 119 deletions(-) create mode 100644 server/schedule/region_splitter_test.go diff --git a/pkg/cache/ttl.go b/pkg/cache/ttl.go index cd589abcb94..bcd79d5492f 100644 --- a/pkg/cache/ttl.go +++ b/pkg/cache/ttl.go @@ -234,6 +234,11 @@ func (c *TTLString) Put(key string, value interface{}) { c.ttlCache.put(key, value) } +// PutWithTTL puts an item into cache with specified TTL. +func (c *TTLString) PutWithTTL(key string, value interface{}, ttl time.Duration) { + c.ttlCache.putWithTTL(key, value, ttl) +} + // Pop one key/value that is not expired func (c *TTLString) Pop() (string, interface{}, bool) { k, v, success := c.ttlCache.pop() diff --git a/pkg/etcdutil/etcdutil.go b/pkg/etcdutil/etcdutil.go index c1dd36766ed..6491d9e999d 100644 --- a/pkg/etcdutil/etcdutil.go +++ b/pkg/etcdutil/etcdutil.go @@ -161,3 +161,13 @@ func GetProtoMsgWithModRev(c *clientv3.Client, key string, msg proto.Message, op } return true, resp.Kvs[0].ModRevision, nil } + +// EtcdKVPutWithTTL put (key, value) into etcd with a ttl of ttlSeconds +func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value string, ttlSeconds int64) (*clientv3.PutResponse, error) { + kv := clientv3.NewKV(c) + grantResp, err := c.Grant(ctx, ttlSeconds) + if err != nil { + return nil, err + } + return kv.Put(ctx, key, value, clientv3.WithLease(grantResp.ID)) +} diff --git a/pkg/etcdutil/etcdutil_test.go b/pkg/etcdutil/etcdutil_test.go index f5b942d2267..0fd91e3996e 100644 --- a/pkg/etcdutil/etcdutil_test.go +++ b/pkg/etcdutil/etcdutil_test.go @@ -21,6 +21,7 @@ import ( "net/url" "os" "testing" + "time" . "github.com/pingcap/check" "github.com/tikv/pd/pkg/tempurl" @@ -187,3 +188,41 @@ func (s *testEtcdutilSuite) TestEtcdKVGet(c *C) { c.Assert(len(resp.Kvs), Equals, 2) cleanConfig(cfg) } + +func (s *testEtcdutilSuite) TestEtcdKVPutWithTTL(c *C) { + cfg := newTestSingleConfig() + etcd, err := embed.StartEtcd(cfg) + c.Assert(err, IsNil) + + ep := cfg.LCUrls[0].String() + client, err := clientv3.New(clientv3.Config{ + Endpoints: []string{ep}, + }) + c.Assert(err, IsNil) + + <-etcd.Server.ReadyNotify() + + _, err = EtcdKVPutWithTTL(context.TODO(), client, "test/ttl1", "val1", 2) + c.Assert(err, IsNil) + _, err = EtcdKVPutWithTTL(context.TODO(), client, "test/ttl2", "val2", 4) + c.Assert(err, IsNil) + + time.Sleep(3 * time.Second) + // test/ttl1 is outdated + resp, err := EtcdKVGet(client, "test/ttl1") + c.Assert(err, IsNil) + c.Assert(resp.Count, Equals, int64(0)) + // but test/ttl2 is not + resp, err = EtcdKVGet(client, "test/ttl2") + c.Assert(err, IsNil) + c.Assert(string(resp.Kvs[0].Value), Equals, "val2") + + time.Sleep(2 * time.Second) + + // test/ttl2 is also outdated + resp, err = EtcdKVGet(client, "test/ttl2") + c.Assert(err, IsNil) + c.Assert(resp.Count, Equals, int64(0)) + + cleanConfig(cfg) +} diff --git a/server/api/config.go b/server/api/config.go index 1b752d87e7a..4e01b692e38 100644 --- a/server/api/config.go +++ b/server/api/config.go @@ -109,7 +109,11 @@ func (h *confHandler) Post(w http.ResponseWriter, r *http.Request) { // if ttlSecond defined, we will apply if to temp configuration. if ttls > 0 { - h.svr.SaveTTLConfig(conf, time.Duration(ttls)*time.Second) + err := h.svr.SaveTTLConfig(conf, time.Duration(ttls)*time.Second) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } h.rd.JSON(w, http.StatusOK, "The config is updated.") return } diff --git a/server/api/config_test.go b/server/api/config_test.go index 162dc7f0130..a893ee0c072 100644 --- a/server/api/config_test.go +++ b/server/api/config_test.go @@ -270,28 +270,39 @@ func (s *testConfigSuite) TestConfigDefault(c *C) { c.Assert(defaultCfg.PDServerCfg.MetricStorage, Equals, "") } +var ttlConfig = map[string]interface{}{ + "schedule.max-snapshot-count": 999, + "schedule.enable-location-replacement": false, + "schedule.max-merge-region-size": 999, + "schedule.max-merge-region-keys": 999, + "schedule.scheduler-max-waiting-operator": 999, + "schedule.leader-schedule-limit": 999, + "schedule.region-schedule-limit": 999, + "schedule.hot-region-schedule-limit": 999, + "schedule.replica-schedule-limit": 999, + "schedule.merge-schedule-limit": 999, +} + +func assertTTLConfig(c *C, options *config.PersistOptions, checker Checker) { + c.Assert(options.GetMaxSnapshotCount(), checker, uint64(999)) + c.Assert(options.IsLocationReplacementEnabled(), checker, false) + c.Assert(options.GetMaxMergeRegionSize(), checker, uint64(999)) + c.Assert(options.GetMaxMergeRegionKeys(), checker, uint64(999)) + c.Assert(options.GetSchedulerMaxWaitingOperator(), checker, uint64(999)) + c.Assert(options.GetLeaderScheduleLimit(), checker, uint64(999)) + c.Assert(options.GetRegionScheduleLimit(), checker, uint64(999)) + c.Assert(options.GetHotRegionScheduleLimit(), checker, uint64(999)) + c.Assert(options.GetReplicaScheduleLimit(), checker, uint64(999)) + c.Assert(options.GetMergeScheduleLimit(), checker, uint64(999)) +} + func (s *testConfigSuite) TestConfigTTL(c *C) { - addr := fmt.Sprintf("%s/config?ttlSecond=3", s.urlPrefix) - r := map[string]interface{}{ - "schedule.max-snapshot-count": 999, - "schedule.enable-location-replacement": false, - "schedule.max-merge-region-size": 999, - "schedule.max-merge-region-keys": 999, - "schedule.scheduler-max-waiting-operator": 999, - } - postData, err := json.Marshal(r) + addr := fmt.Sprintf("%s/config?ttlSecond=1", s.urlPrefix) + postData, err := json.Marshal(ttlConfig) c.Assert(err, IsNil) err = postJSON(testDialClient, addr, postData) c.Assert(err, IsNil) - c.Assert(s.svr.GetPersistOptions().GetMaxSnapshotCount(), Equals, uint64(999)) - c.Assert(s.svr.GetPersistOptions().IsLocationReplacementEnabled(), Equals, false) - c.Assert(s.svr.GetPersistOptions().GetMaxMergeRegionSize(), Equals, uint64(999)) - c.Assert(s.svr.GetPersistOptions().GetMaxMergeRegionKeys(), Equals, uint64(999)) - c.Assert(s.svr.GetPersistOptions().GetSchedulerMaxWaitingOperator(), Equals, uint64(999)) - time.Sleep(5 * time.Second) - c.Assert(s.svr.GetPersistOptions().GetMaxSnapshotCount(), Not(Equals), uint64(999)) - c.Assert(s.svr.GetPersistOptions().IsLocationReplacementEnabled(), Equals, true) - c.Assert(s.svr.GetPersistOptions().GetMaxMergeRegionSize(), Not(Equals), uint64(999)) - c.Assert(s.svr.GetPersistOptions().GetMaxMergeRegionKeys(), Not(Equals), uint64(999)) - c.Assert(s.svr.GetPersistOptions().GetSchedulerMaxWaitingOperator(), Not(Equals), uint64(999)) + assertTTLConfig(c, s.svr.GetPersistOptions(), Equals) + time.Sleep(2 * time.Second) + assertTTLConfig(c, s.svr.GetPersistOptions(), Not(Equals)) } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 1c5196f825e..ec9ff2f4f67 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1797,7 +1797,7 @@ func (c *RaftCluster) SetAllStoresLimit(typ storelimit.Type, ratePerMin float64) // SetAllStoresLimitTTL sets all store limit for a given type and rate with ttl. func (c *RaftCluster) SetAllStoresLimitTTL(typ storelimit.Type, ratePerMin float64, ttl time.Duration) { - c.opt.SetAllStoresLimitTTL(c.ctx, typ, ratePerMin, ttl) + c.opt.SetAllStoresLimitTTL(c.ctx, c.etcdClient, typ, ratePerMin, ttl) } // GetClusterVersion returns the current cluster version. diff --git a/server/config/persist_options.go b/server/config/persist_options.go index bcd33306fa1..e5c7429ba2d 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -17,27 +17,34 @@ import ( "context" "fmt" "reflect" + "strconv" "sync/atomic" "time" "unsafe" "github.com/coreos/go-semver/semver" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "github.com/tikv/pd/pkg/cache" + "github.com/tikv/pd/pkg/etcdutil" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/typeutil" "github.com/tikv/pd/server/core" +<<<<<<< HEAD "github.com/tikv/pd/server/kv" "github.com/tikv/pd/server/schedule" "github.com/tikv/pd/server/schedule/storelimit" +======= + "github.com/tikv/pd/server/core/storelimit" + "go.etcd.io/etcd/clientv3" +>>>>>>> 1eeb4c87... Persist temporary setting to etcd (#3131) ) // PersistOptions wraps all configurations that need to persist to storage and // allows to access them safely. type PersistOptions struct { // configuration -> ttl value - ttl map[string]*cache.TTLString - ttlCancel map[string]context.CancelFunc + ttl *cache.TTLString schedule atomic.Value replication atomic.Value pdServerConfig atomic.Value @@ -55,8 +62,7 @@ func NewPersistOptions(cfg *Config) *PersistOptions { o.replicationMode.Store(&cfg.ReplicationMode) o.labelProperty.Store(cfg.LabelProperty) o.SetClusterVersion(&cfg.ClusterVersion) - o.ttl = make(map[string]*cache.TTLString, 6) - o.ttlCancel = make(map[string]context.CancelFunc, 6) + o.ttl = nil return o } @@ -159,42 +165,37 @@ func (o *PersistOptions) SetMaxReplicas(replicas int) { o.SetReplicationConfig(v) } +const ( + maxSnapshotCountKey = "schedule.max-snapshot-count" + maxMergeRegionSizeKey = "schedule.max-merge-region-size" + maxPendingPeerCountKey = "schedule.max-pending-peer-count" + maxMergeRegionKeysKey = "schedule.max-merge-region-keys" + leaderScheduleLimitKey = "schedule.leader-schedule-limit" + regionScheduleLimitKey = "schedule.region-schedule-limit" + replicaRescheduleLimitKey = "schedule.replica-schedule-limit" + mergeScheduleLimitKey = "schedule.merge-schedule-limit" + hotRegionScheduleLimitKey = "schedule.hot-region-schedule-limit" + schedulerMaxWaitingOperatorKey = "schedule.scheduler-max-waiting-operator" +) + // GetMaxSnapshotCount returns the number of the max snapshot which is allowed to send. func (o *PersistOptions) GetMaxSnapshotCount() uint64 { - if v, ok := o.getTTLData("schedule.max-snapshot-count"); ok { - r, ok := v.(float64) - if ok { - return uint64(r) - } - } - return o.GetScheduleConfig().MaxSnapshotCount + return o.getTTLUintOr(maxSnapshotCountKey, o.GetScheduleConfig().MaxSnapshotCount) } // GetMaxPendingPeerCount returns the number of the max pending peers. func (o *PersistOptions) GetMaxPendingPeerCount() uint64 { - return o.GetScheduleConfig().MaxPendingPeerCount + return o.getTTLUintOr(maxPendingPeerCountKey, o.GetScheduleConfig().MaxPendingPeerCount) } // GetMaxMergeRegionSize returns the max region size. func (o *PersistOptions) GetMaxMergeRegionSize() uint64 { - if v, ok := o.getTTLData("schedule.max-merge-region-size"); ok { - r, ok := v.(float64) - if ok { - return uint64(r) - } - } - return o.GetScheduleConfig().MaxMergeRegionSize + return o.getTTLUintOr(maxMergeRegionSizeKey, o.GetScheduleConfig().MaxMergeRegionSize) } // GetMaxMergeRegionKeys returns the max number of keys. func (o *PersistOptions) GetMaxMergeRegionKeys() uint64 { - if v, ok := o.getTTLData("schedule.max-merge-region-keys"); ok { - r, ok := v.(float64) - if ok { - return uint64(r) - } - } - return o.GetScheduleConfig().MaxMergeRegionKeys + return o.getTTLUintOr(maxMergeRegionKeysKey, o.GetScheduleConfig().MaxMergeRegionKeys) } // GetSplitMergeInterval returns the interval between finishing split and starting to merge. @@ -277,64 +278,62 @@ func (o *PersistOptions) GetMaxStoreDownTime() time.Duration { // GetLeaderScheduleLimit returns the limit for leader schedule. func (o *PersistOptions) GetLeaderScheduleLimit() uint64 { - return o.GetScheduleConfig().LeaderScheduleLimit + return o.getTTLUintOr(leaderScheduleLimitKey, o.GetScheduleConfig().LeaderScheduleLimit) } // GetRegionScheduleLimit returns the limit for region schedule. func (o *PersistOptions) GetRegionScheduleLimit() uint64 { - return o.GetScheduleConfig().RegionScheduleLimit + return o.getTTLUintOr(regionScheduleLimitKey, o.GetScheduleConfig().RegionScheduleLimit) } // GetReplicaScheduleLimit returns the limit for replica schedule. func (o *PersistOptions) GetReplicaScheduleLimit() uint64 { - return o.GetScheduleConfig().ReplicaScheduleLimit + return o.getTTLUintOr(replicaRescheduleLimitKey, o.GetScheduleConfig().ReplicaScheduleLimit) } // GetMergeScheduleLimit returns the limit for merge schedule. func (o *PersistOptions) GetMergeScheduleLimit() uint64 { - return o.GetScheduleConfig().MergeScheduleLimit + return o.getTTLUintOr(mergeScheduleLimitKey, o.GetScheduleConfig().MergeScheduleLimit) } // GetHotRegionScheduleLimit returns the limit for hot region schedule. func (o *PersistOptions) GetHotRegionScheduleLimit() uint64 { - return o.GetScheduleConfig().HotRegionScheduleLimit + return o.getTTLUintOr(hotRegionScheduleLimitKey, o.GetScheduleConfig().HotRegionScheduleLimit) } // GetStoreLimit returns the limit of a store. func (o *PersistOptions) GetStoreLimit(storeID uint64) (returnSC StoreLimitConfig) { defer func() { - if v, ok := o.getTTLData(fmt.Sprintf("remove-peer-%v", storeID)); ok { - r, ok := v.(float64) - if ok { - returnSC.RemovePeer = r - } - } - if v, ok := o.getTTLData(fmt.Sprintf("add-peer-%v", storeID)); ok { - r, ok := v.(float64) - if ok { - returnSC.AddPeer = r - } - } + returnSC.RemovePeer = o.getTTLFloatOr(fmt.Sprintf("remove-peer-%v", storeID), returnSC.RemovePeer) + returnSC.AddPeer = o.getTTLFloatOr(fmt.Sprintf("add-peer-%v", storeID), returnSC.AddPeer) }() if limit, ok := o.GetScheduleConfig().StoreLimit[storeID]; ok { return limit } - v1, ok1 := o.getTTLData("default-add-peer") - v2, ok2 := o.getTTLData("default-remove-peer") cfg := o.GetScheduleConfig().Clone() sc := StoreLimitConfig{ AddPeer: DefaultStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer), RemovePeer: DefaultStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer), } - if ok1 || ok2 { - r, ok := v1.(float64) - if ok { - returnSC.AddPeer = r - } - r, ok = v2.(float64) - if ok { - returnSC.RemovePeer = r - } + v, ok1, err := o.getTTLFloat("default-add-peer") + if err != nil { + log.Warn("failed to parse default-add-peer from PersistOptions's ttl storage") + } + canSetAddPeer := ok1 && err == nil + if canSetAddPeer { + returnSC.AddPeer = v + } + + v, ok2, err := o.getTTLFloat("default-remove-peer") + if err != nil { + log.Warn("failed to parse default-remove-peer from PersistOptions's ttl storage") + } + canSetRemovePeer := ok2 && err == nil + if canSetRemovePeer { + returnSC.RemovePeer = v + } + + if canSetAddPeer || canSetRemovePeer { return returnSC } cfg.StoreLimit[storeID] = sc @@ -346,19 +345,9 @@ func (o *PersistOptions) GetStoreLimit(storeID uint64) (returnSC StoreLimitConfi func (o *PersistOptions) GetStoreLimitByType(storeID uint64, typ storelimit.Type) (returned float64) { defer func() { if typ == storelimit.RemovePeer { - if v, ok := o.getTTLData(fmt.Sprintf("remove-peer-%v", storeID)); ok { - r, ok := v.(float64) - if ok { - returned = r - } - } + returned = o.getTTLFloatOr(fmt.Sprintf("remove-peer-%v", storeID), returned) } else if typ == storelimit.AddPeer { - if v, ok := o.getTTLData(fmt.Sprintf("add-peer-%v", storeID)); ok { - r, ok := v.(float64) - if ok { - returned = r - } - } + returned = o.getTTLFloatOr(fmt.Sprintf("add-peer-%v", storeID), returned) } }() limit := o.GetStoreLimit(storeID) @@ -399,13 +388,7 @@ func (o *PersistOptions) GetHighSpaceRatio() float64 { // GetSchedulerMaxWaitingOperator returns the number of the max waiting operators. func (o *PersistOptions) GetSchedulerMaxWaitingOperator() uint64 { - if v, ok := o.getTTLData("schedule.scheduler-max-waiting-operator"); ok { - r, ok := v.(float64) - if ok { - return uint64(r) - } - } - return o.GetScheduleConfig().SchedulerMaxWaitingOperator + return o.getTTLUintOr(schedulerMaxWaitingOperatorKey, o.GetScheduleConfig().SchedulerMaxWaitingOperator) } // GetLeaderSchedulePolicy is to get leader schedule policy. @@ -456,10 +439,11 @@ func (o *PersistOptions) IsRemoveExtraReplicaEnabled() bool { // IsLocationReplacementEnabled returns if location replace is enabled. func (o *PersistOptions) IsLocationReplacementEnabled() bool { if v, ok := o.getTTLData("schedule.enable-location-replacement"); ok { - r, ok := v.(bool) - if ok { - return r + result, err := strconv.ParseBool(v) + if err == nil { + return result } + log.Warn("failed to parse schedule.enable-location-replacement from PersistOptions's ttl storage") } return o.GetScheduleConfig().EnableLocationReplacement } @@ -614,31 +598,99 @@ func (o *PersistOptions) CheckLabelProperty(typ string, labels []*metapb.StoreLa return false } +const ttlConfigPrefix = "/config/ttl" + // SetTTLData set temporary configuration -func (o *PersistOptions) SetTTLData(parCtx context.Context, key string, value interface{}, ttl time.Duration) { - if data, ok := o.ttl[key]; ok { - data.Clear() - o.ttlCancel[key]() +func (o *PersistOptions) SetTTLData(parCtx context.Context, client *clientv3.Client, key string, value string, ttl time.Duration) error { + if o.ttl == nil { + o.ttl = cache.NewStringTTL(parCtx, time.Second*5, time.Minute*5) } - ctx, cancel := context.WithCancel(parCtx) - o.ttl[key] = cache.NewStringTTL(ctx, 5*time.Second, ttl) - o.ttl[key].Put(key, value) - o.ttlCancel[key] = cancel + _, err := etcdutil.EtcdKVPutWithTTL(parCtx, client, ttlConfigPrefix+"/"+key, value, int64(ttl.Seconds())) + if err != nil { + return err + } + o.ttl.PutWithTTL(key, value, ttl) + return nil +} + +func (o *PersistOptions) getTTLUint(key string) (uint64, bool, error) { + stringForm, ok := o.getTTLData(key) + if !ok { + return 0, false, nil + } + r, err := strconv.ParseUint(stringForm, 10, 64) + return r, true, err +} + +func (o *PersistOptions) getTTLUintOr(key string, defaultValue uint64) uint64 { + if v, ok, err := o.getTTLUint(key); ok { + if err == nil { + return v + } + log.Warn("failed to parse " + key + " from PersistOptions's ttl storage") + } + return defaultValue +} + +func (o *PersistOptions) getTTLFloat(key string) (float64, bool, error) { + stringForm, ok := o.getTTLData(key) + if !ok { + return 0, false, nil + } + r, err := strconv.ParseFloat(stringForm, 64) + return r, true, err +} + +func (o *PersistOptions) getTTLFloatOr(key string, defaultValue float64) float64 { + if v, ok, err := o.getTTLFloat(key); ok { + if err == nil { + return v + } + log.Warn("failed to parse " + key + " from PersistOptions's ttl storage") + } + return defaultValue } -func (o *PersistOptions) getTTLData(key string) (interface{}, bool) { - if data, ok := o.ttl[key]; ok { - return data.Get(key) +func (o *PersistOptions) getTTLData(key string) (string, bool) { + if o.ttl == nil { + return "", false + } + if result, ok := o.ttl.Get(key); ok { + return result.(string), ok + } + return "", false +} + +// LoadTTLFromEtcd loads temporary configuration which was persisted into etcd +func (o *PersistOptions) LoadTTLFromEtcd(ctx context.Context, client *clientv3.Client) error { + resps, err := etcdutil.EtcdKVGet(client, ttlConfigPrefix, clientv3.WithPrefix()) + if err != nil { + return err + } + if o.ttl == nil { + o.ttl = cache.NewStringTTL(ctx, time.Second*5, time.Minute*5) } - return nil, false + for _, resp := range resps.Kvs { + key := string(resp.Key)[len(ttlConfigPrefix)+1:] + value := string(resp.Value) + leaseID := resp.Lease + resp, err := client.TimeToLive(ctx, clientv3.LeaseID(leaseID)) + if err != nil { + return err + } + o.ttl.PutWithTTL(key, value, time.Duration(resp.TTL)*time.Second) + } + return nil } // SetAllStoresLimitTTL sets all store limit for a given type and rate with ttl. -func (o *PersistOptions) SetAllStoresLimitTTL(ctx context.Context, typ storelimit.Type, ratePerMin float64, ttl time.Duration) { +func (o *PersistOptions) SetAllStoresLimitTTL(ctx context.Context, client *clientv3.Client, typ storelimit.Type, ratePerMin float64, ttl time.Duration) error { + var err error switch typ { case storelimit.AddPeer: - o.SetTTLData(ctx, "default-add-peer", ratePerMin, ttl) + err = o.SetTTLData(ctx, client, "default-add-peer", fmt.Sprint(ratePerMin), ttl) case storelimit.RemovePeer: - o.SetTTLData(ctx, "default-remove-peer", ratePerMin, ttl) + err = o.SetTTLData(ctx, client, "default-remove-peer", fmt.Sprint(ratePerMin), ttl) } + return err } diff --git a/server/handler.go b/server/handler.go index ed17f2deae8..3346ca81ab0 100644 --- a/server/handler.go +++ b/server/handler.go @@ -904,8 +904,8 @@ func (h *Handler) GetAddr() string { } // SetStoreLimitTTL set storeLimit with ttl -func (h *Handler) SetStoreLimitTTL(data string, value float64, ttl time.Duration) { - h.s.SaveTTLConfig(map[string]interface{}{ +func (h *Handler) SetStoreLimitTTL(data string, value float64, ttl time.Duration) error { + return h.s.SaveTTLConfig(map[string]interface{}{ data: value, }, ttl) } diff --git a/server/schedule/region_splitter_test.go b/server/schedule/region_splitter_test.go new file mode 100644 index 00000000000..2814b566f02 --- /dev/null +++ b/server/schedule/region_splitter_test.go @@ -0,0 +1,123 @@ +// Copyright 2020 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedule + +import ( + "bytes" + "context" + + . "github.com/pingcap/check" + "github.com/tikv/pd/pkg/mock/mockcluster" + "github.com/tikv/pd/server/config" + "github.com/tikv/pd/server/core" +) + +type mockSplitRegionsHandler struct { + // regionID -> startKey, endKey + regions map[uint64][2][]byte +} + +func newMockSplitRegionsHandler() *mockSplitRegionsHandler { + return &mockSplitRegionsHandler{ + regions: map[uint64][2][]byte{}, + } +} + +// SplitRegionByKeys mock SplitRegionsHandler +func (m *mockSplitRegionsHandler) SplitRegionByKeys(region *core.RegionInfo, splitKeys [][]byte) error { + m.regions[region.GetID()] = [2][]byte{ + region.GetStartKey(), + region.GetEndKey(), + } + return nil +} + +// WatchRegionsByKeyRange mock SplitRegionsHandler +func (m *mockSplitRegionsHandler) ScanRegionsByKeyRange(groupKeys *regionGroupKeys, results *splitKeyResults) { + splitKeys := groupKeys.keys + startKey, endKey := groupKeys.region.GetStartKey(), groupKeys.region.GetEndKey() + for regionID, keyRange := range m.regions { + if bytes.Equal(startKey, keyRange[0]) && bytes.Equal(endKey, keyRange[1]) { + regions := make(map[uint64][]byte) + for i := 0; i < len(splitKeys); i++ { + regions[regionID+uint64(i)+1000] = splitKeys[i] + } + results.addRegionsID(regions) + } + } + groupKeys.finished = true +} + +var _ = Suite(&testRegionSplitterSuite{}) + +type testRegionSplitterSuite struct{} + +func (s *testRegionSplitterSuite) TestRegionSplitter(c *C) { + ctx := context.Background() + opt := config.NewTestOptions() + opt.SetPlacementRuleEnabled(false) + tc := mockcluster.NewCluster(opt) + handler := newMockSplitRegionsHandler() + tc.AddLeaderRegionWithRange(1, "eee", "hhh", 2, 3, 4) + splitter := NewRegionSplitter(tc, handler) + newRegions := map[uint64]struct{}{} + // assert success + failureKeys := splitter.splitRegionsByKeys(ctx, [][]byte{[]byte("fff"), []byte("ggg")}, newRegions) + c.Assert(len(failureKeys), Equals, 0) + c.Assert(len(newRegions), Equals, 2) + + percentage, newRegionsID := splitter.SplitRegions(ctx, [][]byte{[]byte("fff"), []byte("ggg")}, 1) + c.Assert(percentage, Equals, 100) + c.Assert(len(newRegionsID), Equals, 2) + // assert out of range + newRegions = map[uint64]struct{}{} + failureKeys = splitter.splitRegionsByKeys(ctx, [][]byte{[]byte("aaa"), []byte("bbb")}, newRegions) + c.Assert(len(failureKeys), Equals, 2) + c.Assert(len(newRegions), Equals, 0) + + percentage, newRegionsID = splitter.SplitRegions(ctx, [][]byte{[]byte("aaa"), []byte("bbb")}, 1) + c.Assert(percentage, Equals, 0) + c.Assert(len(newRegionsID), Equals, 0) +} + +func (s *testRegionSplitterSuite) TestGroupKeysByRegion(c *C) { + opt := config.NewTestOptions() + opt.SetPlacementRuleEnabled(false) + tc := mockcluster.NewCluster(opt) + handler := newMockSplitRegionsHandler() + tc.AddLeaderRegionWithRange(1, "aaa", "ccc", 2, 3, 4) + tc.AddLeaderRegionWithRange(2, "ccc", "eee", 2, 3, 4) + tc.AddLeaderRegionWithRange(3, "fff", "ggg", 2, 3, 4) + splitter := NewRegionSplitter(tc, handler) + groupKeys := splitter.groupKeysByRegion([][]byte{ + []byte("bbb"), + []byte("ddd"), + []byte("fff"), + []byte("zzz"), + }) + c.Assert(len(groupKeys), Equals, 3) + for k, v := range groupKeys { + switch k { + case uint64(1): + c.Assert(len(v.keys), Equals, 1) + c.Assert(v.keys[0], DeepEquals, []byte("bbb")) + case uint64(2): + c.Assert(len(v.keys), Equals, 1) + c.Assert(v.keys[0], DeepEquals, []byte("ddd")) + case uint64(3): + c.Assert(len(v.keys), Equals, 1) + c.Assert(v.keys[0], DeepEquals, []byte("fff")) + } + } +} diff --git a/server/server.go b/server/server.go index 5f5284e2a08..61e89c82af7 100644 --- a/server/server.go +++ b/server/server.go @@ -454,7 +454,6 @@ func (s *Server) Run() error { if err := s.startEtcd(s.ctx); err != nil { return err } - if err := s.startServer(s.ctx); err != nil { return err } @@ -1149,7 +1148,10 @@ func (s *Server) campaignLeader() { return } defer s.stopRaftCluster() - + if err := s.persistOptions.LoadTTLFromEtcd(s.ctx, s.client); err != nil { + log.Error("failed to load persistOptions from etcd", errs.ZapError(err)) + return + } s.member.EnableLeader() defer s.member.DisableLeader() @@ -1253,8 +1255,11 @@ func (s *Server) PersistFile(name string, data []byte) error { } // SaveTTLConfig save ttl config -func (s *Server) SaveTTLConfig(data map[string]interface{}, ttl time.Duration) { +func (s *Server) SaveTTLConfig(data map[string]interface{}, ttl time.Duration) error { for k, v := range data { - s.persistOptions.SetTTLData(s.ctx, k, v, ttl) + if err := s.persistOptions.SetTTLData(s.ctx, s.client, k, fmt.Sprint(v), ttl); err != nil { + return err + } } + return nil } diff --git a/tests/client/client_test.go b/tests/client/client_test.go index 65eb785156c..b1f1cb0942e 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -14,7 +14,10 @@ package client_test import ( + "bytes" "context" + "encoding/json" + "fmt" "math" "path/filepath" "sort" @@ -790,3 +793,65 @@ func (s *testClientSuite) TestScatterRegionWithOption(c *C) { }) c.Succeed() } + +type testConfigTTLSuite struct { + ctx context.Context + cancel context.CancelFunc +} + +func (s *testConfigTTLSuite) SetUpSuite(c *C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) + server.EnableZap = true +} + +func (s *testConfigTTLSuite) TearDownSuite(c *C) { + s.cancel() +} + +var _ = SerialSuites(&testConfigTTLSuite{}) + +var ttlConfig = map[string]interface{}{ + "schedule.max-snapshot-count": 999, + "schedule.enable-location-replacement": false, + "schedule.max-merge-region-size": 999, + "schedule.max-merge-region-keys": 999, + "schedule.scheduler-max-waiting-operator": 999, + "schedule.leader-schedule-limit": 999, + "schedule.region-schedule-limit": 999, + "schedule.hot-region-schedule-limit": 999, + "schedule.replica-schedule-limit": 999, + "schedule.merge-schedule-limit": 999, +} + +func assertTTLConfig(c *C, options *config.PersistOptions, checker Checker) { + c.Assert(options.GetMaxSnapshotCount(), checker, uint64(999)) + c.Assert(options.IsLocationReplacementEnabled(), checker, false) + c.Assert(options.GetMaxMergeRegionSize(), checker, uint64(999)) + c.Assert(options.GetMaxMergeRegionKeys(), checker, uint64(999)) + c.Assert(options.GetSchedulerMaxWaitingOperator(), checker, uint64(999)) + c.Assert(options.GetLeaderScheduleLimit(), checker, uint64(999)) + c.Assert(options.GetRegionScheduleLimit(), checker, uint64(999)) + c.Assert(options.GetHotRegionScheduleLimit(), checker, uint64(999)) + c.Assert(options.GetReplicaScheduleLimit(), checker, uint64(999)) + c.Assert(options.GetMergeScheduleLimit(), checker, uint64(999)) +} + +func (s *testConfigTTLSuite) TestConfigTTLAfterTransferLeader(c *C) { + cluster, err := tests.NewTestCluster(s.ctx, 3) + c.Assert(err, IsNil) + defer cluster.Destroy() + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + leader := cluster.GetServer(cluster.WaitLeader()) + c.Assert(leader.BootstrapCluster(), IsNil) + addr := fmt.Sprintf("%s/pd/api/v1/config?ttlSecond=5", leader.GetAddr()) + postData, err := json.Marshal(ttlConfig) + c.Assert(err, IsNil) + _, err = leader.GetHTTPClient().Post(addr, "application/json", bytes.NewBuffer(postData)) + c.Assert(err, IsNil) + time.Sleep(2 * time.Second) + _ = leader.Destroy() + time.Sleep(2 * time.Second) + leader = cluster.GetServer(cluster.WaitLeader()) + assertTTLConfig(c, leader.GetPersistOptions(), Equals) +}