Skip to content

Commit

Permalink
modify scheduler log info
Browse files Browse the repository at this point in the history
  • Loading branch information
chengjoey committed Sep 23, 2021
1 parent fe4f001 commit 715fb1d
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (s *Sched) Create(ctx context.Context, action *spec.PipelineTask) (data int
return nil, err
}
if !shouldDispatch {
logrus.Infof("task executor %s execute create", taskExecutor.Name())
logrus.Infof("task executor %s execute create, actionInfo: %s", taskExecutor.Name(), printActionInfo(action))
return nil, nil
}

Expand Down Expand Up @@ -268,7 +268,7 @@ func (s *Sched) Start(ctx context.Context, action *spec.PipelineTask) (data inte
return nil, err
}
if !shouldDispatch {
logrus.Infof("task executor %s execute start", taskExecutor.Name())
logrus.Infof("task executor %s execute start, actionInfo: %s", taskExecutor.Name(), printActionInfo(action))
return taskExecutor.Create(ctx, action)
}

Expand Down Expand Up @@ -322,7 +322,7 @@ func (s *Sched) Status(ctx context.Context, action *spec.PipelineTask) (desc api
return apistructs.PipelineStatusDesc{}, err
}
if !shouldDispatch {
logrus.Infof("task executor %s execute status", taskExecutor.Name())
logrus.Infof("task executor %s execute status, actionInfo: %s", taskExecutor.Name(), printActionInfo(action))
result, err = taskExecutor.Status(ctx, action)
if err != nil {
return apistructs.PipelineStatusDesc{}, err
Expand Down Expand Up @@ -366,7 +366,7 @@ func (s *Sched) Inspect(ctx context.Context, action *spec.PipelineTask) (apistru
return apistructs.TaskInspect{}, err
}
if !shouldDispatch {
logrus.Infof("task executor %s execute inspect", taskExecutor.Name())
logrus.Infof("task executor %s execute inspect, actionInfo: %s", taskExecutor.Name(), printActionInfo(action))
return taskExecutor.Inspect(ctx, action)
}
return apistructs.TaskInspect{}, errors.New("scheduler(job) not support inspect operation")
Expand All @@ -386,7 +386,7 @@ func (s *Sched) Cancel(ctx context.Context, action *spec.PipelineTask) (data int
return nil, err
}
if !shouldDispatch {
logrus.Infof("task executor %s execute cancel", taskExecutor.Name())
logrus.Infof("task executor %s execute cancel, actionInfo: %s", taskExecutor.Name(), printActionInfo(action))
// TODO move all makeJobID to framework
// now move makeJobID to framework may change task uuid in database
// Restore the task uuid after remove, because gc will make the job id, but cancel don't make the job id
Expand Down Expand Up @@ -434,7 +434,7 @@ func (s *Sched) Remove(ctx context.Context, action *spec.PipelineTask) (data int
// TODO move all makeJobID to framework
// now move makeJobID to framework may change task uuid in database
action.Extra.UUID = task_uuid.MakeJobID(action)
logrus.Infof("task executor %s execute remove", taskExecutor.Name())
logrus.Infof("task executor %s execute remove, actionInfo: %s", taskExecutor.Name(), printActionInfo(action))
return taskExecutor.Remove(ctx, action)
}

Expand Down Expand Up @@ -481,7 +481,7 @@ func (s *Sched) BatchDelete(ctx context.Context, actions []*spec.PipelineTask) (
return nil, err
}
if !shouldDispatch {
logrus.Infof("task executor %s execute batch delete", taskExecutor.Name())
logrus.Infof("task executor %s execute batch delete, actionInfo: %s", taskExecutor.Name(), printActionInfo(action))
return taskExecutor.BatchDelete(ctx, actions)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,49 @@ func TestCancel(t *testing.T) {
assert.Equal(t, uuid, task.Extra.UUID)
}

func TestInspect(t *testing.T) {
s = &Sched{
taskManager: taskManager,
}
m := monkey.PatchInstanceMethod(reflect.TypeOf(s), "GetTaskExecutor", func(_ *Sched, executorType string, clusterName string, task *spec.PipelineTask) (bool, types.TaskExecutor, error) {
return false, &k8sjob.K8sJob{}, nil
})
defer m.Unpatch()

p := monkey.PatchInstanceMethod(reflect.TypeOf(&k8sjob.K8sJob{}), "Inspect", func(_ *k8sjob.K8sJob, ctx context.Context, task *spec.PipelineTask) (data interface{}, err error) {
return nil, nil
})
defer p.Unpatch()

ctx := context.Background()
task := &spec.PipelineTask{
Extra: spec.PipelineTaskExtra{
Namespace: "pipeline-1",
UUID: "pipeline-123456",
LoopOptions: &apistructs.PipelineTaskLoopOptions{
LoopedTimes: 3,
},
},
}
_, err := s.Inspect(ctx, task)
assert.Equal(t, nil, err)
}

func TestPrintActionInfo(t *testing.T) {
task := &spec.PipelineTask{
PipelineID: 1,
ID: 1,
Name: "custom-script",
Extra: spec.PipelineTaskExtra{
Namespace: "pipeline-1",
UUID: "task-1",
ClusterName: "terminus-dev",
},
}
msg := printActionInfo(task)
assert.Equal(t, "pipelineID: 1, id: 1, name: custom-script, namespace: pipeline-1, schedulerJobID: task-1, clusterName: terminus-dev", msg)
}

//import (
// "context"
// "encoding/json"
Expand Down

0 comments on commit 715fb1d

Please sign in to comment.