Skip to content

Commit

Permalink
split the result filed to metadata and inspect
Browse files Browse the repository at this point in the history
  • Loading branch information
chengjoey committed Nov 19, 2021
1 parent d322bbb commit 57a87aa
Show file tree
Hide file tree
Showing 24 changed files with 252 additions and 98 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE `pipeline_tasks` ADD COLUMN `inspect` mediumtext COMMENT 'task的调度信息' AFTER `result`;
12 changes: 10 additions & 2 deletions apistructs/pipeline_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type TaskContainer struct {
ContainerID string `json:"containerID"`
}

// PipelineTaskResult spec.pipeline task only use metadata, task dto has all fields
type PipelineTaskResult struct {
Metadata Metadata `json:"metadata,omitempty"`
Errors []*PipelineTaskErrResponse `json:"errors,omitempty"`
Expand All @@ -70,6 +71,13 @@ type PipelineTaskResult struct {
Events string `json:"events,omitempty"`
}

type PipelineTaskInspect struct {
Errors []*PipelineTaskErrResponse `json:"errors,omitempty"`
MachineStat *PipelineTaskMachineStat `json:"machineStat,omitempty"`
Inspect string `json:"inspect,omitempty"`
Events string `json:"events,omitempty"`
}

type PipelineTaskSnippetDetail struct {
Outputs []PipelineOutputWithValue `json:"outputs"`

Expand Down Expand Up @@ -195,7 +203,7 @@ func (o orderedResponses) Len() int { return len(o) }
func (o orderedResponses) Less(i, j int) bool { return o[i].Ctx.EndTime.Before(o[j].Ctx.EndTime) }
func (o orderedResponses) Swap(i, j int) { o[i], o[j] = o[j], o[i] }

func (t *PipelineTaskResult) AppendError(newResponses ...*PipelineTaskErrResponse) []*PipelineTaskErrResponse {
func (t *PipelineTaskInspect) AppendError(newResponses ...*PipelineTaskErrResponse) []*PipelineTaskErrResponse {
if len(newResponses) == 0 {
return t.Errors
}
Expand Down Expand Up @@ -249,7 +257,7 @@ func (t *PipelineTaskResult) AppendError(newResponses ...*PipelineTaskErrRespons
return orderd
}

func (t *PipelineTaskResult) ConvertErrors() {
func (t *PipelineTaskInspect) ConvertErrors() {
for _, response := range t.Errors {
if response.Ctx.Count > 1 {
response.Msg = fmt.Sprintf("%s\nstartTime: %s\nendTime: %s\ncount: %d",
Expand Down
36 changes: 0 additions & 36 deletions apistructs/pipeline_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,49 +17,13 @@ package apistructs
import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestPipelineTaskLoop_Duplicate(t *testing.T) {
var l *PipelineTaskLoop
fmt.Println(l.Duplicate())
}

func TestPipelineTaskAppendError(t *testing.T) {
task := PipelineTaskDTO{}
task.Result.Errors = task.Result.AppendError(&PipelineTaskErrResponse{Msg: "a"})
task.Result.Errors = task.Result.AppendError(&PipelineTaskErrResponse{Msg: "a"})
assert.Equal(t, 1, len(task.Result.Errors))
task.Result.Errors = task.Result.AppendError(&PipelineTaskErrResponse{Msg: "b"})
assert.Equal(t, 2, len(task.Result.Errors))
startA := time.Date(2021, 8, 19, 10, 10, 0, 0, time.Local)
endA := time.Date(2021, 8, 19, 10, 30, 0, 0, time.Local)
task.Result.Errors = task.Result.AppendError(&PipelineTaskErrResponse{Msg: "a", Ctx: PipelineTaskErrCtx{StartTime: startA, EndTime: endA}})
assert.Equal(t, 3, len(task.Result.Errors))
start := time.Date(2021, 8, 19, 10, 9, 0, 0, time.Local)
end := time.Date(2021, 8, 19, 10, 29, 0, 0, time.Local)
task.Result.Errors = task.Result.AppendError(&PipelineTaskErrResponse{Msg: "a", Ctx: PipelineTaskErrCtx{StartTime: start, EndTime: end}})
assert.Equal(t, uint64(2), task.Result.Errors[2].Ctx.Count)
assert.Equal(t, 3, len(task.Result.Errors))
assert.Equal(t, start.Unix(), task.Result.Errors[2].Ctx.StartTime.Unix())
assert.Equal(t, endA.Unix(), task.Result.Errors[2].Ctx.EndTime.Unix())
}

func TestConvertErrors(t *testing.T) {
task := PipelineTaskDTO{}
start := time.Date(2021, 8, 24, 9, 45, 1, 1, time.Local)
end := time.Date(2021, 8, 24, 9, 46, 1, 1, time.Local)
task.Result.Errors = task.Result.AppendError(&PipelineTaskErrResponse{Msg: "err", Ctx: PipelineTaskErrCtx{
StartTime: start,
EndTime: end,
Count: 2,
}})
task.Result.ConvertErrors()
assert.Equal(t, fmt.Sprintf("err\nstartTime: %s\nendTime: %s\ncount: %d", start.Format("2006-01-02 15:04:05"), end.Format("2006-01-02 15:04:05"), 2), task.Result.Errors[0].Msg)
}

func TestPipelineTaskLoop_IsEmpty(t *testing.T) {
type fields struct {
Break string
Expand Down
4 changes: 2 additions & 2 deletions modules/pipeline/aop/plugins/pipeline/basic/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ func (p *provider) Handle(ctx *aoptypes.TuneContext) error {
TimeEndQueue: getTimeOrNil(task.Extra.TimeEndQueue),
QueueCostTimeSec: task.QueueTimeSec,
RunCostTimeSec: task.CostTimeSec,
MachineStat: task.Result.MachineStat,
MachineStat: task.Inspect.MachineStat,
Meta: func() map[string]string {
result := make(map[string]string)
for _, meta := range task.Result.Metadata {
for _, meta := range task.GetMetadata() {
result[meta.Name] = meta.Value
}
return result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func (p *provider) Handle(ctx *aoptypes.TuneContext) error {
}

// task result metafile not have set_cookie return
metadata := ctx.SDK.Task.Result.Metadata
if metadata == nil {
metadata := ctx.SDK.Task.GetMetadata()
if len(metadata) == 0 {
return nil
}
var setCookieJSON string
Expand Down
4 changes: 2 additions & 2 deletions modules/pipeline/aop/plugins/task/unit_test_report/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func (p *provider) Handle(ctx *aoptypes.TuneContext) error {
return nil
}

metadata := ctx.SDK.Task.Result.Metadata
if metadata == nil {
metadata := ctx.SDK.Task.GetMetadata()
if len(metadata) == 0 {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion modules/pipeline/dbclient/op_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ func (client *Client) GetPipelineOutputs(pipelineID uint64) (map[string]map[stri
outputs := make(map[string]map[string]string)

for _, task := range tasks {
for _, metadatum := range task.Result.Metadata {
for _, metadatum := range task.GetMetadata() {
if outputs[task.Name] == nil {
outputs[task.Name] = make(map[string]string)
}
Expand Down
13 changes: 11 additions & 2 deletions modules/pipeline/dbclient/op_pipeline_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (client *Client) ListPipelineTasksByPipelineID(pipelineID uint64, ops ...Se
return tasks, nil
}

func (client *Client) UpdatePipelineTaskResult(id uint64, result apistructs.PipelineTaskResult) error {
func (client *Client) UpdatePipelineTaskMetadata(id uint64, result *apistructs.PipelineTaskResult) error {
_, err := client.ID(id).Cols("result").Update(&spec.PipelineTask{Result: result})
if err != nil {
b, _ := json.Marshal(&result)
Expand All @@ -155,6 +155,15 @@ func (client *Client) UpdatePipelineTaskResult(id uint64, result apistructs.Pipe
return nil
}

func (client *Client) UpdatePipelineTaskInspect(id uint64, inspect apistructs.PipelineTaskInspect) error {
_, err := client.ID(id).Cols("inspect").Update(&spec.PipelineTask{Inspect: inspect})
if err != nil {
b, _ := json.Marshal(&inspect)
return errors.Errorf("failed to update pipeline task inspect, taskID: %d, inspect: %s, err: %v", id, string(b), err)
}
return nil
}

func (client *Client) UpdatePipelineTask(id uint64, task *spec.PipelineTask, ops ...SessionOption) error {
session := client.NewSession(ops...)
defer session.Close()
Expand All @@ -163,7 +172,7 @@ func (client *Client) UpdatePipelineTask(id uint64, task *spec.PipelineTask, ops
retryNum := 0

for {
affectedRows, err := client.ID(id).AllCols().Update(task)
affectedRows, err := client.ID(id).Update(task)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion modules/pipeline/events/event_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ func (e *PipelineTaskEvent) HandleWebSocket() error {
payload.ProjectID = e.Pipeline.Labels[apistructs.LabelProjectID]
payload.OrgID = e.Pipeline.Labels[apistructs.LabelOrgID]
payload.Status = state
payload.Result = e.Task.Result
if e.Task.Result != nil {
payload.Result = *e.Task.Result
}
payload.CostTimeSec = e.Content().(apistructs.PipelineTaskEventData).CostTimeSec

wsEvent := websocket.Event{
Expand Down
2 changes: 1 addition & 1 deletion modules/pipeline/pexpr/pexpr_params/task_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func generateOutputs(tasks []*spec.PipelineTask) map[string]string {
}
outputs := make(map[string]string)
for _, task := range tasks {
for _, meta := range task.Result.Metadata {
for _, meta := range task.GetMetadata() {
outputs[makePhKeyFunc(task.Name, meta.Name)] = meta.Value
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (d *define) Start(ctx context.Context, task *spec.PipelineTask) (interface{
return
}

meta := latestTask.Result.Metadata
meta := latestTask.GetMetadata()
for _, metaField := range meta {
if metaField.Name == logic.MetaKeyResult {
if metaField.Value == logic.ResultSuccess {
Expand Down Expand Up @@ -132,12 +132,12 @@ func (d *define) Status(ctx context.Context, task *spec.PipelineTask) (apistruct
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusAnalyzed}, nil
}

if !started && len(latestTask.Result.Metadata) == 0 {
meta := latestTask.GetMetadata()
if !started && len(meta) == 0 {
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusBorn}, nil
}

// status according to api success or not
meta := latestTask.Result.Metadata
var status = apistructs.PipelineStatusFailed
for _, metaField := range meta {
if metaField.Name == logic.MetaKeyResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (r *Reconciler) createSnippetPipeline(task *spec.PipelineTask, p *spec.Pipe
defer func() {
if failedError != nil {
err = failedError
task.Result.Errors = append(task.Result.Errors, &apistructs.PipelineTaskErrResponse{
task.Inspect.Errors = append(task.Inspect.Errors, &apistructs.PipelineTaskErrResponse{
Msg: err.Error(),
})
task.Status = apistructs.PipelineStatusFailed
Expand Down Expand Up @@ -180,7 +180,7 @@ func (r *Reconciler) reconcileSnippetTask(task *spec.PipelineTask, p *spec.Pipel

if snippetPipeline == nil {
task.Status = apistructs.PipelineStatusAnalyzeFailed
task.Result.Errors = append(task.Result.Errors, &apistructs.PipelineTaskErrResponse{
task.Inspect.Errors = append(task.Inspect.Errors, &apistructs.PipelineTaskErrResponse{
Msg: "not find task bind pipeline",
})
if updateErr := r.dbClient.UpdatePipelineTask(task.ID, task); updateErr != nil {
Expand Down
7 changes: 5 additions & 2 deletions modules/pipeline/pipengine/reconciler/snippet.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,15 @@ func (r *Reconciler) handleParentSnippetTaskOutputs(snippetPipeline *spec.Pipeli

// update result.metadata for value-context reference
for _, outputValue := range snippetPipeline.Snapshot.OutputValues {
if parentTask.Result == nil {
parentTask.Result = &apistructs.PipelineTaskResult{Metadata: apistructs.Metadata{}}
}
parentTask.Result.Metadata = append(parentTask.Result.Metadata, apistructs.MetadataField{
Name: outputValue.Name,
Value: strutil.String(outputValue.Value),
})
}
if err := r.dbClient.UpdatePipelineTaskResult(parentTaskID, parentTask.Result); err != nil {
if err := r.dbClient.UpdatePipelineTaskMetadata(parentTaskID, parentTask.Result); err != nil {
return err
}

Expand All @@ -105,7 +108,7 @@ func (r *Reconciler) calculateAndUpdatePipelineOutputValues(p *spec.Pipeline, ta
// 所有任务的输出
allTaskOutputs := make(map[string]map[string]interface{})
for _, task := range tasks {
for _, meta := range task.Result.Metadata {
for _, meta := range task.GetMetadata() {
if allTaskOutputs[task.Name] == nil {
allTaskOutputs[task.Name] = make(map[string]interface{})
}
Expand Down
53 changes: 53 additions & 0 deletions modules/pipeline/pipengine/reconciler/snippet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
package reconciler

import (
"reflect"
"testing"

"bou.ke/monkey"
"github.com/davecgh/go-spew/spew"
"github.com/stretchr/testify/assert"

"github.com/erda-project/erda/apistructs"
"github.com/erda-project/erda/modules/pipeline/dbclient"
"github.com/erda-project/erda/modules/pipeline/spec"
)

func TestParsePipelineOutputRef(t *testing.T) {
Expand All @@ -36,3 +42,50 @@ func TestParsePipelineOutputRefV2(t *testing.T) {
assert.Equal(t, "a", reffedTask)
assert.Equal(t, "b", reffedKey)
}

func Test_handleParentSnippetTaskOutputs(t *testing.T) {
db := &dbclient.Client{}

m1 := monkey.PatchInstanceMethod(reflect.TypeOf(db), "GetPipelineTask", func(_ *dbclient.Client, id interface{}) (spec.PipelineTask, error) {
return spec.PipelineTask{Result: &apistructs.PipelineTaskResult{}}, nil
})
defer m1.Unpatch()

m2 := monkey.PatchInstanceMethod(reflect.TypeOf(db), "UpdatePipelineTaskSnippetDetail", func(_ *dbclient.Client, id uint64, snippetDetail apistructs.PipelineTaskSnippetDetail, ops ...dbclient.SessionOption) error {
return nil
})
defer m2.Unpatch()

m3 := monkey.PatchInstanceMethod(reflect.TypeOf(db), "UpdatePipelineTaskMetadata", func(_ *dbclient.Client, id uint64, result *apistructs.PipelineTaskResult) error {
return nil
})
defer m3.Unpatch()

r := &Reconciler{dbClient: db}
parentTaskID := uint64(1)
snippetPipeline := &spec.Pipeline{PipelineBase: spec.PipelineBase{ParentTaskID: &parentTaskID}}
err := r.handleParentSnippetTaskOutputs(snippetPipeline, []apistructs.PipelineOutputWithValue{{
PipelineOutput: apistructs.PipelineOutput{Name: "pipelineID"},
Value: "1",
}})
assert.NoError(t, err)
}

func Test_calculateAndUpdatePipelineOutputValues(t *testing.T) {
db := &dbclient.Client{}

m1 := monkey.PatchInstanceMethod(reflect.TypeOf(db), "UpdatePipelineExtraSnapshot", func(_ *dbclient.Client, pipelineID uint64, snapshot spec.Snapshot, ops ...dbclient.SessionOption) error {
return nil
})
defer m1.Unpatch()

tasks := []*spec.PipelineTask{&spec.PipelineTask{Name: "1", Result: &apistructs.PipelineTaskResult{
Metadata: apistructs.Metadata{{
Name: "pipelineID",
Value: "1",
}},
}}}
r := &Reconciler{dbClient: db}
_, err := r.calculateAndUpdatePipelineOutputValues(&spec.Pipeline{PipelineBase: spec.PipelineBase{ID: 1}}, tasks)
assert.NoError(t, err)
}
2 changes: 1 addition & 1 deletion modules/pipeline/pipengine/reconciler/taskrun/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (tr *TaskRun) waitOp(itr TaskOp, o *Elem) (result error) {
}
resultErrMsg = append(resultErrMsg, errs...)
if len(resultErrMsg) > 0 {
tr.Task.Result.Errors = tr.Task.Result.AppendError(&apistructs.PipelineTaskErrResponse{
tr.Task.Inspect.Errors = tr.Task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{
Msg: strutil.Join(resultErrMsg, "\n", true),
Ctx: apistructs.PipelineTaskErrCtx{
StartTime: startTime,
Expand Down
2 changes: 1 addition & 1 deletion modules/pipeline/pipengine/reconciler/taskrun/task_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (tr *TaskRun) resetTaskForLoop() {
tr.Task.Extra.TimeEndQueue = time.Time{}
tr.Task.TimeEnd = time.Time{}
// reset task result
tr.Task.Result = apistructs.PipelineTaskResult{}
tr.Task.Result = nil
// reset volume
tr.Task.Context = spec.PipelineTaskContext{}
tr.Task.Extra.Volumes = nil
Expand Down
10 changes: 5 additions & 5 deletions modules/pipeline/pipengine/reconciler/taskrun/taskop/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (pre *prepare) WhenDone(data interface{}) error {
// no need retry
if err != nil {
pre.Task.Status = apistructs.PipelineStatusAnalyzeFailed
pre.Task.Result.Errors = pre.Task.Result.AppendError(&apistructs.PipelineTaskErrResponse{Msg: err.Error()})
pre.Task.Inspect.Errors = pre.Task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{Msg: err.Error()})
return nil
}

Expand Down Expand Up @@ -748,13 +748,13 @@ func condition(task *spec.PipelineTask) bool {
if sign.Err != nil {
task.Status = apistructs.PipelineStatusFailed
if sign.Err != nil {
task.Result.Errors = task.Result.AppendError(&apistructs.PipelineTaskErrResponse{
task.Inspect.Errors = task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{
Msg: sign.Err.Error(),
})
}

if sign.Msg != "" {
task.Result.Errors = task.Result.AppendError(&apistructs.PipelineTaskErrResponse{
task.Inspect.Errors = task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{
Msg: sign.Msg,
})
}
Expand All @@ -765,13 +765,13 @@ func condition(task *spec.PipelineTask) bool {
task.Status = apistructs.PipelineStatusNoNeedBySystem
task.Extra.AllowFailure = true
if sign.Err != nil {
task.Result.Errors = task.Result.AppendError(&apistructs.PipelineTaskErrResponse{
task.Inspect.Errors = task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{
Msg: sign.Err.Error(),
})
}

if sign.Msg != "" {
task.Result.Errors = task.Result.AppendError(&apistructs.PipelineTaskErrResponse{
task.Inspect.Errors = task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{
Msg: sign.Msg,
})
}
Expand Down
Loading

0 comments on commit 57a87aa

Please sign in to comment.