From 3e321a9001f512184e1d26a257a81b49c2c53a16 Mon Sep 17 00:00:00 2001 From: joey Date: Fri, 12 Nov 2021 18:54:08 +0800 Subject: [PATCH] split the result filed to metadata and inspect --- ...0211112-pipeline-task-metadata-inspect.sql | 1 + apistructs/pipeline_task.go | 12 ++++- apistructs/pipeline_task_test.go | 36 ------------- .../aop/plugins/pipeline/basic/plugin.go | 8 +-- .../task/autotest_cookie_keep_after/plugin.go | 5 +- .../plugins/task/unit_test_report/plugin.go | 5 +- modules/pipeline/dbclient/op_pipeline.go | 10 ++-- modules/pipeline/dbclient/op_pipeline_task.go | 13 ++++- modules/pipeline/events/event_task.go | 4 +- .../pexpr/pexpr_params/task_params.go | 6 ++- .../actionexecutor/plugins/apitest/apitest.go | 12 +++-- .../reconciler/reconcile_data_process.go | 4 +- .../pipeline/pipengine/reconciler/snippet.go | 15 ++++-- .../pipengine/reconciler/snippet_test.go | 52 +++++++++++++++++++ .../pipengine/reconciler/taskrun/framework.go | 2 +- .../pipengine/reconciler/taskrun/task_loop.go | 2 +- .../reconciler/taskrun/taskop/prepare.go | 10 ++-- .../reconciler/taskrun/taskop/prepare_test.go | 9 ++++ .../pipengine/reconciler/taskrun/update.go | 10 ++-- .../pipeline/services/pipelinesvc/callback.go | 37 +++++++++---- .../services/pipelinesvc/callback_test.go | 6 +-- .../pipeline/services/pipelinesvc/detail.go | 9 ++-- modules/pipeline/spec/pipeline_task.go | 33 ++++++++---- modules/pipeline/spec/pipeline_task_test.go | 38 +++++++++++++- 24 files changed, 237 insertions(+), 102 deletions(-) create mode 100644 .erda/migrations/pipeline/20211112-pipeline-task-metadata-inspect.sql diff --git a/.erda/migrations/pipeline/20211112-pipeline-task-metadata-inspect.sql b/.erda/migrations/pipeline/20211112-pipeline-task-metadata-inspect.sql new file mode 100644 index 00000000000..3bad493736b --- /dev/null +++ b/.erda/migrations/pipeline/20211112-pipeline-task-metadata-inspect.sql @@ -0,0 +1 @@ +ALTER TABLE `pipeline_tasks` ADD COLUMN `inspect` text COMMENT 'task的调度信息' AFTER `result`; \ No newline at end of file diff --git a/apistructs/pipeline_task.go b/apistructs/pipeline_task.go index 8a71fbd6742..149c66d1f02 100644 --- a/apistructs/pipeline_task.go +++ b/apistructs/pipeline_task.go @@ -62,6 +62,7 @@ type TaskContainer struct { ContainerID string `json:"containerID"` } +// PipelineTaskResult spec.pipeline task only use metadata, task dto has all fields type PipelineTaskResult struct { Metadata Metadata `json:"metadata,omitempty"` Errors []*PipelineTaskErrResponse `json:"errors,omitempty"` @@ -70,6 +71,13 @@ type PipelineTaskResult struct { Events string `json:"events,omitempty"` } +type PipelineTaskInspect struct { + Errors []*PipelineTaskErrResponse `json:"errors,omitempty"` + MachineStat *PipelineTaskMachineStat `json:"machineStat,omitempty"` + Inspect string `json:"inspect,omitempty"` + Events string `json:"events,omitempty"` +} + type PipelineTaskSnippetDetail struct { Outputs []PipelineOutputWithValue `json:"outputs"` @@ -195,7 +203,7 @@ func (o orderedResponses) Len() int { return len(o) } func (o orderedResponses) Less(i, j int) bool { return o[i].Ctx.EndTime.Before(o[j].Ctx.EndTime) } func (o orderedResponses) Swap(i, j int) { o[i], o[j] = o[j], o[i] } -func (t *PipelineTaskResult) AppendError(newResponses ...*PipelineTaskErrResponse) []*PipelineTaskErrResponse { +func (t *PipelineTaskInspect) AppendError(newResponses ...*PipelineTaskErrResponse) []*PipelineTaskErrResponse { if len(newResponses) == 0 { return t.Errors } @@ -249,7 +257,7 @@ func (t *PipelineTaskResult) AppendError(newResponses ...*PipelineTaskErrRespons return orderd } -func (t *PipelineTaskResult) ConvertErrors() { +func (t *PipelineTaskInspect) ConvertErrors() { for _, response := range t.Errors { if response.Ctx.Count > 1 { response.Msg = fmt.Sprintf("%s\nstartTime: %s\nendTime: %s\ncount: %d", diff --git a/apistructs/pipeline_task_test.go b/apistructs/pipeline_task_test.go index 6d482dfc460..ae8337ef0c4 100644 --- a/apistructs/pipeline_task_test.go +++ b/apistructs/pipeline_task_test.go @@ -17,9 +17,6 @@ package apistructs import ( "fmt" "testing" - "time" - - "github.com/stretchr/testify/assert" ) func TestPipelineTaskLoop_Duplicate(t *testing.T) { @@ -27,39 +24,6 @@ func TestPipelineTaskLoop_Duplicate(t *testing.T) { fmt.Println(l.Duplicate()) } -func TestPipelineTaskAppendError(t *testing.T) { - task := PipelineTaskDTO{} - task.Result.Errors = task.Result.AppendError(&PipelineTaskErrResponse{Msg: "a"}) - task.Result.Errors = task.Result.AppendError(&PipelineTaskErrResponse{Msg: "a"}) - assert.Equal(t, 1, len(task.Result.Errors)) - task.Result.Errors = task.Result.AppendError(&PipelineTaskErrResponse{Msg: "b"}) - assert.Equal(t, 2, len(task.Result.Errors)) - startA := time.Date(2021, 8, 19, 10, 10, 0, 0, time.Local) - endA := time.Date(2021, 8, 19, 10, 30, 0, 0, time.Local) - task.Result.Errors = task.Result.AppendError(&PipelineTaskErrResponse{Msg: "a", Ctx: PipelineTaskErrCtx{StartTime: startA, EndTime: endA}}) - assert.Equal(t, 3, len(task.Result.Errors)) - start := time.Date(2021, 8, 19, 10, 9, 0, 0, time.Local) - end := time.Date(2021, 8, 19, 10, 29, 0, 0, time.Local) - task.Result.Errors = task.Result.AppendError(&PipelineTaskErrResponse{Msg: "a", Ctx: PipelineTaskErrCtx{StartTime: start, EndTime: end}}) - assert.Equal(t, uint64(2), task.Result.Errors[2].Ctx.Count) - assert.Equal(t, 3, len(task.Result.Errors)) - assert.Equal(t, start.Unix(), task.Result.Errors[2].Ctx.StartTime.Unix()) - assert.Equal(t, endA.Unix(), task.Result.Errors[2].Ctx.EndTime.Unix()) -} - -func TestConvertErrors(t *testing.T) { - task := PipelineTaskDTO{} - start := time.Date(2021, 8, 24, 9, 45, 1, 1, time.Local) - end := time.Date(2021, 8, 24, 9, 46, 1, 1, time.Local) - task.Result.Errors = task.Result.AppendError(&PipelineTaskErrResponse{Msg: "err", Ctx: PipelineTaskErrCtx{ - StartTime: start, - EndTime: end, - Count: 2, - }}) - task.Result.ConvertErrors() - assert.Equal(t, fmt.Sprintf("err\nstartTime: %s\nendTime: %s\ncount: %d", start.Format("2006-01-02 15:04:05"), end.Format("2006-01-02 15:04:05"), 2), task.Result.Errors[0].Msg) -} - func TestPipelineTaskLoop_IsEmpty(t *testing.T) { type fields struct { Break string diff --git a/modules/pipeline/aop/plugins/pipeline/basic/plugin.go b/modules/pipeline/aop/plugins/pipeline/basic/plugin.go index 4b5ba3ee17c..8890af549a9 100644 --- a/modules/pipeline/aop/plugins/pipeline/basic/plugin.go +++ b/modules/pipeline/aop/plugins/pipeline/basic/plugin.go @@ -64,11 +64,13 @@ func (p *provider) Handle(ctx *aoptypes.TuneContext) error { TimeEndQueue: getTimeOrNil(task.Extra.TimeEndQueue), QueueCostTimeSec: task.QueueTimeSec, RunCostTimeSec: task.CostTimeSec, - MachineStat: task.Result.MachineStat, + MachineStat: task.Inspect.MachineStat, Meta: func() map[string]string { result := make(map[string]string) - for _, meta := range task.Result.Metadata { - result[meta.Name] = meta.Value + if task.Result != nil { + for _, meta := range task.Result.Metadata { + result[meta.Name] = meta.Value + } } return result }(), diff --git a/modules/pipeline/aop/plugins/task/autotest_cookie_keep_after/plugin.go b/modules/pipeline/aop/plugins/task/autotest_cookie_keep_after/plugin.go index bfba707f4ba..dd9e0dd6b2e 100644 --- a/modules/pipeline/aop/plugins/task/autotest_cookie_keep_after/plugin.go +++ b/modules/pipeline/aop/plugins/task/autotest_cookie_keep_after/plugin.go @@ -41,7 +41,10 @@ func (p *provider) Handle(ctx *aoptypes.TuneContext) error { } // task result metafile not have set_cookie return - metadata := ctx.SDK.Task.Result.Metadata + var metadata apistructs.Metadata + if ctx.SDK.Task.Result != nil { + metadata = ctx.SDK.Task.Result.Metadata + } if metadata == nil { return nil } diff --git a/modules/pipeline/aop/plugins/task/unit_test_report/plugin.go b/modules/pipeline/aop/plugins/task/unit_test_report/plugin.go index d071bd7c4ae..6f83118e754 100644 --- a/modules/pipeline/aop/plugins/task/unit_test_report/plugin.go +++ b/modules/pipeline/aop/plugins/task/unit_test_report/plugin.go @@ -40,7 +40,10 @@ func (p *provider) Handle(ctx *aoptypes.TuneContext) error { return nil } - metadata := ctx.SDK.Task.Result.Metadata + var metadata apistructs.Metadata + if ctx.SDK.Task.Result != nil { + metadata = ctx.SDK.Task.Result.Metadata + } if metadata == nil { return nil } diff --git a/modules/pipeline/dbclient/op_pipeline.go b/modules/pipeline/dbclient/op_pipeline.go index 0ac7f410cb4..1872c51cace 100644 --- a/modules/pipeline/dbclient/op_pipeline.go +++ b/modules/pipeline/dbclient/op_pipeline.go @@ -579,11 +579,13 @@ func (client *Client) GetPipelineOutputs(pipelineID uint64) (map[string]map[stri outputs := make(map[string]map[string]string) for _, task := range tasks { - for _, metadatum := range task.Result.Metadata { - if outputs[task.Name] == nil { - outputs[task.Name] = make(map[string]string) + if task.Result != nil { + for _, metadatum := range task.Result.Metadata { + if outputs[task.Name] == nil { + outputs[task.Name] = make(map[string]string) + } + outputs[task.Name][metadatum.Name] = metadatum.Value } - outputs[task.Name][metadatum.Name] = metadatum.Value } } diff --git a/modules/pipeline/dbclient/op_pipeline_task.go b/modules/pipeline/dbclient/op_pipeline_task.go index ad958cdbaa3..5149e5cb6f3 100644 --- a/modules/pipeline/dbclient/op_pipeline_task.go +++ b/modules/pipeline/dbclient/op_pipeline_task.go @@ -146,7 +146,7 @@ func (client *Client) ListPipelineTasksByPipelineID(pipelineID uint64, ops ...Se return tasks, nil } -func (client *Client) UpdatePipelineTaskResult(id uint64, result apistructs.PipelineTaskResult) error { +func (client *Client) UpdatePipelineTaskMetadata(id uint64, result *apistructs.PipelineTaskResult) error { _, err := client.ID(id).Cols("result").Update(&spec.PipelineTask{Result: result}) if err != nil { b, _ := json.Marshal(&result) @@ -155,6 +155,15 @@ func (client *Client) UpdatePipelineTaskResult(id uint64, result apistructs.Pipe return nil } +func (client *Client) UpdatePipelineTaskInspect(id uint64, inspect apistructs.PipelineTaskInspect) error { + _, err := client.ID(id).Cols("inspect").Update(&spec.PipelineTask{Inspect: inspect}) + if err != nil { + b, _ := json.Marshal(&inspect) + return errors.Errorf("failed to update pipeline task inspect, taskID: %d, inspect: %s, err: %v", id, string(b), err) + } + return nil +} + func (client *Client) UpdatePipelineTask(id uint64, task *spec.PipelineTask, ops ...SessionOption) error { session := client.NewSession(ops...) defer session.Close() @@ -163,7 +172,7 @@ func (client *Client) UpdatePipelineTask(id uint64, task *spec.PipelineTask, ops retryNum := 0 for { - affectedRows, err := client.ID(id).AllCols().Update(task) + affectedRows, err := client.ID(id).Update(task) if err != nil { return err } diff --git a/modules/pipeline/events/event_task.go b/modules/pipeline/events/event_task.go index c880730356f..b601ce6ced7 100644 --- a/modules/pipeline/events/event_task.go +++ b/modules/pipeline/events/event_task.go @@ -105,7 +105,9 @@ func (e *PipelineTaskEvent) HandleWebSocket() error { payload.ProjectID = e.Pipeline.Labels[apistructs.LabelProjectID] payload.OrgID = e.Pipeline.Labels[apistructs.LabelOrgID] payload.Status = state - payload.Result = e.Task.Result + if e.Task.Result != nil { + payload.Result = *e.Task.Result + } payload.CostTimeSec = e.Content().(apistructs.PipelineTaskEventData).CostTimeSec wsEvent := websocket.Event{ diff --git a/modules/pipeline/pexpr/pexpr_params/task_params.go b/modules/pipeline/pexpr/pexpr_params/task_params.go index e66e2aea1ea..7b55b56606c 100644 --- a/modules/pipeline/pexpr/pexpr_params/task_params.go +++ b/modules/pipeline/pexpr/pexpr_params/task_params.go @@ -100,8 +100,10 @@ func generateOutputs(tasks []*spec.PipelineTask) map[string]string { } outputs := make(map[string]string) for _, task := range tasks { - for _, meta := range task.Result.Metadata { - outputs[makePhKeyFunc(task.Name, meta.Name)] = meta.Value + if task.Result != nil { + for _, meta := range task.Result.Metadata { + outputs[makePhKeyFunc(task.Name, meta.Name)] = meta.Value + } } } return outputs diff --git a/modules/pipeline/pipengine/actionexecutor/plugins/apitest/apitest.go b/modules/pipeline/pipengine/actionexecutor/plugins/apitest/apitest.go index e3b14d7983f..2c7e7a0b0f6 100644 --- a/modules/pipeline/pipengine/actionexecutor/plugins/apitest/apitest.go +++ b/modules/pipeline/pipengine/actionexecutor/plugins/apitest/apitest.go @@ -96,7 +96,10 @@ func (d *define) Start(ctx context.Context, task *spec.PipelineTask) (interface{ return } - meta := latestTask.Result.Metadata + var meta apistructs.Metadata + if latestTask.Result != nil { + meta = (*latestTask.Result).Metadata + } for _, metaField := range meta { if metaField.Name == logic.MetaKeyResult { if metaField.Value == logic.ResultSuccess { @@ -132,12 +135,15 @@ func (d *define) Status(ctx context.Context, task *spec.PipelineTask) (apistruct return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusAnalyzed}, nil } - if !started && len(latestTask.Result.Metadata) == 0 { + var meta apistructs.Metadata + if latestTask.Result != nil { + meta = (*latestTask.Result).Metadata + } + if !started && len(meta) == 0 { return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusBorn}, nil } // status according to api success or not - meta := latestTask.Result.Metadata var status = apistructs.PipelineStatusFailed for _, metaField := range meta { if metaField.Name == logic.MetaKeyResult { diff --git a/modules/pipeline/pipengine/reconciler/reconcile_data_process.go b/modules/pipeline/pipengine/reconciler/reconcile_data_process.go index 6dda5f8f1f2..99de3a5c6a2 100644 --- a/modules/pipeline/pipengine/reconciler/reconcile_data_process.go +++ b/modules/pipeline/pipengine/reconciler/reconcile_data_process.go @@ -109,7 +109,7 @@ func (r *Reconciler) createSnippetPipeline(task *spec.PipelineTask, p *spec.Pipe defer func() { if failedError != nil { err = failedError - task.Result.Errors = append(task.Result.Errors, &apistructs.PipelineTaskErrResponse{ + task.Inspect.Errors = append(task.Inspect.Errors, &apistructs.PipelineTaskErrResponse{ Msg: err.Error(), }) task.Status = apistructs.PipelineStatusFailed @@ -176,7 +176,7 @@ func (r *Reconciler) reconcileSnippetTask(task *spec.PipelineTask, p *spec.Pipel if snippetPipeline == nil { task.Status = apistructs.PipelineStatusAnalyzeFailed - task.Result.Errors = append(task.Result.Errors, &apistructs.PipelineTaskErrResponse{ + task.Inspect.Errors = append(task.Inspect.Errors, &apistructs.PipelineTaskErrResponse{ Msg: "not find task bind pipeline", }) if updateErr := r.dbClient.UpdatePipelineTask(task.ID, task); updateErr != nil { diff --git a/modules/pipeline/pipengine/reconciler/snippet.go b/modules/pipeline/pipengine/reconciler/snippet.go index fc318df54ee..7a1373beb39 100644 --- a/modules/pipeline/pipengine/reconciler/snippet.go +++ b/modules/pipeline/pipengine/reconciler/snippet.go @@ -88,12 +88,15 @@ func (r *Reconciler) handleParentSnippetTaskOutputs(snippetPipeline *spec.Pipeli // update result.metadata for value-context reference for _, outputValue := range snippetPipeline.Snapshot.OutputValues { + if parentTask.Result == nil { + parentTask.Result = &apistructs.PipelineTaskResult{Metadata: apistructs.Metadata{}} + } parentTask.Result.Metadata = append(parentTask.Result.Metadata, apistructs.MetadataField{ Name: outputValue.Name, Value: strutil.String(outputValue.Value), }) } - if err := r.dbClient.UpdatePipelineTaskResult(parentTaskID, parentTask.Result); err != nil { + if err := r.dbClient.UpdatePipelineTaskMetadata(parentTaskID, parentTask.Result); err != nil { return err } @@ -105,11 +108,13 @@ func (r *Reconciler) calculateAndUpdatePipelineOutputValues(p *spec.Pipeline, ta // 所有任务的输出 allTaskOutputs := make(map[string]map[string]interface{}) for _, task := range tasks { - for _, meta := range task.Result.Metadata { - if allTaskOutputs[task.Name] == nil { - allTaskOutputs[task.Name] = make(map[string]interface{}) + if task.Result != nil { + for _, meta := range task.Result.Metadata { + if allTaskOutputs[task.Name] == nil { + allTaskOutputs[task.Name] = make(map[string]interface{}) + } + allTaskOutputs[task.Name][meta.Name] = meta.Value } - allTaskOutputs[task.Name][meta.Name] = meta.Value } } diff --git a/modules/pipeline/pipengine/reconciler/snippet_test.go b/modules/pipeline/pipengine/reconciler/snippet_test.go index e3cc0094c73..ca6d59e8633 100644 --- a/modules/pipeline/pipengine/reconciler/snippet_test.go +++ b/modules/pipeline/pipengine/reconciler/snippet_test.go @@ -15,10 +15,16 @@ package reconciler import ( + "reflect" "testing" + "bou.ke/monkey" "github.com/davecgh/go-spew/spew" "github.com/stretchr/testify/assert" + + "github.com/erda-project/erda/apistructs" + "github.com/erda-project/erda/modules/pipeline/dbclient" + "github.com/erda-project/erda/modules/pipeline/spec" ) func TestParsePipelineOutputRef(t *testing.T) { @@ -36,3 +42,49 @@ func TestParsePipelineOutputRefV2(t *testing.T) { assert.Equal(t, "a", reffedTask) assert.Equal(t, "b", reffedKey) } + +func Test_handleParentSnippetTaskOutputs(t *testing.T) { + db := &dbclient.Client{} + + m1 := monkey.PatchInstanceMethod(reflect.TypeOf(db), "GetPipelineTask", func(_ *dbclient.Client, id interface{}) (spec.PipelineTask, error) { + return spec.PipelineTask{Result: &apistructs.PipelineTaskResult{}}, nil + }) + defer m1.Unpatch() + + m2 := monkey.PatchInstanceMethod(reflect.TypeOf(db), "UpdatePipelineTaskSnippetDetail", func(_ *dbclient.Client, id uint64, snippetDetail apistructs.PipelineTaskSnippetDetail, ops ...dbclient.SessionOption) error { + return nil + }) + defer m2.Unpatch() + + m3 := monkey.PatchInstanceMethod(reflect.TypeOf(db), "UpdatePipelineTaskMetadata", func(_ *dbclient.Client, id uint64, result *apistructs.PipelineTaskResult) error { + return nil + }) + defer m3.Unpatch() + + r := &Reconciler{dbClient: db} + snippetPipeline := &spec.Pipeline{} + err := r.handleParentSnippetTaskOutputs(snippetPipeline, []apistructs.PipelineOutputWithValue{{ + PipelineOutput: apistructs.PipelineOutput{Name: "pipelineID"}, + Value: "1", + }}) + assert.NoError(t, err) +} + +func Test_calculateAndUpdatePipelineOutputValues(t *testing.T) { + db := &dbclient.Client{} + + m1 := monkey.PatchInstanceMethod(reflect.TypeOf(db), "UpdatePipelineExtraSnapshot", func(_ *dbclient.Client, pipelineID uint64, snapshot spec.Snapshot, ops ...dbclient.SessionOption) error { + return nil + }) + defer m1.Unpatch() + + tasks := []*spec.PipelineTask{&spec.PipelineTask{Name: "1", Result: &apistructs.PipelineTaskResult{ + Metadata: apistructs.Metadata{{ + Name: "pipelineID", + Value: "1", + }}, + }}} + r := &Reconciler{dbClient: db} + _, err := r.calculateAndUpdatePipelineOutputValues(&spec.Pipeline{PipelineBase: spec.PipelineBase{ID: 1}}, tasks) + assert.NoError(t, err) +} diff --git a/modules/pipeline/pipengine/reconciler/taskrun/framework.go b/modules/pipeline/pipengine/reconciler/taskrun/framework.go index 12b0dcfd4f3..ec432be8da0 100644 --- a/modules/pipeline/pipengine/reconciler/taskrun/framework.go +++ b/modules/pipeline/pipengine/reconciler/taskrun/framework.go @@ -108,7 +108,7 @@ func (tr *TaskRun) waitOp(itr TaskOp, o *Elem) (result error) { } resultErrMsg = append(resultErrMsg, errs...) if len(resultErrMsg) > 0 { - tr.Task.Result.Errors = tr.Task.Result.AppendError(&apistructs.PipelineTaskErrResponse{ + tr.Task.Inspect.Errors = tr.Task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{ Msg: strutil.Join(resultErrMsg, "\n", true), Ctx: apistructs.PipelineTaskErrCtx{ StartTime: startTime, diff --git a/modules/pipeline/pipengine/reconciler/taskrun/task_loop.go b/modules/pipeline/pipengine/reconciler/taskrun/task_loop.go index f7dd0a6eec7..7f19f7b98f8 100644 --- a/modules/pipeline/pipengine/reconciler/taskrun/task_loop.go +++ b/modules/pipeline/pipengine/reconciler/taskrun/task_loop.go @@ -122,7 +122,7 @@ func (tr *TaskRun) resetTaskForLoop() { tr.Task.Extra.TimeEndQueue = time.Time{} tr.Task.TimeEnd = time.Time{} // reset task result - tr.Task.Result = apistructs.PipelineTaskResult{} + tr.Task.Result = nil // reset volume tr.Task.Context = spec.PipelineTaskContext{} tr.Task.Extra.Volumes = nil diff --git a/modules/pipeline/pipengine/reconciler/taskrun/taskop/prepare.go b/modules/pipeline/pipengine/reconciler/taskrun/taskop/prepare.go index 198bcffa175..1d89b331d8b 100644 --- a/modules/pipeline/pipengine/reconciler/taskrun/taskop/prepare.go +++ b/modules/pipeline/pipengine/reconciler/taskrun/taskop/prepare.go @@ -77,7 +77,7 @@ func (pre *prepare) WhenDone(data interface{}) error { // no need retry if err != nil { pre.Task.Status = apistructs.PipelineStatusAnalyzeFailed - pre.Task.Result.Errors = pre.Task.Result.AppendError(&apistructs.PipelineTaskErrResponse{Msg: err.Error()}) + pre.Task.Inspect.Errors = pre.Task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{Msg: err.Error()}) return nil } @@ -748,13 +748,13 @@ func condition(task *spec.PipelineTask) bool { if sign.Err != nil { task.Status = apistructs.PipelineStatusFailed if sign.Err != nil { - task.Result.Errors = task.Result.AppendError(&apistructs.PipelineTaskErrResponse{ + task.Inspect.Errors = task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{ Msg: sign.Err.Error(), }) } if sign.Msg != "" { - task.Result.Errors = task.Result.AppendError(&apistructs.PipelineTaskErrResponse{ + task.Inspect.Errors = task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{ Msg: sign.Msg, }) } @@ -765,13 +765,13 @@ func condition(task *spec.PipelineTask) bool { task.Status = apistructs.PipelineStatusNoNeedBySystem task.Extra.AllowFailure = true if sign.Err != nil { - task.Result.Errors = task.Result.AppendError(&apistructs.PipelineTaskErrResponse{ + task.Inspect.Errors = task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{ Msg: sign.Err.Error(), }) } if sign.Msg != "" { - task.Result.Errors = task.Result.AppendError(&apistructs.PipelineTaskErrResponse{ + task.Inspect.Errors = task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{ Msg: sign.Msg, }) } diff --git a/modules/pipeline/pipengine/reconciler/taskrun/taskop/prepare_test.go b/modules/pipeline/pipengine/reconciler/taskrun/taskop/prepare_test.go index ea75dd150f4..7c35d075fc9 100644 --- a/modules/pipeline/pipengine/reconciler/taskrun/taskop/prepare_test.go +++ b/modules/pipeline/pipengine/reconciler/taskrun/taskop/prepare_test.go @@ -17,8 +17,11 @@ package taskop import ( "testing" + "github.com/stretchr/testify/assert" + "github.com/erda-project/erda/apistructs" "github.com/erda-project/erda/modules/pipeline/spec" + "github.com/erda-project/erda/pkg/parser/pipelineyml" ) func Test_prepare_generateOpenapiTokenForPullBootstrapInfo(t *testing.T) { @@ -70,3 +73,9 @@ func Test_prepare_generateOpenapiTokenForPullBootstrapInfo(t *testing.T) { }) } } + +func Test_condition(t *testing.T) { + task := &spec.PipelineTask{Extra: spec.PipelineTaskExtra{Action: pipelineyml.Action{If: "${{ 1 == 1 }}"}}} + b := condition(task) + assert.Equal(t, false, b) +} diff --git a/modules/pipeline/pipengine/reconciler/taskrun/update.go b/modules/pipeline/pipengine/reconciler/taskrun/update.go index a79a941dcab..b339a139b20 100644 --- a/modules/pipeline/pipengine/reconciler/taskrun/update.go +++ b/modules/pipeline/pipengine/reconciler/taskrun/update.go @@ -70,8 +70,8 @@ func (tr *TaskRun) AppendLastMsg(msg string) error { if err := tr.fetchLatestTask(); err != nil { return err } - tr.Task.Result.Errors = tr.Task.Result.AppendError(&apistructs.PipelineTaskErrResponse{Msg: msg}) - if err := tr.DBClient.UpdatePipelineTaskResult(tr.Task.ID, tr.Task.Result); err != nil { + tr.Task.Inspect.Errors = tr.Task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{Msg: msg}) + if err := tr.DBClient.UpdatePipelineTaskInspect(tr.Task.ID, tr.Task.Inspect); err != nil { logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q append last message failed, err: %v", tr.P.ID, tr.Task.Name, err) return err @@ -88,9 +88,9 @@ func (tr *TaskRun) UpdateTaskInspect(inspect string) error { return err } events := getEventsFromInspect(inspect) - tr.Task.Result.Inspect = inspect - tr.Task.Result.Events = events - if err := tr.DBClient.UpdatePipelineTaskResult(tr.Task.ID, tr.Task.Result); err != nil { + tr.Task.Inspect.Inspect = inspect + tr.Task.Inspect.Events = events + if err := tr.DBClient.UpdatePipelineTaskInspect(tr.Task.ID, tr.Task.Inspect); err != nil { logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q update inspect failed, err: %v", tr.P.ID, tr.Task.Name, err) return err diff --git a/modules/pipeline/services/pipelinesvc/callback.go b/modules/pipeline/services/pipelinesvc/callback.go index 6d04cadc56a..1eb8eda81b1 100644 --- a/modules/pipeline/services/pipelinesvc/callback.go +++ b/modules/pipeline/services/pipelinesvc/callback.go @@ -56,8 +56,13 @@ func (s *PipelineSvc) DealPipelineCallbackOfAction(data []byte) (err error) { fmt.Sprintf("task not belong to pipeline, taskID: %d, pipelineID: %d", task.ID, p.ID)) } - // 更新 task.result - if err = s.appendPipelineTaskResult(&p, &task, cb); err != nil { + // update task.metadata + if err = s.appendPipelineTaskMetadata(&p, &task, cb); err != nil { + return err + } + + // update task.inspect + if err = s.appendPipelineTaskInspect(&p, &task, cb); err != nil { return err } @@ -74,13 +79,10 @@ func (s *PipelineSvc) DealPipelineCallbackOfAction(data []byte) (err error) { return nil } -// appendPipelineTaskResult 追加 result -func (s *PipelineSvc) appendPipelineTaskResult(p *spec.Pipeline, task *spec.PipelineTask, cb apistructs.ActionCallback) error { - if len(cb.Metadata) == 0 && len(cb.Errors) == 0 && cb.MachineStat == nil { +func (s *PipelineSvc) appendPipelineTaskInspect(p *spec.Pipeline, task *spec.PipelineTask, cb apistructs.ActionCallback) error { + if len(cb.Errors) == 0 && cb.MachineStat == nil { return nil } - // metadata - task.Result.Metadata = append(task.Result.Metadata, cb.Metadata...) // TODO action agent should add err start time and end time newTaskErrors := make([]*apistructs.PipelineTaskErrResponse, 0) for _, e := range cb.Errors { @@ -88,13 +90,28 @@ func (s *PipelineSvc) appendPipelineTaskResult(p *spec.Pipeline, task *spec.Pipe Msg: e.Msg, }) } - task.Result.Errors = task.Result.AppendError(newTaskErrors...) + task.Inspect.Errors = task.Inspect.AppendError(newTaskErrors...) // machine stat if cb.MachineStat != nil { - task.Result.MachineStat = cb.MachineStat + task.Inspect.MachineStat = cb.MachineStat + } + + if err := s.dbClient.UpdatePipelineTaskInspect(task.ID, task.Inspect); err != nil { + return err } + return nil +} - if err := s.dbClient.UpdatePipelineTaskResult(task.ID, task.Result); err != nil { +func (s *PipelineSvc) appendPipelineTaskMetadata(p *spec.Pipeline, task *spec.PipelineTask, cb apistructs.ActionCallback) error { + if len(cb.Metadata) == 0 { + return nil + } + if task.Result == nil { + task.Result = &apistructs.PipelineTaskResult{Metadata: apistructs.Metadata{}} + } + + task.Result.Metadata = append(task.Result.Metadata, cb.Metadata...) + if err := s.dbClient.UpdatePipelineTaskMetadata(task.ID, task.Result); err != nil { return err } diff --git a/modules/pipeline/services/pipelinesvc/callback_test.go b/modules/pipeline/services/pipelinesvc/callback_test.go index cfa97ccd3e5..f14b98c0ab9 100644 --- a/modules/pipeline/services/pipelinesvc/callback_test.go +++ b/modules/pipeline/services/pipelinesvc/callback_test.go @@ -34,7 +34,7 @@ func TestAppendPipelineTaskResult(t *testing.T) { } task := &spec.PipelineTask{ - Result: apistructs.PipelineTaskResult{ + Inspect: apistructs.PipelineTaskInspect{ Errors: []*apistructs.PipelineTaskErrResponse{ &apistructs.PipelineTaskErrResponse{Msg: "a"}, }, @@ -47,7 +47,7 @@ func TestAppendPipelineTaskResult(t *testing.T) { Msg: e.Msg, }) } - task.Result.Errors = task.Result.AppendError(newTaskErrors...) + task.Inspect.Errors = task.Inspect.AppendError(newTaskErrors...) - assert.Equal(t, 3, len(task.Result.Errors)) + assert.Equal(t, 3, len(task.Inspect.Errors)) } diff --git a/modules/pipeline/services/pipelinesvc/detail.go b/modules/pipeline/services/pipelinesvc/detail.go index 03205e1ac3a..97d8c555924 100644 --- a/modules/pipeline/services/pipelinesvc/detail.go +++ b/modules/pipeline/services/pipelinesvc/detail.go @@ -108,14 +108,15 @@ func (s *PipelineSvc) Detail(pipelineID uint64) (*apistructs.PipelineDetailDTO, needApproval = true } task.CostTimeSec = costtimeutil.CalculateTaskCostTimeSec(&task) - if task.Result.Metadata == nil { + if task.Result == nil { + task.Result = &apistructs.PipelineTaskResult{} task.Result.Metadata = make([]apistructs.MetadataField, 0) } // add task events to result metadata if task status isn`t success and events it`s failed - if !task.Status.IsSuccessStatus() && task.Result.Events != "" && !isEventsLatestNormal(task.Result.Events) { + if !task.Status.IsSuccessStatus() && task.Inspect.Events != "" && !isEventsLatestNormal(task.Inspect.Events) { task.Result.Metadata = append(task.Result.Metadata, apistructs.MetadataField{ Name: "task-events", - Value: task.Result.Events, + Value: task.Inspect.Events, }) } taskDTOs = append(taskDTOs, *task.Convert2DTO()) @@ -455,7 +456,7 @@ func polishTask(p *spec.Pipeline, task *spec.PipelineTask, runningStageID uint64 return } // 判断 task 状态 - if len(task.Result.Errors) > 0 { + if len(task.Inspect.Errors) > 0 { task.Status = apistructs.PipelineStatusFailed changed = true return diff --git a/modules/pipeline/spec/pipeline_task.go b/modules/pipeline/spec/pipeline_task.go index 4420aba4a47..721f6b88bc3 100644 --- a/modules/pipeline/spec/pipeline_task.go +++ b/modules/pipeline/spec/pipeline_task.go @@ -32,14 +32,15 @@ type PipelineTask struct { PipelineID uint64 `json:"pipelineID"` StageID uint64 `json:"stageID"` - Name string `json:"name"` - OpType PipelineTaskOpType `json:"opType"` // Deprecated: get, put, task - Type string `json:"type,omitempty"` // git, buildpack, release, dice ... 当 OpType 为自定义任务时为空 - ExecutorKind PipelineTaskExecutorKind `json:"executorKind"` // scheduler, memory - Status apistructs.PipelineStatus `json:"status"` - Extra PipelineTaskExtra `json:"extra" xorm:"json"` - Context PipelineTaskContext `json:"context" xorm:"json"` - Result apistructs.PipelineTaskResult `json:"result" xorm:"json"` + Name string `json:"name"` + OpType PipelineTaskOpType `json:"opType"` // Deprecated: get, put, task + Type string `json:"type,omitempty"` // git, buildpack, release, dice ... 当 OpType 为自定义任务时为空 + ExecutorKind PipelineTaskExecutorKind `json:"executorKind"` // scheduler, memory + Status apistructs.PipelineStatus `json:"status"` + Extra PipelineTaskExtra `json:"extra" xorm:"json"` + Context PipelineTaskContext `json:"context" xorm:"json"` + Result *apistructs.PipelineTaskResult `json:"result" xorm:"json"` + Inspect apistructs.PipelineTaskInspect `json:"inspect" xorm:"json"` IsSnippet bool `json:"isSnippet"` // 该节点是否是嵌套流水线节点 SnippetPipelineID *uint64 `json:"snippetPipelineID"` // 嵌套的流水线 id @@ -199,7 +200,6 @@ func (pt *PipelineTask) Convert2DTO() *apistructs.PipelineTaskDTO { if pt == nil { return nil } - pt.Result.ConvertErrors() task := apistructs.PipelineTaskDTO{ ID: pt.ID, PipelineID: pt.PipelineID, @@ -214,7 +214,6 @@ func (pt *PipelineTask) Convert2DTO() *apistructs.PipelineTaskDTO { TaskContainers: pt.Extra.TaskContainers, }, Labels: pt.Extra.Action.Labels, - Result: pt.Result, CostTimeSec: pt.CostTimeSec, QueueTimeSec: pt.QueueTimeSec, TimeBegin: pt.TimeBegin, @@ -226,6 +225,14 @@ func (pt *PipelineTask) Convert2DTO() *apistructs.PipelineTaskDTO { SnippetPipelineID: pt.SnippetPipelineID, SnippetPipelineDetail: pt.SnippetPipelineDetail, } + if pt.Result != nil { + task.Result.Metadata = pt.Result.Metadata + } + pt.Inspect.ConvertErrors() + task.Result.MachineStat = pt.Inspect.MachineStat + task.Result.Inspect = pt.Inspect.Inspect + task.Result.Events = pt.Inspect.Events + task.Result.Errors = pt.Inspect.Errors // handle metadata for _, field := range task.Result.Metadata { field.Level = field.GetLevel() @@ -245,6 +252,9 @@ func (pt *PipelineTask) Convert2DTO() *apistructs.PipelineTaskDTO { } func (pt *PipelineTask) RuntimeID() string { + if pt.Result == nil { + return "" + } for _, meta := range pt.Result.Metadata { if meta.Type == apistructs.ActionCallbackTypeLink && meta.Name == apistructs.ActionCallbackRuntimeID { @@ -255,6 +265,9 @@ func (pt *PipelineTask) RuntimeID() string { } func (pt *PipelineTask) ReleaseID() string { + if pt.Result == nil { + return "" + } for _, meta := range pt.Result.Metadata { if meta.Type == apistructs.ActionCallbackTypeLink && meta.Name == apistructs.ActionCallbackReleaseID { diff --git a/modules/pipeline/spec/pipeline_task_test.go b/modules/pipeline/spec/pipeline_task_test.go index b094f4923e7..c1d8961208e 100644 --- a/modules/pipeline/spec/pipeline_task_test.go +++ b/modules/pipeline/spec/pipeline_task_test.go @@ -16,7 +16,9 @@ package spec import ( "encoding/json" + "fmt" "testing" + "time" "github.com/magiconair/properties/assert" "github.com/sirupsen/logrus" @@ -30,7 +32,7 @@ func TestRuntimeID(t *testing.T) { if err := json.Unmarshal([]byte(s), &r); err != nil { logrus.Fatal(err) } - pt := PipelineTask{Result: r} + pt := PipelineTask{Result: &r} assert.Equal(t, pt.RuntimeID(), "9") } @@ -132,3 +134,37 @@ func TestMakeTaskExecutorCtxKey(t *testing.T) { ctxKey := MakeTaskExecutorCtxKey(task) assert.Equal(t, ctxKey, "executor-done-chan-1") } + +func TestPipelineTaskAppendError(t *testing.T) { + task := PipelineTask{} + task.Inspect.Errors = task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{Msg: "a"}) + task.Inspect.Errors = task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{Msg: "a"}) + assert.Equal(t, 1, len(task.Inspect.Errors)) + task.Inspect.Errors = task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{Msg: "b"}) + assert.Equal(t, 2, len(task.Inspect.Errors)) + startA := time.Date(2021, 8, 19, 10, 10, 0, 0, time.Local) + endA := time.Date(2021, 8, 19, 10, 30, 0, 0, time.Local) + task.Inspect.Errors = task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{Msg: "a", Ctx: apistructs.PipelineTaskErrCtx{StartTime: startA, EndTime: endA}}) + assert.Equal(t, 3, len(task.Inspect.Errors)) + start := time.Date(2021, 8, 19, 10, 9, 0, 0, time.Local) + end := time.Date(2021, 8, 19, 10, 29, 0, 0, time.Local) + task.Inspect.Errors = task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{Msg: "a", Ctx: apistructs.PipelineTaskErrCtx{StartTime: start, EndTime: end}}) + taskDto := task.Convert2DTO() + assert.Equal(t, uint64(2), taskDto.Result.Errors[2].Ctx.Count) + assert.Equal(t, 3, len(taskDto.Result.Errors)) + assert.Equal(t, start.Unix(), taskDto.Result.Errors[2].Ctx.StartTime.Unix()) + assert.Equal(t, endA.Unix(), taskDto.Result.Errors[2].Ctx.EndTime.Unix()) +} + +func TestConvertErrors(t *testing.T) { + task := PipelineTask{} + start := time.Date(2021, 8, 24, 9, 45, 1, 1, time.Local) + end := time.Date(2021, 8, 24, 9, 46, 1, 1, time.Local) + task.Inspect.Errors = task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{Msg: "err", Ctx: apistructs.PipelineTaskErrCtx{ + StartTime: start, + EndTime: end, + Count: 2, + }}) + taskDto := task.Convert2DTO() + assert.Equal(t, fmt.Sprintf("err\nstartTime: %s\nendTime: %s\ncount: %d", start.Format("2006-01-02 15:04:05"), end.Format("2006-01-02 15:04:05"), 2), taskDto.Result.Errors[0].Msg) +}