Skip to content

Commit

Permalink
cluster: change return value for getSchedulers (#2615) (#2638)
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
ti-srebot authored Jul 14, 2020
1 parent 77098f8 commit dae1f61
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 16 deletions.
16 changes: 15 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1609,12 +1609,19 @@ func (c *RaftCluster) GetHotReadRegions() *statistics.StoreHotPeersInfos {
}

// GetSchedulers gets all schedulers.
func (c *RaftCluster) GetSchedulers() map[string]*scheduleController {
func (c *RaftCluster) GetSchedulers() []string {
c.RLock()
defer c.RUnlock()
return c.coordinator.getSchedulers()
}

// GetSchedulerHandlers gets all scheduler handlers.
func (c *RaftCluster) GetSchedulerHandlers() map[string]http.Handler {
c.RLock()
defer c.RUnlock()
return c.coordinator.getSchedulerHandlers()
}

// AddScheduler adds a scheduler.
func (c *RaftCluster) AddScheduler(scheduler schedule.Scheduler, args ...string) error {
c.Lock()
Expand All @@ -1636,6 +1643,13 @@ func (c *RaftCluster) PauseOrResumeScheduler(name string, t int64) error {
return c.coordinator.pauseOrResumeScheduler(name, t)
}

// IsSchedulerPaused checks if a scheduler is paused.
func (c *RaftCluster) IsSchedulerPaused(name string) (bool, error) {
c.RLock()
defer c.RUnlock()
return c.coordinator.isSchedulerPaused(name)
}

// GetStoreLimiter returns the dynamic adjusting limiter
func (c *RaftCluster) GetStoreLimiter() *StoreLimiter {
return c.limiter
Expand Down
32 changes: 30 additions & 2 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package cluster
import (
"context"
"fmt"
"net/http"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -354,10 +355,24 @@ func (c *coordinator) getHotReadRegions() *statistics.StoreHotPeersInfos {
return nil
}

func (c *coordinator) getSchedulers() map[string]*scheduleController {
func (c *coordinator) getSchedulers() []string {
c.RLock()
defer c.RUnlock()
return c.schedulers
names := make([]string, 0, len(c.schedulers))
for name := range c.schedulers {
names = append(names, name)
}
return names
}

func (c *coordinator) getSchedulerHandlers() map[string]http.Handler {
c.RLock()
defer c.RUnlock()
handlers := make(map[string]http.Handler, len(c.schedulers))
for name, scheduler := range c.schedulers {
handlers[name] = scheduler.Scheduler
}
return handlers
}

func (c *coordinator) collectSchedulerMetrics() {
Expand Down Expand Up @@ -535,6 +550,19 @@ func (c *coordinator) pauseOrResumeScheduler(name string, t int64) error {
return err
}

func (c *coordinator) isSchedulerPaused(name string) (bool, error) {
c.RLock()
defer c.RUnlock()
if c.cluster == nil {
return false, ErrNotBootstrapped
}
s, ok := c.schedulers[name]
if !ok {
return false, schedulers.ErrSchedulerNotFound
}
return s.IsPaused(), nil
}

func (c *coordinator) runScheduler(s *scheduleController) {
defer logutil.LogPanic()
defer c.wg.Done()
Expand Down
18 changes: 5 additions & 13 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,11 @@ func (h *Handler) GetOperatorController() (*schedule.OperatorController, error)

// IsSchedulerPaused returns whether scheduler is paused.
func (h *Handler) IsSchedulerPaused(name string) (bool, error) {
c, err := h.GetRaftCluster()
rc, err := h.GetRaftCluster()
if err != nil {
return true, err
}
sc, ok := c.GetSchedulers()[name]
if !ok {
return true, errors.Errorf("scheduler %v not found", name)
return false, err
}
return sc.IsPaused(), nil
return rc.IsSchedulerPaused(name)
}

// GetScheduleConfig returns ScheduleConfig.
Expand All @@ -124,11 +120,7 @@ func (h *Handler) GetSchedulers() ([]string, error) {
if err != nil {
return nil, err
}
names := make([]string, 0, len(c.GetSchedulers()))
for name := range c.GetSchedulers() {
names = append(names, name)
}
return names, nil
return c.GetSchedulers(), nil
}

// GetStores returns all stores in the cluster.
Expand Down Expand Up @@ -805,7 +797,7 @@ func (h *Handler) GetSchedulerConfigHandler() http.Handler {
return nil
}
mux := http.NewServeMux()
for name, handler := range c.GetSchedulers() {
for name, handler := range c.GetSchedulerHandlers() {
prefix := path.Join(pdRootPath, SchedulerConfigHandlerPath, name)
urlPath := prefix + "/"
mux.Handle(urlPath, http.StripPrefix(prefix, handler))
Expand Down

0 comments on commit dae1f61

Please sign in to comment.