Skip to content

Commit

Permalink
use uuid for executor id (pingcap#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored Dec 13, 2021
1 parent 87a6e61 commit 66935ff
Show file tree
Hide file tree
Showing 12 changed files with 163 additions and 101 deletions.
2 changes: 1 addition & 1 deletion executor/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (s *Server) keepHeartbeat(ctx context.Context) error {
return errors.ErrHeartbeat.GenWithStack("heartbeat timeout")
}
req := &pb.HeartbeatRequest{
ExecutorId: int32(s.info.ID),
ExecutorId: string(s.info.ID),
Status: int32(model.Running),
Timestamp: uint64(t.Unix()),
// We set longer ttl for master, which is "ttl + rpc timeout", to avoid that
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/grpc v1.40.0
github.com/google/uuid v1.1.2
)

// cloud.google.com/go/storage will upgrade grpc to v1.40.0
Expand Down
10 changes: 5 additions & 5 deletions master/cluster/executor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type ExecutorManager struct {
executors map[model.ExecutorID]*Executor
offExecutor chan model.ExecutorID

idAllocator *autoid.Allocator
idAllocator *autoid.UUIDAllocator
initHeartbeatTTL time.Duration
keepAliveInterval time.Duration

Expand All @@ -40,7 +40,7 @@ func NewExecutorManager(offExec chan model.ExecutorID, initHeartbeatTTL, keepAli
return &ExecutorManager{
testContext: ctx,
executors: make(map[model.ExecutorID]*Executor),
idAllocator: autoid.NewAllocator(),
idAllocator: autoid.NewUUIDAllocator(),
offExecutor: offExec,
initHeartbeatTTL: initHeartbeatTTL,
keepAliveInterval: keepAliveInterval,
Expand All @@ -50,7 +50,7 @@ func NewExecutorManager(offExec chan model.ExecutorID, initHeartbeatTTL, keepAli
func (e *ExecutorManager) removeExecutorImpl(id model.ExecutorID) error {
e.mu.Lock()
defer e.mu.Unlock()
log.L().Logger.Info("begin to remove executor", zap.Int32("id", int32(id)))
log.L().Logger.Info("begin to remove executor", zap.String("id", string(id)))
exec, ok := e.executors[id]
if !ok {
// This executor has been removed
Expand All @@ -74,7 +74,7 @@ func (e *ExecutorManager) removeExecutorImpl(id model.ExecutorID) error {

// HandleHeartbeat implements pb interface,
func (e *ExecutorManager) HandleHeartbeat(req *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error) {
log.L().Logger.Info("handle heart beat", zap.Int32("id", req.ExecutorId))
log.L().Logger.Info("handle heart beat", zap.String("id", req.ExecutorId))
e.mu.Lock()
exec, ok := e.executors[model.ExecutorID(req.ExecutorId)]

Expand Down Expand Up @@ -166,7 +166,7 @@ func (e *Executor) close() error {
}

func (e *Executor) checkAlive() bool {
log.L().Logger.Info("check alive", zap.Int32("exec", int32(e.ExecutorInfo.ID)))
log.L().Logger.Info("check alive", zap.String("exec", string(e.ExecutorInfo.ID)))

e.mu.Lock()
defer e.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion master/jobmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type JobManager struct {
mu sync.Mutex
jobMasters map[model.JobID]system.JobMaster

idAllocater *autoid.Allocator
idAllocater *autoid.IDAllocator
resourceMgr cluster.ResourceMgr
executorClient cluster.ExecutorClient

Expand Down
2 changes: 1 addition & 1 deletion master/jobmaster/benchmark/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
// BuildBenchmarkJobMaster for benchmark workload.
func BuildBenchmarkJobMaster(
rawConfig string,
idAllocator *autoid.Allocator,
idAllocator *autoid.IDAllocator,
resourceMgr cluster.ResourceMgr,
client cluster.ExecutorClient,
mClient cluster.JobMasterClient,
Expand Down
10 changes: 5 additions & 5 deletions master/jobmaster/system/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (m *Master) dispatch(ctx context.Context, tasks []*Task) error {
Tasks: taskList,
}
reqPb := job.ToPB()
log.L().Logger.Info("submit sub job", zap.Int32("exec id", int32(execID)), zap.String("req pb", reqPb.String()))
log.L().Logger.Info("submit sub job", zap.String("exec id", string(execID)), zap.String("req pb", reqPb.String()))
request := &cluster.ExecutorRequest{
Cmd: cluster.CmdSubmitBatchTasks,
Req: reqPb,
Expand Down Expand Up @@ -232,7 +232,7 @@ func (m *Master) StopTasks(ctx context.Context, tasks []*model.Task) error {
req := &pb.CancelBatchTasksRequest{
TaskIdList: taskList,
}
log.L().Info("begin to cancel tasks", zap.Int32("exec", int32(exec)), zap.Any("task", taskList))
log.L().Info("begin to cancel tasks", zap.String("exec", string(exec)), zap.Any("task", taskList))
resp, err := m.client.Send(ctx, exec, &cluster.ExecutorRequest{
Cmd: cluster.CmdCancelBatchTasks,
Req: req,
Expand Down Expand Up @@ -305,19 +305,19 @@ func (m *Master) monitorSchedulingTasks() {
// OfflineExecutor implements JobMaster interface.
func (m *Master) OfflineExecutor(id model.ExecutorID) {
m.offExecutors <- id
log.L().Logger.Info("executor is offlined", zap.Int32("eid", int32(id)))
log.L().Logger.Info("executor is offlined", zap.String("eid", string(id)))
}

func (m *Master) monitorExecutorOffline() {
for {
select {
case execID := <-m.offExecutors:
log.L().Logger.Info("executor is offlined", zap.Int32("eid", int32(execID)))
log.L().Logger.Info("executor is offlined", zap.String("eid", string(execID)))
m.mu.Lock()
taskList, ok := m.execTasks[execID]
if !ok {
m.mu.Unlock()
log.L().Logger.Info("executor has been removed, nothing todo", zap.Int32("id", int32(execID)))
log.L().Logger.Info("executor has been removed, nothing todo", zap.String("id", string(execID)))
continue
}
delete(m.execTasks, execID)
Expand Down
2 changes: 1 addition & 1 deletion master/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (s *Server) allocateTasksWithNaiveStrategy(
rest := exec.Capacity - used
if rest >= cluster.ResourceUsage(task.Cost) {
result[task.GetTask().Id] = &pb.ScheduleResult{
ExecutorId: int32(exec.ID),
ExecutorId: string(exec.ID),
}
exec.Reserved = exec.Reserved + cluster.ResourceUsage(task.GetCost())
break
Expand Down
2 changes: 1 addition & 1 deletion master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (s *Server) RegisterExecutor(ctx context.Context, req *pb.RegisterExecutorR
}, nil
}
return &pb.RegisterExecutorResponse{
ExecutorId: int32(execInfo.ID),
ExecutorId: string(execInfo.ID),
}, nil
}

Expand Down
5 changes: 2 additions & 3 deletions model/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package model

import (
"encoding/json"
"fmt"

"github.com/hanfei1991/microcosm/pkg/adapter"
)

type ExecutorID int32
type ExecutorID string

// ExecutorInfo describes an Executor.
type ExecutorInfo struct {
Expand All @@ -24,7 +23,7 @@ type ExecutorInfo struct {
}

func (e *ExecutorInfo) EtcdKey() string {
return adapter.ExecutorKeyAdapter.Encode(fmt.Sprintf("%d", e.ID))
return adapter.ExecutorKeyAdapter.Encode(string(e.ID))
}

type JobType int
Expand Down
Loading

0 comments on commit 66935ff

Please sign in to comment.