Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule: support patrol region concurrency #8094

Merged
merged 48 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
d1f4b8a
checker: add patrol region concurrency
lhy1024 Apr 22, 2024
6d0fbd4
speedup drain
lhy1024 Apr 22, 2024
9b43d61
fix config
lhy1024 Apr 22, 2024
b2b4f39
make config
lhy1024 Apr 22, 2024
cb285f6
update
lhy1024 May 16, 2024
2e14405
Merge branch 'master' of github.com:tikv/pd into patrol-concurrency-2
lhy1024 May 16, 2024
351ef5c
fix race
lhy1024 May 23, 2024
c198b08
Merge branch 'master' of github.com:tikv/pd into patrol-concurrency-2
lhy1024 May 23, 2024
a0ec33d
fix test
lhy1024 May 24, 2024
ab9ef1e
remove batch limit config
lhy1024 May 24, 2024
97a40a9
address comments
lhy1024 May 27, 2024
438efce
address comments
lhy1024 May 27, 2024
c59b47c
address comments
lhy1024 May 28, 2024
9f57397
Merge branch 'master' into patrol-concurrency
lhy1024 Jun 3, 2024
bbc1362
refactor and add patrol region context
lhy1024 Jun 3, 2024
b0eab80
address comments
lhy1024 Jun 3, 2024
5c442a3
add config test
lhy1024 Jun 4, 2024
0d02d8b
add more tests
lhy1024 Jun 4, 2024
9896228
Merge branch 'master' of github.com:tikv/pd into patrol-concurrency
lhy1024 Jun 12, 2024
a638cce
address comments
lhy1024 Jun 13, 2024
6147373
add some test to cover branches
lhy1024 Jun 18, 2024
2a86197
Merge branch 'master' into patrol-concurrency
lhy1024 Jun 25, 2024
82785c2
address comments
lhy1024 Jul 1, 2024
a21ef83
address comments
lhy1024 Jul 1, 2024
cd1cd8b
address comments
lhy1024 Jul 4, 2024
5668d98
address comments
lhy1024 Jul 4, 2024
78e3ba5
Merge branch 'master' into patrol-concurrency
lhy1024 Jul 4, 2024
cf01076
refactor
lhy1024 Jul 4, 2024
cc51a2e
fix test and add metrics
lhy1024 Jul 4, 2024
9f7406a
fix failpoint
lhy1024 Jul 4, 2024
bd4ca79
Merge branch 'master' of github.com:tikv/pd into patrol-concurrency-4
lhy1024 Jul 16, 2024
ae0778f
fix conflict
lhy1024 Jul 16, 2024
ecb8d8b
Merge branch 'master' of github.com:tikv/pd into patrol-concurrency
lhy1024 Jul 22, 2024
8259ad0
Merge branch 'master' of github.com:tikv/pd into patrol-concurrency
lhy1024 Aug 1, 2024
18db300
fix
lhy1024 Aug 1, 2024
5ead31e
Merge branch 'master' of github.com:tikv/pd into patrol-concurrency
lhy1024 Aug 8, 2024
6bdb436
fix lint
lhy1024 Aug 8, 2024
40a2e02
Merge branch 'master' into patrol-concurrency
lhy1024 Aug 14, 2024
8cd8825
address comments
lhy1024 Aug 29, 2024
bcd5018
address comments
lhy1024 Sep 24, 2024
acb4244
Merge branch 'master' of github.com:tikv/pd into patrol-concurrency
lhy1024 Sep 24, 2024
457da3d
avoid potential data race
lhy1024 Sep 24, 2024
64abc3c
address comments: remove sleep in failpoint
lhy1024 Sep 25, 2024
e570e3c
fix lint
lhy1024 Sep 29, 2024
74d4fdc
Merge branch 'master' into patrol-concurrency
ti-chi-bot[bot] Oct 29, 2024
aeabc52
Merge branch 'master' into patrol-concurrency
ti-chi-bot[bot] Oct 29, 2024
7e5813d
fix lint and make test stable
lhy1024 Oct 30, 2024
240f902
Merge branch 'master' into patrol-concurrency
ti-chi-bot[bot] Oct 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,23 +371,23 @@ func TestPriorityQueue(t *testing.T) {
pq.Remove(uint64(1))
re.Nil(pq.Get(1))
re.Equal(2, pq.Len())
entry := pq.Peek()
entry := pq.peek()
re.Equal(2, entry.Priority)
re.Equal(testData[2], entry.Value)

// case3 update 3's priority to highest
pq.Put(-1, testData[3])
entry = pq.Peek()
entry = pq.peek()
re.Equal(-1, entry.Priority)
re.Equal(testData[3], entry.Value)
pq.Remove(entry.Value.ID())
re.Equal(testData[2], pq.Peek().Value)
re.Equal(testData[2], pq.peek().Value)
re.Equal(1, pq.Len())

// case4 remove all element
pq.Remove(uint64(2))
re.Equal(0, pq.Len())
re.Empty(pq.items)
re.Nil(pq.Peek())
re.Nil(pq.Tail())
re.Nil(pq.peek())
re.Nil(pq.tail())
}
21 changes: 17 additions & 4 deletions pkg/cache/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package cache

import (
"github.com/tikv/pd/pkg/btree"
"github.com/tikv/pd/pkg/utils/syncutil"
)

// defaultDegree default btree degree, the depth is h<log(degree)(capacity+1)/2
Expand All @@ -26,6 +27,7 @@ type PriorityQueue struct {
items map[uint64]*Entry
btree *btree.BTreeG[*Entry]
capacity int
mutex syncutil.RWMutex
}

// NewPriorityQueue construct of priority queue
Expand All @@ -44,6 +46,8 @@ type PriorityQueueItem interface {

// Put put value with priority into queue
func (pq *PriorityQueue) Put(priority int, value PriorityQueueItem) bool {
pq.mutex.Lock()
defer pq.mutex.Unlock()
id := value.ID()
entry, ok := pq.items[id]
if !ok {
Expand All @@ -54,7 +58,9 @@ func (pq *PriorityQueue) Put(priority int, value PriorityQueueItem) bool {
if !found || !min.Less(entry) {
return false
}
pq.mutex.Unlock()
pq.Remove(min.Value.ID())
pq.mutex.Lock()
}
} else if entry.Priority != priority { // delete before update
pq.btree.Delete(entry)
Expand All @@ -68,19 +74,22 @@ func (pq *PriorityQueue) Put(priority int, value PriorityQueueItem) bool {

// Get find entry by id from queue
func (pq *PriorityQueue) Get(id uint64) *Entry {
pq.mutex.RLock()
defer pq.mutex.RUnlock()
return pq.items[id]
}

// Peek return the highest priority entry
func (pq *PriorityQueue) Peek() *Entry {
// peek return the highest priority entry
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
// It is used test only
func (pq *PriorityQueue) peek() *Entry {
if max, ok := pq.btree.Max(); ok {
return max
}
return nil
}

// Tail return the lowest priority entry
func (pq *PriorityQueue) Tail() *Entry {
// tail return the lowest priority entry
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
func (pq *PriorityQueue) tail() *Entry {
if min, ok := pq.btree.Min(); ok {
return min
}
Expand All @@ -89,6 +98,8 @@ func (pq *PriorityQueue) Tail() *Entry {

// Elems return all elements in queue
func (pq *PriorityQueue) Elems() []*Entry {
pq.mutex.RLock()
defer pq.mutex.RUnlock()
rs := make([]*Entry, pq.Len())
count := 0
pq.btree.Descend(func(i *Entry) bool {
Expand All @@ -101,6 +112,8 @@ func (pq *PriorityQueue) Elems() []*Entry {

// Remove remove value from queue
func (pq *PriorityQueue) Remove(id uint64) {
pq.mutex.Lock()
defer pq.mutex.Unlock()
if v, ok := pq.items[id]; ok {
pq.btree.Delete(v)
delete(pq.items, id)
Expand Down
10 changes: 10 additions & 0 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,16 @@ func (o *PersistConfig) GetHotRegionCacheHitsThreshold() int {
return int(o.GetScheduleConfig().HotRegionCacheHitsThreshold)
}

// GetPatrolRegionConcurrency returns the worker count of the patrol.
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
func (o *PersistConfig) GetPatrolRegionConcurrency() int {
return int(o.GetScheduleConfig().PatrolRegionConcurrency)
}

// GetPatrolRegionBatchLimit returns the region count of the patrol.
func (o *PersistConfig) GetPatrolRegionBatchLimit() int {
return int(o.GetScheduleConfig().PatrolRegionBatchLimit)
}

// GetMaxMovableHotPeerSize returns the max movable hot peer size.
func (o *PersistConfig) GetMaxMovableHotPeerSize() int64 {
return o.GetScheduleConfig().MaxMovableHotPeerSize
Expand Down
15 changes: 15 additions & 0 deletions pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ const (
defaultRegionScoreFormulaVersion = "v2"
defaultLeaderSchedulePolicy = "count"
defaultStoreLimitVersion = "v1"
defaultPatrolRegionConcurrency = 1
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
defaultPatrolRegionBatchLimit = 128
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can use max(128,region_count/1024)

// DefaultSplitMergeInterval is the default value of config split merge interval.
DefaultSplitMergeInterval = time.Hour
defaultSwitchWitnessInterval = time.Hour
Expand Down Expand Up @@ -305,6 +307,12 @@ type ScheduleConfig struct {
// HaltScheduling is the option to halt the scheduling. Once it's on, PD will halt the scheduling,
// and any other scheduling configs will be ignored.
HaltScheduling bool `toml:"halt-scheduling" json:"halt-scheduling,string,omitempty"`

// PatrolRegionConcurrency is the number of workers to patrol region.
PatrolRegionConcurrency uint64 `toml:"patrol-worker-count" json:"patrol-worker-count"`
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved

// PatrolRegionBatchLimit is the number of regions to patrol in one batch.
PatrolRegionBatchLimit uint64 `toml:"patrol-region-batch-limit" json:"patrol-region-batch-limit"`
}

// Clone returns a cloned scheduling configuration.
Expand Down Expand Up @@ -374,6 +382,13 @@ func (c *ScheduleConfig) Adjust(meta *configutil.ConfigMetaData, reloading bool)
if !meta.IsDefined("store-limit-version") {
configutil.AdjustString(&c.StoreLimitVersion, defaultStoreLimitVersion)
}
if !meta.IsDefined("patrol-worker-count") {
configutil.AdjustUint64(&c.PatrolRegionConcurrency, defaultPatrolRegionConcurrency)
}

if !meta.IsDefined("patrol-region-batch-limit") {
configutil.AdjustUint64(&c.PatrolRegionBatchLimit, defaultPatrolRegionBatchLimit)
}

if !meta.IsDefined("enable-joint-consensus") {
c.EnableJointConsensus = defaultEnableJointConsensus
Expand Down
4 changes: 4 additions & 0 deletions pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type SchedulerConfigProvider interface {
GetHotRegionCacheHitsThreshold() int
GetMaxMovableHotPeerSize() int64
IsTraceRegionFlow() bool
GetPatrolRegionConcurrency() int
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
GetPatrolRegionBatchLimit() int

GetTolerantSizeRatio() float64
GetLeaderSchedulePolicy() constant.SchedulePolicy
Expand Down Expand Up @@ -117,6 +119,8 @@ type SharedConfigProvider interface {
IsPlacementRulesCacheEnabled() bool
SetHaltScheduling(bool, string)
GetHotRegionCacheHitsThreshold() int
GetPatrolRegionConcurrency() int
GetPatrolRegionBatchLimit() int

// for test purpose
SetPlacementRuleEnabled(bool)
Expand Down
130 changes: 98 additions & 32 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ const (
maxLoadConfigRetries = 10
// pushOperatorTickInterval is the interval try to push the operator.
pushOperatorTickInterval = 500 * time.Millisecond
patrolRegionChanLen = 1024

// It takes about 1.3 minutes(1000000/128*10/60/1000) to iterate 1 million regions(with DefaultPatrolRegionInterval=10ms).
patrolScanRegionLimit = 128
// PluginLoad means action for load plugin
PluginLoad = "PluginLoad"
// PluginUnload means action for unload plugin
Expand Down Expand Up @@ -157,11 +156,16 @@ func (c *Coordinator) IsPendingRegion(region uint64) bool {
// The function is exposed for test purpose.
func (c *Coordinator) PatrolRegions() {
defer logutil.LogPanic()

defer c.wg.Done()
ticker := time.NewTicker(c.cluster.GetCheckerConfig().GetPatrolRegionInterval())
defer ticker.Stop()

workersCount := c.cluster.GetCheckerConfig().GetPatrolRegionConcurrency()
regionChan := make(chan *core.RegionInfo, patrolRegionChanLen)
quit := make(chan bool)
var wg sync.WaitGroup
c.startPatrolRegionWorkers(workersCount, regionChan, quit, &wg)

log.Info("coordinator starts patrol regions")
start := time.Now()
var (
Expand All @@ -173,42 +177,104 @@ func (c *Coordinator) PatrolRegions() {
case <-ticker.C:
// Note: we reset the ticker here to support updating configuration dynamically.
ticker.Reset(c.cluster.GetCheckerConfig().GetPatrolRegionInterval())
newWorkersCount := c.cluster.GetCheckerConfig().GetPatrolRegionConcurrency()
if newWorkersCount != workersCount {
log.Info("coordinator starts patrol regions with new workers count",
zap.Int("old-workers-count", workersCount),
zap.Int("new-workers-count", newWorkersCount))
workersCount = newWorkersCount
close(quit)
wg.Wait()
quit = make(chan bool)
c.startPatrolRegionWorkers(workersCount, regionChan, quit, &wg)
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
}
if c.cluster.IsSchedulingHalted() {
Copy link
Member

@rleungx rleungx Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we check it first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we need to update config. If we check IsSchedulingHalted firstly, it will skip updating config.

Copy link
Member

@rleungx rleungx Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If IsSchedulingHalted is true, do we need to update config immediately?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If IsSchedulingHalted is true while checking first, we won't be able to update both configurations because the L300 will skip the later code.

However, if IsSchedulingHalted is true, it seems we don't need to update the configurations either?

for len(regionChan) > 0 {
<-regionChan
}
continue
}

// Check priority regions first.
c.waitDrainRegionChan(regionChan)
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
c.checkPriorityRegions()
// Check suspect regions first.
c.waitDrainRegionChan(regionChan)
c.checkSuspectRegions(regionChan)
// Check regions in the waiting list
c.waitDrainRegionChan(regionChan)
c.checkWaitingRegions(regionChan)
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved

c.waitDrainRegionChan(regionChan)
key, regions = c.checkRegions(key, c.cluster.GetCheckerConfig().GetPatrolRegionBatchLimit(), regionChan)
if len(regions) == 0 {
continue
}
// Updates the label level isolation statistics.
c.cluster.UpdateRegionsLabelLevelStats(regions)
if len(key) == 0 {
dur := time.Since(start)
patrolCheckRegionsGauge.Set(dur.Seconds())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it still mean a round for all regions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

c.setPatrolRegionsDuration(dur)
start = time.Now()
}
failpoint.Inject("break-patrol", func() {
failpoint.Break()
})
case <-c.ctx.Done():
patrolCheckRegionsGauge.Set(0)
c.setPatrolRegionsDuration(0)
log.Info("patrol regions has been stopped")
close(regionChan)
close(quit)
wg.Wait()
return
}
if c.cluster.IsSchedulingHalted() {
continue
}
}
}

// Check priority regions first.
c.checkPriorityRegions()
// Check suspect regions first.
c.checkSuspectRegions()
// Check regions in the waiting list
c.checkWaitingRegions()
func (c *Coordinator) startPatrolRegionWorkers(workers int, regionChan <-chan *core.RegionInfo, quit <-chan bool, wg *sync.WaitGroup) {
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
defer logutil.LogPanic()
defer wg.Done()
for {
patrolCheckRegionsChanLenGauge.Set(float64(len(regionChan)))
select {
case region, ok := <-regionChan:
if ok {
c.tryAddOperators(region)
}
case <-quit:
return
}
}
}()
}
}

key, regions = c.checkRegions(key)
if len(regions) == 0 {
continue
}
// Updates the label level isolation statistics.
c.cluster.UpdateRegionsLabelLevelStats(regions)
if len(key) == 0 {
dur := time.Since(start)
patrolCheckRegionsGauge.Set(dur.Seconds())
c.setPatrolRegionsDuration(dur)
start = time.Now()
// waitDrainRegionChan is used to drain the regionChan.
// It is used to avoid duplicated regions in the regionChan from different sources.
func (c *Coordinator) waitDrainRegionChan(regionChan chan *core.RegionInfo) {
if len(regionChan) == 0 {
return
}
ticker := time.NewTicker(c.cluster.GetCheckerConfig().GetPatrolRegionInterval())
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
if len(regionChan) == 0 {
return
}
}
failpoint.Inject("break-patrol", func() {
failpoint.Break()
})
}
}

func (c *Coordinator) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) {
func (c *Coordinator) checkRegions(startKey []byte, patrolScanRegionLimit int, regionChan chan *core.RegionInfo) (key []byte, regions []*core.RegionInfo) {
regions = c.cluster.ScanRegions(startKey, nil, patrolScanRegionLimit)
if len(regions) == 0 {
// Resets the scan key.
Expand All @@ -217,25 +283,25 @@ func (c *Coordinator) checkRegions(startKey []byte) (key []byte, regions []*core
}

for _, region := range regions {
c.tryAddOperators(region)
regionChan <- region
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the channel is full?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current code, it will block here until patrol region workers receive regions.

key = region.GetEndKey()
}
return
}

func (c *Coordinator) checkSuspectRegions() {
func (c *Coordinator) checkSuspectRegions(regionChan chan *core.RegionInfo) {
for _, id := range c.checkers.GetSuspectRegions() {
region := c.cluster.GetRegion(id)
c.tryAddOperators(region)
regionChan <- region
}
}

func (c *Coordinator) checkWaitingRegions() {
func (c *Coordinator) checkWaitingRegions(regionChan chan *core.RegionInfo) {
items := c.checkers.GetWaitingRegions()
waitingListGauge.Set(float64(len(items)))
for _, item := range items {
region := c.cluster.GetRegion(item.Key)
c.tryAddOperators(region)
regionChan <- region
}
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/schedule/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,19 @@ var (
Name: "patrol_regions_time",
Help: "Time spent of patrol checks region.",
})

patrolCheckRegionsChanLenGauge = prometheus.NewGauge(
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "checker",
Name: "patrol_regions_chan_len",
Help: "Time channel length of patrol checks region.",
})
)

func init() {
prometheus.MustRegister(hotSpotStatusGauge)
prometheus.MustRegister(regionListGauge)
prometheus.MustRegister(patrolCheckRegionsGauge)
prometheus.MustRegister(patrolCheckRegionsChanLenGauge)
}
2 changes: 1 addition & 1 deletion pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) (bool
return false, NotInCreateStatus
}
if !isPromoting && oc.wopStatus.getCount(op.Desc()) >= oc.config.GetSchedulerMaxWaitingOperator() {
log.Debug("exceed max return false", zap.Uint64("waiting", oc.wopStatus.ops[op.Desc()]), zap.String("desc", op.Desc()), zap.Uint64("max", oc.config.GetSchedulerMaxWaitingOperator()))
log.Debug("exceed max return false", zap.Uint64("waiting", oc.wopStatus.getCount(op.Desc())), zap.String("desc", op.Desc()), zap.Uint64("max", oc.config.GetSchedulerMaxWaitingOperator()))
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
operatorCounter.WithLabelValues(op.Desc(), "exceed-max-waiting").Inc()
return false, ExceedWaitLimit
}
Expand Down
Loading
Loading