diff --git a/server/schedule/operator/step.go b/server/schedule/operator/step.go index 80af39b019d..7f2ff956bea 100644 --- a/server/schedule/operator/step.go +++ b/server/schedule/operator/step.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/pd/v4/server/core" "github.com/pingcap/pd/v4/server/schedule/storelimit" + "github.com/pkg/errors" "go.uber.org/zap" ) @@ -30,6 +31,7 @@ type OpStep interface { fmt.Stringer ConfVerChanged(region *core.RegionInfo) bool IsFinish(region *core.RegionInfo) bool + CheckSafety(region *core.RegionInfo) error Influence(opInfluence OpInfluence, region *core.RegionInfo) } @@ -52,6 +54,18 @@ func (tl TransferLeader) IsFinish(region *core.RegionInfo) bool { return region.GetLeader().GetStoreId() == tl.ToStore } +// CheckSafety checks if the step meets the safety properties. +func (tl TransferLeader) CheckSafety(region *core.RegionInfo) error { + peer := region.GetStorePeer(tl.ToStore) + if peer == nil { + return errors.New("peer does not existed") + } + if peer.IsLearner { + return errors.New("peer already is a learner") + } + return nil +} + // Influence calculates the store difference that current step makes. func (tl TransferLeader) Influence(opInfluence OpInfluence, region *core.RegionInfo) { from := opInfluence.GetStoreInfluence(tl.FromStore) @@ -101,6 +115,15 @@ func (ap AddPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) { to.AdjustStepCost(storelimit.RegionAdd, regionSize) } +// CheckSafety checks if the step meets the safety properties. +func (ap AddPeer) CheckSafety(region *core.RegionInfo) error { + peer := region.GetStorePeer(ap.ToStore) + if peer != nil && peer.GetId() != ap.PeerID { + return errors.Errorf("peer %d has already existed in store %d, the operator is trying to add peer %d on the same store", peer.GetId(), ap.ToStore, ap.PeerID) + } + return nil +} + // AddLearner is an OpStep that adds a region learner peer. type AddLearner struct { ToStore, PeerID uint64 @@ -130,6 +153,21 @@ func (al AddLearner) IsFinish(region *core.RegionInfo) bool { return false } +// CheckSafety checks if the step meets the safety properties. +func (al AddLearner) CheckSafety(region *core.RegionInfo) error { + peer := region.GetStorePeer(al.ToStore) + if peer == nil { + return nil + } + if peer.GetId() != al.PeerID { + return errors.Errorf("peer %d has already existed in store %d, the operator is trying to add peer %d on the same store", peer.GetId(), al.ToStore, al.PeerID) + } + if !peer.IsLearner { + return errors.New("peer already is a voter") + } + return nil +} + // Influence calculates the store difference that current step makes. func (al AddLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) { to := opInfluence.GetStoreInfluence(al.ToStore) @@ -168,6 +206,15 @@ func (pl PromoteLearner) IsFinish(region *core.RegionInfo) bool { return false } +// CheckSafety checks if the step meets the safety properties. +func (pl PromoteLearner) CheckSafety(region *core.RegionInfo) error { + peer := region.GetStorePeer(pl.ToStore) + if peer == nil { + return errors.New("peer does not exist") + } + return nil +} + // Influence calculates the store difference that current step makes. func (pl PromoteLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) {} @@ -190,6 +237,14 @@ func (rp RemovePeer) IsFinish(region *core.RegionInfo) bool { return region.GetStorePeer(rp.FromStore) == nil } +// CheckSafety checks if the step meets the safety properties. +func (rp RemovePeer) CheckSafety(region *core.RegionInfo) error { + if rp.FromStore == region.GetLeader().GetStoreId() { + return errors.New("cannot remove leader peer") + } + return nil +} + // Influence calculates the store difference that current step makes. func (rp RemovePeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) { from := opInfluence.GetStoreInfluence(rp.FromStore) @@ -230,6 +285,11 @@ func (mr MergeRegion) IsFinish(region *core.RegionInfo) bool { return false } +// CheckSafety checks if the step meets the safety properties. +func (mr MergeRegion) CheckSafety(region *core.RegionInfo) error { + return nil +} + // Influence calculates the store difference that current step makes. func (mr MergeRegion) Influence(opInfluence OpInfluence, region *core.RegionInfo) { if mr.IsPassive { @@ -275,6 +335,11 @@ func (sr SplitRegion) Influence(opInfluence OpInfluence, region *core.RegionInfo } } +// CheckSafety checks if the step meets the safety properties. +func (sr SplitRegion) CheckSafety(region *core.RegionInfo) error { + return nil +} + // AddLightPeer is an OpStep that adds a region peer without considering the influence. type AddLightPeer struct { ToStore, PeerID uint64 @@ -304,6 +369,15 @@ func (ap AddLightPeer) IsFinish(region *core.RegionInfo) bool { return false } +// CheckSafety checks if the step meets the safety properties. +func (ap AddLightPeer) CheckSafety(region *core.RegionInfo) error { + peer := region.GetStorePeer(ap.ToStore) + if peer != nil && peer.GetId() != ap.PeerID { + return errors.Errorf("peer %d has already existed in store %d, the operator is trying to add peer %d on the same store", peer.GetId(), ap.ToStore, ap.PeerID) + } + return nil +} + // Influence calculates the store difference that current step makes. func (ap AddLightPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) { to := opInfluence.GetStoreInfluence(ap.ToStore) @@ -341,6 +415,21 @@ func (al AddLightLearner) IsFinish(region *core.RegionInfo) bool { return false } +// CheckSafety checks if the step meets the safety properties. +func (al AddLightLearner) CheckSafety(region *core.RegionInfo) error { + peer := region.GetStorePeer(al.ToStore) + if peer == nil { + return nil + } + if peer.GetId() != al.PeerID { + return errors.Errorf("peer %d has already existed in store %d, the operator is trying to add peer %d on the same store", peer.GetId(), al.ToStore, al.PeerID) + } + if !peer.IsLearner { + return errors.New("peer already is a voter") + } + return nil +} + // Influence calculates the store difference that current step makes. func (al AddLightLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) { to := opInfluence.GetStoreInfluence(al.ToStore) diff --git a/server/schedule/operator_controller.go b/server/schedule/operator_controller.go old mode 100755 new mode 100644 index cf4d581fb67..1932edec847 --- a/server/schedule/operator_controller.go +++ b/server/schedule/operator_controller.go @@ -114,7 +114,7 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) { switch op.Status() { case operator.STARTED: operatorCounter.WithLabelValues(op.Desc(), "check").Inc() - if source == DispatchFromHeartBeat && oc.checkStaleOperator(op, region) { + if source == DispatchFromHeartBeat && oc.checkStaleOperator(op, step, region) { return } oc.SendScheduleCommand(region, step, source) @@ -147,7 +147,15 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) { } } -func (oc *OperatorController) checkStaleOperator(op *operator.Operator, region *core.RegionInfo) bool { +func (oc *OperatorController) checkStaleOperator(op *operator.Operator, step operator.OpStep, region *core.RegionInfo) bool { + err := step.CheckSafety(region) + if err != nil { + if oc.RemoveOperator(op, zap.String("reason", err.Error())) { + operatorCounter.WithLabelValues(op.Desc(), "stale").Inc() + oc.PromoteWaitingOperator() + return true + } + } // When the "source" is heartbeat, the region may have a newer // confver than the region that the operator holds. In this case, // the operator is stale, and will not be executed even we would @@ -156,22 +164,18 @@ func (oc *OperatorController) checkStaleOperator(op *operator.Operator, region * latest := region.GetRegionEpoch() changes := latest.GetConfVer() - origin.GetConfVer() if changes > uint64(op.ConfVerChanged(region)) { - - if oc.removeOperatorWithoutBury(op) { - if op.Cancel() { - log.Info("stale operator", - zap.Uint64("region-id", op.RegionID()), - zap.Duration("takes", op.RunningTime()), - zap.Reflect("operator", op), - zap.Reflect("latest-epoch", region.GetRegionEpoch()), - zap.Uint64("diff", changes), - ) - operatorCounter.WithLabelValues(op.Desc(), "stale").Inc() - } + if oc.RemoveOperator( + op, + zap.String("reason", "stale operator, confver does not meet expectations"), + zap.Reflect("latest-epoch", region.GetRegionEpoch()), + zap.Uint64("diff", changes), + ) { + operatorCounter.WithLabelValues(op.Desc(), "stale").Inc() oc.PromoteWaitingOperator() + return true } - return true } + return false } @@ -477,7 +481,7 @@ func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool { } // RemoveOperator removes a operator from the running operators. -func (oc *OperatorController) RemoveOperator(op *operator.Operator) bool { +func (oc *OperatorController) RemoveOperator(op *operator.Operator, extraFileds ...zap.Field) bool { oc.Lock() removed := oc.removeOperatorLocked(op) oc.Unlock() @@ -488,7 +492,7 @@ func (oc *OperatorController) RemoveOperator(op *operator.Operator) bool { zap.Duration("takes", op.RunningTime()), zap.Reflect("operator", op)) } - oc.buryOperator(op) + oc.buryOperator(op, extraFileds...) } return removed } @@ -510,7 +514,7 @@ func (oc *OperatorController) removeOperatorLocked(op *operator.Operator) bool { return false } -func (oc *OperatorController) buryOperator(op *operator.Operator) { +func (oc *OperatorController) buryOperator(op *operator.Operator, extraFileds ...zap.Field) { st := op.Status() if !operator.IsEndStatus(st) { @@ -551,6 +555,17 @@ func (oc *OperatorController) buryOperator(op *operator.Operator) { zap.Duration("takes", op.RunningTime()), zap.Reflect("operator", op)) operatorCounter.WithLabelValues(op.Desc(), "timeout").Inc() + case operator.CANCELED: + fileds := []zap.Field{ + zap.Uint64("region-id", op.RegionID()), + zap.Duration("takes", op.RunningTime()), + zap.Reflect("operator", op), + } + fileds = append(fileds, extraFileds...) + log.Info("operator canceled", + fileds..., + ) + operatorCounter.WithLabelValues(op.Desc(), "cancel").Inc() } oc.opRecords.Put(op) diff --git a/server/schedule/operator_controller_test.go b/server/schedule/operator_controller_test.go index 3e2d3e633c3..c8e7649765a 100644 --- a/server/schedule/operator_controller_test.go +++ b/server/schedule/operator_controller_test.go @@ -130,6 +130,38 @@ func (t *testOperatorControllerSuite) TestOperatorStatus(c *C) { c.Assert(oc.GetOperatorStatus(2).Status, Equals, pdpb.OperatorStatus_SUCCESS) } +func (t *testOperatorControllerSuite) TestFastFailOperator(c *C) { + opt := mockoption.NewScheduleOptions() + tc := mockcluster.NewCluster(opt) + oc := NewOperatorController(t.ctx, tc, mockhbstream.NewHeartbeatStream()) + tc.AddLeaderStore(1, 2) + tc.AddLeaderStore(2, 0) + tc.AddLeaderStore(3, 0) + tc.AddLeaderRegion(1, 1, 2) + steps := []operator.OpStep{ + operator.RemovePeer{FromStore: 2}, + operator.AddPeer{ToStore: 3, PeerID: 4}, + } + op := operator.NewOperator("test", "test", 1, &metapb.RegionEpoch{}, operator.OpRegion, steps...) + region := tc.GetRegion(1) + c.Assert(op.Start(), IsTrue) + oc.SetOperator(op) + oc.Dispatch(region, "test") + c.Assert(oc.GetOperatorStatus(1).Status, Equals, pdpb.OperatorStatus_RUNNING) + // change the leader + region = region.Clone(core.WithLeader(region.GetPeer(2))) + oc.Dispatch(region, DispatchFromHeartBeat) + c.Assert(op.Status(), Equals, operator.CANCELED) + c.Assert(oc.GetOperator(region.GetID()), IsNil) + + // transfer leader to an illegal store. + op = operator.NewOperator("test", "test", 1, &metapb.RegionEpoch{}, operator.OpRegion, operator.TransferLeader{ToStore: 5}) + oc.SetOperator(op) + oc.Dispatch(region, DispatchFromHeartBeat) + c.Assert(op.Status(), Equals, operator.CANCELED) + c.Assert(oc.GetOperator(region.GetID()), IsNil) +} + func (t *testOperatorControllerSuite) TestCheckAddUnexpectedStatus(c *C) { c.Assert(failpoint.Disable("github.com/pingcap/pd/v4/server/schedule/unexpectedOperator"), IsNil) opt := mockoption.NewScheduleOptions()