Skip to content

Commit

Permalink
fix: 优化大规模调度时的卡顿问题 TencentBlueKing#559
Browse files Browse the repository at this point in the history
  • Loading branch information
zmberg committed Sep 2, 2020
1 parent 76b3a67 commit 972397e
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 34 deletions.
8 changes: 4 additions & 4 deletions bcs-mesos/bcs-scheduler/src/manager/sched/offer/offerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,21 +457,21 @@ func (p *offerPool) addOffers(offers []*mesos.Offer) bool {
//print offer info
p.printOffer(o)
//add agent deltaXXX 20180530
agentSchedInfo, err := p.scheduler.FetchAgentSchedInfo(o.GetHostname())
/*agentSchedInfo, err := p.scheduler.FetchAgentSchedInfo(o.GetHostname())
if err != nil {
blog.Errorf("get agent(%s) err(%s), offer can not added", o.GetHostname(), err.Error())
continue
}
}*/
agentDeltaCPU := 0.0
agentDeltaMem := 0.0
agentDeltaDisk := 0.0
if agentSchedInfo != nil {
/*if agentSchedInfo != nil {
agentDeltaCPU = agentSchedInfo.DeltaCPU
agentDeltaMem = agentSchedInfo.DeltaMem
agentDeltaDisk = agentSchedInfo.DeltaDisk
blog.V(3).Infof("get agent(%s) delta(cpu: %f | mem: %f | disk: %f)",
o.GetHostname(), agentDeltaCPU, agentDeltaMem, agentDeltaDisk)
}
}*/
off := &innerOffer{
id: p.autoIncrementId,
offerId: o.GetId().GetValue(),
Expand Down
29 changes: 29 additions & 0 deletions bcs-mesos/bcs-scheduler/src/manager/sched/sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,18 @@
package sched

import (
"net/http"
"net/http/pprof"

"github.com/Tencent/bk-bcs/bcs-common/common/blog"
"github.com/Tencent/bk-bcs/bcs-common/common/http/httpserver"
"github.com/Tencent/bk-bcs/bcs-mesos/bcs-scheduler/src/manager/sched/api"
"github.com/Tencent/bk-bcs/bcs-mesos/bcs-scheduler/src/manager/sched/backend"
"github.com/Tencent/bk-bcs/bcs-mesos/bcs-scheduler/src/manager/sched/scheduler"
"github.com/Tencent/bk-bcs/bcs-mesos/bcs-scheduler/src/manager/schedcontext"
"github.com/Tencent/bk-bcs/bcs-mesos/bcs-scheduler/src/util"

restful "github.com/emicklei/go-restful"
)

type Sched struct {
Expand All @@ -38,6 +45,10 @@ func New(config util.Scheduler, scontext *schedcontext.SchedContext) *Sched {
r := api.NewRouter(backend)
apiActions := r.GetActions()
s.scontext.ApiServer2.RegisterWebServer("/v1", nil, apiActions)
//use pprof
if s.config.DebugMode {
s.initDebug()
}

return s
}
Expand All @@ -49,3 +60,21 @@ func (s *Sched) Start() error {

return nil
}
func (s *Sched) initDebug() {
blog.Infof("init debug pprof")
action := []*httpserver.Action{
httpserver.NewAction("GET", "/debug/pprof/", nil, getRouteFunc(pprof.Index)),
httpserver.NewAction("GET", "/debug/pprof/{uri:*}", nil, getRouteFunc(pprof.Index)),
httpserver.NewAction("GET", "/debug/pprof/cmdline", nil, getRouteFunc(pprof.Cmdline)),
httpserver.NewAction("GET", "/debug/pprof/profile", nil, getRouteFunc(pprof.Profile)),
httpserver.NewAction("GET", "/debug/pprof/symbol", nil, getRouteFunc(pprof.Symbol)),
httpserver.NewAction("GET", "/debug/pprof/trace", nil, getRouteFunc(pprof.Trace)),
}
s.scontext.ApiServer2.RegisterWebServer("", nil, action)
}

func getRouteFunc(f http.HandlerFunc) restful.RouteFunction {
return restful.RouteFunction(func(req *restful.Request, resp *restful.Response) {
f(resp, req.Request)
})
}
15 changes: 11 additions & 4 deletions bcs-mesos/bcs-scheduler/src/manager/sched/scheduler/bcs_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

const (
MaxEventQueueLength = 1024
MaxEventQueueLength = 10240
)

type bcsEventManager struct {
Expand Down Expand Up @@ -93,8 +93,10 @@ func (e *bcsEventManager) initCli() {

// Send Event
func (e *bcsEventManager) syncEvent(event *commtypes.BcsStorageEventIf) error {
blog.V(3).Infof("bcsEventManager syncEvent %v", event)

queue := len(e.eventQueue)
if queue>1024 {
blog.Infof("bcsEventManager syncEvent %v queue(%d)", event, len(e.eventQueue))
}
e.eventQueue <- event
return nil
}
Expand Down Expand Up @@ -211,8 +213,13 @@ func (e *bcsEventManager) handleEvent(event *commtypes.BcsStorageEventIf) error
by, _ := json.Marshal(event)

uri := "events"

begin := time.Now().UnixNano() / 1e6
_, err := e.requestStorageV1("PUT", uri, by)
end := time.Now().UnixNano() / 1e6
useTime := end - begin
if useTime > 100 {
blog.Warnf("request storage event, %dms slow query", useTime)
}
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,8 @@ func (s *Scheduler) produceEvent(object interface{}) error {
return fmt.Errorf("object type %s is invalid", btype.Name())
}

return s.eventManager.syncEvent(event)
go s.eventManager.syncEvent(event)
return nil
}

func (s *Scheduler) newTaskEvent(task *types.Task) *commtype.BcsStorageEventIf {
Expand Down
53 changes: 28 additions & 25 deletions bcs-mesos/bcs-scheduler/src/manager/sched/scheduler/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,31 @@ import (
// When scheduler receive a task status report messege, it will create a goroutine for process this message,
// #lizard forgives StatusReport
func (s *Scheduler) StatusReport(status *mesos.TaskStatus) {
blog.Infof("receive status(uuid:%s) report: task(%s) status(%s)",
status.GetUuid(), status.GetTaskId().GetValue(), status.GetState().String())
//ack mesos master the task status report
if status.GetUuid() != nil {
call := &sched.Call{
FrameworkId: s.framework.GetId(),
Type: sched.Call_ACKNOWLEDGE.Enum(),
Acknowledge: &sched.Call_Acknowledge{
AgentId: status.GetAgentId(),
TaskId: status.GetTaskId(),
Uuid: status.GetUuid(),
},
}
// send call
resp, err := s.send(call)
if err != nil {
blog.Error("status report: Unable to send Acknowledge Call: %s ", err)
return
}
if resp.StatusCode != http.StatusAccepted {
blog.Error("status report: Acknowledge call returned unexpected status: %d", resp.StatusCode)
return
}
blog.Infof("send status(uuid:%s) task(%s) report acknowledge success", status.GetUuid(), status.GetTaskId().GetValue())
}

taskId := status.TaskId.GetValue()
taskGroupID := types.GetTaskGroupID(taskId)
Expand All @@ -50,6 +75,7 @@ func (s *Scheduler) StatusReport(status *mesos.TaskStatus) {
s.store.LockApplication(runAs + "." + appId)
defer s.store.UnLockApplication(runAs + "." + appId)
}
blog.Infof("status(uuid:%s) task(%s) Lock Appliation(%s.%s)", status.GetUuid(), taskId, runAs, appId)

// ack and check
if s.preCheckTaskStatusReport(status) == false {
Expand Down Expand Up @@ -93,7 +119,7 @@ func (s *Scheduler) StatusReport(status *mesos.TaskStatus) {
reportStatus = types.TASK_STATUS_FAIL
taskGroup, _ := s.store.FetchTaskGroup(taskGroupID)
if taskGroup != nil {
s.SendHealthMsg(alarm.WarnKind, taskGroup.RunAs, task.ID+"("+taskGroup.HostName+")"+" fail, message:"+status.GetMessage(), taskGroup.RunAs+"."+taskGroup.Name+"-task", &alarmTimeval)
go s.SendHealthMsg(alarm.WarnKind, taskGroup.RunAs, task.ID+"("+taskGroup.HostName+")"+" fail, message:"+status.GetMessage(), taskGroup.RunAs+"."+taskGroup.Name+"-task", &alarmTimeval)
}
case mesos.TaskState_TASK_KILLING:
blog.Info("status report: Task(%s) Killing, message: %s", taskId, status.GetMessage())
Expand All @@ -110,7 +136,7 @@ func (s *Scheduler) StatusReport(status *mesos.TaskStatus) {
//s.addLostSlave(taskGroup.HostName)
s.offerPool.AddLostSlave(taskGroup.HostName)
}
s.SendHealthMsg(alarm.WarnKind, taskGroup.RunAs, task.ID+"("+taskGroup.HostName+")"+" lost, message:"+status.GetMessage(), taskGroup.RunAs+"."+taskGroup.Name+"-task", &alarmTimeval)
go s.SendHealthMsg(alarm.WarnKind, taskGroup.RunAs, task.ID+"("+taskGroup.HostName+")"+" lost, message:"+status.GetMessage(), taskGroup.RunAs+"."+taskGroup.Name+"-task", &alarmTimeval)
}
case mesos.TaskState_TASK_ERROR:
blog.Info("status report: Task(%s) Error, message: %s", taskId, status.GetMessage())
Expand Down Expand Up @@ -325,29 +351,6 @@ func (s *Scheduler) checkApplicationChange(runAs, appId, taskGroupStatus string,
}

func (s *Scheduler) preCheckTaskStatusReport(status *mesos.TaskStatus) bool {
//ack mesos master the task status report
if status.GetUuid() != nil {
call := &sched.Call{
FrameworkId: s.framework.GetId(),
Type: sched.Call_ACKNOWLEDGE.Enum(),
Acknowledge: &sched.Call_Acknowledge{
AgentId: status.GetAgentId(),
TaskId: status.GetTaskId(),
Uuid: status.GetUuid(),
},
}
// send call
resp, err := s.send(call)
if err != nil {
blog.Error("status report: Unable to send Acknowledge Call: %s ", err)
return false
}
if resp.StatusCode != http.StatusAccepted {
blog.Error("status report: Acknowledge call returned unexpected status: %d", resp.StatusCode)
return false
}
}

taskId := status.TaskId.GetValue()
state := status.GetState()
executorID := status.GetExecutorId()
Expand Down
5 changes: 5 additions & 0 deletions bcs-mesos/bcs-scheduler/src/util/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type SchedulerOptions struct {
NetImage string `json:"net_image" value:"" usage:"the network image"`
Kubeconfig string `json:"kubeconfig" value:"" usage:"kubeconfig, when store_driver is etcd"`
StoreDriver string `json:"store_driver" value:"zookeeper" usage:"the store driver, enum: zookeeper, etcd"`
DebugMode bool `json:"debug_mode" value:"false" usage:"Debug mode, use pprof."`
}

type SchedConfig struct {
Expand Down Expand Up @@ -84,6 +85,9 @@ type Scheduler struct {

Kubeconfig string
StoreDriver string

//whether use pprof
DebugMode bool
}

type HttpListener struct {
Expand Down Expand Up @@ -170,6 +174,7 @@ func SetSchedulerCfg(config *SchedConfig, op *SchedulerOptions) {

config.Scheduler.Kubeconfig = op.Kubeconfig
config.Scheduler.StoreDriver = op.StoreDriver
config.Scheduler.DebugMode = op.DebugMode
}

func hostname() string {
Expand Down

0 comments on commit 972397e

Please sign in to comment.