Skip to content

Commit

Permalink
reduce the rebanlance
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro committed Jun 19, 2020
1 parent 173f966 commit e20e099
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 19 deletions.
5 changes: 3 additions & 2 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,10 @@ func (c *changeFeed) rebanlanceTables(ctx context.Context, captures map[model.Ca
return nil
}
}
timeToRebanlance := time.Since(c.lastRebanlanceTime) > time.Duration(c.info.Config.Scheduler.PollingTime)*time.Minute
timeToRebanlance = timeToRebanlance && c.info.Config.Scheduler.PollingTime > 0

if !c.rebanlanceNextTick &&
time.Since(c.lastRebanlanceTime) < time.Duration(c.info.Config.Scheduler.PollingTime)*time.Minute {
if !c.rebanlanceNextTick && !timeToRebanlance {
return nil
}
c.lastRebanlanceTime = time.Now()
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (p *processor) ddlPullWorker(ctx context.Context) error {
}

func (p *processor) workloadWorker(ctx context.Context) error {
t := time.NewTicker(1 * time.Minute)
t := time.NewTicker(10 * time.Second)
err := p.etcdCli.PutTaskWorkload(ctx, p.changefeedID, p.captureID, nil)
if err != nil {
return errors.Trace(err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ var defaultReplicaConfig = &ReplicaConfig{
Enable: false,
},
Scheduler: &SchedulerConfig{
Tp: "manual",
PollingTime: 5,
Tp: "table-number",
PollingTime: -1,
},
}

Expand Down
6 changes: 2 additions & 4 deletions pkg/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,9 @@ type Scheduler interface {
func NewScheduler(tp string) Scheduler {
switch tp {
case "table-number":
return newTableNumberScheduler(false)
case "manual":
return newTableNumberScheduler(true)
return newTableNumberScheduler()
default:
log.Info("invalid scheduler type, using default scheduler")
return newTableNumberScheduler(true)
return newTableNumberScheduler()
}
}
11 changes: 3 additions & 8 deletions pkg/scheduler/table_number.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@ import "github.com/pingcap/ticdc/cdc/model"

// TableNumberScheduler provides a feature that scheduling by the table number
type TableNumberScheduler struct {
workloads workloads
distributeOnly bool
workloads workloads
}

// newTableNumberScheduler creates a new table number scheduler
func newTableNumberScheduler(distributeOnly bool) *TableNumberScheduler {
func newTableNumberScheduler() *TableNumberScheduler {
return &TableNumberScheduler{
workloads: make(workloads),
distributeOnly: distributeOnly,
workloads: make(workloads),
}
}

Expand All @@ -47,9 +45,6 @@ func (t *TableNumberScheduler) Skewness() float64 {
// CalRebalanceOperates implements the Scheduler interface
func (t *TableNumberScheduler) CalRebalanceOperates(targetSkewness float64) (
skewness float64, moveTableJobs map[model.TableID]*model.MoveTableJob) {
if t.distributeOnly {
return 0, nil
}
var totalTableNumber uint64
for _, captureWorkloads := range t.workloads {
totalTableNumber += uint64(len(captureWorkloads))
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/table_number_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type tableNumberSuite struct{}
var _ = check.Suite(&tableNumberSuite{})

func (s *tableNumberSuite) TestDistributeTables(c *check.C) {
scheduler := newTableNumberScheduler(false)
scheduler := newTableNumberScheduler()
scheduler.ResetWorkloads("capture1", model.TaskWorkload{
1: model.WorkloadInfo{Workload: 1},
2: model.WorkloadInfo{Workload: 1}})
Expand Down Expand Up @@ -58,7 +58,7 @@ func (s *tableNumberSuite) TestDistributeTables(c *check.C) {
}

func (s *tableNumberSuite) TestCalRebalanceOperates(c *check.C) {
scheduler := newTableNumberScheduler(false)
scheduler := newTableNumberScheduler()
scheduler.ResetWorkloads("capture1", model.TaskWorkload{
1: model.WorkloadInfo{Workload: 1},
2: model.WorkloadInfo{Workload: 1}})
Expand Down

0 comments on commit e20e099

Please sign in to comment.