Skip to content

Commit

Permalink
[resource manager]: add separated resouce manager interface (pingcap#44)
Browse files Browse the repository at this point in the history
* [resource manager]: add separated resouce manager interface

* fix merge conflicts

* address comments

* fix data race
  • Loading branch information
amyangfei authored Dec 14, 2021
1 parent 66935ff commit 5355972
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 128 deletions.
31 changes: 18 additions & 13 deletions master/cluster/executor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

"github.com/hanfei1991/microcosm/master/resource"
"github.com/hanfei1991/microcosm/model"
"github.com/hanfei1991/microcosm/pb"
"github.com/hanfei1991/microcosm/pkg/autoid"
Expand All @@ -15,10 +16,7 @@ import (
"go.uber.org/zap"
)

var (
_ ExecutorClient = &ExecutorManager{}
_ ResourceMgr = &ExecutorManager{}
)
var _ ExecutorClient = &ExecutorManager{}

// ExecutorManager holds all the executors info, including liveness, status, resource usage.
type ExecutorManager struct {
Expand All @@ -34,6 +32,8 @@ type ExecutorManager struct {

// TODO: complete ha store.
haStore ha.HAStore // nolint:structcheck,unused

rescMgr resource.RescMgr
}

func NewExecutorManager(offExec chan model.ExecutorID, initHeartbeatTTL, keepAliveInterval time.Duration, ctx *test.Context) *ExecutorManager {
Expand All @@ -44,6 +44,7 @@ func NewExecutorManager(offExec chan model.ExecutorID, initHeartbeatTTL, keepAli
offExecutor: offExec,
initHeartbeatTTL: initHeartbeatTTL,
keepAliveInterval: keepAliveInterval,
rescMgr: resource.NewCapRescMgr(),
}
}

Expand All @@ -57,6 +58,7 @@ func (e *ExecutorManager) removeExecutorImpl(id model.ExecutorID) error {
return errors.ErrUnknownExecutorID.GenWithStackByArgs(id)
}
delete(e.executors, id)
e.rescMgr.Unregister(id)
//err := e.haStore.Del(exec.EtcdKey())
//if err != nil {
// return err
Expand Down Expand Up @@ -96,8 +98,11 @@ func (e *ExecutorManager) HandleHeartbeat(req *pb.HeartbeatRequest) (*pb.Heartbe
exec.lastUpdateTime = time.Now()
exec.heartbeatTTL = time.Duration(req.Ttl) * time.Millisecond
exec.Status = model.ExecutorStatus(req.Status)
usage := ResourceUsage(req.ResourceUsage)
exec.resource.Used = usage
usage := resource.RescUnit(req.GetResourceUsage())
err := e.rescMgr.Update(exec.ID, usage, exec.Status)
if err != nil {
return nil, err
}
resp := &pb.HeartbeatResponse{}
return resp, nil
}
Expand All @@ -119,11 +124,7 @@ func (e *ExecutorManager) AddExecutor(req *pb.RegisterExecutorRequest) (*model.E

// Following part is to bootstrap the executor.
exec := &Executor{
ExecutorInfo: *info,
resource: ExecutorResource{
ID: info.ID,
Capacity: ResourceUsage(info.Capability),
},
ExecutorInfo: *info,
lastUpdateTime: time.Now(),
heartbeatTTL: e.initHeartbeatTTL,
Status: model.Initing,
Expand All @@ -144,14 +145,18 @@ func (e *ExecutorManager) AddExecutor(req *pb.RegisterExecutorRequest) (*model.E
e.mu.Lock()
e.executors[info.ID] = exec
e.mu.Unlock()
e.rescMgr.Register(exec.ID, resource.RescUnit(exec.Capability))
return info, nil
}

func (e *ExecutorManager) Allocate(tasks []*pb.ScheduleTask) (bool, *pb.TaskSchedulerResponse) {
return e.rescMgr.Allocate(tasks)
}

// Executor records the status of an executor instance.
type Executor struct {
model.ExecutorInfo
Status model.ExecutorStatus
resource ExecutorResource
Status model.ExecutorStatus

mu sync.Mutex
// Last heartbeat
Expand Down
52 changes: 0 additions & 52 deletions master/cluster/resource_manager.go

This file was deleted.

5 changes: 1 addition & 4 deletions master/jobmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ type JobManager struct {
jobMasters map[model.JobID]system.JobMaster

idAllocater *autoid.IDAllocator
resourceMgr cluster.ResourceMgr
executorClient cluster.ExecutorClient

offExecutors chan model.ExecutorID
Expand Down Expand Up @@ -86,7 +85,7 @@ func (j *JobManager) SubmitJob(ctx context.Context, req *pb.SubmitJobRequest) *p
return resp
}
jobMaster, err = benchmark.BuildBenchmarkJobMaster(
info.Config, j.idAllocater, j.resourceMgr, j.executorClient, mClient)
info.Config, j.idAllocater, j.executorClient, mClient)
if err != nil {
resp.Err = errors.ToPBError(err)
return resp
Expand All @@ -112,15 +111,13 @@ func (j *JobManager) SubmitJob(ctx context.Context, req *pb.SubmitJobRequest) *p
}

func NewJobManager(
resource cluster.ResourceMgr,
clt cluster.ExecutorClient,
executorNotifier chan model.ExecutorID,
masterAddrs []string,
) *JobManager {
return &JobManager{
jobMasters: make(map[model.JobID]system.JobMaster),
idAllocater: autoid.NewAllocator(),
resourceMgr: resource,
executorClient: clt,
offExecutors: executorNotifier,
masterAddrs: masterAddrs,
Expand Down
3 changes: 1 addition & 2 deletions master/jobmaster/benchmark/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
func BuildBenchmarkJobMaster(
rawConfig string,
idAllocator *autoid.IDAllocator,
resourceMgr cluster.ResourceMgr,
client cluster.ExecutorClient,
mClient cluster.JobMasterClient,
) (*jobMaster, error) {
Expand Down Expand Up @@ -140,7 +139,7 @@ func BuildBenchmarkJobMaster(
job.Tasks = append(job.Tasks, tableTasks...)
job.Tasks = append(job.Tasks, hashTasks...)
job.Tasks = append(job.Tasks, sinkTasks...)
systemJobMaster := system.New(context.Background(), job, resourceMgr, client, mClient)
systemJobMaster := system.New(context.Background(), job, client, mClient)
master := &jobMaster{
Master: systemJobMaster,
config: config,
Expand Down
17 changes: 7 additions & 10 deletions master/jobmaster/system/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ type Master struct {
ctx context.Context
cancel func()

resourceManager cluster.ResourceMgr
client cluster.ExecutorClient
mClient cluster.JobMasterClient
client cluster.ExecutorClient
mClient cluster.JobMasterClient

offExecutors chan model.ExecutorID

Expand All @@ -41,18 +40,16 @@ type Master struct {
func New(
parentCtx context.Context,
job *model.Job,
resourceMgr cluster.ResourceMgr,
client cluster.ExecutorClient,
mClient cluster.JobMasterClient,
) *Master {
ctx, cancel := context.WithCancel(parentCtx)
return &Master{
ctx: ctx,
cancel: cancel,
job: job,
resourceManager: resourceMgr,
client: client,
mClient: mClient,
ctx: ctx,
cancel: cancel,
job: job,
client: client,
mClient: mClient,

offExecutors: make(chan model.ExecutorID, 100),
scheduleWaitingTasks: make(chan scheduleGroup, 1024),
Expand Down
108 changes: 108 additions & 0 deletions master/resource/capacity_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package resource

import (
"sync"

"github.com/hanfei1991/microcosm/model"
"github.com/hanfei1991/microcosm/pb"
"github.com/hanfei1991/microcosm/pkg/errors"
"github.com/pingcap/log"
"go.uber.org/zap"
)

// CapRescMgr implements ResourceMgr interface, and it uses node capacity as
// alloction algorithm
type CapRescMgr struct {
mu sync.Mutex
executors map[model.ExecutorID]*ExecutorResource
}

func NewCapRescMgr() *CapRescMgr {
return &CapRescMgr{
executors: make(map[model.ExecutorID]*ExecutorResource),
}
}

// Register implements RescMgr.Register
func (m *CapRescMgr) Register(id model.ExecutorID, capacity RescUnit) {
m.mu.Lock()
defer m.mu.Unlock()
m.executors[id] = &ExecutorResource{
ID: id,
Capacity: capacity,
}
log.L().Info("executor resource is registered",
zap.String("executor-id", string(id)), zap.Int("capacity", int(capacity)))
}

// Unregister implements RescMgr.Unregister
func (m *CapRescMgr) Unregister(id model.ExecutorID) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.executors, id)
log.L().Info("executor resource is unregistered",
zap.String("executor-id", string(id)))
}

// Allocate implements RescMgr.Allocate
func (m *CapRescMgr) Allocate(tasks []*pb.ScheduleTask) (bool, *pb.TaskSchedulerResponse) {
return m.allocateTasksWithNaiveStrategy(tasks)
}

// Update implements RescMgr.Update
func (m *CapRescMgr) Update(id model.ExecutorID, use RescUnit, status model.ExecutorStatus) error {
m.mu.Lock()
defer m.mu.Unlock()
exec, ok := m.executors[id]
if !ok {
return errors.ErrUnknownExecutorID.GenWithStackByArgs(id)
}
exec.Used = use
exec.Status = status
return nil
}

// getAvailableResource returns resources that are available
func (m *CapRescMgr) getAvailableResource() []*ExecutorResource {
res := make([]*ExecutorResource, 0)
for _, exec := range m.executors {
if exec.Status == model.Running &&
exec.Capacity > exec.Reserved && exec.Capacity > exec.Used {
res = append(res, exec)
}
}
return res
}

func (m *CapRescMgr) allocateTasksWithNaiveStrategy(
tasks []*pb.ScheduleTask,
) (bool, *pb.TaskSchedulerResponse) {
m.mu.Lock()
defer m.mu.Unlock()
result := make(map[int32]*pb.ScheduleResult)
resources := m.getAvailableResource()
var idx int = 0
for _, task := range tasks {
originalIdx := idx
for {
exec := resources[idx]
used := exec.Used
if exec.Reserved > used {
used = exec.Reserved
}
rest := exec.Capacity - used
if rest >= RescUnit(task.Cost) {
result[task.GetTask().Id] = &pb.ScheduleResult{
ExecutorId: string(exec.ID),
}
exec.Reserved = exec.Reserved + RescUnit(task.GetCost())
break
}
idx = (idx + 1) % len(resources)
if idx == originalIdx {
return false, nil
}
}
}
return true, &pb.TaskSchedulerResponse{Schedule: result}
}
33 changes: 33 additions & 0 deletions master/resource/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package resource

import (
"github.com/hanfei1991/microcosm/model"
"github.com/hanfei1991/microcosm/pb"
)

// RescMgr manages the resources of the clusters.
type RescMgr interface {
// Register registers new executor, it is called when an executor joins
Register(id model.ExecutorID, capacity RescUnit)

// Unregister is called when an executor exits
Unregister(id model.ExecutorID)

// Allocate allocates executor resources to given tasks
Allocate(tasks []*pb.ScheduleTask) (bool, *pb.TaskSchedulerResponse)

// Update updates executor resource usage and running status
Update(id model.ExecutorID, use RescUnit, status model.ExecutorStatus) error
}

// RescUnit is the min unit of resource that we count.
type RescUnit int

type ExecutorResource struct {
ID model.ExecutorID
Status model.ExecutorStatus

Capacity RescUnit
Reserved RescUnit
Used RescUnit
}
Loading

0 comments on commit 5355972

Please sign in to comment.