From 01e5d8e2b5a8713a14f8fd5394496f73c481c884 Mon Sep 17 00:00:00 2001 From: Shafreeck Sea Date: Thu, 29 Aug 2019 23:46:03 +0800 Subject: [PATCH] scheduler: do not remove the operator when the step does not finish Signed-off-by: Shafreeck Sea There is a bug introduced by #1652, in some case, like adding peers or adding learners, the step is left unfinished if the peer is in pending state, although the conf version has changed, in these cases, the operator will be removed because the controller thought someone has changed the conf version(in fact, it self did). We fix that by checking if the conf version has actually changed by current step, if it is, the operator is not regarded as stale. Signed-off-by: Shafreeck Sea --- server/schedule/operator/operator.go | 86 ++++++++++++--------- server/schedule/operator_controller.go | 2 +- server/schedule/operator_controller_test.go | 74 ++++++++++++++++-- 3 files changed, 118 insertions(+), 44 deletions(-) diff --git a/server/schedule/operator/operator.go b/server/schedule/operator/operator.go index ad25c445554..36943aba466 100644 --- a/server/schedule/operator/operator.go +++ b/server/schedule/operator/operator.go @@ -127,7 +127,7 @@ func (s u64Set) String() string { // OpStep describes the basic scheduling steps that can not be subdivided. type OpStep interface { fmt.Stringer - ExpectConfVerChange() bool + ConfVerChanged(region *core.RegionInfo) bool IsFinish(region *core.RegionInfo) bool Influence(opInfluence OpInfluence, region *core.RegionInfo) } @@ -137,10 +137,9 @@ type TransferLeader struct { FromStore, ToStore uint64 } -// ExpectConfVerChange returns if the confver of a region should be increased -// after this step -func (tl TransferLeader) ExpectConfVerChange() bool { - return false +// ConfVerChanged returns true if the conf version has been changed by this step +func (tl TransferLeader) ConfVerChanged(region *core.RegionInfo) bool { + return false // transfer leader never change the conf version } func (tl TransferLeader) String() string { @@ -168,10 +167,12 @@ type AddPeer struct { ToStore, PeerID uint64 } -// ExpectConfVerChange returns if the confver of a region should be increased -// after this step -func (ap AddPeer) ExpectConfVerChange() bool { - return true +// ConfVerChanged returns true if the conf version has been changed by this step +func (ap AddPeer) ConfVerChanged(region *core.RegionInfo) bool { + if p := region.GetStoreVoter(ap.ToStore); p != nil { + return p.GetId() == ap.PeerID + } + return false } func (ap AddPeer) String() string { return fmt.Sprintf("add peer %v on store %v", ap.PeerID, ap.ToStore) @@ -208,10 +209,12 @@ type AddLearner struct { ToStore, PeerID uint64 } -// ExpectConfVerChange returns if the confver of a region should be increased -// after this step -func (al AddLearner) ExpectConfVerChange() bool { - return true +// ConfVerChanged returns true if the conf version has been changed by this step +func (al AddLearner) ConfVerChanged(region *core.RegionInfo) bool { + if p := region.GetStoreLearner(al.ToStore); p != nil { + return p.GetId() == al.PeerID + } + return false } func (al AddLearner) String() string { @@ -249,10 +252,12 @@ type PromoteLearner struct { ToStore, PeerID uint64 } -// ExpectConfVerChange returns if the confver of a region should be increased -// after this step -func (pl PromoteLearner) ExpectConfVerChange() bool { - return true +// ConfVerChanged returns true if the conf version has been changed by this step +func (pl PromoteLearner) ConfVerChanged(region *core.RegionInfo) bool { + if p := region.GetStoreVoter(pl.ToStore); p != nil { + return p.GetId() == pl.PeerID + } + return false } func (pl PromoteLearner) String() string { @@ -278,10 +283,9 @@ type RemovePeer struct { FromStore uint64 } -// ExpectConfVerChange returns if the confver of a region should be increased -// after this step -func (rp RemovePeer) ExpectConfVerChange() bool { - return true +// ConfVerChanged returns true if the conf version has been changed by this step +func (rp RemovePeer) ConfVerChanged(region *core.RegionInfo) bool { + return region.GetStorePeer(rp.FromStore) == nil } func (rp RemovePeer) String() string { @@ -314,9 +318,8 @@ type MergeRegion struct { IsPassive bool } -// ExpectConfVerChange returns if the confver of a region should be increased -// after this step -func (mr MergeRegion) ExpectConfVerChange() bool { +// ConfVerChanged returns true if the conf version has been changed by this step +func (mr MergeRegion) ConfVerChanged(region *core.RegionInfo) bool { return false } @@ -351,9 +354,8 @@ type SplitRegion struct { Policy pdpb.CheckPolicy } -// ExpectConfVerChange returns if the confver of a region should be increased -// after this step -func (sr SplitRegion) ExpectConfVerChange() bool { +// ConfVerChanged returns true if the conf version has been changed by this step +func (sr SplitRegion) ConfVerChanged(region *core.RegionInfo) bool { return false } @@ -382,10 +384,12 @@ type AddLightPeer struct { ToStore, PeerID uint64 } -// ExpectConfVerChange returns if the confver of a region should be increased -// after this step -func (ap AddLightPeer) ExpectConfVerChange() bool { - return true +// ConfVerChanged returns true if the conf version has been changed by this step +func (ap AddLightPeer) ConfVerChanged(region *core.RegionInfo) bool { + if p := region.GetStoreVoter(ap.ToStore); p != nil { + return p.GetId() == ap.PeerID + } + return false } func (ap AddLightPeer) String() string { @@ -417,10 +421,12 @@ type AddLightLearner struct { ToStore, PeerID uint64 } -// ExpectConfVerChange returns if the confver of a region should be increased -// after this step -func (al AddLightLearner) ExpectConfVerChange() bool { - return true +// ConfVerChanged returns true if the conf version has been changed by this step +func (al AddLightLearner) ConfVerChanged(region *core.RegionInfo) bool { + if p := region.GetStoreLearner(al.ToStore); p != nil { + return p.GetId() == al.PeerID + } + return false } func (al AddLightLearner) String() string { @@ -582,11 +588,15 @@ func (o *Operator) Check(region *core.RegionInfo) OpStep { } // ConfVerChanged returns the number of confver has consumed by steps -func (o *Operator) ConfVerChanged() int { +func (o *Operator) ConfVerChanged(region *core.RegionInfo) int { total := 0 current := atomic.LoadInt32(&o.currentStep) - for _, step := range o.steps[0:current] { - if step.ExpectConfVerChange() { + if current == int32(len(o.steps)) { + current-- + } + // including current step, it may has taken effects in this heartbeat + for _, step := range o.steps[0 : current+1] { + if step.ConfVerChanged(region) { total++ } } diff --git a/server/schedule/operator_controller.go b/server/schedule/operator_controller.go index 8bae5405ac7..72a6f230c33 100644 --- a/server/schedule/operator_controller.go +++ b/server/schedule/operator_controller.go @@ -102,7 +102,7 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) { latest := region.GetRegionEpoch() changes := latest.GetConfVer() - origin.GetConfVer() if source == DispatchFromHeartBeat && - changes > uint64(op.ConfVerChanged()) { + changes > uint64(op.ConfVerChanged(region)) { log.Info("stale operator", zap.Uint64("region-id", region.GetID()), zap.Reflect("operator", op), zap.Uint64("diff", changes)) operatorCounter.WithLabelValues(op.Desc(), "stale").Inc() diff --git a/server/schedule/operator_controller_test.go b/server/schedule/operator_controller_test.go index 3b57dbe9e58..944ef1312f3 100644 --- a/server/schedule/operator_controller_test.go +++ b/server/schedule/operator_controller_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/pd/pkg/mock/mockcluster" "github.com/pingcap/pd/pkg/mock/mockhbstream" "github.com/pingcap/pd/pkg/mock/mockoption" + "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/schedule/operator" ) @@ -206,7 +207,7 @@ func (t *testOperatorControllerSuite) TestDispatchOutdatedRegion(c *C) { &metapb.RegionEpoch{ConfVer: 0, Version: 0}) controller.Dispatch(region, DispatchFromHeartBeat) - c.Assert(op.ConfVerChanged(), Equals, 0) + c.Assert(op.ConfVerChanged(region), Equals, 0) c.Assert(len(stream.MsgCh()), Equals, 2) // report the result of removing peer @@ -214,7 +215,7 @@ func (t *testOperatorControllerSuite) TestDispatchOutdatedRegion(c *C) { &metapb.RegionEpoch{ConfVer: 0, Version: 0}) controller.Dispatch(region, DispatchFromHeartBeat) - c.Assert(op.ConfVerChanged(), Equals, 1) + c.Assert(op.ConfVerChanged(region), Equals, 1) c.Assert(len(stream.MsgCh()), Equals, 2) // add and disaptch op again, the op should be stale @@ -222,14 +223,77 @@ func (t *testOperatorControllerSuite) TestDispatchOutdatedRegion(c *C) { &metapb.RegionEpoch{ConfVer: 0, Version: 0}, operator.OpRegion, steps...) c.Assert(controller.AddOperator(op), Equals, true) - c.Assert(op.ConfVerChanged(), Equals, 0) + c.Assert(op.ConfVerChanged(region), Equals, 0) c.Assert(len(stream.MsgCh()), Equals, 3) // report region with an abnormal confver region = cluster.MockRegionInfo(1, 1, []uint64{1, 2}, &metapb.RegionEpoch{ConfVer: 1, Version: 0}) controller.Dispatch(region, DispatchFromHeartBeat) - c.Assert(op.ConfVerChanged(), Equals, 0) - // no new step sended + c.Assert(op.ConfVerChanged(region), Equals, 0) + // no new step c.Assert(len(stream.MsgCh()), Equals, 3) } + +func (t *testOperatorControllerSuite) TestDispatchUnfinishedStep(c *C) { + cluster := mockcluster.NewCluster(mockoption.NewScheduleOptions()) + stream := mockhbstream.NewHeartbeatStreams(cluster.ID) + controller := NewOperatorController(cluster, stream) + + // Create a new region with epoch(0, 0) + // the region has two peers with its peer id allocated incrementally. + // so the two peers are {peerid: 1, storeid: 1}, {peerid: 2, storeid: 2} + // The peer on store 1 is the leader + epoch := &metapb.RegionEpoch{ConfVer: 0, Version: 0} + region := cluster.MockRegionInfo(1, 1, []uint64{2}, epoch) + // Put region into cluster, otherwise, AddOperator will fail because of + // missing region + cluster.PutRegion(region) + + // The next allocated peer should have peerid 3, so we add this peer + // to store 3 + steps := []operator.OpStep{ + operator.AddPeer{3, 3}, + } + + // Create an operator + op := operator.NewOperator("test", "test", 1, epoch, + operator.OpRegion, steps...) + c.Assert(controller.AddOperator(op), Equals, true) + c.Assert(len(stream.MsgCh()), Equals, 1) + + // Create region2 witch is cloned from the original region. + // region2 has peer 3 in pending state, so the AddPeer step + // is left unfinished + region2 := region.Clone( + core.WithAddPeer(&metapb.Peer{Id: 3, StoreId: 3}), + core.WithPendingPeers([]*metapb.Peer{ + {Id: 3, StoreId: 3, IsLearner: false}, + }), + core.WithIncConfVer(), + ) + c.Assert(region2.GetPendingPeers(), NotNil) + c.Assert(steps[0].IsFinish(region2), Equals, false) + controller.Dispatch(region2, DispatchFromHeartBeat) + + // In this case, the conf version has been changed, but the + // peer added is in peeding state, the operator should not be + // removed by the stale checker + c.Assert(op.ConfVerChanged(region2), Equals, 1) + c.Assert(controller.GetOperator(1), NotNil) + // The operator is valid yet, but the step should not be sent + // again, because it is in pending state, so the message channel + // should not be increased + c.Assert(len(stream.MsgCh()), Equals, 1) + + // Finish the step by clearing the pending state + region3 := region.Clone( + core.WithAddPeer(&metapb.Peer{Id: 3, StoreId: 3}), + core.WithIncConfVer(), + ) + c.Assert(steps[0].IsFinish(region3), Equals, true) + controller.Dispatch(region3, DispatchFromHeartBeat) + c.Assert(op.ConfVerChanged(region3), Equals, 1) + // The Operator has finished, so no message should be sent + c.Assert(len(stream.MsgCh()), Equals, 1) +}