Skip to content

Commit

Permalink
Feat: add stepGroupName to process.Context
Browse files Browse the repository at this point in the history
Signed-off-by: yangsoon <songyang.song@alibaba-inc.com>
  • Loading branch information
yangsoon committed Apr 6, 2023
1 parent c730c05 commit ec56b2a
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 22 deletions.
2 changes: 2 additions & 0 deletions pkg/cue/model/keyword.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
ContextStepSessionID = "stepSessionID"
// ContextStepName is the name of the step
ContextStepName = "stepName"
// ContextStepGroupName is the name of the stepGroup
ContextStepGroupName = "stepGroupName"
// ContextSpanID is name for span id.
ContextSpanID = "spanID"
// OutputSecretName is used to store all secret names which are generated by cloud resource components
Expand Down
64 changes: 64 additions & 0 deletions pkg/cue/process/datamanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package process

import "github.com/kubevela/workflow/pkg/cue/model"

type DataManager interface {
Fill(ctx Context)
Remove(ctx Context, opts ...StepMetaBuilder)
}

type Tracing struct {
SpanID string
}

type StepRunTimeMeta struct {
Data map[string]interface{}
}

type StepMetaKV struct {
Key string
Value interface{}
}

type StepMetaBuilder func() StepMetaKV

func WithSessionID(id string) StepMetaBuilder {
return func() StepMetaKV { return StepMetaKV{Key: model.ContextStepSessionID, Value: id} }
}

func WithName(name string) StepMetaBuilder {
return func() StepMetaKV { return StepMetaKV{Key: model.ContextStepName, Value: name} }
}

func WithGroupName(name string) StepMetaBuilder {
return func() StepMetaKV { return StepMetaKV{Key: model.ContextStepGroupName, Value: name} }
}

func WithSpanID(id string) StepMetaBuilder {
return func() StepMetaKV { return StepMetaKV{Key: model.ContextSpanID, Value: id} }
}

func NewStepRunTimeMeta(builders ...StepMetaBuilder) DataManager {
meta := &StepRunTimeMeta{
Data: make(map[string]interface{}),
}
for _, b := range builders {
metaKV := b()
meta.Data[metaKV.Key] = metaKV.Value
}
return meta
}

func (s *StepRunTimeMeta) Fill(ctx Context) {
for k := range s.Data {
ctx.PushData(k, s.Data[k])
}
}

func (s *StepRunTimeMeta) Remove(ctx Context, builders ...StepMetaBuilder) {
for _, b := range builders {
metaKV := b()
delete(s.Data, metaKV.Key)
ctx.RemoveData(metaKV.Key)
}
}
5 changes: 5 additions & 0 deletions pkg/cue/process/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Context interface {
BaseContextLabels() map[string]string
SetParameters(params map[string]interface{})
PushData(key string, data interface{})
RemoveData(key string)
GetData(key string) interface{}
GetCtx() context.Context
SetCtx(context.Context)
Expand Down Expand Up @@ -218,6 +219,10 @@ func (ctx *templateContext) PushData(key string, data interface{}) {
ctx.data[key] = data
}

func (ctx *templateContext) RemoveData(key string) {
delete(ctx.data, key)
}

// GetData get data from context
func (ctx *templateContext) GetData(key string) interface{} {
return ctx.data[key]
Expand Down
34 changes: 28 additions & 6 deletions pkg/executor/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"sigs.k8s.io/yaml"

"github.com/google/go-cmp/cmp"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -36,12 +34,13 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/pointer"
"sigs.k8s.io/yaml"

monitorContext "github.com/kubevela/pkg/monitor/context"

"github.com/kubevela/workflow/api/v1alpha1"
wfContext "github.com/kubevela/workflow/pkg/context"
"github.com/kubevela/workflow/pkg/cue/model/value"
"github.com/kubevela/workflow/pkg/cue/process"
"github.com/kubevela/workflow/pkg/features"
"github.com/kubevela/workflow/pkg/providers/workspace"
"github.com/kubevela/workflow/pkg/tasks/builtin"
Expand Down Expand Up @@ -2306,7 +2305,7 @@ func makeRunner(step v1alpha1.WorkflowStep, subTaskRunners []types.TaskRunner) t
}, &types.Operation{}, err
}
case "step-group":
group, _ := builtin.StepGroup(step, &types.TaskGeneratorOptions{SubTaskRunners: subTaskRunners})
group, _ := builtin.StepGroup(step, &types.TaskGeneratorOptions{SubTaskRunners: subTaskRunners, ProcessContext: process.NewContext(process.ContextData{})})
run = group.Run
case "running":
run = func(ctx wfContext.Context, options *types.TaskRunOptions) (v1alpha1.StepStatus, *types.Operation, error) {
Expand All @@ -2329,6 +2328,17 @@ func makeRunner(step v1alpha1.WorkflowStep, subTaskRunners []types.TaskRunner) t
return &testTaskRunner{
step: step,
run: run,
fillContext: func(ctx monitorContext.Context, processCtx process.Context) types.ContextDataResetter {
manager := process.NewStepRunTimeMeta(
process.WithName(step.Name),
process.WithSessionID("id"),
process.WithSpanID(ctx.GetID()),
)
manager.Fill(processCtx)
return func(processCtx process.Context) {
manager.Remove(processCtx)
}
},
checkPending: func(ctx monitorContext.Context, wfCtx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus) {
if step.Type != "pending" {
return false, v1alpha1.StepStatus{}
Expand All @@ -2349,6 +2359,7 @@ type testTaskRunner struct {
step v1alpha1.WorkflowStep
run func(ctx wfContext.Context, options *types.TaskRunOptions) (v1alpha1.StepStatus, *types.Operation, error)
checkPending func(ctx monitorContext.Context, wfCtx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus)
fillContext func(ctx monitorContext.Context, processCtx process.Context) types.ContextDataResetter
}

// Name return step name.
Expand All @@ -2359,7 +2370,13 @@ func (tr *testTaskRunner) Name() string {
// Run execute task.
func (tr *testTaskRunner) Run(ctx wfContext.Context, options *types.TaskRunOptions) (v1alpha1.StepStatus, *types.Operation, error) {
logCtx := monitorContext.NewTraceContext(context.Background(), "test-app")
basicVal, basicTemplate, err := custom.MakeBasicValue(logCtx, ctx, nil, tr.step.Name, "id", "", options.PCtx)
if options.PCtx == nil {
options.PCtx = process.NewContext(process.ContextData{})
}
resetter := tr.fillContext(logCtx, options.PCtx)
defer resetter(options.PCtx)

basicVal, basicTemplate, err := custom.MakeBasicValue(ctx, "", options.PCtx)
if err != nil {
return v1alpha1.StepStatus{}, nil, err
}
Expand Down Expand Up @@ -2404,6 +2421,11 @@ func (tr *testTaskRunner) Pending(ctx monitorContext.Context, wfCtx wfContext.Co
return tr.checkPending(ctx, wfCtx, stepStatus)
}

// FillRunTime fill runtime data to context.
func (tr *testTaskRunner) FillContextData(ctx monitorContext.Context, processCtx process.Context) types.ContextDataResetter {
return tr.fillContext(ctx, processCtx)
}

func cleanStepTimeStamp(wfStatus *v1alpha1.WorkflowRunStatus) {
wfStatus.StartTime = metav1.Time{}
for index, step := range wfStatus.Steps {
Expand Down
22 changes: 20 additions & 2 deletions pkg/tasks/builtin/step_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ func (tr *stepGroupTaskRunner) Name() string {

// Pending check task should be executed or not.
func (tr *stepGroupTaskRunner) Pending(ctx monitorContext.Context, wfCtx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus) {
basicVal, _, _ := custom.MakeBasicValue(ctx, wfCtx, tr.pd, tr.name, tr.id, "", tr.pCtx)
resetter := tr.FillContextData(ctx, tr.pCtx)
defer resetter(tr.pCtx)
basicVal, _, _ := custom.MakeBasicValue(wfCtx, "", tr.pCtx)
return custom.CheckPending(wfCtx, tr.step, tr.id, stepStatus, basicVal)
}

Expand All @@ -81,7 +83,9 @@ func (tr *stepGroupTaskRunner) Run(ctx wfContext.Context, options *types.TaskRun
}
}
tracer := options.GetTracer(tr.id, tr.step).AddTag("step_name", tr.name, "step_type", types.WorkflowStepTypeStepGroup)
basicVal, basicTemplate, err := custom.MakeBasicValue(tracer, ctx, tr.pd, tr.name, tr.id, "", tr.pCtx)
resetter := tr.FillContextData(tracer, tr.pCtx)
defer resetter(tr.pCtx)
basicVal, basicTemplate, err := custom.MakeBasicValue(ctx, "", tr.pCtx)
if err != nil {
return status, nil, err
}
Expand Down Expand Up @@ -137,6 +141,20 @@ func (tr *stepGroupTaskRunner) Run(ctx wfContext.Context, options *types.TaskRun
return status, operations, nil
}

func (tr *stepGroupTaskRunner) FillContextData(ctx monitorContext.Context, processCtx process.Context) types.ContextDataResetter {
builders := []process.StepMetaBuilder{
process.WithName(tr.name),
process.WithSessionID(tr.id),
process.WithSpanID(ctx.GetID()),
process.WithGroupName(tr.name),
}
manager := process.NewStepRunTimeMeta(builders...)
manager.Fill(processCtx)
return func(processCtx process.Context) {
manager.Remove(processCtx, builders...)
}
}

func getStepGroupStatus(status v1alpha1.StepStatus, stepStatus v1alpha1.WorkflowStepStatus, operation *types.Operation, subTaskRunners int) (v1alpha1.StepStatus, *types.Operation) {
subStepCounts := make(map[string]int)
for _, subStepsStatus := range stepStatus.SubStepsStatus {
Expand Down
38 changes: 28 additions & 10 deletions pkg/tasks/custom/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type taskRunner struct {
name string
run func(ctx wfContext.Context, options *types.TaskRunOptions) (v1alpha1.StepStatus, *types.Operation, error)
checkPending func(ctx monitorContext.Context, wfCtx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus)
fillContext func(ctx monitorContext.Context, processCtx process.Context) types.ContextDataResetter
}

// Name return step name.
Expand All @@ -80,6 +81,10 @@ func (tr *taskRunner) Pending(ctx monitorContext.Context, wfCtx wfContext.Contex
return tr.checkPending(ctx, wfCtx, stepStatus)
}

func (tr *taskRunner) FillContextData(ctx monitorContext.Context, processCtx process.Context) types.ContextDataResetter {
return tr.fillContext(ctx, processCtx)
}

// nolint:gocyclo
func (t *TaskLoader) makeTaskGenerator(templ string) (types.TaskGenerator, error) {
return func(wfStep v1alpha1.WorkflowStep, genOpt *types.TaskGeneratorOptions) (types.TaskRunner, error) {
Expand Down Expand Up @@ -119,9 +124,23 @@ func (t *TaskLoader) makeTaskGenerator(templ string) (types.TaskGenerator, error
if t.runOptionsProcess != nil {
t.runOptionsProcess(options)
}
basicVal, _, _ := MakeBasicValue(ctx, wfCtx, t.pd, wfStep.Name, exec.wfStatus.ID, paramsStr, options.PCtx)
resetter := tRunner.fillContext(ctx, options.PCtx)
defer resetter(options.PCtx)
basicVal, _, _ := MakeBasicValue(wfCtx, paramsStr, options.PCtx)
return CheckPending(wfCtx, wfStep, exec.wfStatus.ID, stepStatus, basicVal)
}
tRunner.fillContext = func(ctx monitorContext.Context, processCtx process.Context) types.ContextDataResetter {
builders := []process.StepMetaBuilder{
process.WithName(wfStep.Name),
process.WithSessionID(exec.wfStatus.ID),
process.WithSpanID(ctx.GetID()),
}
manager := process.NewStepRunTimeMeta(builders...)
manager.Fill(processCtx)
return func(processCtx process.Context) {
manager.Remove(processCtx, builders...)
}
}
tRunner.run = func(ctx wfContext.Context, options *types.TaskRunOptions) (stepStatus v1alpha1.StepStatus, operations *types.Operation, rErr error) {
if options.GetTracer == nil {
options.GetTracer = func(id string, step v1alpha1.WorkflowStep) monitorContext.Context {
Expand All @@ -138,7 +157,9 @@ func (t *TaskLoader) makeTaskGenerator(templ string) (types.TaskGenerator, error
t.runOptionsProcess(options)
}

basicVal, basicTemplate, err := MakeBasicValue(tracer, ctx, t.pd, wfStep.Name, exec.wfStatus.ID, paramsStr, options.PCtx)
resetter := tRunner.fillContext(tracer, options.PCtx)
defer resetter(options.PCtx)
basicVal, basicTemplate, err := MakeBasicValue(ctx, paramsStr, options.PCtx)
if err != nil {
tracer.Error(err, "make context parameter")
return v1alpha1.StepStatus{}, nil, errors.WithMessage(err, "make context parameter")
Expand Down Expand Up @@ -292,12 +313,12 @@ func buildValueForStatus(ctx wfContext.Context, step v1alpha1.WorkflowStep, temp
}

// MakeBasicValue makes basic value
func MakeBasicValue(ctx monitorContext.Context, wfCtx wfContext.Context, pd *packages.PackageDiscover, step, id, parameterTemplate string, pCtx process.Context) (*value.Value, string, error) {
func MakeBasicValue(wfCtx wfContext.Context, parameterTemplate string, pCtx process.Context) (*value.Value, string, error) {
paramStr := model.ParameterFieldName + ": {}\n"
if parameterTemplate != "" {
paramStr = fmt.Sprintf(model.ParameterFieldName+": {%s}\n", parameterTemplate)
}
template := strings.Join([]string{getContextTemplate(ctx, wfCtx, step, id, pCtx), paramStr}, "\n")
template := strings.Join([]string{getContextTemplate(pCtx), paramStr}, "\n")
v, err := wfCtx.MakeParameter(template)
if err != nil {
return nil, "", err
Expand All @@ -308,14 +329,11 @@ func MakeBasicValue(ctx monitorContext.Context, wfCtx wfContext.Context, pd *pac
return v, template, nil
}

func getContextTemplate(ctx monitorContext.Context, wfCtx wfContext.Context, step, id string, pCtx process.Context) string {
contextTempl := fmt.Sprintf("\ncontext: stepSessionID: \"%s\"", id)
func getContextTemplate(pCtx process.Context) string {
var contextTempl string
if pCtx == nil {
return ""
return contextTempl
}
pCtx.PushData(model.ContextStepSessionID, id)
pCtx.PushData(model.ContextStepName, step)
pCtx.PushData(model.ContextSpanID, ctx.GetID())
c, err := pCtx.BaseContextFile()
if err != nil {
return ""
Expand Down
7 changes: 3 additions & 4 deletions pkg/tasks/custom/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@ import (
"testing"

"github.com/crossplane/crossplane-runtime/pkg/test"
monitorContext "github.com/kubevela/pkg/monitor/context"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/yaml"

monitorContext "github.com/kubevela/pkg/monitor/context"

"github.com/kubevela/workflow/api/v1alpha1"
wfContext "github.com/kubevela/workflow/pkg/context"
"github.com/kubevela/workflow/pkg/cue/model/value"
Expand Down Expand Up @@ -598,8 +597,8 @@ func TestValidateIfValue(t *testing.T) {
Namespace: "default",
Data: map[string]interface{}{"arr": []string{"a", "b"}},
})
logCtx := monitorContext.NewTraceContext(context.Background(), "test-app")
basicVal, basicTemplate, err := MakeBasicValue(logCtx, ctx, nil, "test-step", "id", `key: "value"`, pCtx)
//logCtx := monitorContext.NewTraceContext(context.Background(), "test-app")
basicVal, basicTemplate, err := MakeBasicValue(ctx, `key: "value"`, pCtx)
r := require.New(t)
r.NoError(err)

Expand Down
3 changes: 3 additions & 0 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,14 @@ type WorkflowMeta struct {
ChildOwnerReferences []metav1.OwnerReference
}

type ContextDataResetter func(processCtx process.Context)

// TaskRunner is a task runner
type TaskRunner interface {
Name() string
Pending(ctx monitorContext.Context, wfCtx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus)
Run(ctx wfContext.Context, options *TaskRunOptions) (v1alpha1.StepStatus, *Operation, error)
FillContextData(ctx monitorContext.Context, processCtx process.Context) ContextDataResetter
}

// TaskDiscover is the interface to obtain the TaskGenerator
Expand Down

0 comments on commit ec56b2a

Please sign in to comment.