diff --git a/cmd/kubectl-flyte/cmd/printers/node.go b/cmd/kubectl-flyte/cmd/printers/node.go index 237df5bea..ae00fb411 100644 --- a/cmd/kubectl-flyte/cmd/printers/node.go +++ b/cmd/kubectl-flyte/cmd/printers/node.go @@ -58,7 +58,7 @@ func (p NodeStatusPrinter) BaseNodeInfo(node v1alpha1.BaseNode, nodeStatus v1alp } func (p NodeStatusPrinter) NodeInfo(wName string, node v1alpha1.BaseNode, nodeStatus v1alpha1.ExecutableNodeStatus) []string { - resourceName, err := encoding.FixedLengthUniqueIDForParts(task.IDMaxLength, wName, node.GetID(), strconv.Itoa(int(nodeStatus.GetAttempts()))) + resourceName, err := encoding.FixedLengthUniqueIDForParts(task.IDMaxLength, []string{wName, node.GetID(), strconv.Itoa(int(nodeStatus.GetAttempts()))}) if err != nil { resourceName = "na" } diff --git a/cmd/kubectl-flyte/cmd/string_map_value.go b/cmd/kubectl-flyte/cmd/string_map_value.go index 6f067ab20..b0e7e1380 100644 --- a/cmd/kubectl-flyte/cmd/string_map_value.go +++ b/cmd/kubectl-flyte/cmd/string_map_value.go @@ -22,7 +22,7 @@ func newStringMapValue() *stringMapValue { var entryRegex = regexp.MustCompile("(?P[^,]+)=(?P[^,]+)") -// Parses val into a map. Accepted format: a=1,b=2 +// Set parses val into a map. Accepted format: a=1,b=2 func (s *stringMapValue) Set(val string) error { matches := entryRegex.FindAllStringSubmatch(val, -1) out := make(map[string]string, len(matches)) diff --git a/cmd/kubectl-flyte/cmd/string_map_value_test.go b/cmd/kubectl-flyte/cmd/string_map_value_test.go index ff3ae8d17..1c2ed2873 100644 --- a/cmd/kubectl-flyte/cmd/string_map_value_test.go +++ b/cmd/kubectl-flyte/cmd/string_map_value_test.go @@ -23,7 +23,7 @@ func formatArg(values map[string]string) string { func randSpaces() string { res := "" - for cnt := rand.Int() % 10; cnt > 0; cnt-- { // nolint: gas + for cnt := rand.Int()%10 + 1; cnt > 0; cnt-- { // nolint: gas res += " " } diff --git a/go.mod b/go.mod index 2bb15e190..6c15363b5 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.13.0 github.com/flyteorg/flyteidl v1.1.10 - github.com/flyteorg/flyteplugins v1.0.10 + github.com/flyteorg/flyteplugins v1.0.13 github.com/flyteorg/flytestdlib v1.0.5 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible diff --git a/go.sum b/go.sum index 4621e7bdf..8be4efbec 100644 --- a/go.sum +++ b/go.sum @@ -294,8 +294,8 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4 github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flyteorg/flyteidl v1.1.10 h1:Bus/JUto0oBTjAS4EBN7EITeuZNS4naq+uFpj+ydaW4= github.com/flyteorg/flyteidl v1.1.10/go.mod h1:f1tvw5CDjqmrzNxKpRYr6BdAhHL8f7Wp1Duxl0ZOV4g= -github.com/flyteorg/flyteplugins v1.0.10 h1:XBycM4aOSE/WlI8iP9vqogKGXy4FMfVCUUfzxJus/p4= -github.com/flyteorg/flyteplugins v1.0.10/go.mod h1:GfbmRByI/rSatm/Epoj3bNyrXwIQ9NOXTVwLS6Z0p84= +github.com/flyteorg/flyteplugins v1.0.13 h1:mNGImGSdGsYUjmB9vUzZAWqh/h7FCH+MyMRPS78z6Z0= +github.com/flyteorg/flyteplugins v1.0.13/go.mod h1:GfbmRByI/rSatm/Epoj3bNyrXwIQ9NOXTVwLS6Z0p84= github.com/flyteorg/flytestdlib v1.0.0/go.mod h1:QSVN5wIM1lM9d60eAEbX7NwweQXW96t5x4jbyftn89c= github.com/flyteorg/flytestdlib v1.0.5 h1:80A/vfpAJl+pgU6vxccbsYApZPrvyGhOIsCAFngsjnk= github.com/flyteorg/flytestdlib v1.0.5/go.mod h1:WTe0k3DmmrKFjj3hwiIbjjdCK89X63MBzBbXhQ4Yxf0= diff --git a/pkg/apis/flyteworkflow/v1alpha1/iface.go b/pkg/apis/flyteworkflow/v1alpha1/iface.go index 26cfd2321..1ceddd36f 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -236,8 +236,8 @@ type MutableDynamicNodeStatus interface { SetExecutionError(executionError *core.ExecutionError) } -// Interface for Branch node. All the methods are purely read only except for the GetExecutionStatus. -// p returns ExecutableBranchNodeStatus, which permits some mutations +// ExecutableBranchNode is an interface for Branch node. All the methods are purely read only except for the +// GetExecutionStatus. p returns ExecutableBranchNodeStatus, which permits some mutations type ExecutableBranchNode interface { GetIf() ExecutableIfBlock GetElse() *NodeID @@ -443,6 +443,7 @@ type Meta interface { GetSecurityContext() core.SecurityContext IsInterruptible() bool GetEventVersion() EventVersion + GetDefinitionVersion() WorkflowDefinitionVersion GetRawOutputDataConfig() RawOutputDataConfig } diff --git a/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflow.go b/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflow.go index 8a582f63e..42c8e694b 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflow.go +++ b/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflow.go @@ -195,6 +195,38 @@ func (_m *ExecutableWorkflow) GetCreationTimestamp() v1.Time { return r0 } +type ExecutableWorkflow_GetDefinitionVersion struct { + *mock.Call +} + +func (_m ExecutableWorkflow_GetDefinitionVersion) Return(_a0 v1alpha1.WorkflowDefinitionVersion) *ExecutableWorkflow_GetDefinitionVersion { + return &ExecutableWorkflow_GetDefinitionVersion{Call: _m.Call.Return(_a0)} +} + +func (_m *ExecutableWorkflow) OnGetDefinitionVersion() *ExecutableWorkflow_GetDefinitionVersion { + c_call := _m.On("GetDefinitionVersion") + return &ExecutableWorkflow_GetDefinitionVersion{Call: c_call} +} + +func (_m *ExecutableWorkflow) OnGetDefinitionVersionMatch(matchers ...interface{}) *ExecutableWorkflow_GetDefinitionVersion { + c_call := _m.On("GetDefinitionVersion", matchers...) + return &ExecutableWorkflow_GetDefinitionVersion{Call: c_call} +} + +// GetDefinitionVersion provides a mock function with given fields: +func (_m *ExecutableWorkflow) GetDefinitionVersion() v1alpha1.WorkflowDefinitionVersion { + ret := _m.Called() + + var r0 v1alpha1.WorkflowDefinitionVersion + if rf, ok := ret.Get(0).(func() v1alpha1.WorkflowDefinitionVersion); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(v1alpha1.WorkflowDefinitionVersion) + } + + return r0 +} + type ExecutableWorkflow_GetEventVersion struct { *mock.Call } diff --git a/pkg/apis/flyteworkflow/v1alpha1/mocks/Meta.go b/pkg/apis/flyteworkflow/v1alpha1/mocks/Meta.go index 4afbcae33..4d38f70c9 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/mocks/Meta.go +++ b/pkg/apis/flyteworkflow/v1alpha1/mocks/Meta.go @@ -84,6 +84,38 @@ func (_m *Meta) GetCreationTimestamp() v1.Time { return r0 } +type Meta_GetDefinitionVersion struct { + *mock.Call +} + +func (_m Meta_GetDefinitionVersion) Return(_a0 v1alpha1.WorkflowDefinitionVersion) *Meta_GetDefinitionVersion { + return &Meta_GetDefinitionVersion{Call: _m.Call.Return(_a0)} +} + +func (_m *Meta) OnGetDefinitionVersion() *Meta_GetDefinitionVersion { + c_call := _m.On("GetDefinitionVersion") + return &Meta_GetDefinitionVersion{Call: c_call} +} + +func (_m *Meta) OnGetDefinitionVersionMatch(matchers ...interface{}) *Meta_GetDefinitionVersion { + c_call := _m.On("GetDefinitionVersion", matchers...) + return &Meta_GetDefinitionVersion{Call: c_call} +} + +// GetDefinitionVersion provides a mock function with given fields: +func (_m *Meta) GetDefinitionVersion() v1alpha1.WorkflowDefinitionVersion { + ret := _m.Called() + + var r0 v1alpha1.WorkflowDefinitionVersion + if rf, ok := ret.Get(0).(func() v1alpha1.WorkflowDefinitionVersion); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(v1alpha1.WorkflowDefinitionVersion) + } + + return r0 +} + type Meta_GetEventVersion struct { *mock.Call } diff --git a/pkg/apis/flyteworkflow/v1alpha1/mocks/MetaExtended.go b/pkg/apis/flyteworkflow/v1alpha1/mocks/MetaExtended.go index e3f395bc3..5e0a048ad 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/mocks/MetaExtended.go +++ b/pkg/apis/flyteworkflow/v1alpha1/mocks/MetaExtended.go @@ -118,6 +118,38 @@ func (_m *MetaExtended) GetCreationTimestamp() v1.Time { return r0 } +type MetaExtended_GetDefinitionVersion struct { + *mock.Call +} + +func (_m MetaExtended_GetDefinitionVersion) Return(_a0 v1alpha1.WorkflowDefinitionVersion) *MetaExtended_GetDefinitionVersion { + return &MetaExtended_GetDefinitionVersion{Call: _m.Call.Return(_a0)} +} + +func (_m *MetaExtended) OnGetDefinitionVersion() *MetaExtended_GetDefinitionVersion { + c_call := _m.On("GetDefinitionVersion") + return &MetaExtended_GetDefinitionVersion{Call: c_call} +} + +func (_m *MetaExtended) OnGetDefinitionVersionMatch(matchers ...interface{}) *MetaExtended_GetDefinitionVersion { + c_call := _m.On("GetDefinitionVersion", matchers...) + return &MetaExtended_GetDefinitionVersion{Call: c_call} +} + +// GetDefinitionVersion provides a mock function with given fields: +func (_m *MetaExtended) GetDefinitionVersion() v1alpha1.WorkflowDefinitionVersion { + ret := _m.Called() + + var r0 v1alpha1.WorkflowDefinitionVersion + if rf, ok := ret.Get(0).(func() v1alpha1.WorkflowDefinitionVersion); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(v1alpha1.WorkflowDefinitionVersion) + } + + return r0 +} + type MetaExtended_GetEventVersion struct { *mock.Call } diff --git a/pkg/apis/flyteworkflow/v1alpha1/workflow.go b/pkg/apis/flyteworkflow/v1alpha1/workflow.go index d96a87d14..4aac6ef0f 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/workflow.go +++ b/pkg/apis/flyteworkflow/v1alpha1/workflow.go @@ -23,6 +23,15 @@ const ShardKeyspaceSize = 32 const StartNodeID = "start-node" const EndNodeID = "end-node" +type WorkflowDefinitionVersion uint32 + +var LatestWorkflowDefinitionVersion = WorkflowDefinitionVersion1 + +const ( + WorkflowDefinitionVersion0 WorkflowDefinitionVersion = iota + WorkflowDefinitionVersion1 +) + // +genclient // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -82,6 +91,14 @@ func (in *FlyteWorkflow) GetEventVersion() EventVersion { return EventVersion0 } +func (in *FlyteWorkflow) GetDefinitionVersion() WorkflowDefinitionVersion { + if in.Status.DefinitionVersion != nil { + return *in.Status.DefinitionVersion + } + + return WorkflowDefinitionVersion0 +} + func (in *FlyteWorkflow) GetExecutionConfig() ExecutionConfig { return in.ExecutionConfig } diff --git a/pkg/apis/flyteworkflow/v1alpha1/workflow_status.go b/pkg/apis/flyteworkflow/v1alpha1/workflow_status.go index 7abc6efca..e4cae0306 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/workflow_status.go +++ b/pkg/apis/flyteworkflow/v1alpha1/workflow_status.go @@ -35,6 +35,12 @@ type WorkflowStatus struct { // Stores the Error during the Execution of the Workflow. It is optional and usually associated with Failing/Failed state only Error *ExecutionError `json:"error,omitempty"` + // DefinitionVersion allows propeller code that populates the CRD to evolve (in backward incompatible ways) without + // affecting in-flight executions. Once an execution starts, propeller will populate this field with the current or + // latest version. If a newer propeller version is deployed midway that comes with a newer version, code that relies + // on the latest version should be gated behind this. + DefinitionVersion *WorkflowDefinitionVersion `json:"defVersion,omitempty"` + // non-Serialized fields DataReferenceConstructor storage.ReferenceConstructor `json:"-"` } diff --git a/pkg/controller/executors/mocks/execution_context.go b/pkg/controller/executors/mocks/execution_context.go index a949ed4dc..cc919f641 100644 --- a/pkg/controller/executors/mocks/execution_context.go +++ b/pkg/controller/executors/mocks/execution_context.go @@ -152,6 +152,38 @@ func (_m *ExecutionContext) GetCreationTimestamp() v1.Time { return r0 } +type ExecutionContext_GetDefinitionVersion struct { + *mock.Call +} + +func (_m ExecutionContext_GetDefinitionVersion) Return(_a0 v1alpha1.WorkflowDefinitionVersion) *ExecutionContext_GetDefinitionVersion { + return &ExecutionContext_GetDefinitionVersion{Call: _m.Call.Return(_a0)} +} + +func (_m *ExecutionContext) OnGetDefinitionVersion() *ExecutionContext_GetDefinitionVersion { + c_call := _m.On("GetDefinitionVersion") + return &ExecutionContext_GetDefinitionVersion{Call: c_call} +} + +func (_m *ExecutionContext) OnGetDefinitionVersionMatch(matchers ...interface{}) *ExecutionContext_GetDefinitionVersion { + c_call := _m.On("GetDefinitionVersion", matchers...) + return &ExecutionContext_GetDefinitionVersion{Call: c_call} +} + +// GetDefinitionVersion provides a mock function with given fields: +func (_m *ExecutionContext) GetDefinitionVersion() v1alpha1.WorkflowDefinitionVersion { + ret := _m.Called() + + var r0 v1alpha1.WorkflowDefinitionVersion + if rf, ok := ret.Get(0).(func() v1alpha1.WorkflowDefinitionVersion); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(v1alpha1.WorkflowDefinitionVersion) + } + + return r0 +} + type ExecutionContext_GetEventVersion struct { *mock.Call } diff --git a/pkg/controller/executors/mocks/immutable_execution_context.go b/pkg/controller/executors/mocks/immutable_execution_context.go index 6c899c811..1b4dfb6d0 100644 --- a/pkg/controller/executors/mocks/immutable_execution_context.go +++ b/pkg/controller/executors/mocks/immutable_execution_context.go @@ -85,6 +85,38 @@ func (_m *ImmutableExecutionContext) GetCreationTimestamp() v1.Time { return r0 } +type ImmutableExecutionContext_GetDefinitionVersion struct { + *mock.Call +} + +func (_m ImmutableExecutionContext_GetDefinitionVersion) Return(_a0 v1alpha1.WorkflowDefinitionVersion) *ImmutableExecutionContext_GetDefinitionVersion { + return &ImmutableExecutionContext_GetDefinitionVersion{Call: _m.Call.Return(_a0)} +} + +func (_m *ImmutableExecutionContext) OnGetDefinitionVersion() *ImmutableExecutionContext_GetDefinitionVersion { + c_call := _m.On("GetDefinitionVersion") + return &ImmutableExecutionContext_GetDefinitionVersion{Call: c_call} +} + +func (_m *ImmutableExecutionContext) OnGetDefinitionVersionMatch(matchers ...interface{}) *ImmutableExecutionContext_GetDefinitionVersion { + c_call := _m.On("GetDefinitionVersion", matchers...) + return &ImmutableExecutionContext_GetDefinitionVersion{Call: c_call} +} + +// GetDefinitionVersion provides a mock function with given fields: +func (_m *ImmutableExecutionContext) GetDefinitionVersion() v1alpha1.WorkflowDefinitionVersion { + ret := _m.Called() + + var r0 v1alpha1.WorkflowDefinitionVersion + if rf, ok := ret.Get(0).(func() v1alpha1.WorkflowDefinitionVersion); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(v1alpha1.WorkflowDefinitionVersion) + } + + return r0 +} + type ImmutableExecutionContext_GetEventVersion struct { *mock.Call } diff --git a/pkg/controller/handler.go b/pkg/controller/handler.go index c0bf7bb11..fed7ba622 100644 --- a/pkg/controller/handler.go +++ b/pkg/controller/handler.go @@ -78,11 +78,17 @@ type Propeller struct { cfg *config.Config } -// Initializes all downstream executors +// Initialize initializes all downstream executors func (p *Propeller) Initialize(ctx context.Context) error { return p.workflowExecutor.Initialize(ctx) } +func SetDefinitionVersionIfEmpty(wf *v1alpha1.FlyteWorkflow, version v1alpha1.WorkflowDefinitionVersion) { + if wf.Status.DefinitionVersion == nil { + wf.Status.DefinitionVersion = &version + } +} + // TryMutateWorkflow will try to mutate the workflow by traversing it and reconciling the desired and actual state. // The desired state here is the entire workflow is completed, actual state is each nodes current execution state. func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error) { @@ -120,6 +126,7 @@ func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.F if !mutableW.GetExecutionStatus().IsTerminated() { var err error SetFinalizerIfEmpty(mutableW, FinalizerKey) + SetDefinitionVersionIfEmpty(mutableW, v1alpha1.LatestWorkflowDefinitionVersion) func() { t := p.metrics.RawWorkflowTraversalTime.Start(ctx) diff --git a/pkg/controller/nodes/common/utils.go b/pkg/controller/nodes/common/utils.go index af5a25091..e6a5fb6fd 100644 --- a/pkg/controller/nodes/common/utils.go +++ b/pkg/controller/nodes/common/utils.go @@ -11,7 +11,7 @@ import ( const maxUniqueIDLength = 20 -// The UniqueId of a node is unique within a given workflow execution. +// GenerateUniqueID is the UniqueId of a node is unique within a given workflow execution. // In order to achieve that we track the lineage of the node. // To compute the uniqueID of a node, we use the uniqueID and retry attempt of the parent node // For nodes in level 0, there is no parent, and parentInfo is nil @@ -24,10 +24,11 @@ func GenerateUniqueID(parentInfo executors.ImmutableParentInfo, nodeID string) ( parentRetryAttempt = strconv.Itoa(int(parentInfo.CurrentAttempt())) } - return encoding.FixedLengthUniqueIDForParts(maxUniqueIDLength, parentUniqueID, parentRetryAttempt, nodeID) + return encoding.FixedLengthUniqueIDForParts(maxUniqueIDLength, []string{parentUniqueID, parentRetryAttempt, nodeID}) } -// When creating parentInfo, the unique id of parent is dependent on the unique id and the current attempt of the grand parent to track the lineage. +// CreateParentInfo creates a unique parent id, the unique id of parent is dependent on the unique id and the current +// attempt of the grandparent to track the lineage. func CreateParentInfo(grandParentInfo executors.ImmutableParentInfo, nodeID string, parentAttempt uint32) (executors.ImmutableParentInfo, error) { uniqueID, err := GenerateUniqueID(grandParentInfo, nodeID) if err != nil { diff --git a/pkg/controller/nodes/dynamic/dynamic_workflow_test.go b/pkg/controller/nodes/dynamic/dynamic_workflow_test.go index 4fbf42f6e..184b0ee6c 100644 --- a/pkg/controller/nodes/dynamic/dynamic_workflow_test.go +++ b/pkg/controller/nodes/dynamic/dynamic_workflow_test.go @@ -207,7 +207,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t assert.NotNil(t, dCtx.subWorkflowClosure) assert.NotNil(t, dCtx.execContext) assert.NotNil(t, dCtx.execContext.GetParentInfo()) - expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, "c1", "2", "n1") + expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, []string{"c1", "2", "n1"}) assert.Nil(t, err) assert.Equal(t, expectedParentUniqueID, dCtx.execContext.GetParentInfo().GetUniqueID()) assert.Equal(t, uint32(1), dCtx.execContext.GetParentInfo().CurrentAttempt()) @@ -276,7 +276,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t assert.NotNil(t, dCtx.subWorkflowClosure) assert.NotNil(t, dCtx.execContext) assert.NotNil(t, dCtx.execContext.GetParentInfo()) - expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, "", "", "n1") + expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, []string{"", "", "n1"}) assert.Nil(t, err) assert.Equal(t, expectedParentUniqueID, dCtx.execContext.GetParentInfo().GetUniqueID()) assert.Equal(t, uint32(1), dCtx.execContext.GetParentInfo().CurrentAttempt()) @@ -430,7 +430,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t assert.NotNil(t, dCtx.subWorkflow) assert.NotNil(t, dCtx.execContext) assert.NotNil(t, dCtx.execContext.GetParentInfo()) - expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, "c1", "2", "n1") + expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, []string{"c1", "2", "n1"}) assert.Nil(t, err) assert.Equal(t, expectedParentUniqueID, dCtx.execContext.GetParentInfo().GetUniqueID()) assert.Equal(t, uint32(1), dCtx.execContext.GetParentInfo().CurrentAttempt()) @@ -575,7 +575,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t assert.NotNil(t, dCtx.subWorkflow) assert.NotNil(t, dCtx.execContext) assert.NotNil(t, dCtx.execContext.GetParentInfo()) - expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, "c1", "2", "n1") + expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, []string{"c1", "2", "n1"}) assert.Nil(t, err) assert.Equal(t, expectedParentUniqueID, dCtx.execContext.GetParentInfo().GetUniqueID()) assert.Equal(t, uint32(1), dCtx.execContext.GetParentInfo().CurrentAttempt()) diff --git a/pkg/controller/nodes/dynamic/utils.go b/pkg/controller/nodes/dynamic/utils.go index dea537178..d08845856 100644 --- a/pkg/controller/nodes/dynamic/utils.go +++ b/pkg/controller/nodes/dynamic/utils.go @@ -28,7 +28,7 @@ func underlyingInterface(ctx context.Context, taskReader handler.TaskReader) (*c } func hierarchicalNodeID(parentNodeID, retryAttempt, nodeID string) (string, error) { - return encoding.FixedLengthUniqueIDForParts(20, parentNodeID, retryAttempt, nodeID) + return encoding.FixedLengthUniqueIDForParts(20, []string{parentNodeID, retryAttempt, nodeID}) } func updateBindingNodeIDsWithLineage(parentNodeID, retryAttempt string, binding *core.BindingData) (err error) { diff --git a/pkg/controller/nodes/resolve_test.go b/pkg/controller/nodes/resolve_test.go index 9069a1035..533e0ebfd 100644 --- a/pkg/controller/nodes/resolve_test.go +++ b/pkg/controller/nodes/resolve_test.go @@ -34,6 +34,10 @@ type dummyBaseWorkflow struct { Interruptible bool } +func (d *dummyBaseWorkflow) GetDefinitionVersion() v1alpha1.WorkflowDefinitionVersion { + return v1alpha1.WorkflowDefinitionVersion1 +} + func (d *dummyBaseWorkflow) GetParentInfo() executors.ImmutableParentInfo { return nil } diff --git a/pkg/controller/nodes/subworkflow/handler_test.go b/pkg/controller/nodes/subworkflow/handler_test.go index 13a997a7c..17d9ee2d7 100644 --- a/pkg/controller/nodes/subworkflow/handler_test.go +++ b/pkg/controller/nodes/subworkflow/handler_test.go @@ -113,6 +113,7 @@ func createNodeContextWithVersion(phase v1alpha1.WorkflowNodePhase, n v1alpha1.E ex.OnGetAnnotations().Return(nil) ex.OnGetLabels().Return(nil) ex.OnGetRawOutputDataConfig().Return(v1alpha1.RawOutputDataConfig{}) + ex.OnGetDefinitionVersion().Return(v1alpha1.WorkflowDefinitionVersion1) nCtx.OnExecutionContext().Return(ex) @@ -156,7 +157,6 @@ func TestWorkflowNodeHandler_StartNode_Launchplan(t *testing.T) { recoveryClient := &mocks5.Client{} t.Run("happy v0", func(t *testing.T) { - mockLPExec := &mocks.Executor{} h := New(nil, mockLPExec, recoveryClient, eventConfig, promutils.NewTestScope()) mockLPExec.OnLaunchMatch( @@ -321,6 +321,7 @@ func TestWorkflowNodeHandler_AbortNode(t *testing.T) { ).Return(nil) eCtx := &execMocks.ExecutionContext{} + eCtx.OnGetDefinitionVersion().Return(v1alpha1.WorkflowDefinitionVersion1) nCtx.OnExecutionContext().Return(eCtx) eCtx.OnGetName().Return("test") err := h.Abort(ctx, nCtx, "test") @@ -342,6 +343,7 @@ func TestWorkflowNodeHandler_AbortNode(t *testing.T) { ).Return(nil) eCtx := &execMocks.ExecutionContext{} + eCtx.OnGetDefinitionVersion().Return(v1alpha1.WorkflowDefinitionVersion1) nCtx.OnExecutionContext().Return(eCtx) eCtx.OnGetName().Return("test") err := h.Abort(ctx, nCtx, "test") @@ -362,6 +364,7 @@ func TestWorkflowNodeHandler_AbortNode(t *testing.T) { nCtx := createNodeContext(v1alpha1.WorkflowNodePhaseExecuting, mockNode, mockNodeStatus) eCtx := &execMocks.ExecutionContext{} + eCtx.OnGetDefinitionVersion().Return(v1alpha1.WorkflowDefinitionVersion1) nCtx.OnExecutionContext().Return(eCtx) eCtx.OnGetName().Return("test") diff --git a/pkg/controller/nodes/subworkflow/launchplan.go b/pkg/controller/nodes/subworkflow/launchplan.go index ffcc5b9e1..ec972d9fd 100644 --- a/pkg/controller/nodes/subworkflow/launchplan.go +++ b/pkg/controller/nodes/subworkflow/launchplan.go @@ -56,10 +56,12 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No if err != nil { return handler.UnknownTransition, err } - childID, err := GetChildWorkflowExecutionID( + + childID, err := GetChildWorkflowExecutionIDForExecution( parentNodeExecutionID, - nCtx.CurrentAttempt(), + nCtx, ) + if err != nil { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.RuntimeExecutionError, "failed to create unique ID", nil)), nil } @@ -120,15 +122,31 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No })), nil } +func GetChildWorkflowExecutionIDForExecution(parentNodeExecID *core.NodeExecutionIdentifier, nCtx handler.NodeExecutionContext) (*core.WorkflowExecutionIdentifier, error) { + // Handle launch plan + if nCtx.ExecutionContext().GetDefinitionVersion() == v1alpha1.WorkflowDefinitionVersion0 { + return GetChildWorkflowExecutionID( + parentNodeExecID, + nCtx.CurrentAttempt(), + ) + } + + return GetChildWorkflowExecutionIDV2( + parentNodeExecID, + nCtx.CurrentAttempt(), + ) +} + func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx handler.NodeExecutionContext) (handler.Transition, error) { parentNodeExecutionID, err := getParentNodeExecutionID(nCtx) if err != nil { return handler.UnknownTransition, err } + // Handle launch plan - childID, err := GetChildWorkflowExecutionID( + childID, err := GetChildWorkflowExecutionIDForExecution( parentNodeExecutionID, - nCtx.CurrentAttempt(), + nCtx, ) if err != nil { @@ -213,9 +231,9 @@ func (l *launchPlanHandler) HandleAbort(ctx context.Context, nCtx handler.NodeEx if err != nil { return err } - childID, err := GetChildWorkflowExecutionID( + childID, err := GetChildWorkflowExecutionIDForExecution( parentNodeExecutionID, - nCtx.CurrentAttempt(), + nCtx, ) if err != nil { // THIS SHOULD NEVER HAPPEN diff --git a/pkg/controller/nodes/subworkflow/launchplan_test.go b/pkg/controller/nodes/subworkflow/launchplan_test.go index 6022ccf2e..4c8f6806e 100644 --- a/pkg/controller/nodes/subworkflow/launchplan_test.go +++ b/pkg/controller/nodes/subworkflow/launchplan_test.go @@ -67,7 +67,6 @@ func TestSubWorkflowHandler_StartLaunchPlan(t *testing.T) { mockNodeStatus.On("GetAttempts").Return(attempts) t.Run("happy", func(t *testing.T) { - mockLPExec := &mocks.Executor{} h := launchPlanHandler{ @@ -248,6 +247,7 @@ func TestSubWorkflowHandler_StartLaunchPlan(t *testing.T) { nCtx.OnNodeExecutionMetadata().Return(nm) ectx := &execMocks.ExecutionContext{} + ectx.OnGetDefinitionVersion().Return(v1alpha1.WorkflowDefinitionVersion1) ectx.OnGetEventVersion().Return(1) ectx.OnGetParentInfo().Return(nil) ectx.OnGetExecutionConfig().Return(v1alpha1.ExecutionConfig{ diff --git a/pkg/controller/nodes/subworkflow/util.go b/pkg/controller/nodes/subworkflow/util.go index 0b5bf715b..9eed0c8a5 100644 --- a/pkg/controller/nodes/subworkflow/util.go +++ b/pkg/controller/nodes/subworkflow/util.go @@ -11,10 +11,26 @@ import ( const maxLengthForSubWorkflow = 20 func GetChildWorkflowExecutionID(nodeExecID *core.NodeExecutionIdentifier, attempt uint32) (*core.WorkflowExecutionIdentifier, error) { - name, err := encoding.FixedLengthUniqueIDForParts(maxLengthForSubWorkflow, nodeExecID.ExecutionId.Name, nodeExecID.NodeId, strconv.Itoa(int(attempt))) + name, err := encoding.FixedLengthUniqueIDForParts(maxLengthForSubWorkflow, []string{nodeExecID.ExecutionId.Name, nodeExecID.NodeId, strconv.Itoa(int(attempt))}) if err != nil { return nil, err } + + // Restriction on name is 20 chars + return &core.WorkflowExecutionIdentifier{ + Project: nodeExecID.ExecutionId.Project, + Domain: nodeExecID.ExecutionId.Domain, + Name: name, + }, nil +} + +func GetChildWorkflowExecutionIDV2(nodeExecID *core.NodeExecutionIdentifier, attempt uint32) (*core.WorkflowExecutionIdentifier, error) { + name, err := encoding.FixedLengthUniqueIDForParts(maxLengthForSubWorkflow, []string{nodeExecID.ExecutionId.Name, nodeExecID.NodeId, strconv.Itoa(int(attempt))}, + encoding.NewAlgorithmOption(encoding.Algorithm64)) + if err != nil { + return nil, err + } + // Restriction on name is 20 chars return &core.WorkflowExecutionIdentifier{ Project: nodeExecID.ExecutionId.Project, diff --git a/pkg/controller/nodes/task/taskexec_context.go b/pkg/controller/nodes/task/taskexec_context.go index ceb5849d6..8a81fdf60 100644 --- a/pkg/controller/nodes/task/taskexec_context.go +++ b/pkg/controller/nodes/task/taskexec_context.go @@ -204,7 +204,7 @@ func convertTaskResourcesToRequirements(taskResources v1alpha1.TaskResources) *v // access to this location and can be passed in per execution. // the function also returns the uniqueID generated func ComputeRawOutputPrefix(ctx context.Context, length int, nCtx handler.NodeExecutionContext, currentNodeUniqueID v1alpha1.NodeID, currentAttempt uint32) (io.RawOutputPaths, string, error) { - uniqueID, err := encoding.FixedLengthUniqueIDForParts(length, nCtx.NodeExecutionMetadata().GetOwnerID().Name, currentNodeUniqueID, strconv.Itoa(int(currentAttempt))) + uniqueID, err := encoding.FixedLengthUniqueIDForParts(length, []string{nCtx.NodeExecutionMetadata().GetOwnerID().Name, currentNodeUniqueID, strconv.Itoa(int(currentAttempt))}) if err != nil { // SHOULD never really happen return nil, uniqueID, err