Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow step restart on workflow retry. Closes #2334 #2431

Merged
merged 4 commits into from
Apr 12, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -6147,6 +6147,13 @@
},
"namespace": {
"type": "string"
},
"nodeFieldSelector": {
"type": "string"
},
"restartSuccesful": {
"type": "boolean",
"format": "boolean"
}
}
},
Expand Down
22 changes: 20 additions & 2 deletions cmd/argo/commands/retry.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
package commands

import (
"log"

"github.com/argoproj/pkg/errors"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/fields"

"github.com/argoproj/argo/cmd/argo/commands/client"
workflowpkg "github.com/argoproj/argo/pkg/apiclient/workflow"
)

type retryOps struct {
nodeFieldSelector string // --node-field-selector
restartSuccessful bool // --restart-successful
}

func NewRetryCommand() *cobra.Command {
var (
cliSubmitOpts cliSubmitOpts
retryOps retryOps
)
var command = &cobra.Command{
Use: "retry [WORKFLOW...]",
Expand All @@ -20,10 +29,17 @@ func NewRetryCommand() *cobra.Command {
serviceClient := apiClient.NewWorkflowServiceClient()
namespace := client.Namespace()

selector, err := fields.ParseSelector(retryOps.nodeFieldSelector)
if err != nil {
log.Fatalf("Unable to parse node field selector '%s': %s", retryOps.nodeFieldSelector, err)
}

for _, name := range args {
wf, err := serviceClient.RetryWorkflow(ctx, &workflowpkg.WorkflowRetryRequest{
Name: name,
Namespace: namespace,
Name: name,
Namespace: namespace,
RestartSuccesful: retryOps.restartSuccessful,
NodeFieldSelector: selector.String(),
})
if err != nil {
errors.CheckError(err)
Expand All @@ -37,5 +53,7 @@ func NewRetryCommand() *cobra.Command {
command.Flags().StringVarP(&cliSubmitOpts.output, "output", "o", "", "Output format. One of: name|json|yaml|wide")
command.Flags().BoolVarP(&cliSubmitOpts.wait, "wait", "w", false, "wait for the workflow to complete")
command.Flags().BoolVar(&cliSubmitOpts.watch, "watch", false, "watch the workflow until it completes")
command.Flags().BoolVar(&retryOps.restartSuccessful, "restart-successful", false, "indicates to restart succesful nodes matching the --node-field-selector")
command.Flags().StringVar(&retryOps.nodeFieldSelector, "node-field-selector", "", "selector of nodes to reset, eg: --node-field-selector inputs.paramaters.myparam.value=abc")
return command
}
242 changes: 168 additions & 74 deletions pkg/apiclient/workflow/workflow.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pkg/apiclient/workflow/workflow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ message WorkflowResubmitRequest {
message WorkflowRetryRequest {
string name = 1;
string namespace = 2;
bool restartSuccesful = 3;
string nodeFieldSelector = 4;
}
message WorkflowResumeRequest {
string name = 1;
Expand Down
7 changes: 7 additions & 0 deletions pkg/apiclient/workflow/workflow.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -4943,6 +4943,13 @@
},
"namespace": {
"type": "string"
},
"restartSuccesful": {
"type": "boolean",
"format": "boolean"
},
"nodeFieldSelector": {
"type": "string"
}
}
},
Expand Down
2 changes: 1 addition & 1 deletion server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.Wor
return nil, err
}

wf, err = util.RetryWorkflow(kubeClient, s.offloadNodeStatusRepo, wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), wf)
wf, err = util.RetryWorkflow(kubeClient, s.offloadNodeStatusRepo, wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), wf, req.RestartSuccesful, req.NodeFieldSelector)
if err != nil {
return nil, err
}
Expand Down
33 changes: 29 additions & 4 deletions test/e2e/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,17 +476,42 @@ func (s *CLISuite) TestWorkflowRetryNoPersistence() {
// When this is the case, this behavior is tested in cli_with_server_test.go
s.T().SkipNow()
}

var retryTime corev1.Time

s.Given().
Workflow("@testdata/exit-1.yaml").
Workflow("@testdata/retry-test.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(30*time.Second).
Given().
RunCli([]string{"retry", "exit-1"}, func(t *testing.T, output string, err error) {
WaitForWorkflowToStart(5*time.Second).
WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool {
return wf.Status.AnyActiveSuspendNode()
}, "suspended node", 30*time.Second).
RunCli([]string{"terminate", "retry-test"}, func(t *testing.T, output string, err error) {
if assert.NoError(t, err) {
assert.Contains(t, output, "workflow retry-test terminated")
}
}).
WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool {
retryTime = wf.Status.FinishedAt
return wf.Status.Phase == wfv1.NodeFailed
}, "terminated", 20*time.Second).
RunCli([]string{"retry", "retry-test", "--restart-successful", "--node-field-selector", "templateName==steps-inner"}, func(t *testing.T, output string, err error) {
if assert.NoError(t, err) {
assert.Contains(t, output, "Name:")
assert.Contains(t, output, "Namespace:")
}
}).
WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool {
return wf.Status.AnyActiveSuspendNode()
}, "suspended node", 20*time.Second).
Then().
ExpectWorkflow(func(t *testing.T, _ *corev1.ObjectMeta, status *wfv1.WorkflowStatus) {
outerStepsPodNode := status.Nodes.FindByDisplayName("steps-outer-step1")
innerStepsPodNode := status.Nodes.FindByDisplayName("steps-inner-step1")

assert.True(t, outerStepsPodNode.FinishedAt.Before(&retryTime))
assert.True(t, retryTime.Before(&innerStepsPodNode.FinishedAt))
})
}

Expand Down
31 changes: 31 additions & 0 deletions test/e2e/testdata/retry-test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: retry-test
labels:
argo-e2e: "true"
spec:
entrypoint: steps-outer
templates:
- name: steps-outer
steps:
- - name: steps-outer-step1
template: whalesay
- - name: steps-outer-step2
template: steps-inner

- name: steps-inner
steps:
- - name: steps-inner-step1
template: whalesay
- - name: steps-inner-step2
template: approve

- name: approve
suspend: {}

- name: whalesay
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
25 changes: 21 additions & 4 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (woc *wfOperationCtx) operate() {
woc.workflowDeadline = woc.getWorkflowDeadline()
err := woc.podReconciliation()
if err == nil {
err = woc.failSuspendedNodesAfterDeadline()
err = woc.failSuspendedNodesAfterDeadlineOrShutdown()
}
if err != nil {
woc.log.Errorf("%s error: %+v", woc.wf.ObjectMeta.Name, err)
Expand Down Expand Up @@ -636,6 +636,17 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
return woc.markNodePhase(node.Name, wfv1.NodeSucceeded), true, nil
}

if woc.wf.Spec.Shutdown != "" || (woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)) {
var message string
if woc.wf.Spec.Shutdown != "" {
message = fmt.Sprintf("Stopped with strategy '%s'", woc.wf.Spec.Shutdown)
} else {
message = fmt.Sprintf("retry exceeded workflow deadline %s", *woc.workflowDeadline)
}
woc.log.Infoln(message)
return woc.markNodePhase(node.Name, lastChildNode.Phase, message), true, nil
}
Comment on lines +639 to +648
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this here? Isn't this covered by failSuspendedNodesAfterDeadlineOrShutdown()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this, in the case of a retry parent node it just keeps retrying pods that continually fail because they are being executed after the deadline. The integration test didn't work without it.


if retryStrategy.Backoff != nil {
// Process max duration limit
if retryStrategy.Backoff.MaxDuration != "" && len(node.Children) > 0 {
Expand Down Expand Up @@ -808,11 +819,17 @@ func (woc *wfOperationCtx) shouldPrintPodSpec(node wfv1.NodeStatus) bool {
}

//fails any suspended nodes if the workflow deadline has passed
func (woc *wfOperationCtx) failSuspendedNodesAfterDeadline() error {
if woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline) {
func (woc *wfOperationCtx) failSuspendedNodesAfterDeadlineOrShutdown() error {
if woc.wf.Spec.Shutdown != "" || (woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)) {
Comment on lines +822 to +823
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this!

for _, node := range woc.wf.Status.Nodes {
if node.IsActiveSuspendNode() {
woc.markNodePhase(node.Name, wfv1.NodeFailed, fmt.Sprintf("step exceeded workflow deadline %s", *woc.workflowDeadline))
var message string
if woc.wf.Spec.Shutdown != "" {
message = fmt.Sprintf("Stopped with strategy '%s'", woc.wf.Spec.Shutdown)
} else {
message = fmt.Sprintf("step exceeded workflow deadline %s", *woc.workflowDeadline)
}
woc.markNodePhase(node.Name, wfv1.NodeFailed, message)
}
}
}
Expand Down
64 changes: 51 additions & 13 deletions workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,20 @@ func ResumeWorkflow(wfIf v1alpha1.WorkflowInterface, repo sqldb.OffloadNodeStatu
}
}

func selectorMatchesNode(selector fields.Selector, node wfv1.NodeStatus) bool {
nodeFields := fields.Set{
"displayName": node.DisplayName,
"templateName": node.TemplateName,
}
if node.Inputs != nil {
for _, inParam := range node.Inputs.Parameters {
nodeFields[fmt.Sprintf("inputs.parameters.%s.value", inParam.Name)] = *inParam.Value
}
}

return selector.Matches(nodeFields)
}

func updateWorkflowNodeByKey(wfIf v1alpha1.WorkflowInterface, workflowName string, nodeFieldSelector string, phase wfv1.NodePhase, message string) error {
selector, err := fields.ParseSelector(nodeFieldSelector)

Expand All @@ -441,16 +455,7 @@ func updateWorkflowNodeByKey(wfIf v1alpha1.WorkflowInterface, workflowName strin
nodeUpdated := false
for nodeID, node := range wf.Status.Nodes {
if node.IsActiveSuspendNode() {
nodeFields := fields.Set{
"displayName": node.DisplayName,
}
if node.Inputs != nil {
for _, inParam := range node.Inputs.Parameters {
nodeFields[fmt.Sprintf("inputs.parameters.%s.value", inParam.Name)] = *inParam.Value
}
}

if selector.Matches(nodeFields) {
if selectorMatchesNode(selector, node) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

node.Phase = phase
node.FinishedAt = metav1.Time{Time: time.Now().UTC()}
if len(message) > 0 {
Expand Down Expand Up @@ -612,7 +617,7 @@ func convertNodeID(newWf *wfv1.Workflow, regex *regexp.Regexp, oldNodeID string,
}

// RetryWorkflow updates a workflow, deleting all failed steps as well as the onExit node (and children)
func RetryWorkflow(kubeClient kubernetes.Interface, repo sqldb.OffloadNodeStatusRepo, wfClient v1alpha1.WorkflowInterface, wf *wfv1.Workflow) (*wfv1.Workflow, error) {
func RetryWorkflow(kubeClient kubernetes.Interface, repo sqldb.OffloadNodeStatusRepo, wfClient v1alpha1.WorkflowInterface, wf *wfv1.Workflow, restartSuccessful bool, nodeFieldSelector string) (*wfv1.Workflow, error) {
switch wf.Status.Phase {
case wfv1.NodeFailed, wfv1.NodeError:
default:
Expand All @@ -627,13 +632,41 @@ func RetryWorkflow(kubeClient kubernetes.Interface, repo sqldb.OffloadNodeStatus
newWF := wf.DeepCopy()
podIf := kubeClient.CoreV1().Pods(wf.ObjectMeta.Namespace)

// Get all children of nodes that match filter
nodeIDsToReset := make(map[string]bool)
if restartSuccessful && len(nodeFieldSelector) > 0 {
selector, err := fields.ParseSelector(nodeFieldSelector)
if err != nil {
return nil, err
} else {
for _, node := range wf.Status.Nodes {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @mark9white, because of #2645 this actually needs to be moved further down the code. Workflows with offloaded nodes are only retrieved starting in line 678, so if a Workflow has offloaded nodes, wf.Status.Nodes will be nil at this point in the code and the node field selector will have no effect.

While you're at this, would you mind extracting this block out to a helper function? RetryWorkflow is already a bit cluttered 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if selectorMatchesNode(selector, node) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this code could be included in the large for loop starting at line 690. What do you think? Adding it there could save us an iteration through all the nodes. If you do decide to add it, please make sure it's added via a helper function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to determine the list of nodes including child nodes first.

//traverse all children of the node
var queue []string
queue = append(queue, node.ID)

for len(queue) > 0 {
childNode := queue[0]
//if the child isn't already in nodeIDsToReset then we add it and traverse its children
if _, present := nodeIDsToReset[childNode]; !present {
nodeIDsToReset[childNode] = true
queue = append(queue, wf.Status.Nodes[childNode].Children...)
}
queue = queue[1:]
}
}
}
}
}

// Delete/reset fields which indicate workflow completed
delete(newWF.Labels, common.LabelKeyCompleted)
newWF.Status.Conditions.UpsertCondition(wfv1.WorkflowCondition{Status: metav1.ConditionFalse, Type: wfv1.WorkflowConditionCompleted})
newWF.ObjectMeta.Labels[common.LabelKeyPhase] = string(wfv1.NodeRunning)
newWF.Status.Phase = wfv1.NodeRunning
newWF.Status.Message = ""
newWF.Status.FinishedAt = metav1.Time{}
newWF.Spec.Shutdown = ""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

if newWF.Spec.ActiveDeadlineSeconds != nil && *newWF.Spec.ActiveDeadlineSeconds == 0 {
// if it was terminated, unset the deadline
newWF.Spec.ActiveDeadlineSeconds = nil
Expand All @@ -655,14 +688,19 @@ func RetryWorkflow(kubeClient kubernetes.Interface, repo sqldb.OffloadNodeStatus
}

for _, node := range nodes {
var doForceResetNode = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor:

Suggested change
var doForceResetNode = false
forceResetNode := false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if _, present := nodeIDsToReset[node.ID]; present {
// if we are resetting this node then don't carry it across regardless of its phase
doForceResetNode = true
}
switch node.Phase {
case wfv1.NodeSucceeded, wfv1.NodeSkipped:
if !strings.HasPrefix(node.Name, onExitNodeName) {
if !strings.HasPrefix(node.Name, onExitNodeName) && !doForceResetNode {
newNodes[node.ID] = node
continue
}
case wfv1.NodeError, wfv1.NodeFailed:
if !strings.HasPrefix(node.Name, onExitNodeName) && (node.Type == wfv1.NodeTypeDAG || node.Type == wfv1.NodeTypeStepGroup) {
if !strings.HasPrefix(node.Name, onExitNodeName) && (node.Type == wfv1.NodeTypeDAG || node.Type == wfv1.NodeTypeStepGroup) && !doForceResetNode {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this && ! doForceResetNode necessary here? What's the difference between "pretend as if this node never existed" or resetting it manually?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, fixed

newNode := node.DeepCopy()
newNode.Phase = wfv1.NodeRunning
newNode.Message = ""
Expand Down
4 changes: 2 additions & 2 deletions workflow/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ func TestRetryWorkflowCompressed(t *testing.T) {

clearFunc = packer.SetMaxWorkflowSize(1557)
defer clearFunc()
wf, err := RetryWorkflow(kubeCs, sqldb.ExplosiveOffloadNodeStatusRepo, wfIf, origWf)
wf, err := RetryWorkflow(kubeCs, sqldb.ExplosiveOffloadNodeStatusRepo, wfIf, origWf, false, "")
assert.NoError(t, err)
assert.NotEmpty(t, wf.Status.CompressedNodes)
}
Expand All @@ -481,7 +481,7 @@ func TestRetryWorkflowOffloaded(t *testing.T) {
offloadNodeStatusRepo.On("Get", "7e74dbb9-d681-4c22-9bed-a581ec28383f", "123").Return(origNodes, nil)
offloadNodeStatusRepo.On("Save", "7e74dbb9-d681-4c22-9bed-a581ec28383f", mock.Anything, mock.Anything).Return("1234", nil)

_, err = RetryWorkflow(kubeCs, offloadNodeStatusRepo, wfIf, origWf)
_, err = RetryWorkflow(kubeCs, offloadNodeStatusRepo, wfIf, origWf, false, "")
assert.NoError(t, err)

wf, err := wfIf.Get("fail-template", metav1.GetOptions{})
Expand Down