Skip to content

Commit

Permalink
cherry pick #2946 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
disksing authored and rleungx committed Oct 26, 2020
1 parent 775b6a5 commit 93ad898
Show file tree
Hide file tree
Showing 18 changed files with 187 additions and 85 deletions.
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ github.com/pingcap/log v0.0.0-20200511115504-543df19646ad h1:SveG82rmu/GFxYanffx
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/parser v0.0.0-20190506092653-e336082eb825/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/parser v0.0.0-20200422082501-7329d80eaf2c/go.mod h1:9v0Edh8IbgjGYW2ArJr19E+bvL8zKahsFp+ixWeId+4=
github.com/pingcap/pd v2.1.5+incompatible h1:vOLV2tSQdRjjmxaTXtJULoC94dYQOd+6fzn2yChODHc=
github.com/pingcap/pd v2.1.5+incompatible/go.mod h1:nD3+EoYes4+aNNODO99ES59V83MZSI+dFbhyr667a0E=
github.com/pingcap/pd/v4 v4.0.0-rc.1.0.20200422143320-428acd53eba2/go.mod h1:s+utZtXDznOiL24VK0qGmtoHjjXNsscJx3m1n8cC56s=
github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI=
Expand Down
10 changes: 8 additions & 2 deletions pkg/mock/mockoption/mockoption.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,13 @@ func (mso *ScheduleOptions) GetKeyType() core.KeyType {
return core.StringToKeyType(mso.KeyType)
}

// CheckLabelProperty mocks method
// CheckLabelProperty mocks method. It checks if there is any label
// has the same key as typ.
func (mso *ScheduleOptions) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool {
return true
for _, l := range labels {
if l.Key == typ {
return true
}
}
return false
}
2 changes: 1 addition & 1 deletion plugin/scheduler_example/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ type evictLeaderScheduler struct {
// out of a store.
func newEvictLeaderScheduler(opController *schedule.OperatorController, conf *evictLeaderSchedulerConfig) schedule.Scheduler {
filters := []filter.Filter{
filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true},
&filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true},
}

base := schedulers.NewBaseScheduler(opController)
Expand Down
6 changes: 3 additions & 3 deletions server/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (c *RuleChecker) allowLeader(fit *placement.RegionFit, peer *metapb.Peer) b
if s == nil {
return false
}
stateFilter := filter.StoreStateFilter{ActionScope: "rule-checker", TransferLeader: true}
stateFilter := &filter.StoreStateFilter{ActionScope: "rule-checker", TransferLeader: true}
if !stateFilter.Target(c.cluster, s) {
return false
}
Expand All @@ -176,7 +176,7 @@ func (c *RuleChecker) fixBetterLocation(region *core.RegionInfo, fit *placement.
return nil, nil
}
stores := getRuleFitStores(c.cluster, rf)
s := selector.NewReplicaSelector(stores, rf.Rule.LocationLabels, filter.StoreStateFilter{ActionScope: "rule-checker", MoveRegion: true})
s := selector.NewReplicaSelector(stores, rf.Rule.LocationLabels, &filter.StoreStateFilter{ActionScope: "rule-checker", MoveRegion: true})
oldPeerStore := s.SelectSource(c.cluster, stores)
if oldPeerStore == nil {
return nil, nil
Expand Down Expand Up @@ -249,7 +249,7 @@ func (c *RuleChecker) isOfflinePeer(region *core.RegionInfo, peer *metapb.Peer)
// SelectStoreToAddPeerByRule selects a store to add peer in order to fit the placement rule.
func SelectStoreToAddPeerByRule(scope string, cluster opt.Cluster, region *core.RegionInfo, rf *placement.RuleFit, filters ...filter.Filter) *core.StoreInfo {
fs := []filter.Filter{
filter.StoreStateFilter{ActionScope: scope, MoveRegion: true},
&filter.StoreStateFilter{ActionScope: scope, MoveRegion: true},
filter.NewStorageThresholdFilter(scope),
filter.NewLabelConstaintFilter(scope, rf.Rule.LabelConstraints),
filter.NewExcludedFilter(scope, nil, region.GetStoreIds()),
Expand Down
161 changes: 114 additions & 47 deletions server/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,76 +361,143 @@ type StoreStateFilter struct {
TransferLeader bool
// Set true if the schedule involves any move region operation.
MoveRegion bool
// Reason is used to distinguish the reason of store state filter
Reason string
}

// Scope returns the scheduler or the checker which the filter acts on.
func (f StoreStateFilter) Scope() string {
func (f *StoreStateFilter) Scope() string {
return f.ActionScope
}

// Type returns the type of the Filter.
func (f StoreStateFilter) Type() string {
return "store-state-filter"
func (f *StoreStateFilter) Type() string {
return fmt.Sprintf("store-state-%s-filter", f.Reason)
}

// Source returns true when the store can be selected as the schedule
// source.
func (f StoreStateFilter) Source(opt opt.Options, store *core.StoreInfo) bool {
if store.IsTombstone() ||
store.DownTime() > opt.GetMaxStoreDownTime() {
return false
}
if f.TransferLeader && (store.IsDisconnected() || store.IsBlocked()) {
return false
}
// conditionFunc defines condition to determine a store should be selected.
// It should consider if the filter allows temporary states.
type conditionFunc func(opt.Options, *core.StoreInfo) bool

if f.MoveRegion && !f.filterMoveRegion(opt, true, store) {
return false
}
return true
func (f *StoreStateFilter) isTombstone(opt opt.Options, store *core.StoreInfo) bool {
f.Reason = "tombstone"
return store.IsTombstone()
}

// Target returns true when the store can be selected as the schedule
// target.
func (f StoreStateFilter) Target(opts opt.Options, store *core.StoreInfo) bool {
if store.IsTombstone() ||
store.IsOffline() ||
store.DownTime() > opts.GetMaxStoreDownTime() {
return false
}
if f.TransferLeader &&
(store.IsDisconnected() ||
store.IsBlocked() ||
store.IsBusy() ||
opts.CheckLabelProperty(opt.RejectLeader, store.GetLabels())) {
return false
}
func (f *StoreStateFilter) isDown(opt opt.Options, store *core.StoreInfo) bool {
f.Reason = "down"
return store.DownTime() > opt.GetMaxStoreDownTime()
}

if f.MoveRegion {
// only target consider the pending peers because pending more means the disk is slower.
if opts.GetMaxPendingPeerCount() > 0 && store.GetPendingPeerCount() > int(opts.GetMaxPendingPeerCount()) {
return false
}
func (f *StoreStateFilter) isOffline(opt opt.Options, store *core.StoreInfo) bool {
f.Reason = "offline"
return store.IsOffline()
}

if !f.filterMoveRegion(opts, false, store) {
return false
func (f *StoreStateFilter) isBlocked(opt opt.Options, store *core.StoreInfo) bool {
f.Reason = "blocked"
return store.IsBlocked()
}

func (f *StoreStateFilter) isDisconnected(opt opt.Options, store *core.StoreInfo) bool {
f.Reason = "disconnected"
return store.IsDisconnected()
}

func (f *StoreStateFilter) isBusy(opt opt.Options, store *core.StoreInfo) bool {
f.Reason = "busy"
return store.IsBusy()
}

func (f *StoreStateFilter) exceedRemoveLimit(opt opt.Options, store *core.StoreInfo) bool {
f.Reason = "exceed-remove-limit"
return !store.IsAvailable(storelimit.RemovePeer)
}

func (f *StoreStateFilter) exceedAddLimit(opt opt.Options, store *core.StoreInfo) bool {
f.Reason = "exceed-add-limit"
return !store.IsAvailable(storelimit.AddPeer)
}

func (f *StoreStateFilter) tooManySnapshots(opt opt.Options, store *core.StoreInfo) bool {
f.Reason = "too-many-snapshot"
return (uint64(store.GetSendingSnapCount()) > opt.GetMaxSnapshotCount() ||
uint64(store.GetReceivingSnapCount()) > opt.GetMaxSnapshotCount() ||
uint64(store.GetApplyingSnapCount()) > opt.GetMaxSnapshotCount())
}

func (f *StoreStateFilter) tooManyPendingPeers(opt opt.Options, store *core.StoreInfo) bool {
f.Reason = "too-many-pending-peer"
return opt.GetMaxPendingPeerCount() > 0 &&
store.GetPendingPeerCount() > int(opt.GetMaxPendingPeerCount())
}

func (f *StoreStateFilter) hasRejectLeaderProperty(opts opt.Options, store *core.StoreInfo) bool {
f.Reason = "reject-leader"
return opts.CheckLabelProperty(opt.RejectLeader, store.GetLabels())
}

// The condition table.
// Y: the condition is temporary (expected to become false soon).
// N: the condition is expected to be true for a long time.
// X means when the condition is true, the store CANNOT be selected.
//
// Condition Down Offline Tomb Block Disconn Busy RmLimit AddLimit Snap Pending Reject
// IsTemporary N N N N Y Y Y Y Y Y N
//
// LeaderSource X X X X
// RegionSource X X X
// LeaderTarget X X X X X X X
// RegionTarget X X X X X X X X

const (
leaderSource = iota
regionSource
leaderTarget
regionTarget
)

func (f *StoreStateFilter) anyConditionMatch(typ int, opt opt.Options, store *core.StoreInfo) bool {
var funcs []conditionFunc
switch typ {
case leaderSource:
funcs = []conditionFunc{f.isTombstone, f.isDown, f.isBlocked, f.isDisconnected}
case regionSource:
funcs = []conditionFunc{f.isBusy, f.exceedRemoveLimit, f.tooManySnapshots}
case leaderTarget:
funcs = []conditionFunc{f.isTombstone, f.isOffline, f.isDown, f.isBlocked,
f.isDisconnected, f.isBusy, f.hasRejectLeaderProperty}
case regionTarget:
funcs = []conditionFunc{f.isTombstone, f.isOffline, f.isDown, f.isDisconnected, f.isBusy,
f.exceedAddLimit, f.tooManySnapshots, f.tooManyPendingPeers}
}
for _, cf := range funcs {
if cf(opt, store) {
return true
}
}
return true
return false
}

func (f StoreStateFilter) filterMoveRegion(opt opt.Options, isSource bool, store *core.StoreInfo) bool {
if store.IsBusy() {
// Source returns true when the store can be selected as the schedule
// source.
func (f *StoreStateFilter) Source(opts opt.Options, store *core.StoreInfo) bool {
if f.TransferLeader && f.anyConditionMatch(leaderSource, opts, store) {
return false
}

if (isSource && !store.IsAvailable(storelimit.RemovePeer)) || (!isSource && !store.IsAvailable(storelimit.AddPeer)) {
if f.MoveRegion && f.anyConditionMatch(regionSource, opts, store) {
return false
}
return true
}

if uint64(store.GetSendingSnapCount()) > opt.GetMaxSnapshotCount() ||
uint64(store.GetReceivingSnapCount()) > opt.GetMaxSnapshotCount() ||
uint64(store.GetApplyingSnapCount()) > opt.GetMaxSnapshotCount() {
// Target returns true when the store can be selected as the schedule
// target.
func (f *StoreStateFilter) Target(opts opt.Options, store *core.StoreInfo) bool {
if f.TransferLeader && f.anyConditionMatch(leaderTarget, opts, store) {
return false
}
if f.MoveRegion && f.anyConditionMatch(regionTarget, opts, store) {
return false
}
return true
Expand Down
64 changes: 46 additions & 18 deletions server/schedule/filter/filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ package filter

import (
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/pkg/mock/mockoption"
"github.com/tikv/pd/server/core"
Expand Down Expand Up @@ -59,26 +61,52 @@ func (s *testFiltersSuite) TestLabelConstraintsFilter(c *C) {
c.Assert(filter2.Target(tc, store2), IsTrue)
}

func (s *testFiltersSuite) TestRuleFitFilter(c *C) {
func (s *testFiltersSuite) TestStoreStateFilter(c *C) {
filters := []Filter{
&StoreStateFilter{TransferLeader: true},
&StoreStateFilter{MoveRegion: true},
&StoreStateFilter{TransferLeader: true, MoveRegion: true},
}
opt := mockoption.NewScheduleOptions()
opt.EnablePlacementRules = true
opt.LocationLabels = []string{"zone"}
tc := mockcluster.NewCluster(opt)
tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"})
tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1"})
tc.AddLabelsStore(3, 1, map[string]string{"zone": "z2"})
tc.AddLabelsStore(4, 1, map[string]string{"zone": "z2"})
tc.AddLabelsStore(5, 1, map[string]string{"zone": "z3"})
region := core.NewRegionInfo(&metapb.Region{Peers: []*metapb.Peer{
{StoreId: 1, Id: 1},
{StoreId: 3, Id: 3},
{StoreId: 5, Id: 5},
}}, &metapb.Peer{StoreId: 1, Id: 1})
store := core.NewStoreInfoWithLabel(1, 0, map[string]string{})

type testCase struct {
filterIdx int
sourceRes bool
targetRes bool
}

check := func(store *core.StoreInfo, testCases []testCase) {
for _, tc := range testCases {
c.Assert(filters[tc.filterIdx].Source(opt, store), Equals, tc.sourceRes)
c.Assert(filters[tc.filterIdx].Target(opt, store), Equals, tc.targetRes)
}
}

filter := NewRuleFitFilter("", tc, region, 1)
c.Assert(filter.Target(tc, tc.GetStore(2)), IsTrue)
c.Assert(filter.Target(tc, tc.GetStore(4)), IsFalse)
c.Assert(filter.Source(tc, tc.GetStore(4)), IsTrue)
store = store.Clone(core.SetLastHeartbeatTS(time.Now()))
testCases := []testCase{
{2, true, true},
}
check(store, testCases)

// Disconn
store = store.Clone(core.SetLastHeartbeatTS(time.Now().Add(-5 * time.Minute)))
testCases = []testCase{
{0, false, false},
{1, true, false},
{2, false, false},
}
check(store, testCases)

// Busy
store = store.Clone(core.SetLastHeartbeatTS(time.Now())).
Clone(core.SetStoreStats(&pdpb.StoreStats{IsBusy: true}))
testCases = []testCase{
{0, true, false},
{1, false, false},
{2, false, false},
}
check(store, testCases)
}

func BenchmarkCloneRegionTest(b *testing.B) {
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/operator/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func (b *Builder) allowLeader(peer *metapb.Peer) bool {
if store == nil {
return false
}
stateFilter := filter.StoreStateFilter{ActionScope: "operator-builder", TransferLeader: true}
stateFilter := &filter.StoreStateFilter{ActionScope: "operator-builder", TransferLeader: true}
if !stateFilter.Target(b.cluster, store) {
return false
}
Expand Down
4 changes: 2 additions & 2 deletions server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ type engineContext struct {
}

func newEngineContext(filters ...filter.Filter) engineContext {
filters = append(filters, filter.StoreStateFilter{ActionScope: regionScatterName})
filters = append(filters, &filter.StoreStateFilter{ActionScope: regionScatterName})
return engineContext{
filters: filters,
selectedPeer: newSelectedStores(true),
Expand Down Expand Up @@ -278,7 +278,7 @@ func (r *RegionScatterer) selectPeerToReplace(group string, stores map[uint64]*c
func (r *RegionScatterer) collectAvailableStores(group string, region *core.RegionInfo, context engineContext) map[uint64]*core.StoreInfo {
filters := []filter.Filter{
filter.NewExcludedFilter(r.name, nil, region.GetStoreIds()),
filter.StoreStateFilter{ActionScope: r.name, MoveRegion: true},
&filter.StoreStateFilter{ActionScope: r.name, MoveRegion: true},
}
filters = append(filters, context.filters...)
filters = append(filters, context.selectedPeer.newFilters(r.name, group)...)
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/adjacent_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (a *adjacentState) len() int {
// on each store.
func newBalanceAdjacentRegionScheduler(opController *schedule.OperatorController, conf *balanceAdjacentRegionConfig) schedule.Scheduler {
filters := []filter.Filter{
filter.StoreStateFilter{ActionScope: AdjacentRegionName, TransferLeader: true, MoveRegion: true},
&filter.StoreStateFilter{ActionScope: AdjacentRegionName, TransferLeader: true, MoveRegion: true},
filter.NewSpecialUseFilter(AdjacentRegionName),
}
base := NewBaseScheduler(opController)
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func newBalanceLeaderScheduler(opController *schedule.OperatorController, conf *
option(s)
}
s.filters = []filter.Filter{
filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true},
&filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true},
filter.NewSpecialUseFilter(s.GetName()),
}
return s
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func newBalanceRegionScheduler(opController *schedule.OperatorController, conf *
setOption(scheduler)
}
scheduler.filters = []filter.Filter{
filter.StoreStateFilter{ActionScope: scheduler.GetName(), MoveRegion: true},
&filter.StoreStateFilter{ActionScope: scheduler.GetName(), MoveRegion: true},
filter.NewSpecialUseFilter(scheduler.GetName()),
}
return scheduler
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ type evictLeaderScheduler struct {
// out of a store.
func newEvictLeaderScheduler(opController *schedule.OperatorController, conf *evictLeaderSchedulerConfig) schedule.Scheduler {
filters := []filter.Filter{
filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true},
&filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true},
}

base := NewBaseScheduler(opController)
Expand Down
Loading

0 comments on commit 93ad898

Please sign in to comment.