diff --git a/master/cluster/executor_manager.go b/master/cluster/executor_manager.go index d930feb3583..9c568e0e1d5 100644 --- a/master/cluster/executor_manager.go +++ b/master/cluster/executor_manager.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/hanfei1991/microcosm/master/resource" "github.com/hanfei1991/microcosm/model" "github.com/hanfei1991/microcosm/pb" "github.com/hanfei1991/microcosm/pkg/autoid" @@ -15,10 +16,7 @@ import ( "go.uber.org/zap" ) -var ( - _ ExecutorClient = &ExecutorManager{} - _ ResourceMgr = &ExecutorManager{} -) +var _ ExecutorClient = &ExecutorManager{} // ExecutorManager holds all the executors info, including liveness, status, resource usage. type ExecutorManager struct { @@ -34,6 +32,8 @@ type ExecutorManager struct { // TODO: complete ha store. haStore ha.HAStore // nolint:structcheck,unused + + rescMgr resource.RescMgr } func NewExecutorManager(offExec chan model.ExecutorID, initHeartbeatTTL, keepAliveInterval time.Duration, ctx *test.Context) *ExecutorManager { @@ -44,6 +44,7 @@ func NewExecutorManager(offExec chan model.ExecutorID, initHeartbeatTTL, keepAli offExecutor: offExec, initHeartbeatTTL: initHeartbeatTTL, keepAliveInterval: keepAliveInterval, + rescMgr: resource.NewCapRescMgr(), } } @@ -57,6 +58,7 @@ func (e *ExecutorManager) removeExecutorImpl(id model.ExecutorID) error { return errors.ErrUnknownExecutorID.GenWithStackByArgs(id) } delete(e.executors, id) + e.rescMgr.Unregister(id) //err := e.haStore.Del(exec.EtcdKey()) //if err != nil { // return err @@ -96,8 +98,11 @@ func (e *ExecutorManager) HandleHeartbeat(req *pb.HeartbeatRequest) (*pb.Heartbe exec.lastUpdateTime = time.Now() exec.heartbeatTTL = time.Duration(req.Ttl) * time.Millisecond exec.Status = model.ExecutorStatus(req.Status) - usage := ResourceUsage(req.ResourceUsage) - exec.resource.Used = usage + usage := resource.RescUnit(req.GetResourceUsage()) + err := e.rescMgr.Update(exec.ID, usage, exec.Status) + if err != nil { + return nil, err + } resp := &pb.HeartbeatResponse{} return resp, nil } @@ -119,11 +124,7 @@ func (e *ExecutorManager) AddExecutor(req *pb.RegisterExecutorRequest) (*model.E // Following part is to bootstrap the executor. exec := &Executor{ - ExecutorInfo: *info, - resource: ExecutorResource{ - ID: info.ID, - Capacity: ResourceUsage(info.Capability), - }, + ExecutorInfo: *info, lastUpdateTime: time.Now(), heartbeatTTL: e.initHeartbeatTTL, Status: model.Initing, @@ -144,14 +145,18 @@ func (e *ExecutorManager) AddExecutor(req *pb.RegisterExecutorRequest) (*model.E e.mu.Lock() e.executors[info.ID] = exec e.mu.Unlock() + e.rescMgr.Register(exec.ID, resource.RescUnit(exec.Capability)) return info, nil } +func (e *ExecutorManager) Allocate(tasks []*pb.ScheduleTask) (bool, *pb.TaskSchedulerResponse) { + return e.rescMgr.Allocate(tasks) +} + // Executor records the status of an executor instance. type Executor struct { model.ExecutorInfo - Status model.ExecutorStatus - resource ExecutorResource + Status model.ExecutorStatus mu sync.Mutex // Last heartbeat diff --git a/master/cluster/resource_manager.go b/master/cluster/resource_manager.go deleted file mode 100644 index 3bc238d21f8..00000000000 --- a/master/cluster/resource_manager.go +++ /dev/null @@ -1,52 +0,0 @@ -package cluster - -import ( - "github.com/hanfei1991/microcosm/model" - "github.com/pingcap/ticdc/dm/pkg/log" - "go.uber.org/zap" -) - -// ResouceManager manages the resources of the clusters. -type ResourceMgr interface { - GetResourceSnapshot() *ResourceSnapshot -} - -// Resource is the min unit of resource that we count. -type ResourceUsage int - -type ExecutorResource struct { - ID model.ExecutorID - - Capacity ResourceUsage - Reserved ResourceUsage - Used ResourceUsage -} - -func (e *ExecutorResource) getSnapShot() *ExecutorResource { - r := &ExecutorResource{ - ID: e.ID, - Capacity: e.Capacity, - Reserved: e.Reserved, - Used: e.Used, - } - return r -} - -// ResourceSnapshot shows the resource usage of every executors. -type ResourceSnapshot struct { - Executors []*ExecutorResource -} - -// GetResourceSnapshot provides the snapshot of current resource usage. -func (r *ExecutorManager) GetResourceSnapshot() *ResourceSnapshot { - snapshot := &ResourceSnapshot{} - r.mu.Lock() - defer r.mu.Unlock() - for _, exec := range r.executors { - log.L().Logger.Info("executor status", zap.Int32("cap", int32(exec.resource.Capacity))) - if exec.Status == model.Running && exec.resource.Capacity > exec.resource.Reserved && exec.resource.Capacity > exec.resource.Used { - snapshot.Executors = append(snapshot.Executors, exec.resource.getSnapShot()) - } - } - return snapshot -} diff --git a/master/jobmanager.go b/master/jobmanager.go index 816f979ed66..7797318af79 100644 --- a/master/jobmanager.go +++ b/master/jobmanager.go @@ -21,7 +21,6 @@ type JobManager struct { jobMasters map[model.JobID]system.JobMaster idAllocater *autoid.IDAllocator - resourceMgr cluster.ResourceMgr executorClient cluster.ExecutorClient offExecutors chan model.ExecutorID @@ -86,7 +85,7 @@ func (j *JobManager) SubmitJob(ctx context.Context, req *pb.SubmitJobRequest) *p return resp } jobMaster, err = benchmark.BuildBenchmarkJobMaster( - info.Config, j.idAllocater, j.resourceMgr, j.executorClient, mClient) + info.Config, j.idAllocater, j.executorClient, mClient) if err != nil { resp.Err = errors.ToPBError(err) return resp @@ -112,7 +111,6 @@ func (j *JobManager) SubmitJob(ctx context.Context, req *pb.SubmitJobRequest) *p } func NewJobManager( - resource cluster.ResourceMgr, clt cluster.ExecutorClient, executorNotifier chan model.ExecutorID, masterAddrs []string, @@ -120,7 +118,6 @@ func NewJobManager( return &JobManager{ jobMasters: make(map[model.JobID]system.JobMaster), idAllocater: autoid.NewAllocator(), - resourceMgr: resource, executorClient: clt, offExecutors: executorNotifier, masterAddrs: masterAddrs, diff --git a/master/jobmaster/benchmark/build.go b/master/jobmaster/benchmark/build.go index 36c26c25d2a..b9ab0c92f12 100644 --- a/master/jobmaster/benchmark/build.go +++ b/master/jobmaster/benchmark/build.go @@ -16,7 +16,6 @@ import ( func BuildBenchmarkJobMaster( rawConfig string, idAllocator *autoid.IDAllocator, - resourceMgr cluster.ResourceMgr, client cluster.ExecutorClient, mClient cluster.JobMasterClient, ) (*jobMaster, error) { @@ -140,7 +139,7 @@ func BuildBenchmarkJobMaster( job.Tasks = append(job.Tasks, tableTasks...) job.Tasks = append(job.Tasks, hashTasks...) job.Tasks = append(job.Tasks, sinkTasks...) - systemJobMaster := system.New(context.Background(), job, resourceMgr, client, mClient) + systemJobMaster := system.New(context.Background(), job, client, mClient) master := &jobMaster{ Master: systemJobMaster, config: config, diff --git a/master/jobmaster/system/impl.go b/master/jobmaster/system/impl.go index 73b19326cfc..e58daa40598 100644 --- a/master/jobmaster/system/impl.go +++ b/master/jobmaster/system/impl.go @@ -22,9 +22,8 @@ type Master struct { ctx context.Context cancel func() - resourceManager cluster.ResourceMgr - client cluster.ExecutorClient - mClient cluster.JobMasterClient + client cluster.ExecutorClient + mClient cluster.JobMasterClient offExecutors chan model.ExecutorID @@ -41,18 +40,16 @@ type Master struct { func New( parentCtx context.Context, job *model.Job, - resourceMgr cluster.ResourceMgr, client cluster.ExecutorClient, mClient cluster.JobMasterClient, ) *Master { ctx, cancel := context.WithCancel(parentCtx) return &Master{ - ctx: ctx, - cancel: cancel, - job: job, - resourceManager: resourceMgr, - client: client, - mClient: mClient, + ctx: ctx, + cancel: cancel, + job: job, + client: client, + mClient: mClient, offExecutors: make(chan model.ExecutorID, 100), scheduleWaitingTasks: make(chan scheduleGroup, 1024), diff --git a/master/resource/capacity_impl.go b/master/resource/capacity_impl.go new file mode 100644 index 00000000000..a5d20c626c2 --- /dev/null +++ b/master/resource/capacity_impl.go @@ -0,0 +1,108 @@ +package resource + +import ( + "sync" + + "github.com/hanfei1991/microcosm/model" + "github.com/hanfei1991/microcosm/pb" + "github.com/hanfei1991/microcosm/pkg/errors" + "github.com/pingcap/log" + "go.uber.org/zap" +) + +// CapRescMgr implements ResourceMgr interface, and it uses node capacity as +// alloction algorithm +type CapRescMgr struct { + mu sync.Mutex + executors map[model.ExecutorID]*ExecutorResource +} + +func NewCapRescMgr() *CapRescMgr { + return &CapRescMgr{ + executors: make(map[model.ExecutorID]*ExecutorResource), + } +} + +// Register implements RescMgr.Register +func (m *CapRescMgr) Register(id model.ExecutorID, capacity RescUnit) { + m.mu.Lock() + defer m.mu.Unlock() + m.executors[id] = &ExecutorResource{ + ID: id, + Capacity: capacity, + } + log.L().Info("executor resource is registered", + zap.String("executor-id", string(id)), zap.Int("capacity", int(capacity))) +} + +// Unregister implements RescMgr.Unregister +func (m *CapRescMgr) Unregister(id model.ExecutorID) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.executors, id) + log.L().Info("executor resource is unregistered", + zap.String("executor-id", string(id))) +} + +// Allocate implements RescMgr.Allocate +func (m *CapRescMgr) Allocate(tasks []*pb.ScheduleTask) (bool, *pb.TaskSchedulerResponse) { + return m.allocateTasksWithNaiveStrategy(tasks) +} + +// Update implements RescMgr.Update +func (m *CapRescMgr) Update(id model.ExecutorID, use RescUnit, status model.ExecutorStatus) error { + m.mu.Lock() + defer m.mu.Unlock() + exec, ok := m.executors[id] + if !ok { + return errors.ErrUnknownExecutorID.GenWithStackByArgs(id) + } + exec.Used = use + exec.Status = status + return nil +} + +// getAvailableResource returns resources that are available +func (m *CapRescMgr) getAvailableResource() []*ExecutorResource { + res := make([]*ExecutorResource, 0) + for _, exec := range m.executors { + if exec.Status == model.Running && + exec.Capacity > exec.Reserved && exec.Capacity > exec.Used { + res = append(res, exec) + } + } + return res +} + +func (m *CapRescMgr) allocateTasksWithNaiveStrategy( + tasks []*pb.ScheduleTask, +) (bool, *pb.TaskSchedulerResponse) { + m.mu.Lock() + defer m.mu.Unlock() + result := make(map[int32]*pb.ScheduleResult) + resources := m.getAvailableResource() + var idx int = 0 + for _, task := range tasks { + originalIdx := idx + for { + exec := resources[idx] + used := exec.Used + if exec.Reserved > used { + used = exec.Reserved + } + rest := exec.Capacity - used + if rest >= RescUnit(task.Cost) { + result[task.GetTask().Id] = &pb.ScheduleResult{ + ExecutorId: string(exec.ID), + } + exec.Reserved = exec.Reserved + RescUnit(task.GetCost()) + break + } + idx = (idx + 1) % len(resources) + if idx == originalIdx { + return false, nil + } + } + } + return true, &pb.TaskSchedulerResponse{Schedule: result} +} diff --git a/master/resource/manager.go b/master/resource/manager.go new file mode 100644 index 00000000000..c45df5d74cd --- /dev/null +++ b/master/resource/manager.go @@ -0,0 +1,33 @@ +package resource + +import ( + "github.com/hanfei1991/microcosm/model" + "github.com/hanfei1991/microcosm/pb" +) + +// RescMgr manages the resources of the clusters. +type RescMgr interface { + // Register registers new executor, it is called when an executor joins + Register(id model.ExecutorID, capacity RescUnit) + + // Unregister is called when an executor exits + Unregister(id model.ExecutorID) + + // Allocate allocates executor resources to given tasks + Allocate(tasks []*pb.ScheduleTask) (bool, *pb.TaskSchedulerResponse) + + // Update updates executor resource usage and running status + Update(id model.ExecutorID, use RescUnit, status model.ExecutorStatus) error +} + +// RescUnit is the min unit of resource that we count. +type RescUnit int + +type ExecutorResource struct { + ID model.ExecutorID + Status model.ExecutorStatus + + Capacity RescUnit + Reserved RescUnit + Used RescUnit +} diff --git a/master/scheduler.go b/master/scheduler.go deleted file mode 100644 index b6bf3e7ec73..00000000000 --- a/master/scheduler.go +++ /dev/null @@ -1,39 +0,0 @@ -package master - -import ( - "github.com/hanfei1991/microcosm/master/cluster" - "github.com/hanfei1991/microcosm/pb" -) - -// TODO: Implement different allocate task logic. -// TODO: Add abstraction for resource allocator. -func (s *Server) allocateTasksWithNaiveStrategy( - snapshot *cluster.ResourceSnapshot, - tasks []*pb.ScheduleTask, -) (bool, *pb.TaskSchedulerResponse) { - var idx int = 0 - result := make(map[int32]*pb.ScheduleResult) - for _, task := range tasks { - originalIdx := idx - for { - exec := snapshot.Executors[idx] - used := exec.Used - if exec.Reserved > used { - used = exec.Reserved - } - rest := exec.Capacity - used - if rest >= cluster.ResourceUsage(task.Cost) { - result[task.GetTask().Id] = &pb.ScheduleResult{ - ExecutorId: string(exec.ID), - } - exec.Reserved = exec.Reserved + cluster.ResourceUsage(task.GetCost()) - break - } - idx = (idx + 1) % len(snapshot.Executors) - if idx == originalIdx { - return false, nil - } - } - } - return true, &pb.TaskSchedulerResponse{Schedule: result} -} diff --git a/master/server.go b/master/server.go index 1b8c3ae9926..74e1c401d75 100644 --- a/master/server.go +++ b/master/server.go @@ -50,7 +50,7 @@ func NewServer(cfg *Config, ctx *test.Context) (*Server, error) { for _, u := range urls { masterAddrs = append(masterAddrs, u.Host) } - jobManager := NewJobManager(executorManager, executorManager, executorNotifier, masterAddrs) + jobManager := NewJobManager(executorManager, executorNotifier, masterAddrs) server := &Server{ cfg: cfg, executorManager: executorManager, @@ -94,17 +94,12 @@ func (s *Server) RegisterExecutor(ctx context.Context, req *pb.RegisterExecutorR // - queries resource manager to allocate resource and maps tasks to executors // - returns scheduler response to job master func (s *Server) ScheduleTask(ctx context.Context, req *pb.TaskSchedulerRequest) (*pb.TaskSchedulerResponse, error) { - // TODO: support running resource manager independently, and get resource snapshot via rpc. - snapshot := s.executorManager.GetResourceSnapshot() - if len(snapshot.Executors) == 0 { - return nil, errors.ErrClusterResourceNotEnough.GenWithStackByArgs() - } tasks := req.GetTasks() - success, scheduleResp := s.allocateTasksWithNaiveStrategy(snapshot, tasks) + success, resp := s.executorManager.Allocate(tasks) if !success { return nil, errors.ErrClusterResourceNotEnough.GenWithStackByArgs() } - return scheduleResp, nil + return resp, nil } // DeleteExecutor deletes an executor, but have yet implemented.