Skip to content

Commit

Permalink
Merge branch 'release-5.0' into cherry-pick-4175-to-release-5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored Nov 30, 2021
2 parents 0cd6117 + 581ee38 commit b1ff18b
Show file tree
Hide file tree
Showing 15 changed files with 189 additions and 44 deletions.
8 changes: 4 additions & 4 deletions metrics/alertmanager/pd.rules.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ groups:
summary: PD_pending_peer_region_count

- alert: PD_leader_change
expr: count( changes(pd_server_tso{type="save"}[10m]) > 0 ) >= 2
expr: count( changes(pd_tso_events{type="save"}[10m]) > 0 ) >= 2
for: 1m
labels:
env: ENV_LABELS_ENV
level: warning
expr: count( changes(pd_server_tso{type="save"}[10m]) > 0 ) >= 2
expr: count( changes(pd_tso_events{type="save"}[10m]) > 0 ) >= 2
annotations:
description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values:{{ $value }}'
value: '{{ $value }}'
Expand All @@ -146,12 +146,12 @@ groups:
summary: TiKV_space_used_more_than_80%

- alert: PD_system_time_slow
expr: changes(pd_server_tso{type="system_time_slow"}[10m]) >= 1
expr: changes(pd_tso_events{type="system_time_slow"}[10m]) >= 1
for: 1m
labels:
env: ENV_LABELS_ENV
level: warning
expr: changes(pd_server_tso{type="system_time_slow"}[10m]) >= 1
expr: changes(pd_tso_events{type="system_time_slow"}[10m]) >= 1
annotations:
description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}'
value: '{{ $value }}'
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,8 @@ func (c *RaftCluster) Stop() {
}

c.running = false
close(c.quit)
c.coordinator.stop()
close(c.quit)
c.Unlock()
c.wg.Wait()
log.Info("raftcluster is stopped")
Expand Down
6 changes: 6 additions & 0 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,12 @@ func (s *scheduleController) Stop() {

func (s *scheduleController) Schedule() []*operator.Operator {
for i := 0; i < maxScheduleRetries; i++ {
// no need to retry if schedule should stop to speed exit
select {
case <-s.ctx.Done():
return nil
default:
}
// If we have schedule, reset interval to the minimal interval.
if op := s.Scheduler.Schedule(s.cluster); op != nil {
s.nextInterval = s.Scheduler.GetMinInterval()
Expand Down
41 changes: 41 additions & 0 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,47 @@ func (s *testOperatorControllerSuite) TestStoreOverloadedWithReplace(c *C) {
c.Assert(lb.Schedule(tc), NotNil)
}

func (s *testOperatorControllerSuite) TestDownStoreLimit(c *C) {
tc, co, cleanup := prepare(nil, nil, nil, c)
defer cleanup()
oc := co.opController
rc := co.checkers.GetRuleChecker()

tc.addRegionStore(1, 100)
tc.addRegionStore(2, 100)
tc.addRegionStore(3, 100)
tc.addLeaderRegion(1, 1, 2, 3)

region := tc.GetRegion(1)
tc.setStoreDown(1)
tc.SetStoreLimit(1, storelimit.RemovePeer, 1)

region = region.Clone(core.WithDownPeers([]*pdpb.PeerStats{
{
Peer: region.GetStorePeer(1),
DownSeconds: 24 * 60 * 60,
},
}), core.SetApproximateSize(1))
tc.putRegion(region)
for i := uint64(1); i < 20; i++ {
tc.addRegionStore(i+3, 100)
op := rc.Check(region)
c.Assert(op, NotNil)
c.Assert(oc.AddOperator(op), IsTrue)
oc.RemoveOperator(op)
}

region = region.Clone(core.SetApproximateSize(100))
tc.putRegion(region)
for i := uint64(20); i < 25; i++ {
tc.addRegionStore(i+3, 100)
op := rc.Check(region)
c.Assert(op, NotNil)
c.Assert(oc.AddOperator(op), IsTrue)
oc.RemoveOperator(op)
}
}

var _ = Suite(&testScheduleControllerSuite{})

type testScheduleControllerSuite struct {
Expand Down
79 changes: 48 additions & 31 deletions server/encryptionkm/key_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,6 @@ func (m *KeyManager) keysRevision() int64 {
func (m *KeyManager) StartBackgroundLoop(ctx context.Context) {
// Setup key dictionary watcher
watcher := clientv3.NewWatcher(m.etcdClient)
watchChan := watcher.Watch(ctx, EncryptionKeysPath, clientv3.WithRev(m.keysRevision()))
watcherEnabled := true
defer watcher.Close()
// Check data key rotation every min(dataKeyRotationPeriod, keyRotationCheckPeriod).
checkPeriod := m.dataKeyRotationPeriod
Expand All @@ -216,54 +214,73 @@ func (m *KeyManager) StartBackgroundLoop(ctx context.Context) {
}
ticker := time.NewTicker(checkPeriod)
defer ticker.Stop()
// Loop

for {
select {
// Reload encryption keys updated by PD leader (could be ourselves).
case resp := <-watchChan:
if resp.Canceled {
// If the watcher failed, we fallback to reload every 10 minutes.
log.Warn("encryption key watcher canceled")
watcherEnabled = false
continue
}
for _, event := range resp.Events {
if event.Type != mvccpb.PUT {
log.Warn("encryption keys is deleted unexpectedly")
continue
var (
resp clientv3.WatchResponse
ok bool
)
rch := watcher.Watch(ctx, EncryptionKeysPath, clientv3.WithRev(m.keysRevision()))

keyWatchLoop:
for {
select {
case resp, ok = <-rch:
if !ok || resp.Err() != nil {
// If chan is closed or canceled, exit watch loop
// Ref https://etcd.io/docs/v3.4/learning/api/#watch-streams
break keyWatchLoop
}
_, err := m.loadKeysFromKV(event.Kv)
if err != nil {
log.Warn("fail to get encryption keys from watcher result", zap.Error(err))
for _, event := range resp.Events {
if event.Type != mvccpb.PUT {
log.Warn("encryption keys is deleted unexpectedly")
continue
}
_, err := m.loadKeysFromKV(event.Kv)
if err != nil {
log.Warn("fail to get encryption keys from watcher result", errs.ZapError(err))
}
}
m.helper.eventAfterReloadByWatcher()
case <-m.helper.tick(ticker):
m.checkOnTick()
m.helper.eventAfterTicker()
}
m.helper.eventAfterReloadByWatcher()
case <-m.helper.tick(ticker):
m.checkOnTick(watcherEnabled)
m.helper.eventAfterTicker()
}

select {
case <-ctx.Done():
// Server shutdown.
return
default:
}

if resp.CompactRevision != 0 {
// meet compacted error
log.Warn("revision has been compacted, the watcher will watch again",
zap.Int64("revision", m.keysRevision()),
zap.Int64("compact-revision", resp.CompactRevision))
} else {
// other error
log.Error("encryption key watcher canceled, the watcher will watch again",
errs.ZapError(errs.ErrEncryptionKeysWatcher, resp.Err()))
}

if _, err := m.loadKeys(); err != nil {
log.Error("encryption key reload failed", errs.ZapError(err))
}
}
}

// checkOnTick perform key rotation and key reload on timer tick, if necessary.
func (m *KeyManager) checkOnTick(watcherEnabled bool) {
func (m *KeyManager) checkOnTick() {
m.mu.Lock()
defer m.mu.Unlock()
// Check data key rotation in case we are the PD leader.
err := m.rotateKeyIfNeeded(false /*forceUpdate*/)
if err != nil {
log.Warn("fail to rotate data encryption key", zap.Error(err))
}
// Fallback mechanism to reload keys if watcher failed.
if !watcherEnabled {
_, err = m.loadKeysImpl()
if err != nil {
log.Warn("fail to reload keys after watcher failed", zap.Error(err))
}
}
}

// loadKeysFromKVImpl reload keys from etcd result.
Expand Down
2 changes: 1 addition & 1 deletion server/encryptionkm/key_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestKeyManager(t *testing.T) {

type testKeyManagerSuite struct{}

var _ = Suite(&testKeyManagerSuite{})
var _ = SerialSuites(&testKeyManagerSuite{})

const (
testMasterKey = "8fd7e3e917c170d92f3e51a981dd7bc8fba11f3df7d8df994842f6e86f69b530"
Expand Down
6 changes: 5 additions & 1 deletion server/schedule/checker/replica_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,12 @@ func (s *ReplicaStrategy) SelectStoreToFix(coLocationStores []*core.StoreInfo, o
func (s *ReplicaStrategy) SelectStoreToImprove(coLocationStores []*core.StoreInfo, old uint64) uint64 {
// trick to avoid creating a slice with `old` removed.
s.swapStoreToFirst(coLocationStores, old)
oldStore := s.cluster.GetStore(old)
if oldStore == nil {
return 0
}
filters := []filter.Filter{
filter.NewLocationImprover(s.checkerName, s.locationLabels, coLocationStores, s.cluster.GetStore(old)),
filter.NewLocationImprover(s.checkerName, s.locationLabels, coLocationStores, oldStore),
}
if len(s.locationLabels) > 0 && s.isolationLevel != "" {
filters = append(filters, filter.NewIsolationFilter(s.checkerName, s.isolationLevel, s.locationLabels, coLocationStores[1:]))
Expand Down
5 changes: 5 additions & 0 deletions server/schedule/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ func (c *CheckerController) GetMergeChecker() *checker.MergeChecker {
return c.mergeChecker
}

// GetRuleChecker returns the rule checker.
func (c *CheckerController) GetRuleChecker() *checker.RuleChecker {
return c.ruleChecker
}

// GetWaitingRegions returns the regions in the waiting list.
func (c *CheckerController) GetWaitingRegions() []*cache.Item {
return c.regionWaitingList.Elems()
Expand Down
12 changes: 9 additions & 3 deletions server/schedule/operator/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,9 +663,15 @@ func (b *Builder) execAddPeer(peer *metapb.Peer) {
}

func (b *Builder) execRemovePeer(peer *metapb.Peer) {
b.steps = append(b.steps, RemovePeer{FromStore: peer.GetStoreId(), PeerID: peer.GetId()})
delete(b.currentPeers, peer.GetStoreId())
delete(b.toRemove, peer.GetStoreId())
removeStoreID := peer.GetStoreId()
var isDownStore bool
store := b.cluster.GetStore(removeStoreID)
if store != nil {
isDownStore = store.DownTime() > b.cluster.GetOpts().GetMaxStoreDownTime()
}
b.steps = append(b.steps, RemovePeer{FromStore: removeStoreID, PeerID: peer.GetId(), IsDownStore: isDownStore})
delete(b.currentPeers, removeStoreID)
delete(b.toRemove, removeStoreID)
}

func (b *Builder) execChangePeerV2(needEnter bool, needTransferLeader bool) {
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/operator/influence.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (s *StoreInfluence) addStepCost(limitType storelimit.Type, cost int64) {
func (s *StoreInfluence) AdjustStepCost(limitType storelimit.Type, regionSize int64) {
if regionSize > storelimit.SmallRegionThreshold {
s.addStepCost(limitType, storelimit.RegionInfluence[limitType])
} else if regionSize <= storelimit.SmallRegionThreshold && regionSize > core.EmptyRegionApproximateSize {
} else if regionSize > core.EmptyRegionApproximateSize {
s.addStepCost(limitType, storelimit.SmallRegionInfluence[limitType])
}
}
5 changes: 5 additions & 0 deletions server/schedule/operator/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ func (pl PromoteLearner) Influence(opInfluence OpInfluence, region *core.RegionI
// RemovePeer is an OpStep that removes a region peer.
type RemovePeer struct {
FromStore, PeerID uint64
IsDownStore bool
}

// ConfVerChanged returns the delta value for version increased by this step.
Expand Down Expand Up @@ -259,6 +260,10 @@ func (rp RemovePeer) Influence(opInfluence OpInfluence, region *core.RegionInfo)
regionSize := region.GetApproximateSize()
from.RegionSize -= regionSize
from.RegionCount--

if rp.IsDownStore && regionSize > storelimit.SmallRegionThreshold {
regionSize = storelimit.SmallRegionThreshold
}
from.AdjustStepCost(storelimit.RemovePeer, regionSize)
}

Expand Down
9 changes: 9 additions & 0 deletions server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
// Group peers by the engine of their stores
for _, peer := range region.GetPeers() {
store := r.cluster.GetStore(peer.GetStoreId())
if store == nil {
return nil
}
if ordinaryFilter.Target(r.cluster.GetOpts(), store) {
ordinaryPeers[peer.GetId()] = peer
} else {
Expand Down Expand Up @@ -407,6 +410,9 @@ func (r *RegionScatterer) selectAvailableLeaderStores(group string, peers map[ui
leaderCandidateStores := make([]uint64, 0)
for storeID := range peers {
store := r.cluster.GetStore(storeID)
if store == nil {
return 0
}
engine := store.GetLabelValue(filter.EngineKey)
if len(engine) < 1 {
leaderCandidateStores = append(leaderCandidateStores, storeID)
Expand All @@ -431,6 +437,9 @@ func (r *RegionScatterer) Put(peers map[uint64]*metapb.Peer, leaderStoreID uint6
for _, peer := range peers {
storeID := peer.GetStoreId()
store := r.cluster.GetStore(storeID)
if store == nil {
continue
}
if ordinaryFilter.Target(r.cluster.GetOpts(), store) {
r.ordinaryEngine.selectedPeer.Put(storeID, group)
scatterDistributionCounter.WithLabelValues(
Expand Down
6 changes: 5 additions & 1 deletion server/schedulers/shuffle_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,11 @@ func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster opt.Cluster) (*core.
}

func (s *shuffleRegionScheduler) scheduleAddPeer(cluster opt.Cluster, region *core.RegionInfo, oldPeer *metapb.Peer) *metapb.Peer {
scoreGuard := filter.NewPlacementSafeguard(s.GetName(), cluster, region, cluster.GetStore(oldPeer.GetStoreId()))
store := cluster.GetStore(oldPeer.GetStoreId())
if store == nil {
return nil
}
scoreGuard := filter.NewPlacementSafeguard(s.GetName(), cluster, region, store)
excludedFilter := filter.NewExcludedFilter(s.GetName(), nil, region.GetStoreIds())

target := filter.NewCandidates(cluster.GetStores()).
Expand Down
2 changes: 1 addition & 1 deletion server/statistics/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (s *StoresStats) GetStoresLoads() map[uint64][]float64 {

func (s *StoresStats) storeIsUnhealthy(cluster core.StoreSetInformer, storeID uint64) bool {
store := cluster.GetStore(storeID)
return store.IsTombstone() || store.IsUnhealthy() || store.IsPhysicallyDestroyed()
return store == nil || store.IsTombstone() || store.IsUnhealthy() || store.IsPhysicallyDestroyed()
}

// FilterUnhealthyStore filter unhealthy store
Expand Down
48 changes: 48 additions & 0 deletions server/statistics/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2021 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package statistics

import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/server/core"
)

var _ = Suite(&testStoreSuite{})

type testStoreSuite struct{}

func (s *testStoreSuite) TestFilterUnhealtyStore(c *C) {
stats := NewStoresStats()
cluster := core.NewBasicCluster()
for i := uint64(1); i <= 5; i++ {
cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: i}, core.SetLastHeartbeatTS(time.Now())))
stats.Observe(i, &pdpb.StoreStats{})
}
c.Assert(stats.GetStoresLoads(), HasLen, 5)

cluster.PutStore(cluster.GetStore(1).Clone(core.SetLastHeartbeatTS(time.Now().Add(-24 * time.Hour))))
cluster.PutStore(cluster.GetStore(2).Clone(core.TombstoneStore()))
cluster.DeleteStore(cluster.GetStore(3))

stats.FilterUnhealthyStore(cluster)
loads := stats.GetStoresLoads()
c.Assert(loads, HasLen, 2)
c.Assert(loads[4], NotNil)
c.Assert(loads[5], NotNil)
}

0 comments on commit b1ff18b

Please sign in to comment.