Skip to content

Commit

Permalink
scheduler: do not remove the operator when the step does not finish (#…
Browse files Browse the repository at this point in the history
…1715)

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>
  • Loading branch information
shafreeck authored and sre-bot committed Sep 5, 2019
1 parent 9af07b0 commit 144031c
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 44 deletions.
86 changes: 48 additions & 38 deletions server/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (s u64Set) String() string {
// OpStep describes the basic scheduling steps that can not be subdivided.
type OpStep interface {
fmt.Stringer
ExpectConfVerChange() bool
ConfVerChanged(region *core.RegionInfo) bool
IsFinish(region *core.RegionInfo) bool
Influence(opInfluence OpInfluence, region *core.RegionInfo)
}
Expand All @@ -137,10 +137,9 @@ type TransferLeader struct {
FromStore, ToStore uint64
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (tl TransferLeader) ExpectConfVerChange() bool {
return false
// ConfVerChanged returns true if the conf version has been changed by this step
func (tl TransferLeader) ConfVerChanged(region *core.RegionInfo) bool {
return false // transfer leader never change the conf version
}

func (tl TransferLeader) String() string {
Expand Down Expand Up @@ -168,10 +167,12 @@ type AddPeer struct {
ToStore, PeerID uint64
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (ap AddPeer) ExpectConfVerChange() bool {
return true
// ConfVerChanged returns true if the conf version has been changed by this step
func (ap AddPeer) ConfVerChanged(region *core.RegionInfo) bool {
if p := region.GetStoreVoter(ap.ToStore); p != nil {
return p.GetId() == ap.PeerID
}
return false
}
func (ap AddPeer) String() string {
return fmt.Sprintf("add peer %v on store %v", ap.PeerID, ap.ToStore)
Expand Down Expand Up @@ -208,10 +209,12 @@ type AddLearner struct {
ToStore, PeerID uint64
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (al AddLearner) ExpectConfVerChange() bool {
return true
// ConfVerChanged returns true if the conf version has been changed by this step
func (al AddLearner) ConfVerChanged(region *core.RegionInfo) bool {
if p := region.GetStoreLearner(al.ToStore); p != nil {
return p.GetId() == al.PeerID
}
return false
}

func (al AddLearner) String() string {
Expand Down Expand Up @@ -249,10 +252,12 @@ type PromoteLearner struct {
ToStore, PeerID uint64
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (pl PromoteLearner) ExpectConfVerChange() bool {
return true
// ConfVerChanged returns true if the conf version has been changed by this step
func (pl PromoteLearner) ConfVerChanged(region *core.RegionInfo) bool {
if p := region.GetStoreVoter(pl.ToStore); p != nil {
return p.GetId() == pl.PeerID
}
return false
}

func (pl PromoteLearner) String() string {
Expand All @@ -278,10 +283,9 @@ type RemovePeer struct {
FromStore uint64
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (rp RemovePeer) ExpectConfVerChange() bool {
return true
// ConfVerChanged returns true if the conf version has been changed by this step
func (rp RemovePeer) ConfVerChanged(region *core.RegionInfo) bool {
return region.GetStorePeer(rp.FromStore) == nil
}

func (rp RemovePeer) String() string {
Expand Down Expand Up @@ -314,9 +318,8 @@ type MergeRegion struct {
IsPassive bool
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (mr MergeRegion) ExpectConfVerChange() bool {
// ConfVerChanged returns true if the conf version has been changed by this step
func (mr MergeRegion) ConfVerChanged(region *core.RegionInfo) bool {
return false
}

Expand Down Expand Up @@ -351,9 +354,8 @@ type SplitRegion struct {
Policy pdpb.CheckPolicy
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (sr SplitRegion) ExpectConfVerChange() bool {
// ConfVerChanged returns true if the conf version has been changed by this step
func (sr SplitRegion) ConfVerChanged(region *core.RegionInfo) bool {
return false
}

Expand Down Expand Up @@ -382,10 +384,12 @@ type AddLightPeer struct {
ToStore, PeerID uint64
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (ap AddLightPeer) ExpectConfVerChange() bool {
return true
// ConfVerChanged returns true if the conf version has been changed by this step
func (ap AddLightPeer) ConfVerChanged(region *core.RegionInfo) bool {
if p := region.GetStoreVoter(ap.ToStore); p != nil {
return p.GetId() == ap.PeerID
}
return false
}

func (ap AddLightPeer) String() string {
Expand Down Expand Up @@ -417,10 +421,12 @@ type AddLightLearner struct {
ToStore, PeerID uint64
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (al AddLightLearner) ExpectConfVerChange() bool {
return true
// ConfVerChanged returns true if the conf version has been changed by this step
func (al AddLightLearner) ConfVerChanged(region *core.RegionInfo) bool {
if p := region.GetStoreLearner(al.ToStore); p != nil {
return p.GetId() == al.PeerID
}
return false
}

func (al AddLightLearner) String() string {
Expand Down Expand Up @@ -582,11 +588,15 @@ func (o *Operator) Check(region *core.RegionInfo) OpStep {
}

// ConfVerChanged returns the number of confver has consumed by steps
func (o *Operator) ConfVerChanged() int {
func (o *Operator) ConfVerChanged(region *core.RegionInfo) int {
total := 0
current := atomic.LoadInt32(&o.currentStep)
for _, step := range o.steps[0:current] {
if step.ExpectConfVerChange() {
if current == int32(len(o.steps)) {
current--
}
// including current step, it may has taken effects in this heartbeat
for _, step := range o.steps[0 : current+1] {
if step.ConfVerChanged(region) {
total++
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) {
latest := region.GetRegionEpoch()
changes := latest.GetConfVer() - origin.GetConfVer()
if source == DispatchFromHeartBeat &&
changes > uint64(op.ConfVerChanged()) {
changes > uint64(op.ConfVerChanged(region)) {
log.Info("stale operator", zap.Uint64("region-id", region.GetID()),
zap.Reflect("operator", op), zap.Uint64("diff", changes))
operatorCounter.WithLabelValues(op.Desc(), "stale").Inc()
Expand Down
74 changes: 69 additions & 5 deletions server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/pd/pkg/mock/mockcluster"
"github.com/pingcap/pd/pkg/mock/mockhbstream"
"github.com/pingcap/pd/pkg/mock/mockoption"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/schedule/operator"
)

Expand Down Expand Up @@ -206,30 +207,93 @@ func (t *testOperatorControllerSuite) TestDispatchOutdatedRegion(c *C) {
&metapb.RegionEpoch{ConfVer: 0, Version: 0})

controller.Dispatch(region, DispatchFromHeartBeat)
c.Assert(op.ConfVerChanged(), Equals, 0)
c.Assert(op.ConfVerChanged(region), Equals, 0)
c.Assert(len(stream.MsgCh()), Equals, 2)

// report the result of removing peer
region = cluster.MockRegionInfo(1, 2, []uint64{2},
&metapb.RegionEpoch{ConfVer: 0, Version: 0})

controller.Dispatch(region, DispatchFromHeartBeat)
c.Assert(op.ConfVerChanged(), Equals, 1)
c.Assert(op.ConfVerChanged(region), Equals, 1)
c.Assert(len(stream.MsgCh()), Equals, 2)

// add and disaptch op again, the op should be stale
op = operator.NewOperator("test", "test", 1,
&metapb.RegionEpoch{ConfVer: 0, Version: 0},
operator.OpRegion, steps...)
c.Assert(controller.AddOperator(op), Equals, true)
c.Assert(op.ConfVerChanged(), Equals, 0)
c.Assert(op.ConfVerChanged(region), Equals, 0)
c.Assert(len(stream.MsgCh()), Equals, 3)

// report region with an abnormal confver
region = cluster.MockRegionInfo(1, 1, []uint64{1, 2},
&metapb.RegionEpoch{ConfVer: 1, Version: 0})
controller.Dispatch(region, DispatchFromHeartBeat)
c.Assert(op.ConfVerChanged(), Equals, 0)
// no new step sended
c.Assert(op.ConfVerChanged(region), Equals, 0)
// no new step
c.Assert(len(stream.MsgCh()), Equals, 3)
}

func (t *testOperatorControllerSuite) TestDispatchUnfinishedStep(c *C) {
cluster := mockcluster.NewCluster(mockoption.NewScheduleOptions())
stream := mockhbstream.NewHeartbeatStreams(cluster.ID)
controller := NewOperatorController(cluster, stream)

// Create a new region with epoch(0, 0)
// the region has two peers with its peer id allocated incrementally.
// so the two peers are {peerid: 1, storeid: 1}, {peerid: 2, storeid: 2}
// The peer on store 1 is the leader
epoch := &metapb.RegionEpoch{ConfVer: 0, Version: 0}
region := cluster.MockRegionInfo(1, 1, []uint64{2}, epoch)
// Put region into cluster, otherwise, AddOperator will fail because of
// missing region
cluster.PutRegion(region)

// The next allocated peer should have peerid 3, so we add this peer
// to store 3
steps := []operator.OpStep{
operator.AddPeer{3, 3},
}

// Create an operator
op := operator.NewOperator("test", "test", 1, epoch,
operator.OpRegion, steps...)
c.Assert(controller.AddOperator(op), Equals, true)
c.Assert(len(stream.MsgCh()), Equals, 1)

// Create region2 witch is cloned from the original region.
// region2 has peer 3 in pending state, so the AddPeer step
// is left unfinished
region2 := region.Clone(
core.WithAddPeer(&metapb.Peer{Id: 3, StoreId: 3}),
core.WithPendingPeers([]*metapb.Peer{
{Id: 3, StoreId: 3, IsLearner: false},
}),
core.WithIncConfVer(),
)
c.Assert(region2.GetPendingPeers(), NotNil)
c.Assert(steps[0].IsFinish(region2), Equals, false)
controller.Dispatch(region2, DispatchFromHeartBeat)

// In this case, the conf version has been changed, but the
// peer added is in peeding state, the operator should not be
// removed by the stale checker
c.Assert(op.ConfVerChanged(region2), Equals, 1)
c.Assert(controller.GetOperator(1), NotNil)
// The operator is valid yet, but the step should not be sent
// again, because it is in pending state, so the message channel
// should not be increased
c.Assert(len(stream.MsgCh()), Equals, 1)

// Finish the step by clearing the pending state
region3 := region.Clone(
core.WithAddPeer(&metapb.Peer{Id: 3, StoreId: 3}),
core.WithIncConfVer(),
)
c.Assert(steps[0].IsFinish(region3), Equals, true)
controller.Dispatch(region3, DispatchFromHeartBeat)
c.Assert(op.ConfVerChanged(region3), Equals, 1)
// The Operator has finished, so no message should be sent
c.Assert(len(stream.MsgCh()), Equals, 1)
}

0 comments on commit 144031c

Please sign in to comment.