Skip to content

Commit

Permalink
Feat: add stepGroupName to process.Context (#151)
Browse files Browse the repository at this point in the history
Signed-off-by: yangsoon <songyang.song@alibaba-inc.com>
Co-authored-by: yangsoon <songyang.song@alibaba-inc.com>
  • Loading branch information
yangsoon and yangsoon authored Apr 7, 2023
1 parent a4f3ec8 commit cd812ee
Show file tree
Hide file tree
Showing 11 changed files with 294 additions and 26 deletions.
26 changes: 26 additions & 0 deletions controllers/testdata/save-process-context.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
apiVersion: core.oam.dev/v1beta1
kind: WorkflowStepDefinition
metadata:
name: save-process-context
namespace: vela-system
spec:
schematic:
cue:
template: |
import "vela/op"
cm: op.#Apply & {
value: {
apiVersion: "v1"
kind: "ConfigMap"
metadata: {
name: parameter.name
labels: {
"process.context.data": "true"
}
}
data: context
}
}
parameter: name: string
97 changes: 96 additions & 1 deletion controllers/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ var _ = Describe("Test Workflow", func() {
},
},
}
testDefinitions := []string{"test-apply", "apply-object", "failed-render", "suspend-and-deploy", "multi-suspend"}
testDefinitions := []string{"test-apply", "apply-object", "failed-render", "suspend-and-deploy", "multi-suspend", "save-process-context"}

BeforeEach(func() {
setupNamespace(ctx, namespace)
Expand Down Expand Up @@ -1822,6 +1822,101 @@ var _ = Describe("Test Workflow", func() {
Namespace: wr.Namespace,
}, debugCM)).Should(BeNil())
})

It("test step context data", func() {
wr := wrTemplate.DeepCopy()
wr.Name = "test-step-context-data"
wr.Spec.WorkflowSpec.Steps = []v1alpha1.WorkflowStep{{
WorkflowStepBase: v1alpha1.WorkflowStepBase{
Name: "group1",
Type: "step-group",
},
SubSteps: []v1alpha1.WorkflowStepBase{
{
Name: "step1",
Type: "save-process-context",
Properties: &runtime.RawExtension{Raw: []byte(`{"name":"process-context-step1"}`)},
},
{
Name: "step2",
Type: "save-process-context",
Properties: &runtime.RawExtension{Raw: []byte(`{"name":"process-context-step2"}`)},
},
},
}, {
WorkflowStepBase: v1alpha1.WorkflowStepBase{
Name: "group2",
Type: "step-group",
},
SubSteps: []v1alpha1.WorkflowStepBase{
{
Name: "step3",
Type: "save-process-context",
Properties: &runtime.RawExtension{Raw: []byte(`{"name":"process-context-step3"}`)},
},
{
Name: "step4",
Type: "save-process-context",
Properties: &runtime.RawExtension{Raw: []byte(`{"name":"process-context-step4"}`)},
},
},
}, {
WorkflowStepBase: v1alpha1.WorkflowStepBase{
Name: "step5",
Type: "save-process-context",
Properties: &runtime.RawExtension{Raw: []byte(`{"name":"process-context-step5"}`)},
},
}}
Expect(k8sClient.Create(ctx, wr)).Should(BeNil())

tryReconcile(reconciler, wr.Name, wr.Namespace)
wrObj := &v1alpha1.WorkflowRun{}
Expect(k8sClient.Get(ctx, client.ObjectKey{
Name: wr.Name,
Namespace: wr.Namespace,
}, wrObj)).Should(BeNil())
cmList := new(corev1.ConfigMapList)
labels := &metav1.LabelSelector{
MatchLabels: map[string]string{
"process.context.data": "true",
},
}
selector, err := metav1.LabelSelectorAsSelector(labels)
Expect(err).Should(BeNil())
Expect(k8sClient.List(ctx, cmList, &client.ListOptions{
LabelSelector: selector,
})).Should(BeNil())

processCtxMap := make(map[string]map[string]string)
for _, cm := range cmList.Items {
processCtxMap[cm.Name] = cm.Data
}
step1Ctx := processCtxMap["process-context-step1"]
step2Ctx := processCtxMap["process-context-step2"]
step3Ctx := processCtxMap["process-context-step3"]
step4Ctx := processCtxMap["process-context-step4"]
step5Ctx := processCtxMap["process-context-step5"]

By("check context.stepName")
Expect(step1Ctx["stepName"]).Should(Equal("step1"))
Expect(step2Ctx["stepName"]).Should(Equal("step2"))
Expect(step3Ctx["stepName"]).Should(Equal("step3"))
Expect(step4Ctx["stepName"]).Should(Equal("step4"))
Expect(step5Ctx["stepName"]).Should(Equal("step5"))

By("check context.stepGroupName")
Expect(step1Ctx["stepGroupName"]).Should(Equal("group1"))
Expect(step2Ctx["stepGroupName"]).Should(Equal("group1"))
Expect(step3Ctx["stepGroupName"]).Should(Equal("group2"))
Expect(step4Ctx["stepGroupName"]).Should(Equal("group2"))
Expect(step5Ctx["stepGroupName"]).Should(Equal(""))

By("check context.spanID")
spanID := strings.Split(step1Ctx["spanID"], ".")[0]
for _, pCtx := range processCtxMap {
Expect(pCtx["spanID"]).Should(ContainSubstring(spanID))
}
})
})

func reconcileWithReturn(r *WorkflowRunReconciler, name, ns string) error {
Expand Down
2 changes: 2 additions & 0 deletions pkg/cue/model/keyword.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
ContextStepSessionID = "stepSessionID"
// ContextStepName is the name of the step
ContextStepName = "stepName"
// ContextStepGroupName is the name of the stepGroup
ContextStepGroupName = "stepGroupName"
// ContextSpanID is name for span id.
ContextSpanID = "spanID"
// OutputSecretName is used to store all secret names which are generated by cloud resource components
Expand Down
69 changes: 69 additions & 0 deletions pkg/cue/process/datamanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package process

import "github.com/kubevela/workflow/pkg/cue/model"

// DataManager is in charge of injecting and removing runtime context for ContextData
type DataManager interface {
Fill(ctx Context, kvs []StepMetaKV)
Remove(ctx Context, keys []string)
}

// StepRunTimeMeta manage step runtime metadata
type StepRunTimeMeta struct{}

// StepMetaKV store the key and value of step runtime metadata
type StepMetaKV struct {
Key string
Value interface{}
}

// WithSessionID return stepSessionID of the step
func WithSessionID(id string) StepMetaKV {
return StepMetaKV{
Key: model.ContextStepSessionID,
Value: id,
}
}

// WithName return stepName of the step
func WithName(name string) StepMetaKV {
return StepMetaKV{
Key: model.ContextStepName,
Value: name,
}
}

// WithGroupName return stepGroupName of the step
func WithGroupName(name string) StepMetaKV {
return StepMetaKV{
Key: model.ContextStepGroupName,
Value: name,
}
}

// WithSpanID return spanID of the step
func WithSpanID(id string) StepMetaKV {
return StepMetaKV{
Key: model.ContextSpanID,
Value: id,
}
}

// NewStepRunTimeMeta create step runtime metadata manager
func NewStepRunTimeMeta() DataManager {
return &StepRunTimeMeta{}
}

// Fill will fill step runtime metadata for ContextData
func (s *StepRunTimeMeta) Fill(ctx Context, kvs []StepMetaKV) {
for _, kv := range kvs {
ctx.PushData(kv.Key, kv.Value)
}
}

// Remove remove step runtime metadata of ContextData
func (s *StepRunTimeMeta) Remove(ctx Context, keys []string) {
for _, key := range keys {
ctx.RemoveData(key)
}
}
5 changes: 5 additions & 0 deletions pkg/cue/process/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Context interface {
BaseContextLabels() map[string]string
SetParameters(params map[string]interface{})
PushData(key string, data interface{})
RemoveData(key string)
GetData(key string) interface{}
GetCtx() context.Context
SetCtx(context.Context)
Expand Down Expand Up @@ -218,6 +219,10 @@ func (ctx *templateContext) PushData(key string, data interface{}) {
ctx.data[key] = data
}

func (ctx *templateContext) RemoveData(key string) {
delete(ctx.data, key)
}

// GetData get data from context
func (ctx *templateContext) GetData(key string) interface{} {
return ctx.data[key]
Expand Down
35 changes: 29 additions & 6 deletions pkg/executor/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/kubevela/pkg/util/slices"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"sigs.k8s.io/yaml"

"github.com/google/go-cmp/cmp"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -36,12 +35,13 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/pointer"
"sigs.k8s.io/yaml"

monitorContext "github.com/kubevela/pkg/monitor/context"

"github.com/kubevela/workflow/api/v1alpha1"
wfContext "github.com/kubevela/workflow/pkg/context"
"github.com/kubevela/workflow/pkg/cue/model/value"
"github.com/kubevela/workflow/pkg/cue/process"
"github.com/kubevela/workflow/pkg/features"
"github.com/kubevela/workflow/pkg/providers/workspace"
"github.com/kubevela/workflow/pkg/tasks/builtin"
Expand Down Expand Up @@ -2306,7 +2306,7 @@ func makeRunner(step v1alpha1.WorkflowStep, subTaskRunners []types.TaskRunner) t
}, &types.Operation{}, err
}
case "step-group":
group, _ := builtin.StepGroup(step, &types.TaskGeneratorOptions{SubTaskRunners: subTaskRunners})
group, _ := builtin.StepGroup(step, &types.TaskGeneratorOptions{SubTaskRunners: subTaskRunners, ProcessContext: process.NewContext(process.ContextData{})})
run = group.Run
case "running":
run = func(ctx wfContext.Context, options *types.TaskRunOptions) (v1alpha1.StepStatus, *types.Operation, error) {
Expand All @@ -2329,6 +2329,17 @@ func makeRunner(step v1alpha1.WorkflowStep, subTaskRunners []types.TaskRunner) t
return &testTaskRunner{
step: step,
run: run,
fillContext: func(ctx monitorContext.Context, processCtx process.Context) types.ContextDataResetter {
metas := []process.StepMetaKV{process.WithName(step.Name), process.WithSessionID("id"), process.WithSpanID(ctx.GetID())}
manager := process.NewStepRunTimeMeta()
manager.Fill(processCtx, metas)
return func(processCtx process.Context) {
manager.Remove(processCtx, slices.Map(metas,
func(t process.StepMetaKV) string {
return t.Key
}))
}
},
checkPending: func(ctx monitorContext.Context, wfCtx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus) {
if step.Type != "pending" {
return false, v1alpha1.StepStatus{}
Expand All @@ -2349,6 +2360,7 @@ type testTaskRunner struct {
step v1alpha1.WorkflowStep
run func(ctx wfContext.Context, options *types.TaskRunOptions) (v1alpha1.StepStatus, *types.Operation, error)
checkPending func(ctx monitorContext.Context, wfCtx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus)
fillContext func(ctx monitorContext.Context, processCtx process.Context) types.ContextDataResetter
}

// Name return step name.
Expand All @@ -2359,7 +2371,13 @@ func (tr *testTaskRunner) Name() string {
// Run execute task.
func (tr *testTaskRunner) Run(ctx wfContext.Context, options *types.TaskRunOptions) (v1alpha1.StepStatus, *types.Operation, error) {
logCtx := monitorContext.NewTraceContext(context.Background(), "test-app")
basicVal, basicTemplate, err := custom.MakeBasicValue(logCtx, ctx, nil, tr.step.Name, "id", "", options.PCtx)
if options.PCtx == nil {
options.PCtx = process.NewContext(process.ContextData{})
}
resetter := tr.fillContext(logCtx, options.PCtx)
defer resetter(options.PCtx)

basicVal, basicTemplate, err := custom.MakeBasicValue(ctx, "", options.PCtx)
if err != nil {
return v1alpha1.StepStatus{}, nil, err
}
Expand Down Expand Up @@ -2404,6 +2422,11 @@ func (tr *testTaskRunner) Pending(ctx monitorContext.Context, wfCtx wfContext.Co
return tr.checkPending(ctx, wfCtx, stepStatus)
}

// FillRunTime fill runtime data to context.
func (tr *testTaskRunner) FillContextData(ctx monitorContext.Context, processCtx process.Context) types.ContextDataResetter {
return tr.fillContext(ctx, processCtx)
}

func cleanStepTimeStamp(wfStatus *v1alpha1.WorkflowRunStatus) {
wfStatus.StartTime = metav1.Time{}
for index, step := range wfStatus.Steps {
Expand Down
27 changes: 25 additions & 2 deletions pkg/tasks/builtin/step_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"

monitorContext "github.com/kubevela/pkg/monitor/context"
"github.com/kubevela/pkg/util/slices"

"github.com/kubevela/workflow/api/v1alpha1"
wfContext "github.com/kubevela/workflow/pkg/context"
Expand Down Expand Up @@ -61,7 +62,9 @@ func (tr *stepGroupTaskRunner) Name() string {

// Pending check task should be executed or not.
func (tr *stepGroupTaskRunner) Pending(ctx monitorContext.Context, wfCtx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus) {
basicVal, _, _ := custom.MakeBasicValue(ctx, wfCtx, tr.pd, tr.name, tr.id, "", tr.pCtx)
resetter := tr.FillContextData(ctx, tr.pCtx)
defer resetter(tr.pCtx)
basicVal, _, _ := custom.MakeBasicValue(wfCtx, "", tr.pCtx)
return custom.CheckPending(wfCtx, tr.step, tr.id, stepStatus, basicVal)
}

Expand All @@ -81,7 +84,9 @@ func (tr *stepGroupTaskRunner) Run(ctx wfContext.Context, options *types.TaskRun
}
}
tracer := options.GetTracer(tr.id, tr.step).AddTag("step_name", tr.name, "step_type", types.WorkflowStepTypeStepGroup)
basicVal, basicTemplate, err := custom.MakeBasicValue(tracer, ctx, tr.pd, tr.name, tr.id, "", tr.pCtx)
resetter := tr.FillContextData(tracer, tr.pCtx)
defer resetter(tr.pCtx)
basicVal, basicTemplate, err := custom.MakeBasicValue(ctx, "", tr.pCtx)
if err != nil {
return status, nil, err
}
Expand Down Expand Up @@ -137,6 +142,24 @@ func (tr *stepGroupTaskRunner) Run(ctx wfContext.Context, options *types.TaskRun
return status, operations, nil
}

func (tr *stepGroupTaskRunner) FillContextData(ctx monitorContext.Context, processCtx process.Context) types.ContextDataResetter {
metas := []process.StepMetaKV{
process.WithName(tr.name),
process.WithSessionID(tr.id),
process.WithSpanID(ctx.GetID()),
process.WithGroupName(tr.name),
}
manager := process.NewStepRunTimeMeta()
manager.Fill(processCtx, metas)
return func(processCtx process.Context) {
manager.Remove(processCtx, slices.Map(metas,
func(t process.StepMetaKV) string {
return t.Key
}),
)
}
}

func getStepGroupStatus(status v1alpha1.StepStatus, stepStatus v1alpha1.WorkflowStepStatus, operation *types.Operation, subTaskRunners int) (v1alpha1.StepStatus, *types.Operation) {
subStepCounts := make(map[string]int)
for _, subStepsStatus := range stepStatus.SubStepsStatus {
Expand Down
3 changes: 2 additions & 1 deletion pkg/tasks/builtin/step_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"testing"

"github.com/kubevela/workflow/pkg/cue/process"
"github.com/stretchr/testify/require"
"sigs.k8s.io/yaml"

Expand Down Expand Up @@ -53,7 +54,7 @@ func TestStepGroupStep(t *testing.T) {
Name: "test",
DependsOn: []string{"depend"},
},
}, &types.TaskGeneratorOptions{ID: "124", SubTaskRunners: []types.TaskRunner{subRunner}})
}, &types.TaskGeneratorOptions{ID: "124", SubTaskRunners: []types.TaskRunner{subRunner}, ProcessContext: process.NewContext(process.ContextData{})})
r.NoError(err)
r.Equal(runner.Name(), "test")

Expand Down
Loading

0 comments on commit cd812ee

Please sign in to comment.