diff --git a/server/schedule/operator/operator.go b/server/schedule/operator/operator.go index ad25c445554..36943aba466 100644 --- a/server/schedule/operator/operator.go +++ b/server/schedule/operator/operator.go @@ -127,7 +127,7 @@ func (s u64Set) String() string { // OpStep describes the basic scheduling steps that can not be subdivided. type OpStep interface { fmt.Stringer - ExpectConfVerChange() bool + ConfVerChanged(region *core.RegionInfo) bool IsFinish(region *core.RegionInfo) bool Influence(opInfluence OpInfluence, region *core.RegionInfo) } @@ -137,10 +137,9 @@ type TransferLeader struct { FromStore, ToStore uint64 } -// ExpectConfVerChange returns if the confver of a region should be increased -// after this step -func (tl TransferLeader) ExpectConfVerChange() bool { - return false +// ConfVerChanged returns true if the conf version has been changed by this step +func (tl TransferLeader) ConfVerChanged(region *core.RegionInfo) bool { + return false // transfer leader never change the conf version } func (tl TransferLeader) String() string { @@ -168,10 +167,12 @@ type AddPeer struct { ToStore, PeerID uint64 } -// ExpectConfVerChange returns if the confver of a region should be increased -// after this step -func (ap AddPeer) ExpectConfVerChange() bool { - return true +// ConfVerChanged returns true if the conf version has been changed by this step +func (ap AddPeer) ConfVerChanged(region *core.RegionInfo) bool { + if p := region.GetStoreVoter(ap.ToStore); p != nil { + return p.GetId() == ap.PeerID + } + return false } func (ap AddPeer) String() string { return fmt.Sprintf("add peer %v on store %v", ap.PeerID, ap.ToStore) @@ -208,10 +209,12 @@ type AddLearner struct { ToStore, PeerID uint64 } -// ExpectConfVerChange returns if the confver of a region should be increased -// after this step -func (al AddLearner) ExpectConfVerChange() bool { - return true +// ConfVerChanged returns true if the conf version has been changed by this step +func (al AddLearner) ConfVerChanged(region *core.RegionInfo) bool { + if p := region.GetStoreLearner(al.ToStore); p != nil { + return p.GetId() == al.PeerID + } + return false } func (al AddLearner) String() string { @@ -249,10 +252,12 @@ type PromoteLearner struct { ToStore, PeerID uint64 } -// ExpectConfVerChange returns if the confver of a region should be increased -// after this step -func (pl PromoteLearner) ExpectConfVerChange() bool { - return true +// ConfVerChanged returns true if the conf version has been changed by this step +func (pl PromoteLearner) ConfVerChanged(region *core.RegionInfo) bool { + if p := region.GetStoreVoter(pl.ToStore); p != nil { + return p.GetId() == pl.PeerID + } + return false } func (pl PromoteLearner) String() string { @@ -278,10 +283,9 @@ type RemovePeer struct { FromStore uint64 } -// ExpectConfVerChange returns if the confver of a region should be increased -// after this step -func (rp RemovePeer) ExpectConfVerChange() bool { - return true +// ConfVerChanged returns true if the conf version has been changed by this step +func (rp RemovePeer) ConfVerChanged(region *core.RegionInfo) bool { + return region.GetStorePeer(rp.FromStore) == nil } func (rp RemovePeer) String() string { @@ -314,9 +318,8 @@ type MergeRegion struct { IsPassive bool } -// ExpectConfVerChange returns if the confver of a region should be increased -// after this step -func (mr MergeRegion) ExpectConfVerChange() bool { +// ConfVerChanged returns true if the conf version has been changed by this step +func (mr MergeRegion) ConfVerChanged(region *core.RegionInfo) bool { return false } @@ -351,9 +354,8 @@ type SplitRegion struct { Policy pdpb.CheckPolicy } -// ExpectConfVerChange returns if the confver of a region should be increased -// after this step -func (sr SplitRegion) ExpectConfVerChange() bool { +// ConfVerChanged returns true if the conf version has been changed by this step +func (sr SplitRegion) ConfVerChanged(region *core.RegionInfo) bool { return false } @@ -382,10 +384,12 @@ type AddLightPeer struct { ToStore, PeerID uint64 } -// ExpectConfVerChange returns if the confver of a region should be increased -// after this step -func (ap AddLightPeer) ExpectConfVerChange() bool { - return true +// ConfVerChanged returns true if the conf version has been changed by this step +func (ap AddLightPeer) ConfVerChanged(region *core.RegionInfo) bool { + if p := region.GetStoreVoter(ap.ToStore); p != nil { + return p.GetId() == ap.PeerID + } + return false } func (ap AddLightPeer) String() string { @@ -417,10 +421,12 @@ type AddLightLearner struct { ToStore, PeerID uint64 } -// ExpectConfVerChange returns if the confver of a region should be increased -// after this step -func (al AddLightLearner) ExpectConfVerChange() bool { - return true +// ConfVerChanged returns true if the conf version has been changed by this step +func (al AddLightLearner) ConfVerChanged(region *core.RegionInfo) bool { + if p := region.GetStoreLearner(al.ToStore); p != nil { + return p.GetId() == al.PeerID + } + return false } func (al AddLightLearner) String() string { @@ -582,11 +588,15 @@ func (o *Operator) Check(region *core.RegionInfo) OpStep { } // ConfVerChanged returns the number of confver has consumed by steps -func (o *Operator) ConfVerChanged() int { +func (o *Operator) ConfVerChanged(region *core.RegionInfo) int { total := 0 current := atomic.LoadInt32(&o.currentStep) - for _, step := range o.steps[0:current] { - if step.ExpectConfVerChange() { + if current == int32(len(o.steps)) { + current-- + } + // including current step, it may has taken effects in this heartbeat + for _, step := range o.steps[0 : current+1] { + if step.ConfVerChanged(region) { total++ } } diff --git a/server/schedule/operator_controller.go b/server/schedule/operator_controller.go index 8bae5405ac7..72a6f230c33 100644 --- a/server/schedule/operator_controller.go +++ b/server/schedule/operator_controller.go @@ -102,7 +102,7 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) { latest := region.GetRegionEpoch() changes := latest.GetConfVer() - origin.GetConfVer() if source == DispatchFromHeartBeat && - changes > uint64(op.ConfVerChanged()) { + changes > uint64(op.ConfVerChanged(region)) { log.Info("stale operator", zap.Uint64("region-id", region.GetID()), zap.Reflect("operator", op), zap.Uint64("diff", changes)) operatorCounter.WithLabelValues(op.Desc(), "stale").Inc() diff --git a/server/schedule/operator_controller_test.go b/server/schedule/operator_controller_test.go index 3b57dbe9e58..944ef1312f3 100644 --- a/server/schedule/operator_controller_test.go +++ b/server/schedule/operator_controller_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/pd/pkg/mock/mockcluster" "github.com/pingcap/pd/pkg/mock/mockhbstream" "github.com/pingcap/pd/pkg/mock/mockoption" + "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/schedule/operator" ) @@ -206,7 +207,7 @@ func (t *testOperatorControllerSuite) TestDispatchOutdatedRegion(c *C) { &metapb.RegionEpoch{ConfVer: 0, Version: 0}) controller.Dispatch(region, DispatchFromHeartBeat) - c.Assert(op.ConfVerChanged(), Equals, 0) + c.Assert(op.ConfVerChanged(region), Equals, 0) c.Assert(len(stream.MsgCh()), Equals, 2) // report the result of removing peer @@ -214,7 +215,7 @@ func (t *testOperatorControllerSuite) TestDispatchOutdatedRegion(c *C) { &metapb.RegionEpoch{ConfVer: 0, Version: 0}) controller.Dispatch(region, DispatchFromHeartBeat) - c.Assert(op.ConfVerChanged(), Equals, 1) + c.Assert(op.ConfVerChanged(region), Equals, 1) c.Assert(len(stream.MsgCh()), Equals, 2) // add and disaptch op again, the op should be stale @@ -222,14 +223,77 @@ func (t *testOperatorControllerSuite) TestDispatchOutdatedRegion(c *C) { &metapb.RegionEpoch{ConfVer: 0, Version: 0}, operator.OpRegion, steps...) c.Assert(controller.AddOperator(op), Equals, true) - c.Assert(op.ConfVerChanged(), Equals, 0) + c.Assert(op.ConfVerChanged(region), Equals, 0) c.Assert(len(stream.MsgCh()), Equals, 3) // report region with an abnormal confver region = cluster.MockRegionInfo(1, 1, []uint64{1, 2}, &metapb.RegionEpoch{ConfVer: 1, Version: 0}) controller.Dispatch(region, DispatchFromHeartBeat) - c.Assert(op.ConfVerChanged(), Equals, 0) - // no new step sended + c.Assert(op.ConfVerChanged(region), Equals, 0) + // no new step c.Assert(len(stream.MsgCh()), Equals, 3) } + +func (t *testOperatorControllerSuite) TestDispatchUnfinishedStep(c *C) { + cluster := mockcluster.NewCluster(mockoption.NewScheduleOptions()) + stream := mockhbstream.NewHeartbeatStreams(cluster.ID) + controller := NewOperatorController(cluster, stream) + + // Create a new region with epoch(0, 0) + // the region has two peers with its peer id allocated incrementally. + // so the two peers are {peerid: 1, storeid: 1}, {peerid: 2, storeid: 2} + // The peer on store 1 is the leader + epoch := &metapb.RegionEpoch{ConfVer: 0, Version: 0} + region := cluster.MockRegionInfo(1, 1, []uint64{2}, epoch) + // Put region into cluster, otherwise, AddOperator will fail because of + // missing region + cluster.PutRegion(region) + + // The next allocated peer should have peerid 3, so we add this peer + // to store 3 + steps := []operator.OpStep{ + operator.AddPeer{3, 3}, + } + + // Create an operator + op := operator.NewOperator("test", "test", 1, epoch, + operator.OpRegion, steps...) + c.Assert(controller.AddOperator(op), Equals, true) + c.Assert(len(stream.MsgCh()), Equals, 1) + + // Create region2 witch is cloned from the original region. + // region2 has peer 3 in pending state, so the AddPeer step + // is left unfinished + region2 := region.Clone( + core.WithAddPeer(&metapb.Peer{Id: 3, StoreId: 3}), + core.WithPendingPeers([]*metapb.Peer{ + {Id: 3, StoreId: 3, IsLearner: false}, + }), + core.WithIncConfVer(), + ) + c.Assert(region2.GetPendingPeers(), NotNil) + c.Assert(steps[0].IsFinish(region2), Equals, false) + controller.Dispatch(region2, DispatchFromHeartBeat) + + // In this case, the conf version has been changed, but the + // peer added is in peeding state, the operator should not be + // removed by the stale checker + c.Assert(op.ConfVerChanged(region2), Equals, 1) + c.Assert(controller.GetOperator(1), NotNil) + // The operator is valid yet, but the step should not be sent + // again, because it is in pending state, so the message channel + // should not be increased + c.Assert(len(stream.MsgCh()), Equals, 1) + + // Finish the step by clearing the pending state + region3 := region.Clone( + core.WithAddPeer(&metapb.Peer{Id: 3, StoreId: 3}), + core.WithIncConfVer(), + ) + c.Assert(steps[0].IsFinish(region3), Equals, true) + controller.Dispatch(region3, DispatchFromHeartBeat) + c.Assert(op.ConfVerChanged(region3), Equals, 1) + // The Operator has finished, so no message should be sent + c.Assert(len(stream.MsgCh()), Equals, 1) +}