Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Retry pending nodes #2385

Merged
merged 21 commits into from
Mar 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2431,6 +2431,11 @@
"title": "Resource template subtype which can run k8s resources",
"$ref": "#/definitions/io.argoproj.workflow.v1alpha1.ResourceTemplate"
},
"resubmitPendingPods": {
"type": "boolean",
"format": "boolean",
"title": "ResubmitPendingPods is a flag to enable resubmitting pods that remain Pending after initial submission"
},
"retryStrategy": {
"title": "RetryStrategy describes how to retry a template when it fails",
"$ref": "#/definitions/io.argoproj.workflow.v1alpha1.RetryStrategy"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
apiVersion: v1
data:
config: |
executor:
imagePullPolicy: IfNotPresent
resources:
requests:
cpu: 0.1
memory: 64Mi
limits:
cpu: 0.5
memory: 128Mi
artifactRepository:
archiveLogs: true
s3:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
apiVersion: v1
data:
config: |
executor:
imagePullPolicy: IfNotPresent
resources:
requests:
cpu: 0.1
memory: 64Mi
limits:
cpu: 0.5
memory: 128Mi
artifactRepository:
archiveLogs: true
s3:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
apiVersion: v1
data:
config: |
executor:
imagePullPolicy: IfNotPresent
resources:
requests:
cpu: 0.1
memory: 64Mi
limits:
cpu: 0.5
memory: 128Mi
artifactRepository:
archiveLogs: true
s3:
Expand Down
5 changes: 5 additions & 0 deletions pkg/apiclient/cronworkflow/cron-workflow.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,11 @@
"podSpecPatch": {
"type": "string",
"description": "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of\ncontainer fields which are not strings (e.g. resource limits)."
},
"resubmitPendingPods": {
"type": "boolean",
"format": "boolean",
"title": "ResubmitPendingPods is a flag to enable resubmitting pods that remain Pending after initial submission"
}
},
"title": "Template is a reusable and composable unit of execution in a workflow"
Expand Down
5 changes: 5 additions & 0 deletions pkg/apiclient/workflow/workflow.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1694,6 +1694,11 @@
"podSpecPatch": {
"type": "string",
"description": "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of\ncontainer fields which are not strings (e.g. resource limits)."
},
"resubmitPendingPods": {
"type": "boolean",
"format": "boolean",
"title": "ResubmitPendingPods is a flag to enable resubmitting pods that remain Pending after initial submission"
}
},
"title": "Template is a reusable and composable unit of execution in a workflow"
Expand Down
5 changes: 5 additions & 0 deletions pkg/apiclient/workflowarchive/workflow-archive.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,11 @@
"podSpecPatch": {
"type": "string",
"description": "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of\ncontainer fields which are not strings (e.g. resource limits)."
},
"resubmitPendingPods": {
"type": "boolean",
"format": "boolean",
"title": "ResubmitPendingPods is a flag to enable resubmitting pods that remain Pending after initial submission"
}
},
"title": "Template is a reusable and composable unit of execution in a workflow"
Expand Down
5 changes: 5 additions & 0 deletions pkg/apiclient/workflowtemplate/workflow-template.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,11 @@
"podSpecPatch": {
"type": "string",
"description": "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of\ncontainer fields which are not strings (e.g. resource limits)."
},
"resubmitPendingPods": {
"type": "boolean",
"format": "boolean",
"title": "ResubmitPendingPods is a flag to enable resubmitting pods that remain Pending after initial submission"
}
},
"title": "Template is a reusable and composable unit of execution in a workflow"
Expand Down
703 changes: 371 additions & 332 deletions pkg/apis/workflow/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pkg/apis/workflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,9 @@ type Template struct {
// PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of
// container fields which are not strings (e.g. resource limits).
PodSpecPatch string `json:"podSpecPatch,omitempty" protobuf:"bytes,31,opt,name=podSpecPatch"`

// ResubmitPendingPods is a flag to enable resubmitting pods that remain Pending after initial submission
ResubmitPendingPods *bool `json:"resubmitPendingPods,omitempty" protobuf:"varint,34,opt,name=resubmitPendingPods"`
}

var _ TemplateHolder = &Template{}
Expand Down Expand Up @@ -1019,11 +1022,16 @@ func (in *WorkflowStatus) AnyActiveSuspendNode() bool {
return in.Nodes.Any(func(node NodeStatus) bool { return node.IsActiveSuspendNode() })
}

// Remove returns whether or not the node has completed execution
// Completed returns whether or not the node has completed execution
func (n NodeStatus) Completed() bool {
return isCompletedPhase(n.Phase) || n.IsDaemoned() && n.Phase != NodePending
}

// Pending returns whether or not the node is in pending state
func (n NodeStatus) Pending() bool {
return n.Phase == NodePending
}

// IsDaemoned returns whether or not the node is deamoned
func (n NodeStatus) IsDaemoned() bool {
if n.Daemoned == nil || !*n.Daemoned {
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions test/e2e/fixtures/e2e_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,19 @@ func (s *E2ESuite) DeleteResources(label string) {
panic(err)
}
}

// Delete all resourcequotas
rqList, err := s.KubeClient.CoreV1().ResourceQuotas(Namespace).List(metav1.ListOptions{LabelSelector: label})
if err != nil {
panic(err)
}
for _, rq := range rqList.Items {
log.WithField("resourcequota", rq.Name).Info("Deleting resource quota")
err = s.KubeClient.CoreV1().ResourceQuotas(Namespace).Delete(rq.Name, nil)
if err != nil {
panic(err)
}
}
}

func (s *E2ESuite) GetBasicAuthToken() string {
Expand Down
21 changes: 21 additions & 0 deletions test/e2e/fixtures/when.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"k8s.io/client-go/kubernetes"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"

"github.com/argoproj/argo/persist/sqldb"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/argoproj/argo/test/util"
"github.com/argoproj/argo/workflow/packer"
)

Expand All @@ -32,6 +34,7 @@ type When struct {
wfTemplateNames []string
cronWorkflowName string
kubeClient kubernetes.Interface
resourceQuota *corev1.ResourceQuota
}

func (w *When) SubmitWorkflow() *When {
Expand Down Expand Up @@ -173,6 +176,24 @@ func (w *When) RunCli(args []string, block func(t *testing.T, output string, err
return w
}

func (w *When) MemoryQuota(quota string) *When {
obj, err := util.CreateHardMemoryQuota(w.kubeClient, "argo", "memory-quota", quota)
if err != nil {
w.t.Fatal(err)
}
w.resourceQuota = obj
return w
}

func (w *When) DeleteQuota() *When {
err := util.DeleteQuota(w.kubeClient, w.resourceQuota)
if err != nil {
w.t.Fatal(err)
}
w.resourceQuota = nil
return w
}

func (w *When) Then() *Then {
return &Then{
t: w.t,
Expand Down
15 changes: 9 additions & 6 deletions test/e2e/functional/param-aggregation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ spec:
parameters:
- name: message
value: "{{item}}"
withParam: "{{steps.divide-by-2.outputs.result}}"
withParam: "{{steps.divide-by-2.outputs.parameters}}"

# odd-or-even accepts a number and returns whether or not that number is odd or even
- name: odd-or-even
Expand Down Expand Up @@ -63,12 +63,15 @@ spec:
inputs:
parameters:
- name: num
script:
container:
image: alpine:latest
command: [sh, -x]
source: |
#!/bin/sh
echo $(({{inputs.parameters.num}}/2))
command: [sh, -c]
args: ["echo $(({{inputs.parameters.num}}/2)) > /tmp/res"]
outputs:
parameters:
- name: res
valueFrom:
path: /tmp/res

# whalesay prints a number using whalesay
- name: whalesay
Expand Down
113 changes: 110 additions & 3 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package e2e

import (
"regexp"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -178,7 +179,111 @@ func (s *FunctionalSuite) TestLoopEmptyParam() {
})
}

func (s *FunctionalSuite) TestparameterAggregation() {
// 128M is for argo executor
func (s *FunctionalSuite) TestPendingRetryWorkflow() {
s.Given().
Workflow(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: dag-limited-1
labels:
argo-e2e: true
spec:
entrypoint: dag
templates:
- name: cowsay
resubmitPendingPods: true
container:
image: cowsay:v1
command: [sh, -c]
args: ["cowsay a"]
resources:
limits:
memory: 128M
- name: dag
dag:
tasks:
- name: a
template: cowsay
- name: b
template: cowsay
`).
When().
MemoryQuota("130M").
SubmitWorkflow().
WaitForWorkflowToStart(5*time.Second).
WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool {
a := wf.Status.Nodes.FindByDisplayName("a")
b := wf.Status.Nodes.FindByDisplayName("b")
return wfv1.NodePending == a.Phase &&
regexp.MustCompile(`^Pending \d+\.\d+s$`).MatchString(a.Message) &&
wfv1.NodePending == b.Phase &&
regexp.MustCompile(`^Pending \d+\.\d+s$`).MatchString(b.Message)
}, "pods pending", 20*time.Second).
DeleteQuota().
WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool {
a := wf.Status.Nodes.FindByDisplayName("a")
b := wf.Status.Nodes.FindByDisplayName("b")
return wfv1.NodeSucceeded == a.Phase && wfv1.NodeSucceeded == b.Phase
}, "pods succeeded", 20*time.Second)
s.TearDownSuite()
}

// 128M is for argo executor
func (s *FunctionalSuite) TestPendingRetryWorkflowWithRetryStrategy() {
s.Given().
Workflow(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: dag-limited-2
labels:
argo-e2e: true
spec:
entrypoint: dag
templates:
- name: cowsay
resubmitPendingPods: true
retryStrategy:
limit: 1
container:
image: cowsay:v1
command: [sh, -c]
args: ["cowsay a"]
resources:
limits:
memory: 128M
- name: dag
dag:
tasks:
- name: a
template: cowsay
- name: b
template: cowsay
`).
When().
MemoryQuota("130M").
SubmitWorkflow().
WaitForWorkflowToStart(5*time.Second).
WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool {
a := wf.Status.Nodes.FindByDisplayName("a(0)")
b := wf.Status.Nodes.FindByDisplayName("b(0)")
return wfv1.NodePending == a.Phase &&
regexp.MustCompile(`^Pending \d+\.\d+s$`).MatchString(a.Message) &&
wfv1.NodePending == b.Phase &&
regexp.MustCompile(`^Pending \d+\.\d+s$`).MatchString(b.Message)
}, "pods pending", 20*time.Second).
DeleteQuota().
WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool {
a := wf.Status.Nodes.FindByDisplayName("a(0)")
b := wf.Status.Nodes.FindByDisplayName("b(0)")
return wfv1.NodeSucceeded == a.Phase && wfv1.NodeSucceeded == b.Phase
}, "pods succeeded", 20*time.Second)
s.TearDownSuite()
}

func (s *FunctionalSuite) TestParameterAggregation() {
s.Given().
Workflow("@functional/param-aggregation.yaml").
When().
Expand All @@ -187,8 +292,10 @@ func (s *FunctionalSuite) TestparameterAggregation() {
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
nodeStatus := status.Nodes.FindByDisplayName("print(0:1)")
assert.Equal(t, wfv1.NodeSucceeded, nodeStatus.Phase)
nodeStatus := status.Nodes.FindByDisplayName("print(0:res:1)")
if assert.NotNil(t, nodeStatus) {
assert.Equal(t, wfv1.NodeSucceeded, nodeStatus.Phase)
}
})
}

Expand Down
Loading