Skip to content

Commit

Permalink
operator: fast-fail if the leader changed to the RemovePeer (#2530) (#…
Browse files Browse the repository at this point in the history
…2551)

Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch authored Jun 18, 2020
1 parent b81f851 commit c459c25
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 18 deletions.
89 changes: 89 additions & 0 deletions server/schedule/operator/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/schedule/storelimit"
"github.com/pkg/errors"
"go.uber.org/zap"
)

Expand All @@ -30,6 +31,7 @@ type OpStep interface {
fmt.Stringer
ConfVerChanged(region *core.RegionInfo) bool
IsFinish(region *core.RegionInfo) bool
CheckSafety(region *core.RegionInfo) error
Influence(opInfluence OpInfluence, region *core.RegionInfo)
}

Expand All @@ -52,6 +54,18 @@ func (tl TransferLeader) IsFinish(region *core.RegionInfo) bool {
return region.GetLeader().GetStoreId() == tl.ToStore
}

// CheckSafety checks if the step meets the safety properties.
func (tl TransferLeader) CheckSafety(region *core.RegionInfo) error {
peer := region.GetStorePeer(tl.ToStore)
if peer == nil {
return errors.New("peer does not existed")
}
if peer.IsLearner {
return errors.New("peer already is a learner")
}
return nil
}

// Influence calculates the store difference that current step makes.
func (tl TransferLeader) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
from := opInfluence.GetStoreInfluence(tl.FromStore)
Expand Down Expand Up @@ -101,6 +115,15 @@ func (ap AddPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
to.AdjustStepCost(storelimit.RegionAdd, regionSize)
}

// CheckSafety checks if the step meets the safety properties.
func (ap AddPeer) CheckSafety(region *core.RegionInfo) error {
peer := region.GetStorePeer(ap.ToStore)
if peer != nil && peer.GetId() != ap.PeerID {
return errors.Errorf("peer %d has already existed in store %d, the operator is trying to add peer %d on the same store", peer.GetId(), ap.ToStore, ap.PeerID)
}
return nil
}

// AddLearner is an OpStep that adds a region learner peer.
type AddLearner struct {
ToStore, PeerID uint64
Expand Down Expand Up @@ -130,6 +153,21 @@ func (al AddLearner) IsFinish(region *core.RegionInfo) bool {
return false
}

// CheckSafety checks if the step meets the safety properties.
func (al AddLearner) CheckSafety(region *core.RegionInfo) error {
peer := region.GetStorePeer(al.ToStore)
if peer == nil {
return nil
}
if peer.GetId() != al.PeerID {
return errors.Errorf("peer %d has already existed in store %d, the operator is trying to add peer %d on the same store", peer.GetId(), al.ToStore, al.PeerID)
}
if !peer.IsLearner {
return errors.New("peer already is a voter")
}
return nil
}

// Influence calculates the store difference that current step makes.
func (al AddLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
to := opInfluence.GetStoreInfluence(al.ToStore)
Expand Down Expand Up @@ -168,6 +206,15 @@ func (pl PromoteLearner) IsFinish(region *core.RegionInfo) bool {
return false
}

// CheckSafety checks if the step meets the safety properties.
func (pl PromoteLearner) CheckSafety(region *core.RegionInfo) error {
peer := region.GetStorePeer(pl.ToStore)
if peer == nil {
return errors.New("peer does not exist")
}
return nil
}

// Influence calculates the store difference that current step makes.
func (pl PromoteLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) {}

Expand All @@ -190,6 +237,14 @@ func (rp RemovePeer) IsFinish(region *core.RegionInfo) bool {
return region.GetStorePeer(rp.FromStore) == nil
}

// CheckSafety checks if the step meets the safety properties.
func (rp RemovePeer) CheckSafety(region *core.RegionInfo) error {
if rp.FromStore == region.GetLeader().GetStoreId() {
return errors.New("cannot remove leader peer")
}
return nil
}

// Influence calculates the store difference that current step makes.
func (rp RemovePeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
from := opInfluence.GetStoreInfluence(rp.FromStore)
Expand Down Expand Up @@ -230,6 +285,11 @@ func (mr MergeRegion) IsFinish(region *core.RegionInfo) bool {
return false
}

// CheckSafety checks if the step meets the safety properties.
func (mr MergeRegion) CheckSafety(region *core.RegionInfo) error {
return nil
}

// Influence calculates the store difference that current step makes.
func (mr MergeRegion) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
if mr.IsPassive {
Expand Down Expand Up @@ -275,6 +335,11 @@ func (sr SplitRegion) Influence(opInfluence OpInfluence, region *core.RegionInfo
}
}

// CheckSafety checks if the step meets the safety properties.
func (sr SplitRegion) CheckSafety(region *core.RegionInfo) error {
return nil
}

// AddLightPeer is an OpStep that adds a region peer without considering the influence.
type AddLightPeer struct {
ToStore, PeerID uint64
Expand Down Expand Up @@ -304,6 +369,15 @@ func (ap AddLightPeer) IsFinish(region *core.RegionInfo) bool {
return false
}

// CheckSafety checks if the step meets the safety properties.
func (ap AddLightPeer) CheckSafety(region *core.RegionInfo) error {
peer := region.GetStorePeer(ap.ToStore)
if peer != nil && peer.GetId() != ap.PeerID {
return errors.Errorf("peer %d has already existed in store %d, the operator is trying to add peer %d on the same store", peer.GetId(), ap.ToStore, ap.PeerID)
}
return nil
}

// Influence calculates the store difference that current step makes.
func (ap AddLightPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
to := opInfluence.GetStoreInfluence(ap.ToStore)
Expand Down Expand Up @@ -341,6 +415,21 @@ func (al AddLightLearner) IsFinish(region *core.RegionInfo) bool {
return false
}

// CheckSafety checks if the step meets the safety properties.
func (al AddLightLearner) CheckSafety(region *core.RegionInfo) error {
peer := region.GetStorePeer(al.ToStore)
if peer == nil {
return nil
}
if peer.GetId() != al.PeerID {
return errors.Errorf("peer %d has already existed in store %d, the operator is trying to add peer %d on the same store", peer.GetId(), al.ToStore, al.PeerID)
}
if !peer.IsLearner {
return errors.New("peer already is a voter")
}
return nil
}

// Influence calculates the store difference that current step makes.
func (al AddLightLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
to := opInfluence.GetStoreInfluence(al.ToStore)
Expand Down
51 changes: 33 additions & 18 deletions server/schedule/operator_controller.go
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) {
switch op.Status() {
case operator.STARTED:
operatorCounter.WithLabelValues(op.Desc(), "check").Inc()
if source == DispatchFromHeartBeat && oc.checkStaleOperator(op, region) {
if source == DispatchFromHeartBeat && oc.checkStaleOperator(op, step, region) {
return
}
oc.SendScheduleCommand(region, step, source)
Expand Down Expand Up @@ -147,7 +147,15 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) {
}
}

func (oc *OperatorController) checkStaleOperator(op *operator.Operator, region *core.RegionInfo) bool {
func (oc *OperatorController) checkStaleOperator(op *operator.Operator, step operator.OpStep, region *core.RegionInfo) bool {
err := step.CheckSafety(region)
if err != nil {
if oc.RemoveOperator(op, zap.String("reason", err.Error())) {
operatorCounter.WithLabelValues(op.Desc(), "stale").Inc()
oc.PromoteWaitingOperator()
return true
}
}
// When the "source" is heartbeat, the region may have a newer
// confver than the region that the operator holds. In this case,
// the operator is stale, and will not be executed even we would
Expand All @@ -156,22 +164,18 @@ func (oc *OperatorController) checkStaleOperator(op *operator.Operator, region *
latest := region.GetRegionEpoch()
changes := latest.GetConfVer() - origin.GetConfVer()
if changes > uint64(op.ConfVerChanged(region)) {

if oc.removeOperatorWithoutBury(op) {
if op.Cancel() {
log.Info("stale operator",
zap.Uint64("region-id", op.RegionID()),
zap.Duration("takes", op.RunningTime()),
zap.Reflect("operator", op),
zap.Reflect("latest-epoch", region.GetRegionEpoch()),
zap.Uint64("diff", changes),
)
operatorCounter.WithLabelValues(op.Desc(), "stale").Inc()
}
if oc.RemoveOperator(
op,
zap.String("reason", "stale operator, confver does not meet expectations"),
zap.Reflect("latest-epoch", region.GetRegionEpoch()),
zap.Uint64("diff", changes),
) {
operatorCounter.WithLabelValues(op.Desc(), "stale").Inc()
oc.PromoteWaitingOperator()
return true
}
return true
}

return false
}

Expand Down Expand Up @@ -477,7 +481,7 @@ func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool {
}

// RemoveOperator removes a operator from the running operators.
func (oc *OperatorController) RemoveOperator(op *operator.Operator) bool {
func (oc *OperatorController) RemoveOperator(op *operator.Operator, extraFileds ...zap.Field) bool {
oc.Lock()
removed := oc.removeOperatorLocked(op)
oc.Unlock()
Expand All @@ -488,7 +492,7 @@ func (oc *OperatorController) RemoveOperator(op *operator.Operator) bool {
zap.Duration("takes", op.RunningTime()),
zap.Reflect("operator", op))
}
oc.buryOperator(op)
oc.buryOperator(op, extraFileds...)
}
return removed
}
Expand All @@ -510,7 +514,7 @@ func (oc *OperatorController) removeOperatorLocked(op *operator.Operator) bool {
return false
}

func (oc *OperatorController) buryOperator(op *operator.Operator) {
func (oc *OperatorController) buryOperator(op *operator.Operator, extraFileds ...zap.Field) {
st := op.Status()

if !operator.IsEndStatus(st) {
Expand Down Expand Up @@ -551,6 +555,17 @@ func (oc *OperatorController) buryOperator(op *operator.Operator) {
zap.Duration("takes", op.RunningTime()),
zap.Reflect("operator", op))
operatorCounter.WithLabelValues(op.Desc(), "timeout").Inc()
case operator.CANCELED:
fileds := []zap.Field{
zap.Uint64("region-id", op.RegionID()),
zap.Duration("takes", op.RunningTime()),
zap.Reflect("operator", op),
}
fileds = append(fileds, extraFileds...)
log.Info("operator canceled",
fileds...,
)
operatorCounter.WithLabelValues(op.Desc(), "cancel").Inc()
}

oc.opRecords.Put(op)
Expand Down
32 changes: 32 additions & 0 deletions server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,38 @@ func (t *testOperatorControllerSuite) TestOperatorStatus(c *C) {
c.Assert(oc.GetOperatorStatus(2).Status, Equals, pdpb.OperatorStatus_SUCCESS)
}

func (t *testOperatorControllerSuite) TestFastFailOperator(c *C) {
opt := mockoption.NewScheduleOptions()
tc := mockcluster.NewCluster(opt)
oc := NewOperatorController(t.ctx, tc, mockhbstream.NewHeartbeatStream())
tc.AddLeaderStore(1, 2)
tc.AddLeaderStore(2, 0)
tc.AddLeaderStore(3, 0)
tc.AddLeaderRegion(1, 1, 2)
steps := []operator.OpStep{
operator.RemovePeer{FromStore: 2},
operator.AddPeer{ToStore: 3, PeerID: 4},
}
op := operator.NewOperator("test", "test", 1, &metapb.RegionEpoch{}, operator.OpRegion, steps...)
region := tc.GetRegion(1)
c.Assert(op.Start(), IsTrue)
oc.SetOperator(op)
oc.Dispatch(region, "test")
c.Assert(oc.GetOperatorStatus(1).Status, Equals, pdpb.OperatorStatus_RUNNING)
// change the leader
region = region.Clone(core.WithLeader(region.GetPeer(2)))
oc.Dispatch(region, DispatchFromHeartBeat)
c.Assert(op.Status(), Equals, operator.CANCELED)
c.Assert(oc.GetOperator(region.GetID()), IsNil)

// transfer leader to an illegal store.
op = operator.NewOperator("test", "test", 1, &metapb.RegionEpoch{}, operator.OpRegion, operator.TransferLeader{ToStore: 5})
oc.SetOperator(op)
oc.Dispatch(region, DispatchFromHeartBeat)
c.Assert(op.Status(), Equals, operator.CANCELED)
c.Assert(oc.GetOperator(region.GetID()), IsNil)
}

func (t *testOperatorControllerSuite) TestCheckAddUnexpectedStatus(c *C) {
c.Assert(failpoint.Disable("github.com/pingcap/pd/v4/server/schedule/unexpectedOperator"), IsNil)
opt := mockoption.NewScheduleOptions()
Expand Down

0 comments on commit c459c25

Please sign in to comment.