Skip to content

Commit

Permalink
schedule: transfer leader in batch when do eviction (#2081)
Browse files Browse the repository at this point in the history
Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>
  • Loading branch information
shafreeck authored Feb 20, 2020
1 parent 0d5bce8 commit 0f32cc4
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 55 deletions.
3 changes: 2 additions & 1 deletion server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,8 @@ func (c *coordinator) runScheduler(s *scheduleController) {
continue
}
if op := s.Schedule(); op != nil {
c.opController.AddWaitingOperator(op...)
added := c.opController.AddWaitingOperator(op...)
log.Debug("add operator", zap.Int("added", added), zap.Int("total", len(op)), zap.String("scheduler", s.GetName()))
}

case <-s.Ctx().Done():
Expand Down
40 changes: 20 additions & 20 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,11 @@ func prepare(setCfg func(*config.ScheduleConfig), setTc func(*testCluster), run
}
}

func (s *testCoordinatorSuite) checkRegion(c *C, tc *testCluster, co *coordinator, regionID uint64, expectCheckerIsBusy, expectAddOperator bool) {
func (s *testCoordinatorSuite) checkRegion(c *C, tc *testCluster, co *coordinator, regionID uint64, expectCheckerIsBusy bool, expectAddOperator int) {
checkerIsBusy, ops := co.checkers.CheckRegion(tc.GetRegion(regionID))
c.Assert(checkerIsBusy, Equals, expectCheckerIsBusy)
if ops == nil {
c.Assert(expectAddOperator, IsFalse)
c.Assert(expectAddOperator, Equals, 0)
} else {
c.Assert(co.opController.AddWaitingOperator(ops...), Equals, expectAddOperator)
}
Expand All @@ -318,10 +318,10 @@ func (s *testCoordinatorSuite) TestCheckRegion(c *C) {
c.Assert(tc.addRegionStore(2, 2), IsNil)
c.Assert(tc.addRegionStore(1, 1), IsNil)
c.Assert(tc.addLeaderRegion(1, 2, 3), IsNil)
s.checkRegion(c, tc, co, 1, false, true)
s.checkRegion(c, tc, co, 1, false, 1)
waitOperator(c, co, 1)
testutil.CheckAddPeer(c, co.opController.GetOperator(1), operator.OpReplica, 1)
s.checkRegion(c, tc, co, 1, false, false)
s.checkRegion(c, tc, co, 1, false, 0)

r := tc.GetRegion(1)
p := &metapb.Peer{Id: 1, StoreId: 1, IsLearner: true}
Expand All @@ -330,7 +330,7 @@ func (s *testCoordinatorSuite) TestCheckRegion(c *C) {
core.WithPendingPeers(append(r.GetPendingPeers(), p)),
)
c.Assert(tc.putRegion(r), IsNil)
s.checkRegion(c, tc, co, 1, false, false)
s.checkRegion(c, tc, co, 1, false, 0)
co.stop()
co.wg.Wait()

Expand All @@ -343,15 +343,15 @@ func (s *testCoordinatorSuite) TestCheckRegion(c *C) {
c.Assert(tc.addRegionStore(2, 2), IsNil)
c.Assert(tc.addRegionStore(1, 1), IsNil)
c.Assert(tc.putRegion(r), IsNil)
s.checkRegion(c, tc, co, 1, false, false)
s.checkRegion(c, tc, co, 1, false, 0)
r = r.Clone(core.WithPendingPeers(nil))
c.Assert(tc.putRegion(r), IsNil)
s.checkRegion(c, tc, co, 1, false, true)
s.checkRegion(c, tc, co, 1, false, 1)
waitOperator(c, co, 1)
op := co.opController.GetOperator(1)
c.Assert(op.Len(), Equals, 1)
c.Assert(op.Step(0).(operator.PromoteLearner).ToStore, Equals, uint64(1))
s.checkRegion(c, tc, co, 1, false, false)
s.checkRegion(c, tc, co, 1, false, 0)
}

func (s *testCoordinatorSuite) TestCheckerIsBusy(c *C) {
Expand All @@ -373,18 +373,18 @@ func (s *testCoordinatorSuite) TestCheckerIsBusy(c *C) {
switch operatorKind {
case operator.OpReplica:
op := newTestOperator(regionID, tc.GetRegion(regionID).GetRegionEpoch(), operatorKind)
c.Assert(co.opController.AddWaitingOperator(op), IsTrue)
c.Assert(co.opController.AddWaitingOperator(op), Equals, 1)
case operator.OpRegion | operator.OpMerge:
if regionID%2 == 1 {
ops, err := operator.CreateMergeRegionOperator("merge-region", co.cluster, tc.GetRegion(regionID), tc.GetRegion(regionID-1), operator.OpMerge)
c.Assert(err, IsNil)
c.Assert(co.opController.AddWaitingOperator(ops...), IsTrue)
c.Assert(co.opController.AddWaitingOperator(ops...), Equals, len(ops))
}
}

}
}
s.checkRegion(c, tc, co, num, true, false)
s.checkRegion(c, tc, co, num, true, 0)
}

func (s *testCoordinatorSuite) TestReplica(c *C) {
Expand Down Expand Up @@ -1034,11 +1034,11 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
{
c.Assert(sc.AllowSchedule(), IsTrue)
op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader)
c.Assert(oc.AddWaitingOperator(op1), IsTrue)
c.Assert(oc.AddWaitingOperator(op1), Equals, 1)
// count = 1
c.Assert(sc.AllowSchedule(), IsTrue)
op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpLeader)
c.Assert(oc.AddWaitingOperator(op2), IsTrue)
c.Assert(oc.AddWaitingOperator(op2), Equals, 1)
// count = 2
c.Assert(sc.AllowSchedule(), IsFalse)
c.Assert(oc.RemoveOperator(op1), IsTrue)
Expand All @@ -1051,29 +1051,29 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
{
op3 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpHotRegion)
op3.SetPriorityLevel(core.HighPriority)
c.Assert(oc.AddWaitingOperator(op11), IsTrue)
c.Assert(oc.AddWaitingOperator(op11), Equals, 1)
c.Assert(sc.AllowSchedule(), IsFalse)
c.Assert(oc.AddWaitingOperator(op3), IsTrue)
c.Assert(oc.AddWaitingOperator(op3), Equals, 1)
c.Assert(sc.AllowSchedule(), IsTrue)
c.Assert(oc.RemoveOperator(op3), IsTrue)
}

// add a admin operator will remove old operator
{
op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpLeader)
c.Assert(oc.AddWaitingOperator(op2), IsTrue)
c.Assert(oc.AddWaitingOperator(op2), Equals, 1)
c.Assert(sc.AllowSchedule(), IsFalse)
op4 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpAdmin)
op4.SetPriorityLevel(core.HighPriority)
c.Assert(oc.AddWaitingOperator(op4), IsTrue)
c.Assert(oc.AddWaitingOperator(op4), Equals, 1)
c.Assert(sc.AllowSchedule(), IsTrue)
c.Assert(oc.RemoveOperator(op4), IsTrue)
}

// test wrong region id.
{
op5 := newTestOperator(3, &metapb.RegionEpoch{}, operator.OpHotRegion)
c.Assert(oc.AddWaitingOperator(op5), IsFalse)
c.Assert(oc.AddWaitingOperator(op5), Equals, 0)
}

// test wrong region epoch.
Expand All @@ -1084,12 +1084,12 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
}
{
op6 := newTestOperator(1, epoch, operator.OpLeader)
c.Assert(oc.AddWaitingOperator(op6), IsFalse)
c.Assert(oc.AddWaitingOperator(op6), Equals, 0)
}
epoch.Version--
{
op6 := newTestOperator(1, epoch, operator.OpLeader)
c.Assert(oc.AddWaitingOperator(op6), IsTrue)
c.Assert(oc.AddWaitingOperator(op6), Equals, 1)
c.Assert(oc.RemoveOperator(op6), IsTrue)
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ const (
// defaultHotRegionCacheHitsThreshold is the low hit number threshold of the
// hot region.
defaultHotRegionCacheHitsThreshold = 3
defaultSchedulerMaxWaitingOperator = 3
defaultSchedulerMaxWaitingOperator = 5
defaultLeaderSchedulePolicy = "count"
defaultStoreLimitMode = "manual"
)
Expand Down
81 changes: 55 additions & 26 deletions server/schedule/operator_controller.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -242,40 +242,57 @@ func (oc *OperatorController) PushOperators() {
}

// AddWaitingOperator adds operators to waiting operators.
func (oc *OperatorController) AddWaitingOperator(ops ...*operator.Operator) bool {
func (oc *OperatorController) AddWaitingOperator(ops ...*operator.Operator) int {
oc.Lock()

if !oc.checkAddOperator(ops...) {
for _, op := range ops {
operatorWaitCounter.WithLabelValues(op.Desc(), "add_canceled").Inc()
_ = op.Cancel()
oc.buryOperator(op)
added := 0

for i := 0; i < len(ops); i++ {
op := ops[i]
desc := op.Desc()
isMerge := false
if op.Kind()&operator.OpMerge != 0 {
if i+1 >= len(ops) {
// should not be here forever
log.Error("orphan merge operators found", zap.String("desc", desc))
oc.Unlock()
return added
}
if ops[i+1].Kind()&operator.OpMerge == 0 {
log.Error("merge operator should be paired", zap.String("desc",
ops[i+1].Desc()))
oc.Unlock()
return added
}
isMerge = true
}
oc.Unlock()
return false
}

op := ops[0]
desc := op.Desc()
if oc.wopStatus.ops[desc] >= oc.cluster.GetSchedulerMaxWaitingOperator() {
operatorWaitCounter.WithLabelValues(op.Desc(), "exceed_max").Inc()
for _, op := range ops {
if !oc.checkAddOperator(op) {
_ = op.Cancel()
oc.buryOperator(op)
if isMerge {
// Merge operation have two operators, cancel them all
next := ops[i+1]
_ = next.Cancel()
oc.buryOperator(next)
}
oc.Unlock()
return added
}
oc.Unlock()
return false
}
oc.wop.PutOperator(op)
operatorWaitCounter.WithLabelValues(op.Desc(), "put").Inc()
// This step is especially for the merge operation.
if len(ops) > 1 {
oc.wop.PutOperator(ops[1])
oc.wop.PutOperator(op)
if isMerge {
// count two merge operators as one, so wopStatus.ops[desc] should
// not be updated here
i++
added++
oc.wop.PutOperator(ops[i])
}
operatorWaitCounter.WithLabelValues(desc, "put").Inc()
oc.wopStatus.ops[desc]++
added++
}
oc.wopStatus.ops[desc]++

oc.Unlock()
oc.PromoteWaitingOperator()
return true
return added
}

// AddOperator adds operators to the running operators.
Expand Down Expand Up @@ -305,6 +322,7 @@ func (oc *OperatorController) PromoteWaitingOperator() {
defer oc.Unlock()
var ops []*operator.Operator
for {
// GetOperator returns one operator or two merge operators
ops = oc.wop.GetOperator()
if ops == nil {
return
Expand Down Expand Up @@ -336,13 +354,15 @@ func (oc *OperatorController) PromoteWaitingOperator() {
// - There is no such region in the cluster
// - The epoch of the operator and the epoch of the corresponding region are no longer consistent.
// - The region already has a higher priority or same priority operator.
// - Exceed the max number of waiting operators
// - At least one operator is expired.
func (oc *OperatorController) checkAddOperator(ops ...*operator.Operator) bool {
for _, op := range ops {
region := oc.cluster.GetRegion(op.RegionID())
if region == nil {
log.Debug("region not found, cancel add operator",
zap.Uint64("region-id", op.RegionID()))
operatorWaitCounter.WithLabelValues(op.Desc(), "add_canceled").Inc()
return false
}
if region.GetRegionEpoch().GetVersion() != op.RegionEpoch().GetVersion() ||
Expand All @@ -351,12 +371,14 @@ func (oc *OperatorController) checkAddOperator(ops ...*operator.Operator) bool {
zap.Uint64("region-id", op.RegionID()),
zap.Reflect("old", region.GetRegionEpoch()),
zap.Reflect("new", op.RegionEpoch()))
operatorWaitCounter.WithLabelValues(op.Desc(), "add_canceled").Inc()
return false
}
if old := oc.operators[op.RegionID()]; old != nil && !isHigherPriorityOperator(op, old) {
log.Debug("already have operator, cancel add operator",
zap.Uint64("region-id", op.RegionID()),
zap.Reflect("old", old))
operatorWaitCounter.WithLabelValues(op.Desc(), "add_canceled").Inc()
return false
}
if op.Status() != operator.CREATED {
Expand All @@ -367,13 +389,20 @@ func (oc *OperatorController) checkAddOperator(ops ...*operator.Operator) bool {
failpoint.Inject("unexpectedOperator", func() {
panic(op)
})
operatorWaitCounter.WithLabelValues(op.Desc(), "add_canceled").Inc()
return false
}
if oc.wopStatus.ops[op.Desc()] >= oc.cluster.GetSchedulerMaxWaitingOperator() {
log.Debug("exceed_max return false", zap.Uint64("waiting", oc.wopStatus.ops[op.Desc()]), zap.String("desc", op.Desc()), zap.Uint64("max", oc.cluster.GetSchedulerMaxWaitingOperator()))
operatorWaitCounter.WithLabelValues(op.Desc(), "exceed_max").Inc()
return false
}
}
expired := false
for _, op := range ops {
if op.CheckExpired() {
expired = true
operatorWaitCounter.WithLabelValues(op.Desc(), "add_canceled").Inc()
}
}
return !expired
Expand Down
40 changes: 40 additions & 0 deletions server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,3 +637,43 @@ func checkRemoveOperatorSuccess(c *C, oc *OperatorController, op *operator.Opera
c.Assert(op.IsEnd(), IsTrue)
c.Assert(oc.GetOperatorStatus(op.RegionID()).Op, DeepEquals, op)
}

func (t *testOperatorControllerSuite) TestAddWaitingOperator(c *C) {
cluster := mockcluster.NewCluster(mockoption.NewScheduleOptions())
stream := mockhbstream.NewHeartbeatStreams(cluster.ID, true /* no need to run */)
controller := NewOperatorController(t.ctx, cluster, stream)

addPeerOp := func(i uint64) *operator.Operator {
start := fmt.Sprintf("%da", i)
end := fmt.Sprintf("%db", i)
region := newRegionInfo(i, start, end, 1, 1, []uint64{101, 1}, []uint64{101, 1})
cluster.PutRegion(region)
peer := &metapb.Peer{
StoreId: 2,
}
op, err := operator.CreateAddPeerOperator("add-peer", cluster, region, peer, operator.OpBalance)
c.Assert(err, IsNil)
c.Assert(op, NotNil)
return op
}

// a batch of operators should be added atomiclly
var batch []*operator.Operator
for i := uint64(0); i < cluster.GetSchedulerMaxWaitingOperator()-1; i++ {
batch = append(batch, addPeerOp(i))
}
added := controller.AddWaitingOperator(batch...)
c.Assert(added, Equals, int(cluster.GetSchedulerMaxWaitingOperator()-1))

source := newRegionInfo(1, "1a", "1b", 1, 1, []uint64{101, 1}, []uint64{101, 1})
target := newRegionInfo(0, "0a", "0b", 1, 1, []uint64{101, 1}, []uint64{101, 1})
// now there is one operator being allowed to add, if it is a merge operator
// both of the pair are allowed
ops, err := operator.CreateMergeRegionOperator("merge-region", cluster, source, target, operator.OpMerge)
c.Assert(err, IsNil)
c.Assert(len(ops), Equals, 2)
c.Assert(controller.AddWaitingOperator(ops...), Equals, 2)

// no space left, new operator can not be added.
c.Assert(controller.AddWaitingOperator(addPeerOp(0)), Equals, 0)
}
Loading

0 comments on commit 0f32cc4

Please sign in to comment.