Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Use fnv64 to create a hash for child workflow executions #476

Merged
merged 15 commits into from
Sep 9, 2022
Merged
2 changes: 1 addition & 1 deletion cmd/kubectl-flyte/cmd/printers/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (p NodeStatusPrinter) BaseNodeInfo(node v1alpha1.BaseNode, nodeStatus v1alp
}

func (p NodeStatusPrinter) NodeInfo(wName string, node v1alpha1.BaseNode, nodeStatus v1alpha1.ExecutableNodeStatus) []string {
resourceName, err := encoding.FixedLengthUniqueIDForParts(task.IDMaxLength, wName, node.GetID(), strconv.Itoa(int(nodeStatus.GetAttempts())))
resourceName, err := encoding.FixedLengthUniqueIDForParts(task.IDMaxLength, []string{wName, node.GetID(), strconv.Itoa(int(nodeStatus.GetAttempts()))})
if err != nil {
resourceName = "na"
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/kubectl-flyte/cmd/string_map_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func newStringMapValue() *stringMapValue {

var entryRegex = regexp.MustCompile("(?P<EntryKey>[^,]+)=(?P<EntryValue>[^,]+)")

// Parses val into a map. Accepted format: a=1,b=2
// Set parses val into a map. Accepted format: a=1,b=2
func (s *stringMapValue) Set(val string) error {
matches := entryRegex.FindAllStringSubmatch(val, -1)
out := make(map[string]string, len(matches))
Expand Down
2 changes: 1 addition & 1 deletion cmd/kubectl-flyte/cmd/string_map_value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func formatArg(values map[string]string) string {

func randSpaces() string {
res := ""
for cnt := rand.Int() % 10; cnt > 0; cnt-- { // nolint: gas
for cnt := rand.Int()%10 + 1; cnt > 0; cnt-- { // nolint: gas
res += " "
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.13.0
github.com/flyteorg/flyteidl v1.1.10
github.com/flyteorg/flyteplugins v1.0.10
github.com/flyteorg/flyteplugins v1.0.13
github.com/flyteorg/flytestdlib v1.0.5
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.1.10 h1:Bus/JUto0oBTjAS4EBN7EITeuZNS4naq+uFpj+ydaW4=
github.com/flyteorg/flyteidl v1.1.10/go.mod h1:f1tvw5CDjqmrzNxKpRYr6BdAhHL8f7Wp1Duxl0ZOV4g=
github.com/flyteorg/flyteplugins v1.0.10 h1:XBycM4aOSE/WlI8iP9vqogKGXy4FMfVCUUfzxJus/p4=
github.com/flyteorg/flyteplugins v1.0.10/go.mod h1:GfbmRByI/rSatm/Epoj3bNyrXwIQ9NOXTVwLS6Z0p84=
github.com/flyteorg/flyteplugins v1.0.13 h1:mNGImGSdGsYUjmB9vUzZAWqh/h7FCH+MyMRPS78z6Z0=
github.com/flyteorg/flyteplugins v1.0.13/go.mod h1:GfbmRByI/rSatm/Epoj3bNyrXwIQ9NOXTVwLS6Z0p84=
github.com/flyteorg/flytestdlib v1.0.0/go.mod h1:QSVN5wIM1lM9d60eAEbX7NwweQXW96t5x4jbyftn89c=
github.com/flyteorg/flytestdlib v1.0.5 h1:80A/vfpAJl+pgU6vxccbsYApZPrvyGhOIsCAFngsjnk=
github.com/flyteorg/flytestdlib v1.0.5/go.mod h1:WTe0k3DmmrKFjj3hwiIbjjdCK89X63MBzBbXhQ4Yxf0=
Expand Down
5 changes: 3 additions & 2 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ type MutableDynamicNodeStatus interface {
SetExecutionError(executionError *core.ExecutionError)
}

// Interface for Branch node. All the methods are purely read only except for the GetExecutionStatus.
// p returns ExecutableBranchNodeStatus, which permits some mutations
// ExecutableBranchNode is an interface for Branch node. All the methods are purely read only except for the
// GetExecutionStatus. p returns ExecutableBranchNodeStatus, which permits some mutations
type ExecutableBranchNode interface {
GetIf() ExecutableIfBlock
GetElse() *NodeID
Expand Down Expand Up @@ -443,6 +443,7 @@ type Meta interface {
GetSecurityContext() core.SecurityContext
IsInterruptible() bool
GetEventVersion() EventVersion
GetDefinitionVersion() WorkflowDefinitionVersion
GetRawOutputDataConfig() RawOutputDataConfig
}

Expand Down
32 changes: 32 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/Meta.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/MetaExtended.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ const ShardKeyspaceSize = 32
const StartNodeID = "start-node"
const EndNodeID = "end-node"

type WorkflowDefinitionVersion uint32

var LatestWorkflowDefinitionVersion = WorkflowDefinitionVersion1

const (
WorkflowDefinitionVersion0 WorkflowDefinitionVersion = iota
WorkflowDefinitionVersion1
)

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

Expand Down Expand Up @@ -82,6 +91,14 @@ func (in *FlyteWorkflow) GetEventVersion() EventVersion {
return EventVersion0
}

func (in *FlyteWorkflow) GetDefinitionVersion() WorkflowDefinitionVersion {
if in.Status.DefinitionVersion != nil {
return *in.Status.DefinitionVersion
}

return WorkflowDefinitionVersion0
}

func (in *FlyteWorkflow) GetExecutionConfig() ExecutionConfig {
return in.ExecutionConfig
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/workflow_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ type WorkflowStatus struct {
// Stores the Error during the Execution of the Workflow. It is optional and usually associated with Failing/Failed state only
Error *ExecutionError `json:"error,omitempty"`

// DefinitionVersion allows propeller code that populates the CRD to evolve (in backward incompatible ways) without
// affecting in-flight executions. Once an execution starts, propeller will populate this field with the current or
// latest version. If a newer propeller version is deployed midway that comes with a newer version, code that relies
// on the latest version should be gated behind this.
DefinitionVersion *WorkflowDefinitionVersion `json:"defVersion,omitempty"`

// non-Serialized fields
DataReferenceConstructor storage.ReferenceConstructor `json:"-"`
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/controller/executors/mocks/execution_context.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions pkg/controller/executors/mocks/immutable_execution_context.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion pkg/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,17 @@ type Propeller struct {
cfg *config.Config
}

// Initializes all downstream executors
// Initialize initializes all downstream executors
func (p *Propeller) Initialize(ctx context.Context) error {
return p.workflowExecutor.Initialize(ctx)
}

func SetDefinitionVersionIfEmpty(wf *v1alpha1.FlyteWorkflow, version v1alpha1.WorkflowDefinitionVersion) {
if wf.Status.DefinitionVersion == nil {
wf.Status.DefinitionVersion = &version
}
}

// TryMutateWorkflow will try to mutate the workflow by traversing it and reconciling the desired and actual state.
// The desired state here is the entire workflow is completed, actual state is each nodes current execution state.
func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error) {
Expand Down Expand Up @@ -120,6 +126,7 @@ func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.F
if !mutableW.GetExecutionStatus().IsTerminated() {
var err error
SetFinalizerIfEmpty(mutableW, FinalizerKey)
SetDefinitionVersionIfEmpty(mutableW, v1alpha1.LatestWorkflowDefinitionVersion)

func() {
t := p.metrics.RawWorkflowTraversalTime.Start(ctx)
Expand Down
7 changes: 4 additions & 3 deletions pkg/controller/nodes/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

const maxUniqueIDLength = 20

// The UniqueId of a node is unique within a given workflow execution.
// GenerateUniqueID is the UniqueId of a node is unique within a given workflow execution.
// In order to achieve that we track the lineage of the node.
// To compute the uniqueID of a node, we use the uniqueID and retry attempt of the parent node
// For nodes in level 0, there is no parent, and parentInfo is nil
Expand All @@ -24,10 +24,11 @@ func GenerateUniqueID(parentInfo executors.ImmutableParentInfo, nodeID string) (
parentRetryAttempt = strconv.Itoa(int(parentInfo.CurrentAttempt()))
}

return encoding.FixedLengthUniqueIDForParts(maxUniqueIDLength, parentUniqueID, parentRetryAttempt, nodeID)
return encoding.FixedLengthUniqueIDForParts(maxUniqueIDLength, []string{parentUniqueID, parentRetryAttempt, nodeID})
}

// When creating parentInfo, the unique id of parent is dependent on the unique id and the current attempt of the grand parent to track the lineage.
// CreateParentInfo creates a unique parent id, the unique id of parent is dependent on the unique id and the current
// attempt of the grandparent to track the lineage.
func CreateParentInfo(grandParentInfo executors.ImmutableParentInfo, nodeID string, parentAttempt uint32) (executors.ImmutableParentInfo, error) {
uniqueID, err := GenerateUniqueID(grandParentInfo, nodeID)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/nodes/dynamic/dynamic_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t
assert.NotNil(t, dCtx.subWorkflowClosure)
assert.NotNil(t, dCtx.execContext)
assert.NotNil(t, dCtx.execContext.GetParentInfo())
expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, "c1", "2", "n1")
expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, []string{"c1", "2", "n1"})
assert.Nil(t, err)
assert.Equal(t, expectedParentUniqueID, dCtx.execContext.GetParentInfo().GetUniqueID())
assert.Equal(t, uint32(1), dCtx.execContext.GetParentInfo().CurrentAttempt())
Expand Down Expand Up @@ -276,7 +276,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t
assert.NotNil(t, dCtx.subWorkflowClosure)
assert.NotNil(t, dCtx.execContext)
assert.NotNil(t, dCtx.execContext.GetParentInfo())
expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, "", "", "n1")
expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, []string{"", "", "n1"})
assert.Nil(t, err)
assert.Equal(t, expectedParentUniqueID, dCtx.execContext.GetParentInfo().GetUniqueID())
assert.Equal(t, uint32(1), dCtx.execContext.GetParentInfo().CurrentAttempt())
Expand Down Expand Up @@ -430,7 +430,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t
assert.NotNil(t, dCtx.subWorkflow)
assert.NotNil(t, dCtx.execContext)
assert.NotNil(t, dCtx.execContext.GetParentInfo())
expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, "c1", "2", "n1")
expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, []string{"c1", "2", "n1"})
assert.Nil(t, err)
assert.Equal(t, expectedParentUniqueID, dCtx.execContext.GetParentInfo().GetUniqueID())
assert.Equal(t, uint32(1), dCtx.execContext.GetParentInfo().CurrentAttempt())
Expand Down Expand Up @@ -575,7 +575,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t
assert.NotNil(t, dCtx.subWorkflow)
assert.NotNil(t, dCtx.execContext)
assert.NotNil(t, dCtx.execContext.GetParentInfo())
expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, "c1", "2", "n1")
expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, []string{"c1", "2", "n1"})
assert.Nil(t, err)
assert.Equal(t, expectedParentUniqueID, dCtx.execContext.GetParentInfo().GetUniqueID())
assert.Equal(t, uint32(1), dCtx.execContext.GetParentInfo().CurrentAttempt())
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/nodes/dynamic/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func underlyingInterface(ctx context.Context, taskReader handler.TaskReader) (*c
}

func hierarchicalNodeID(parentNodeID, retryAttempt, nodeID string) (string, error) {
return encoding.FixedLengthUniqueIDForParts(20, parentNodeID, retryAttempt, nodeID)
return encoding.FixedLengthUniqueIDForParts(20, []string{parentNodeID, retryAttempt, nodeID})
}

func updateBindingNodeIDsWithLineage(parentNodeID, retryAttempt string, binding *core.BindingData) (err error) {
Expand Down
Loading