From e81855e8a5d0d863baebb3415bf64809c4d67657 Mon Sep 17 00:00:00 2001 From: Alexander Matyushentsev Date: Tue, 15 Jan 2019 13:35:17 -0800 Subject: [PATCH 1/2] Issue #1113 - Wait for daemon pods completion to handle annotations --- examples/daemon-step.yaml | 4 ++++ pkg/apis/workflow/v1alpha1/types.go | 4 ++-- workflow/controller/operator.go | 19 +++++++++++++++---- 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/examples/daemon-step.yaml b/examples/daemon-step.yaml index 14f9e0c6f869..6b65788e1598 100644 --- a/examples/daemon-step.yaml +++ b/examples/daemon-step.yaml @@ -53,6 +53,10 @@ spec: port: 8086 initialDelaySeconds: 5 timeoutSeconds: 1 + outputs: + artifacts: + - name: data + path: /var/lib/influxdb/data - name: influxdb-client inputs: diff --git a/pkg/apis/workflow/v1alpha1/types.go b/pkg/apis/workflow/v1alpha1/types.go index 22700b5b5ff9..02d9ea9067ab 100644 --- a/pkg/apis/workflow/v1alpha1/types.go +++ b/pkg/apis/workflow/v1alpha1/types.go @@ -561,7 +561,7 @@ func (ws *WorkflowStatus) Completed() bool { // Remove returns whether or not the node has completed execution func (n NodeStatus) Completed() bool { - return isCompletedPhase(n.Phase) + return isCompletedPhase(n.Phase) || n.IsDaemoned() && n.Phase != NodePending } // IsDaemoned returns whether or not the node is deamoned @@ -574,7 +574,7 @@ func (n NodeStatus) IsDaemoned() bool { // Successful returns whether or not this node completed successfully func (n NodeStatus) Successful() bool { - return n.Phase == NodeSucceeded || n.Phase == NodeSkipped + return n.Phase == NodeSucceeded || n.Phase == NodeSkipped || n.IsDaemoned() && n.Phase != NodePending } // CanRetry returns whether the node should be retried or not. diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index b6c39e75882b..4978af5af1b8 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -442,7 +442,8 @@ func (woc *wfOperationCtx) podReconciliation() error { woc.addOutputsToScope("workflow", node.Outputs, nil) woc.updated = true } - if woc.wf.Status.Nodes[pod.ObjectMeta.Name].Completed() { + node := woc.wf.Status.Nodes[pod.ObjectMeta.Name] + if node.Completed() && !node.IsDaemoned() { woc.completedPods[pod.ObjectMeta.Name] = true } } @@ -578,8 +579,8 @@ func assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatus) *wfv1.NodeStatus { return nil } } - // proceed to mark node status as succeeded (and daemoned) - newPhase = wfv1.NodeSucceeded + // proceed to mark node status as running (and daemoned) + newPhase = wfv1.NodeRunning t := true newDaemonStatus = &t log.Infof("Processing ready daemon pod: %v", pod.ObjectMeta.SelfLink) @@ -1025,7 +1026,8 @@ func (woc *wfOperationCtx) markWorkflowPhase(phase wfv1.NodePhase, markCompleted switch phase { case wfv1.NodeSucceeded, wfv1.NodeFailed, wfv1.NodeError: - if markCompleted { + // wait for all daemon nodes to get terminated before marking workflow completed + if markCompleted && !woc.hasDaemonNodes() { woc.log.Infof("Marking workflow completed") woc.wf.Status.FinishedAt = metav1.Time{Time: time.Now().UTC()} if woc.wf.ObjectMeta.Labels == nil { @@ -1037,6 +1039,15 @@ func (woc *wfOperationCtx) markWorkflowPhase(phase wfv1.NodePhase, markCompleted } } +func (woc *wfOperationCtx) hasDaemonNodes() bool { + for _, node := range woc.wf.Status.Nodes { + if node.IsDaemoned() { + return true + } + } + return false +} + func (woc *wfOperationCtx) markWorkflowRunning() { woc.markWorkflowPhase(wfv1.NodeRunning, false) } From 07e32a163f1371eea3c1ad00a6700d6360319dc9 Mon Sep 17 00:00:00 2001 From: Alexander Matyushentsev Date: Thu, 17 Jan 2019 13:24:09 -0800 Subject: [PATCH 2/2] Add output artifacts to influxdb-ci example --- examples/daemon-step.yaml | 4 ---- examples/influxdb-ci.yaml | 4 ++++ workflow/controller/operator.go | 7 ++++++- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/examples/daemon-step.yaml b/examples/daemon-step.yaml index 6b65788e1598..14f9e0c6f869 100644 --- a/examples/daemon-step.yaml +++ b/examples/daemon-step.yaml @@ -53,10 +53,6 @@ spec: port: 8086 initialDelaySeconds: 5 timeoutSeconds: 1 - outputs: - artifacts: - - name: data - path: /var/lib/influxdb/data - name: influxdb-client inputs: diff --git a/examples/influxdb-ci.yaml b/examples/influxdb-ci.yaml index 121a6fd2d8a0..d26c765fdd78 100644 --- a/examples/influxdb-ci.yaml +++ b/examples/influxdb-ci.yaml @@ -194,6 +194,10 @@ spec: - name: influxd path: /app daemon: true + outputs: + artifacts: + - name: data + path: /var/lib/influxdb/data container: image: debian:9.4 readinessProbe: diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 4978af5af1b8..ba16443e5ed1 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -557,7 +557,12 @@ func assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatus) *wfv1.NodeStatus { newPhase = wfv1.NodeSucceeded newDaemonStatus = &f case apiv1.PodFailed: - newPhase, message = inferFailedReason(pod) + // ignore pod failure for daemoned steps + if node.IsDaemoned() { + newPhase = wfv1.NodeSucceeded + } else { + newPhase, message = inferFailedReason(pod) + } newDaemonStatus = &f case apiv1.PodRunning: newPhase = wfv1.NodeRunning