Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: do not remove the operator when the step does not finish #1715

Merged
merged 2 commits into from
Sep 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 48 additions & 38 deletions server/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (s u64Set) String() string {
// OpStep describes the basic scheduling steps that can not be subdivided.
type OpStep interface {
fmt.Stringer
ExpectConfVerChange() bool
ConfVerChanged(region *core.RegionInfo) bool
IsFinish(region *core.RegionInfo) bool
Influence(opInfluence OpInfluence, region *core.RegionInfo)
}
Expand All @@ -137,10 +137,9 @@ type TransferLeader struct {
FromStore, ToStore uint64
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (tl TransferLeader) ExpectConfVerChange() bool {
return false
// ConfVerChanged returns true if the conf version has been changed by this step
func (tl TransferLeader) ConfVerChanged(region *core.RegionInfo) bool {
return false // transfer leader never change the conf version
}

func (tl TransferLeader) String() string {
Expand Down Expand Up @@ -168,10 +167,12 @@ type AddPeer struct {
ToStore, PeerID uint64
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (ap AddPeer) ExpectConfVerChange() bool {
return true
// ConfVerChanged returns true if the conf version has been changed by this step
func (ap AddPeer) ConfVerChanged(region *core.RegionInfo) bool {
if p := region.GetStoreVoter(ap.ToStore); p != nil {
return p.GetId() == ap.PeerID
}
return false
}
func (ap AddPeer) String() string {
return fmt.Sprintf("add peer %v on store %v", ap.PeerID, ap.ToStore)
Expand Down Expand Up @@ -208,10 +209,12 @@ type AddLearner struct {
ToStore, PeerID uint64
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (al AddLearner) ExpectConfVerChange() bool {
return true
// ConfVerChanged returns true if the conf version has been changed by this step
func (al AddLearner) ConfVerChanged(region *core.RegionInfo) bool {
if p := region.GetStoreLearner(al.ToStore); p != nil {
return p.GetId() == al.PeerID
}
return false
}

func (al AddLearner) String() string {
Expand Down Expand Up @@ -249,10 +252,12 @@ type PromoteLearner struct {
ToStore, PeerID uint64
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (pl PromoteLearner) ExpectConfVerChange() bool {
return true
// ConfVerChanged returns true if the conf version has been changed by this step
func (pl PromoteLearner) ConfVerChanged(region *core.RegionInfo) bool {
if p := region.GetStoreVoter(pl.ToStore); p != nil {
return p.GetId() == pl.PeerID
}
return false
}

func (pl PromoteLearner) String() string {
Expand All @@ -278,10 +283,9 @@ type RemovePeer struct {
FromStore uint64
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (rp RemovePeer) ExpectConfVerChange() bool {
return true
// ConfVerChanged returns true if the conf version has been changed by this step
func (rp RemovePeer) ConfVerChanged(region *core.RegionInfo) bool {
return region.GetStorePeer(rp.FromStore) == nil
}

func (rp RemovePeer) String() string {
Expand Down Expand Up @@ -314,9 +318,8 @@ type MergeRegion struct {
IsPassive bool
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (mr MergeRegion) ExpectConfVerChange() bool {
// ConfVerChanged returns true if the conf version has been changed by this step
func (mr MergeRegion) ConfVerChanged(region *core.RegionInfo) bool {
return false
}

Expand Down Expand Up @@ -351,9 +354,8 @@ type SplitRegion struct {
Policy pdpb.CheckPolicy
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (sr SplitRegion) ExpectConfVerChange() bool {
// ConfVerChanged returns true if the conf version has been changed by this step
func (sr SplitRegion) ConfVerChanged(region *core.RegionInfo) bool {
return false
}

Expand Down Expand Up @@ -382,10 +384,12 @@ type AddLightPeer struct {
ToStore, PeerID uint64
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (ap AddLightPeer) ExpectConfVerChange() bool {
return true
// ConfVerChanged returns true if the conf version has been changed by this step
func (ap AddLightPeer) ConfVerChanged(region *core.RegionInfo) bool {
if p := region.GetStoreVoter(ap.ToStore); p != nil {
return p.GetId() == ap.PeerID
}
return false
}

func (ap AddLightPeer) String() string {
Expand Down Expand Up @@ -417,10 +421,12 @@ type AddLightLearner struct {
ToStore, PeerID uint64
}

// ExpectConfVerChange returns if the confver of a region should be increased
// after this step
func (al AddLightLearner) ExpectConfVerChange() bool {
return true
// ConfVerChanged returns true if the conf version has been changed by this step
func (al AddLightLearner) ConfVerChanged(region *core.RegionInfo) bool {
if p := region.GetStoreLearner(al.ToStore); p != nil {
return p.GetId() == al.PeerID
}
return false
}

func (al AddLightLearner) String() string {
Expand Down Expand Up @@ -582,11 +588,15 @@ func (o *Operator) Check(region *core.RegionInfo) OpStep {
}

// ConfVerChanged returns the number of confver has consumed by steps
func (o *Operator) ConfVerChanged() int {
func (o *Operator) ConfVerChanged(region *core.RegionInfo) int {
total := 0
current := atomic.LoadInt32(&o.currentStep)
for _, step := range o.steps[0:current] {
if step.ExpectConfVerChange() {
if current == int32(len(o.steps)) {
current--
}
// including current step, it may has taken effects in this heartbeat
for _, step := range o.steps[0 : current+1] {
if step.ConfVerChanged(region) {
total++
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) {
latest := region.GetRegionEpoch()
changes := latest.GetConfVer() - origin.GetConfVer()
if source == DispatchFromHeartBeat &&
changes > uint64(op.ConfVerChanged()) {
changes > uint64(op.ConfVerChanged(region)) {
log.Info("stale operator", zap.Uint64("region-id", region.GetID()),
zap.Reflect("operator", op), zap.Uint64("diff", changes))
operatorCounter.WithLabelValues(op.Desc(), "stale").Inc()
Expand Down
74 changes: 69 additions & 5 deletions server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/pd/pkg/mock/mockcluster"
"github.com/pingcap/pd/pkg/mock/mockhbstream"
"github.com/pingcap/pd/pkg/mock/mockoption"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/schedule/operator"
)

Expand Down Expand Up @@ -206,30 +207,93 @@ func (t *testOperatorControllerSuite) TestDispatchOutdatedRegion(c *C) {
&metapb.RegionEpoch{ConfVer: 0, Version: 0})

controller.Dispatch(region, DispatchFromHeartBeat)
c.Assert(op.ConfVerChanged(), Equals, 0)
c.Assert(op.ConfVerChanged(region), Equals, 0)
c.Assert(len(stream.MsgCh()), Equals, 2)

// report the result of removing peer
region = cluster.MockRegionInfo(1, 2, []uint64{2},
&metapb.RegionEpoch{ConfVer: 0, Version: 0})

controller.Dispatch(region, DispatchFromHeartBeat)
c.Assert(op.ConfVerChanged(), Equals, 1)
c.Assert(op.ConfVerChanged(region), Equals, 1)
c.Assert(len(stream.MsgCh()), Equals, 2)

// add and disaptch op again, the op should be stale
op = operator.NewOperator("test", "test", 1,
&metapb.RegionEpoch{ConfVer: 0, Version: 0},
operator.OpRegion, steps...)
c.Assert(controller.AddOperator(op), Equals, true)
c.Assert(op.ConfVerChanged(), Equals, 0)
c.Assert(op.ConfVerChanged(region), Equals, 0)
c.Assert(len(stream.MsgCh()), Equals, 3)

// report region with an abnormal confver
region = cluster.MockRegionInfo(1, 1, []uint64{1, 2},
&metapb.RegionEpoch{ConfVer: 1, Version: 0})
controller.Dispatch(region, DispatchFromHeartBeat)
c.Assert(op.ConfVerChanged(), Equals, 0)
// no new step sended
c.Assert(op.ConfVerChanged(region), Equals, 0)
// no new step
c.Assert(len(stream.MsgCh()), Equals, 3)
}

func (t *testOperatorControllerSuite) TestDispatchUnfinishedStep(c *C) {
cluster := mockcluster.NewCluster(mockoption.NewScheduleOptions())
stream := mockhbstream.NewHeartbeatStreams(cluster.ID)
controller := NewOperatorController(cluster, stream)

// Create a new region with epoch(0, 0)
// the region has two peers with its peer id allocated incrementally.
// so the two peers are {peerid: 1, storeid: 1}, {peerid: 2, storeid: 2}
// The peer on store 1 is the leader
epoch := &metapb.RegionEpoch{ConfVer: 0, Version: 0}
region := cluster.MockRegionInfo(1, 1, []uint64{2}, epoch)
// Put region into cluster, otherwise, AddOperator will fail because of
// missing region
cluster.PutRegion(region)

// The next allocated peer should have peerid 3, so we add this peer
// to store 3
steps := []operator.OpStep{
operator.AddPeer{3, 3},
}

// Create an operator
op := operator.NewOperator("test", "test", 1, epoch,
operator.OpRegion, steps...)
c.Assert(controller.AddOperator(op), Equals, true)
c.Assert(len(stream.MsgCh()), Equals, 1)

// Create region2 witch is cloned from the original region.
// region2 has peer 3 in pending state, so the AddPeer step
// is left unfinished
region2 := region.Clone(
core.WithAddPeer(&metapb.Peer{Id: 3, StoreId: 3}),
core.WithPendingPeers([]*metapb.Peer{
{Id: 3, StoreId: 3, IsLearner: false},
}),
core.WithIncConfVer(),
)
c.Assert(region2.GetPendingPeers(), NotNil)
c.Assert(steps[0].IsFinish(region2), Equals, false)
controller.Dispatch(region2, DispatchFromHeartBeat)

// In this case, the conf version has been changed, but the
// peer added is in peeding state, the operator should not be
// removed by the stale checker
c.Assert(op.ConfVerChanged(region2), Equals, 1)
c.Assert(controller.GetOperator(1), NotNil)
// The operator is valid yet, but the step should not be sent
// again, because it is in pending state, so the message channel
// should not be increased
c.Assert(len(stream.MsgCh()), Equals, 1)

// Finish the step by clearing the pending state
region3 := region.Clone(
core.WithAddPeer(&metapb.Peer{Id: 3, StoreId: 3}),
core.WithIncConfVer(),
)
c.Assert(steps[0].IsFinish(region3), Equals, true)
controller.Dispatch(region3, DispatchFromHeartBeat)
c.Assert(op.ConfVerChanged(region3), Equals, 1)
// The Operator has finished, so no message should be sent
c.Assert(len(stream.MsgCh()), Equals, 1)
}