Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add stepGroupName to process.Context #151

Merged
merged 1 commit into from
Apr 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions controllers/testdata/save-process-context.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
apiVersion: core.oam.dev/v1beta1
kind: WorkflowStepDefinition
metadata:
name: save-process-context
namespace: vela-system
spec:
schematic:
cue:
template: |
import "vela/op"
cm: op.#Apply & {
value: {
apiVersion: "v1"
kind: "ConfigMap"
metadata: {
name: parameter.name
labels: {
"process.context.data": "true"
}
}
data: context
}
}
parameter: name: string
97 changes: 96 additions & 1 deletion controllers/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ var _ = Describe("Test Workflow", func() {
},
},
}
testDefinitions := []string{"test-apply", "apply-object", "failed-render", "suspend-and-deploy", "multi-suspend"}
testDefinitions := []string{"test-apply", "apply-object", "failed-render", "suspend-and-deploy", "multi-suspend", "save-process-context"}

BeforeEach(func() {
setupNamespace(ctx, namespace)
Expand Down Expand Up @@ -1748,6 +1748,101 @@ var _ = Describe("Test Workflow", func() {
Namespace: wr.Namespace,
}, debugCM)).Should(BeNil())
})

It("test step context data", func() {
wr := wrTemplate.DeepCopy()
wr.Name = "test-step-context-data"
wr.Spec.WorkflowSpec.Steps = []v1alpha1.WorkflowStep{{
WorkflowStepBase: v1alpha1.WorkflowStepBase{
Name: "group1",
Type: "step-group",
},
SubSteps: []v1alpha1.WorkflowStepBase{
{
Name: "step1",
Type: "save-process-context",
Properties: &runtime.RawExtension{Raw: []byte(`{"name":"process-context-step1"}`)},
},
{
Name: "step2",
Type: "save-process-context",
Properties: &runtime.RawExtension{Raw: []byte(`{"name":"process-context-step2"}`)},
},
},
}, {
WorkflowStepBase: v1alpha1.WorkflowStepBase{
Name: "group2",
Type: "step-group",
},
SubSteps: []v1alpha1.WorkflowStepBase{
{
Name: "step3",
Type: "save-process-context",
Properties: &runtime.RawExtension{Raw: []byte(`{"name":"process-context-step3"}`)},
},
{
Name: "step4",
Type: "save-process-context",
Properties: &runtime.RawExtension{Raw: []byte(`{"name":"process-context-step4"}`)},
},
},
}, {
WorkflowStepBase: v1alpha1.WorkflowStepBase{
Name: "step5",
Type: "save-process-context",
Properties: &runtime.RawExtension{Raw: []byte(`{"name":"process-context-step5"}`)},
},
}}
Expect(k8sClient.Create(ctx, wr)).Should(BeNil())

tryReconcile(reconciler, wr.Name, wr.Namespace)
wrObj := &v1alpha1.WorkflowRun{}
Expect(k8sClient.Get(ctx, client.ObjectKey{
Name: wr.Name,
Namespace: wr.Namespace,
}, wrObj)).Should(BeNil())
cmList := new(corev1.ConfigMapList)
labels := &metav1.LabelSelector{
MatchLabels: map[string]string{
"process.context.data": "true",
},
}
selector, err := metav1.LabelSelectorAsSelector(labels)
Expect(err).Should(BeNil())
Expect(k8sClient.List(ctx, cmList, &client.ListOptions{
LabelSelector: selector,
})).Should(BeNil())

processCtxMap := make(map[string]map[string]string)
for _, cm := range cmList.Items {
processCtxMap[cm.Name] = cm.Data
}
step1Ctx := processCtxMap["process-context-step1"]
step2Ctx := processCtxMap["process-context-step2"]
step3Ctx := processCtxMap["process-context-step3"]
step4Ctx := processCtxMap["process-context-step4"]
step5Ctx := processCtxMap["process-context-step5"]

By("check context.stepName")
Expect(step1Ctx["stepName"]).Should(Equal("step1"))
Expect(step2Ctx["stepName"]).Should(Equal("step2"))
Expect(step3Ctx["stepName"]).Should(Equal("step3"))
Expect(step4Ctx["stepName"]).Should(Equal("step4"))
Expect(step5Ctx["stepName"]).Should(Equal("step5"))

By("check context.stepGroupName")
Expect(step1Ctx["stepGroupName"]).Should(Equal("group1"))
Expect(step2Ctx["stepGroupName"]).Should(Equal("group1"))
Expect(step3Ctx["stepGroupName"]).Should(Equal("group2"))
Expect(step4Ctx["stepGroupName"]).Should(Equal("group2"))
Expect(step5Ctx["stepGroupName"]).Should(Equal(""))

By("check context.spanID")
spanID := strings.Split(step1Ctx["spanID"], ".")[0]
for _, pCtx := range processCtxMap {
Expect(pCtx["spanID"]).Should(ContainSubstring(spanID))
}
})
})

func reconcileWithReturn(r *WorkflowRunReconciler, name, ns string) error {
Expand Down
2 changes: 2 additions & 0 deletions pkg/cue/model/keyword.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
ContextStepSessionID = "stepSessionID"
// ContextStepName is the name of the step
ContextStepName = "stepName"
// ContextStepGroupName is the name of the stepGroup
ContextStepGroupName = "stepGroupName"
// ContextSpanID is name for span id.
ContextSpanID = "spanID"
// OutputSecretName is used to store all secret names which are generated by cloud resource components
Expand Down
69 changes: 69 additions & 0 deletions pkg/cue/process/datamanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package process

import "github.com/kubevela/workflow/pkg/cue/model"

// DataManager is in charge of injecting and removing runtime context for ContextData
type DataManager interface {
Fill(ctx Context, kvs []StepMetaKV)
Remove(ctx Context, keys []string)
}

// StepRunTimeMeta manage step runtime metadata
type StepRunTimeMeta struct{}

// StepMetaKV store the key and value of step runtime metadata
type StepMetaKV struct {
Key string
Value interface{}
}

// WithSessionID return stepSessionID of the step
func WithSessionID(id string) StepMetaKV {
return StepMetaKV{
Key: model.ContextStepSessionID,
Value: id,
}
}

// WithName return stepName of the step
func WithName(name string) StepMetaKV {
return StepMetaKV{
Key: model.ContextStepName,
Value: name,
}
}

// WithGroupName return stepGroupName of the step
func WithGroupName(name string) StepMetaKV {
return StepMetaKV{
Key: model.ContextStepGroupName,
Value: name,
}
}

// WithSpanID return spanID of the step
func WithSpanID(id string) StepMetaKV {
return StepMetaKV{
Key: model.ContextSpanID,
Value: id,
}
}

// NewStepRunTimeMeta create step runtime metadata manager
func NewStepRunTimeMeta() DataManager {
return &StepRunTimeMeta{}
}

// Fill will fill step runtime metadata for ContextData
func (s *StepRunTimeMeta) Fill(ctx Context, kvs []StepMetaKV) {
for _, kv := range kvs {
ctx.PushData(kv.Key, kv.Value)
}
}

// Remove remove step runtime metadata of ContextData
func (s *StepRunTimeMeta) Remove(ctx Context, keys []string) {
for _, key := range keys {
ctx.RemoveData(key)
}
}
5 changes: 5 additions & 0 deletions pkg/cue/process/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Context interface {
BaseContextLabels() map[string]string
SetParameters(params map[string]interface{})
PushData(key string, data interface{})
RemoveData(key string)
GetData(key string) interface{}
GetCtx() context.Context
SetCtx(context.Context)
Expand Down Expand Up @@ -218,6 +219,10 @@ func (ctx *templateContext) PushData(key string, data interface{}) {
ctx.data[key] = data
}

func (ctx *templateContext) RemoveData(key string) {
delete(ctx.data, key)
}

// GetData get data from context
func (ctx *templateContext) GetData(key string) interface{} {
return ctx.data[key]
Expand Down
35 changes: 29 additions & 6 deletions pkg/executor/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/kubevela/pkg/util/slices"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"sigs.k8s.io/yaml"

"github.com/google/go-cmp/cmp"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -36,12 +35,13 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/pointer"
"sigs.k8s.io/yaml"

monitorContext "github.com/kubevela/pkg/monitor/context"

"github.com/kubevela/workflow/api/v1alpha1"
wfContext "github.com/kubevela/workflow/pkg/context"
"github.com/kubevela/workflow/pkg/cue/model/value"
"github.com/kubevela/workflow/pkg/cue/process"
"github.com/kubevela/workflow/pkg/features"
"github.com/kubevela/workflow/pkg/providers/workspace"
"github.com/kubevela/workflow/pkg/tasks/builtin"
Expand Down Expand Up @@ -2306,7 +2306,7 @@ func makeRunner(step v1alpha1.WorkflowStep, subTaskRunners []types.TaskRunner) t
}, &types.Operation{}, err
}
case "step-group":
group, _ := builtin.StepGroup(step, &types.TaskGeneratorOptions{SubTaskRunners: subTaskRunners})
group, _ := builtin.StepGroup(step, &types.TaskGeneratorOptions{SubTaskRunners: subTaskRunners, ProcessContext: process.NewContext(process.ContextData{})})
run = group.Run
case "running":
run = func(ctx wfContext.Context, options *types.TaskRunOptions) (v1alpha1.StepStatus, *types.Operation, error) {
Expand All @@ -2329,6 +2329,17 @@ func makeRunner(step v1alpha1.WorkflowStep, subTaskRunners []types.TaskRunner) t
return &testTaskRunner{
step: step,
run: run,
fillContext: func(ctx monitorContext.Context, processCtx process.Context) types.ContextDataResetter {
metas := []process.StepMetaKV{process.WithName(step.Name), process.WithSessionID("id"), process.WithSpanID(ctx.GetID())}
manager := process.NewStepRunTimeMeta()
manager.Fill(processCtx, metas)
return func(processCtx process.Context) {
manager.Remove(processCtx, slices.Map(metas,
func(t process.StepMetaKV) string {
return t.Key
}))
}
},
checkPending: func(ctx monitorContext.Context, wfCtx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus) {
if step.Type != "pending" {
return false, v1alpha1.StepStatus{}
Expand All @@ -2349,6 +2360,7 @@ type testTaskRunner struct {
step v1alpha1.WorkflowStep
run func(ctx wfContext.Context, options *types.TaskRunOptions) (v1alpha1.StepStatus, *types.Operation, error)
checkPending func(ctx monitorContext.Context, wfCtx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus)
fillContext func(ctx monitorContext.Context, processCtx process.Context) types.ContextDataResetter
}

// Name return step name.
Expand All @@ -2359,7 +2371,13 @@ func (tr *testTaskRunner) Name() string {
// Run execute task.
func (tr *testTaskRunner) Run(ctx wfContext.Context, options *types.TaskRunOptions) (v1alpha1.StepStatus, *types.Operation, error) {
logCtx := monitorContext.NewTraceContext(context.Background(), "test-app")
basicVal, basicTemplate, err := custom.MakeBasicValue(logCtx, ctx, nil, tr.step.Name, "id", "", options.PCtx)
if options.PCtx == nil {
options.PCtx = process.NewContext(process.ContextData{})
}
resetter := tr.fillContext(logCtx, options.PCtx)
defer resetter(options.PCtx)

basicVal, basicTemplate, err := custom.MakeBasicValue(ctx, "", options.PCtx)
if err != nil {
return v1alpha1.StepStatus{}, nil, err
}
Expand Down Expand Up @@ -2404,6 +2422,11 @@ func (tr *testTaskRunner) Pending(ctx monitorContext.Context, wfCtx wfContext.Co
return tr.checkPending(ctx, wfCtx, stepStatus)
}

// FillRunTime fill runtime data to context.
func (tr *testTaskRunner) FillContextData(ctx monitorContext.Context, processCtx process.Context) types.ContextDataResetter {
return tr.fillContext(ctx, processCtx)
}

func cleanStepTimeStamp(wfStatus *v1alpha1.WorkflowRunStatus) {
wfStatus.StartTime = metav1.Time{}
for index, step := range wfStatus.Steps {
Expand Down
27 changes: 25 additions & 2 deletions pkg/tasks/builtin/step_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"

monitorContext "github.com/kubevela/pkg/monitor/context"
"github.com/kubevela/pkg/util/slices"

"github.com/kubevela/workflow/api/v1alpha1"
wfContext "github.com/kubevela/workflow/pkg/context"
Expand Down Expand Up @@ -61,7 +62,9 @@ func (tr *stepGroupTaskRunner) Name() string {

// Pending check task should be executed or not.
func (tr *stepGroupTaskRunner) Pending(ctx monitorContext.Context, wfCtx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus) {
basicVal, _, _ := custom.MakeBasicValue(ctx, wfCtx, tr.pd, tr.name, tr.id, "", tr.pCtx)
resetter := tr.FillContextData(ctx, tr.pCtx)
defer resetter(tr.pCtx)
basicVal, _, _ := custom.MakeBasicValue(wfCtx, "", tr.pCtx)
return custom.CheckPending(wfCtx, tr.step, tr.id, stepStatus, basicVal)
}

Expand All @@ -81,7 +84,9 @@ func (tr *stepGroupTaskRunner) Run(ctx wfContext.Context, options *types.TaskRun
}
}
tracer := options.GetTracer(tr.id, tr.step).AddTag("step_name", tr.name, "step_type", types.WorkflowStepTypeStepGroup)
basicVal, basicTemplate, err := custom.MakeBasicValue(tracer, ctx, tr.pd, tr.name, tr.id, "", tr.pCtx)
resetter := tr.FillContextData(tracer, tr.pCtx)
defer resetter(tr.pCtx)
basicVal, basicTemplate, err := custom.MakeBasicValue(ctx, "", tr.pCtx)
if err != nil {
return status, nil, err
}
Expand Down Expand Up @@ -137,6 +142,24 @@ func (tr *stepGroupTaskRunner) Run(ctx wfContext.Context, options *types.TaskRun
return status, operations, nil
}

func (tr *stepGroupTaskRunner) FillContextData(ctx monitorContext.Context, processCtx process.Context) types.ContextDataResetter {
metas := []process.StepMetaKV{
process.WithName(tr.name),
process.WithSessionID(tr.id),
process.WithSpanID(ctx.GetID()),
process.WithGroupName(tr.name),
}
manager := process.NewStepRunTimeMeta()
manager.Fill(processCtx, metas)
return func(processCtx process.Context) {
manager.Remove(processCtx, slices.Map(metas,
func(t process.StepMetaKV) string {
return t.Key
}),
)
}
}

func getStepGroupStatus(status v1alpha1.StepStatus, stepStatus v1alpha1.WorkflowStepStatus, operation *types.Operation, subTaskRunners int) (v1alpha1.StepStatus, *types.Operation) {
subStepCounts := make(map[string]int)
for _, subStepsStatus := range stepStatus.SubStepsStatus {
Expand Down
3 changes: 2 additions & 1 deletion pkg/tasks/builtin/step_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"testing"

"github.com/kubevela/workflow/pkg/cue/process"
"github.com/stretchr/testify/require"
"sigs.k8s.io/yaml"

Expand Down Expand Up @@ -53,7 +54,7 @@ func TestStepGroupStep(t *testing.T) {
Name: "test",
DependsOn: []string{"depend"},
},
}, &types.TaskGeneratorOptions{ID: "124", SubTaskRunners: []types.TaskRunner{subRunner}})
}, &types.TaskGeneratorOptions{ID: "124", SubTaskRunners: []types.TaskRunner{subRunner}, ProcessContext: process.NewContext(process.ContextData{})})
r.NoError(err)
r.Equal(runner.Name(), "test")

Expand Down
Loading