-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Allow step restart on workflow retry. Closes #2334 #2431
Conversation
85d7364
to
58b0103
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #2431 +/- ##
==========================================
+ Coverage 11.22% 11.62% +0.39%
==========================================
Files 83 84 +1
Lines 32696 32871 +175
==========================================
+ Hits 3671 3820 +149
Misses 28525 28525
- Partials 500 526 +26 ☔ View full report in Codecov by Sentry. |
@sarabala1979 I think this is your area of expertise? |
d52a540
to
3f798c1
Compare
@sarabala1979 hi, any feedback on this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My initial reaction is that this shouldn't be implemented in the workflow-controller
. "Retry" is not a feature that the controller knows about. When a Workflow is retried, the CLI/Server manually edits the Workflow object and sets "Failed" steps to "Pending" so that they are re-run. The controller is unaware that this has happened and treats the Workflow as if it was running for the first time.
For this feature to be implemented without breaking abstraction barriers, it should be implemented fully on the RetryWorkflow
function on workflow/util/util.go
. This could perhaps be by specifying which steps should be fully restated on a UI/CLI, which are then passed to the function. The function can then restart the appropriate nodes from that input.
workflow/controller/operator.go
Outdated
@@ -629,6 +629,17 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate | |||
return woc.markNodePhase(node.Name, wfv1.NodeSucceeded), true, nil | |||
} | |||
|
|||
if woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline) { | |||
var message string | |||
if woc.workflowDeadline.IsZero() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JSYK: The way we detect termination has changed since you opened this PR. This info is now found in Workflow.Spec.Shutdown
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks I'm updating this.
test/e2e/testdata/retry-test.yaml
Outdated
|
||
- name: steps-inner | ||
retryStrategy: | ||
restartOnWorkflowRetry: true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My initial reaction is that this is not the correct place for this.
retryStrategy
deals with retrying this node during a single workflow execution. Your proposed flag deals with retrying the node across different executions. Under this implementation retryStrategy
is overloaded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, you are right - I'm moving restartOnWorkflowRetry up to the template level. Does that work?
3f798c1
to
072add9
Compare
workflow/controller/operator.go
Outdated
for _, node := range woc.wf.Status.Nodes { | ||
if node.IsActiveSuspendNode() { | ||
woc.markNodePhase(node.Name, wfv1.NodeFailed, fmt.Sprintf("step exceeded workflow deadline %s", *woc.workflowDeadline)) | ||
var message = "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ineffectual assignment to message
(from ineffassign
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking a look.
My use case for this is where we have an 'asynchronous' group of steps where the first starts a pod which kicks off a job outside Argo, then the suspend step is either resumed or failed depending on that job. Therefore just retrying the resume step on failure wouldn't be very useful.
As you saw I did implement all the actual logic in the RetryWorkflow function, but a user triggering a retry wouldn't know what to pass in to be fully restarted - I do think this information best belongs with the workflow, which means storing at least that in the workflow template. But I don't think that's very invasive ...
workflow/controller/operator.go
Outdated
@@ -629,6 +629,17 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate | |||
return woc.markNodePhase(node.Name, wfv1.NodeSucceeded), true, nil | |||
} | |||
|
|||
if woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline) { | |||
var message string | |||
if woc.workflowDeadline.IsZero() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks I'm updating this.
test/e2e/testdata/retry-test.yaml
Outdated
|
||
- name: steps-inner | ||
retryStrategy: | ||
restartOnWorkflowRetry: true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, you are right - I'm moving restartOnWorkflowRetry up to the template level. Does that work?
If a user would tag a certain template as
Let me gather some more opinions with the team and get back to you. Could be that you're right and I'm a bit too stringent 🙂 |
Hi @mark9white. The team agreed that we won't want to support this sort of labeling on the Workflow spec. This feature is still very much desired, but we'll have to find a way to specify which nodes to restart fully on the client-side. |
Thanks for following up with the team. I could make this work by providing the ability to specify nodes to restart by templateName - would that be ok? Doing this is not ideal for Argo users - has the team had any thoughts to providing first-class support for triggering asynchronous jobs from Argo without using polling? An example would be triggered a Spark job (eg directly or using something like Amazon EMR) where you would have one step trigger it and then a suspend step that waits for the job completion. |
I have just modified the PR so the retry command takes in a --reset-nodes-field-selector parameter. |
Thanks @mark9white! I'll take another look.
Do you mean using the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good, just some minor comments.
cmd/argo/commands/retry.go
Outdated
@@ -37,5 +51,6 @@ func NewRetryCommand() *cobra.Command { | |||
command.Flags().StringVarP(&cliSubmitOpts.output, "output", "o", "", "Output format. One of: name|json|yaml|wide") | |||
command.Flags().BoolVarP(&cliSubmitOpts.wait, "wait", "w", false, "wait for the workflow to complete") | |||
command.Flags().BoolVar(&cliSubmitOpts.watch, "watch", false, "watch the workflow until it completes") | |||
command.Flags().StringVar(&retryOps.nodesToResetFieldSelector, "reset-nodes-field-selector", "", "selector of nodes to reset, eg: --node-field-selector inputs.paramaters.myparam.value=abc") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the name of this flag should be --node-field-selector
as all this does is provide a selector. This way it would be analogous to #1904. To specify that we want said nodes restarted we can pass a flag in conjunction:
$ argo retry --restart-successful --node-field-selector inputs.paramaters.myparam.value=abc
Or something like this. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, am applying.
if woc.wf.Spec.Shutdown != "" || (woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)) { | ||
var message string | ||
if woc.wf.Spec.Shutdown != "" { | ||
message = fmt.Sprintf("Stopped with strategy '%s'", woc.wf.Spec.Shutdown) | ||
} else { | ||
message = fmt.Sprintf("retry exceeded workflow deadline %s", *woc.workflowDeadline) | ||
} | ||
woc.log.Infoln(message) | ||
return woc.markNodePhase(node.Name, lastChildNode.Phase, message), true, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this here? Isn't this covered by failSuspendedNodesAfterDeadlineOrShutdown()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this, in the case of a retry parent node it just keeps retrying pods that continually fail because they are being executed after the deadline. The integration test didn't work without it.
func (woc *wfOperationCtx) failSuspendedNodesAfterDeadlineOrShutdown() error { | ||
if woc.wf.Spec.Shutdown != "" || (woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this!
} | ||
|
||
if selector.Matches(nodeFields) { | ||
if selectorMatchesNode(selector, node) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
Because the person running 'retry' needs to know what selector to pass in to effectively retry the given workflow. |
74ef8a0
to
6fd7755
Compare
@simster7 I've applied the feedback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for this great PR @mark9white
// Delete/reset fields which indicate workflow completed | ||
delete(newWF.Labels, common.LabelKeyCompleted) | ||
newWF.Status.Conditions.UpsertCondition(wfv1.WorkflowCondition{Status: metav1.ConditionFalse, Type: wfv1.WorkflowConditionCompleted}) | ||
newWF.ObjectMeta.Labels[common.LabelKeyPhase] = string(wfv1.NodeRunning) | ||
newWF.Status.Phase = wfv1.NodeRunning | ||
newWF.Status.Message = "" | ||
newWF.Status.FinishedAt = metav1.Time{} | ||
newWF.Spec.Shutdown = "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
Hey @mark9white. Could you resolve the conflicts here please? |
6fd7755
to
e8f28cc
Compare
Done!
…On Thu, 9 Apr 2020 at 02:32, Simon Behar ***@***.***> wrote:
Hey @mark9white <https://github.com/mark9white>. Could you resolve the
conflicts here please?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#2431 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AABO7LSDUJXOVW6A7VFMT33RLUQTPANCNFSM4LHE2LVA>
.
|
Actually there is an issue in the e2e tests, which I'm looking into. |
test/e2e/fixtures/e2e_suite.go
Outdated
@@ -357,8 +364,10 @@ func (s *E2ESuite) printWorkflowDiagnostics(name string) { | |||
s.CheckError(err) | |||
wf.Status.Nodes = offloaded | |||
} | |||
logCtx.Debug("Workflow metadata:") | |||
logCtx.Debug("Workflow metadata at %s:", time.Now().String()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
printf: Debug call has possible formatting directive %s (from govet
)
ecb6067
to
5616a66
Compare
4f19ff4
to
a460ac8
Compare
workflow/util/util.go
Outdated
@@ -715,7 +753,7 @@ func RetryWorkflow(kubeClient kubernetes.Interface, repo sqldb.OffloadNodeStatus | |||
if err != nil { | |||
return nil, fmt.Errorf("unable to compress workflow: %s", err) | |||
} | |||
|
|||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File is not goimports
-ed with -local github.com/argoproj/argo (from goimports
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry to renege the approval, but because of #2645 new changes are needed.
workflow/util/util.go
Outdated
if err != nil { | ||
return nil, err | ||
} else { | ||
for _, node := range wf.Status.Nodes { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @mark9white, because of #2645 this actually needs to be moved further down the code. Workflows with offloaded nodes are only retrieved starting in line 678, so if a Workflow has offloaded nodes, wf.Status.Nodes
will be nil at this point in the code and the node field selector will have no effect.
While you're at this, would you mind extracting this block out to a helper function? RetryWorkflow
is already a bit cluttered 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
workflow/util/util.go
Outdated
return nil, err | ||
} else { | ||
for _, node := range wf.Status.Nodes { | ||
if selectorMatchesNode(selector, node) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this code could be included in the large for
loop starting at line 690. What do you think? Adding it there could save us an iteration through all the nodes. If you do decide to add it, please make sure it's added via a helper function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to determine the list of nodes including child nodes first.
workflow/util/util.go
Outdated
newNodes[node.ID] = node | ||
continue | ||
} | ||
case wfv1.NodeError, wfv1.NodeFailed: | ||
if !strings.HasPrefix(node.Name, onExitNodeName) && (node.Type == wfv1.NodeTypeDAG || node.Type == wfv1.NodeTypeStepGroup) { | ||
if !strings.HasPrefix(node.Name, onExitNodeName) && (node.Type == wfv1.NodeTypeDAG || node.Type == wfv1.NodeTypeStepGroup) && !doForceResetNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this && ! doForceResetNode
necessary here? What's the difference between "pretend as if this node never existed" or resetting it manually?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, fixed
workflow/util/util.go
Outdated
@@ -655,14 +688,19 @@ func RetryWorkflow(kubeClient kubernetes.Interface, repo sqldb.OffloadNodeStatus | |||
} | |||
|
|||
for _, node := range nodes { | |||
var doForceResetNode = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor:
var doForceResetNode = false | |
forceResetNode := false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
I've retriggered the build as it failed on TestLogProblems (which is unrelated and looks to be flakey). |
Kudos, SonarCloud Quality Gate passed! 0 Bugs No Coverage information |
I'm still seeing TestLogProblems failing, but I don't think related to this PR. |
This allows retryStrategy to contain restartOnWorkflowRetry, in which case the entire node (and therefore all descendent nodes) will be restarted if they haven't all already succeeded and the workflow is retried.
See: #2334 for more info
Checklist:
"fix(controller): Updates such and such. Fixes #1234"
.