Skip to content

Commit

Permalink
split the result filed to metadata and inspect
Browse files Browse the repository at this point in the history
  • Loading branch information
chengjoey committed Nov 15, 2021
1 parent 801d71b commit f52119f
Show file tree
Hide file tree
Showing 22 changed files with 181 additions and 107 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE `pipeline_tasks` CHANGE COLUMN `result` `metadata` mediumtext COMMENT 'task的结果元数据';
ALTER TABLE `pipeline_tasks` ADD COLUMN `inspect` text AFTER `metadata` COMMENT 'task的调度信息';
11 changes: 9 additions & 2 deletions apistructs/pipeline_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ type PipelineTaskResult struct {
Events string `json:"events,omitempty"`
}

type PipelineTaskInspect struct {
Errors []*PipelineTaskErrResponse `json:"errors,omitempty"`
MachineStat *PipelineTaskMachineStat `json:"machineStat,omitempty"`
Inspect string `json:"inspect,omitempty"`
Events string `json:"events,omitempty"`
}

type PipelineTaskSnippetDetail struct {
Outputs []PipelineOutputWithValue `json:"outputs"`

Expand Down Expand Up @@ -195,7 +202,7 @@ func (o orderedResponses) Len() int { return len(o) }
func (o orderedResponses) Less(i, j int) bool { return o[i].Ctx.EndTime.Before(o[j].Ctx.EndTime) }
func (o orderedResponses) Swap(i, j int) { o[i], o[j] = o[j], o[i] }

func (t *PipelineTaskResult) AppendError(newResponses ...*PipelineTaskErrResponse) []*PipelineTaskErrResponse {
func (t *PipelineTaskInspect) AppendError(newResponses ...*PipelineTaskErrResponse) []*PipelineTaskErrResponse {
if len(newResponses) == 0 {
return t.Errors
}
Expand Down Expand Up @@ -249,7 +256,7 @@ func (t *PipelineTaskResult) AppendError(newResponses ...*PipelineTaskErrRespons
return orderd
}

func (t *PipelineTaskResult) ConvertErrors() {
func (t *PipelineTaskInspect) ConvertErrors() {
for _, response := range t.Errors {
if response.Ctx.Count > 1 {
response.Msg = fmt.Sprintf("%s\nstartTime: %s\nendTime: %s\ncount: %d",
Expand Down
36 changes: 0 additions & 36 deletions apistructs/pipeline_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,49 +17,13 @@ package apistructs
import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestPipelineTaskLoop_Duplicate(t *testing.T) {
var l *PipelineTaskLoop
fmt.Println(l.Duplicate())
}

func TestPipelineTaskAppendError(t *testing.T) {
task := PipelineTaskDTO{}
task.Result.Errors = task.Result.AppendError(&PipelineTaskErrResponse{Msg: "a"})
task.Result.Errors = task.Result.AppendError(&PipelineTaskErrResponse{Msg: "a"})
assert.Equal(t, 1, len(task.Result.Errors))
task.Result.Errors = task.Result.AppendError(&PipelineTaskErrResponse{Msg: "b"})
assert.Equal(t, 2, len(task.Result.Errors))
startA := time.Date(2021, 8, 19, 10, 10, 0, 0, time.Local)
endA := time.Date(2021, 8, 19, 10, 30, 0, 0, time.Local)
task.Result.Errors = task.Result.AppendError(&PipelineTaskErrResponse{Msg: "a", Ctx: PipelineTaskErrCtx{StartTime: startA, EndTime: endA}})
assert.Equal(t, 3, len(task.Result.Errors))
start := time.Date(2021, 8, 19, 10, 9, 0, 0, time.Local)
end := time.Date(2021, 8, 19, 10, 29, 0, 0, time.Local)
task.Result.Errors = task.Result.AppendError(&PipelineTaskErrResponse{Msg: "a", Ctx: PipelineTaskErrCtx{StartTime: start, EndTime: end}})
assert.Equal(t, uint64(2), task.Result.Errors[2].Ctx.Count)
assert.Equal(t, 3, len(task.Result.Errors))
assert.Equal(t, start.Unix(), task.Result.Errors[2].Ctx.StartTime.Unix())
assert.Equal(t, endA.Unix(), task.Result.Errors[2].Ctx.EndTime.Unix())
}

func TestConvertErrors(t *testing.T) {
task := PipelineTaskDTO{}
start := time.Date(2021, 8, 24, 9, 45, 1, 1, time.Local)
end := time.Date(2021, 8, 24, 9, 46, 1, 1, time.Local)
task.Result.Errors = task.Result.AppendError(&PipelineTaskErrResponse{Msg: "err", Ctx: PipelineTaskErrCtx{
StartTime: start,
EndTime: end,
Count: 2,
}})
task.Result.ConvertErrors()
assert.Equal(t, fmt.Sprintf("err\nstartTime: %s\nendTime: %s\ncount: %d", start.Format("2006-01-02 15:04:05"), end.Format("2006-01-02 15:04:05"), 2), task.Result.Errors[0].Msg)
}

func TestPipelineTaskLoop_IsEmpty(t *testing.T) {
type fields struct {
Break string
Expand Down
8 changes: 5 additions & 3 deletions modules/pipeline/aop/plugins/pipeline/basic/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ func (p *provider) Handle(ctx *aoptypes.TuneContext) error {
TimeEndQueue: getTimeOrNil(task.Extra.TimeEndQueue),
QueueCostTimeSec: task.QueueTimeSec,
RunCostTimeSec: task.CostTimeSec,
MachineStat: task.Result.MachineStat,
MachineStat: task.Inspect.MachineStat,
Meta: func() map[string]string {
result := make(map[string]string)
for _, meta := range task.Result.Metadata {
result[meta.Name] = meta.Value
if task.Metadata != nil {
for _, meta := range task.Metadata.Metadata {
result[meta.Name] = meta.Value
}
}
return result
}(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ func (p *provider) Handle(ctx *aoptypes.TuneContext) error {
}

// task result metafile not have set_cookie return
metadata := ctx.SDK.Task.Result.Metadata
var metadata apistructs.Metadata
if ctx.SDK.Task.Metadata != nil {
metadata = ctx.SDK.Task.Metadata.Metadata
}
if metadata == nil {
return nil
}
Expand Down
5 changes: 4 additions & 1 deletion modules/pipeline/aop/plugins/task/unit_test_report/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ func (p *provider) Handle(ctx *aoptypes.TuneContext) error {
return nil
}

metadata := ctx.SDK.Task.Result.Metadata
var metadata apistructs.Metadata
if ctx.SDK.Task.Metadata != nil {
metadata = ctx.SDK.Task.Metadata.Metadata
}
if metadata == nil {
return nil
}
Expand Down
10 changes: 6 additions & 4 deletions modules/pipeline/dbclient/op_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,11 +579,13 @@ func (client *Client) GetPipelineOutputs(pipelineID uint64) (map[string]map[stri
outputs := make(map[string]map[string]string)

for _, task := range tasks {
for _, metadatum := range task.Result.Metadata {
if outputs[task.Name] == nil {
outputs[task.Name] = make(map[string]string)
if task.Metadata != nil {
for _, metadatum := range task.Metadata.Metadata {
if outputs[task.Name] == nil {
outputs[task.Name] = make(map[string]string)
}
outputs[task.Name][metadatum.Name] = metadatum.Value
}
outputs[task.Name][metadatum.Name] = metadatum.Value
}
}

Expand Down
13 changes: 11 additions & 2 deletions modules/pipeline/dbclient/op_pipeline_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,24 @@ func (client *Client) ListPipelineTasksByPipelineID(pipelineID uint64, ops ...Se
return tasks, nil
}

func (client *Client) UpdatePipelineTaskResult(id uint64, result apistructs.PipelineTaskResult) error {
_, err := client.ID(id).Cols("result").Update(&spec.PipelineTask{Result: result})
func (client *Client) UpdatePipelineTaskMetadata(id uint64, result *apistructs.PipelineTaskResult) error {
_, err := client.ID(id).Cols("metadata").Update(&spec.PipelineTask{Metadata: result})
if err != nil {
b, _ := json.Marshal(&result)
return errors.Errorf("failed to update pipeline task result, taskID: %d, result: %s, err: %v", id, string(b), err)
}
return nil
}

func (client *Client) UpdatePipelineTaskInspect(id uint64, inspect apistructs.PipelineTaskInspect) error {
_, err := client.ID(id).Cols("inspect").Update(&spec.PipelineTask{Inspect: inspect})
if err != nil {
b, _ := json.Marshal(&inspect)
return errors.Errorf("failed to update pipeline task inspect, taskID: %d, inspect: %s, err: %v", id, string(b), err)
}
return nil
}

func (client *Client) UpdatePipelineTask(id uint64, task *spec.PipelineTask, ops ...SessionOption) error {
session := client.NewSession(ops...)
defer session.Close()
Expand Down
4 changes: 3 additions & 1 deletion modules/pipeline/events/event_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ func (e *PipelineTaskEvent) HandleWebSocket() error {
payload.ProjectID = e.Pipeline.Labels[apistructs.LabelProjectID]
payload.OrgID = e.Pipeline.Labels[apistructs.LabelOrgID]
payload.Status = state
payload.Result = e.Task.Result
if e.Task.Metadata != nil {
payload.Result = *e.Task.Metadata
}
payload.CostTimeSec = e.Content().(apistructs.PipelineTaskEventData).CostTimeSec

wsEvent := websocket.Event{
Expand Down
6 changes: 4 additions & 2 deletions modules/pipeline/pexpr/pexpr_params/task_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,10 @@ func generateOutputs(tasks []*spec.PipelineTask) map[string]string {
}
outputs := make(map[string]string)
for _, task := range tasks {
for _, meta := range task.Result.Metadata {
outputs[makePhKeyFunc(task.Name, meta.Name)] = meta.Value
if task.Metadata != nil {
for _, meta := range task.Metadata.Metadata {
outputs[makePhKeyFunc(task.Name, meta.Name)] = meta.Value
}
}
}
return outputs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ func (d *define) Start(ctx context.Context, task *spec.PipelineTask) (interface{
return
}

meta := latestTask.Result.Metadata
var meta apistructs.Metadata
if latestTask.Metadata != nil {
meta = (*latestTask.Metadata).Metadata
}
for _, metaField := range meta {
if metaField.Name == logic.MetaKeyResult {
if metaField.Value == logic.ResultSuccess {
Expand Down Expand Up @@ -132,12 +135,15 @@ func (d *define) Status(ctx context.Context, task *spec.PipelineTask) (apistruct
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusAnalyzed}, nil
}

if !started && len(latestTask.Result.Metadata) == 0 {
var meta apistructs.Metadata
if latestTask.Metadata != nil {
meta = (*latestTask.Metadata).Metadata
}
if !started && len(meta) == 0 {
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusBorn}, nil
}

// status according to api success or not
meta := latestTask.Result.Metadata
var status = apistructs.PipelineStatusFailed
for _, metaField := range meta {
if metaField.Name == logic.MetaKeyResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (r *Reconciler) createSnippetPipeline(task *spec.PipelineTask, p *spec.Pipe
defer func() {
if failedError != nil {
err = failedError
task.Result.Errors = append(task.Result.Errors, &apistructs.PipelineTaskErrResponse{
task.Inspect.Errors = append(task.Inspect.Errors, &apistructs.PipelineTaskErrResponse{
Msg: err.Error(),
})
task.Status = apistructs.PipelineStatusFailed
Expand Down Expand Up @@ -176,7 +176,7 @@ func (r *Reconciler) reconcileSnippetTask(task *spec.PipelineTask, p *spec.Pipel

if snippetPipeline == nil {
task.Status = apistructs.PipelineStatusAnalyzeFailed
task.Result.Errors = append(task.Result.Errors, &apistructs.PipelineTaskErrResponse{
task.Inspect.Errors = append(task.Inspect.Errors, &apistructs.PipelineTaskErrResponse{
Msg: "not find task bind pipeline",
})
if updateErr := r.dbClient.UpdatePipelineTask(task.ID, task); updateErr != nil {
Expand Down
17 changes: 11 additions & 6 deletions modules/pipeline/pipengine/reconciler/snippet.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,15 @@ func (r *Reconciler) handleParentSnippetTaskOutputs(snippetPipeline *spec.Pipeli

// update result.metadata for value-context reference
for _, outputValue := range snippetPipeline.Snapshot.OutputValues {
parentTask.Result.Metadata = append(parentTask.Result.Metadata, apistructs.MetadataField{
if parentTask.Metadata == nil {
parentTask.Metadata = &apistructs.PipelineTaskResult{Metadata: apistructs.Metadata{}}
}
parentTask.Metadata.Metadata = append(parentTask.Metadata.Metadata, apistructs.MetadataField{
Name: outputValue.Name,
Value: strutil.String(outputValue.Value),
})
}
if err := r.dbClient.UpdatePipelineTaskResult(parentTaskID, parentTask.Result); err != nil {
if err := r.dbClient.UpdatePipelineTaskMetadata(parentTaskID, parentTask.Metadata); err != nil {
return err
}

Expand All @@ -105,11 +108,13 @@ func (r *Reconciler) calculateAndUpdatePipelineOutputValues(p *spec.Pipeline, ta
// 所有任务的输出
allTaskOutputs := make(map[string]map[string]interface{})
for _, task := range tasks {
for _, meta := range task.Result.Metadata {
if allTaskOutputs[task.Name] == nil {
allTaskOutputs[task.Name] = make(map[string]interface{})
if task.Metadata != nil {
for _, meta := range task.Metadata.Metadata {
if allTaskOutputs[task.Name] == nil {
allTaskOutputs[task.Name] = make(map[string]interface{})
}
allTaskOutputs[task.Name][meta.Name] = meta.Value
}
allTaskOutputs[task.Name][meta.Name] = meta.Value
}
}

Expand Down
2 changes: 1 addition & 1 deletion modules/pipeline/pipengine/reconciler/taskrun/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (tr *TaskRun) waitOp(itr TaskOp, o *Elem) (result error) {
}
resultErrMsg = append(resultErrMsg, errs...)
if len(resultErrMsg) > 0 {
tr.Task.Result.Errors = tr.Task.Result.AppendError(&apistructs.PipelineTaskErrResponse{
tr.Task.Inspect.Errors = tr.Task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{
Msg: strutil.Join(resultErrMsg, "\n", true),
Ctx: apistructs.PipelineTaskErrCtx{
StartTime: startTime,
Expand Down
2 changes: 1 addition & 1 deletion modules/pipeline/pipengine/reconciler/taskrun/task_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (tr *TaskRun) resetTaskForLoop() {
tr.Task.Extra.TimeEndQueue = time.Time{}
tr.Task.TimeEnd = time.Time{}
// reset task result
tr.Task.Result = apistructs.PipelineTaskResult{}
tr.Task.Metadata = nil
// reset volume
tr.Task.Context = spec.PipelineTaskContext{}
tr.Task.Extra.Volumes = nil
Expand Down
10 changes: 5 additions & 5 deletions modules/pipeline/pipengine/reconciler/taskrun/taskop/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (pre *prepare) WhenDone(data interface{}) error {
// no need retry
if err != nil {
pre.Task.Status = apistructs.PipelineStatusAnalyzeFailed
pre.Task.Result.Errors = pre.Task.Result.AppendError(&apistructs.PipelineTaskErrResponse{Msg: err.Error()})
pre.Task.Inspect.Errors = pre.Task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{Msg: err.Error()})
return nil
}

Expand Down Expand Up @@ -748,13 +748,13 @@ func condition(task *spec.PipelineTask) bool {
if sign.Err != nil {
task.Status = apistructs.PipelineStatusFailed
if sign.Err != nil {
task.Result.Errors = task.Result.AppendError(&apistructs.PipelineTaskErrResponse{
task.Inspect.Errors = task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{
Msg: sign.Err.Error(),
})
}

if sign.Msg != "" {
task.Result.Errors = task.Result.AppendError(&apistructs.PipelineTaskErrResponse{
task.Inspect.Errors = task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{
Msg: sign.Msg,
})
}
Expand All @@ -765,13 +765,13 @@ func condition(task *spec.PipelineTask) bool {
task.Status = apistructs.PipelineStatusNoNeedBySystem
task.Extra.AllowFailure = true
if sign.Err != nil {
task.Result.Errors = task.Result.AppendError(&apistructs.PipelineTaskErrResponse{
task.Inspect.Errors = task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{
Msg: sign.Err.Error(),
})
}

if sign.Msg != "" {
task.Result.Errors = task.Result.AppendError(&apistructs.PipelineTaskErrResponse{
task.Inspect.Errors = task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{
Msg: sign.Msg,
})
}
Expand Down
10 changes: 5 additions & 5 deletions modules/pipeline/pipengine/reconciler/taskrun/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ func (tr *TaskRun) AppendLastMsg(msg string) error {
if err := tr.fetchLatestTask(); err != nil {
return err
}
tr.Task.Result.Errors = tr.Task.Result.AppendError(&apistructs.PipelineTaskErrResponse{Msg: msg})
if err := tr.DBClient.UpdatePipelineTaskResult(tr.Task.ID, tr.Task.Result); err != nil {
tr.Task.Inspect.Errors = tr.Task.Inspect.AppendError(&apistructs.PipelineTaskErrResponse{Msg: msg})
if err := tr.DBClient.UpdatePipelineTaskInspect(tr.Task.ID, tr.Task.Inspect); err != nil {
logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q append last message failed, err: %v",
tr.P.ID, tr.Task.Name, err)
return err
Expand All @@ -88,9 +88,9 @@ func (tr *TaskRun) UpdateTaskInspect(inspect string) error {
return err
}
events := getEventsFromInspect(inspect)
tr.Task.Result.Inspect = inspect
tr.Task.Result.Events = events
if err := tr.DBClient.UpdatePipelineTaskResult(tr.Task.ID, tr.Task.Result); err != nil {
tr.Task.Inspect.Inspect = inspect
tr.Task.Inspect.Events = events
if err := tr.DBClient.UpdatePipelineTaskInspect(tr.Task.ID, tr.Task.Inspect); err != nil {
logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q update inspect failed, err: %v",
tr.P.ID, tr.Task.Name, err)
return err
Expand Down
Loading

0 comments on commit f52119f

Please sign in to comment.