diff --git a/examples/influxdb-ci.yaml b/examples/influxdb-ci.yaml index 121a6fd2d8a0..d26c765fdd78 100644 --- a/examples/influxdb-ci.yaml +++ b/examples/influxdb-ci.yaml @@ -194,6 +194,10 @@ spec: - name: influxd path: /app daemon: true + outputs: + artifacts: + - name: data + path: /var/lib/influxdb/data container: image: debian:9.4 readinessProbe: diff --git a/pkg/apis/workflow/v1alpha1/types.go b/pkg/apis/workflow/v1alpha1/types.go index 22700b5b5ff9..02d9ea9067ab 100644 --- a/pkg/apis/workflow/v1alpha1/types.go +++ b/pkg/apis/workflow/v1alpha1/types.go @@ -561,7 +561,7 @@ func (ws *WorkflowStatus) Completed() bool { // Remove returns whether or not the node has completed execution func (n NodeStatus) Completed() bool { - return isCompletedPhase(n.Phase) + return isCompletedPhase(n.Phase) || n.IsDaemoned() && n.Phase != NodePending } // IsDaemoned returns whether or not the node is deamoned @@ -574,7 +574,7 @@ func (n NodeStatus) IsDaemoned() bool { // Successful returns whether or not this node completed successfully func (n NodeStatus) Successful() bool { - return n.Phase == NodeSucceeded || n.Phase == NodeSkipped + return n.Phase == NodeSucceeded || n.Phase == NodeSkipped || n.IsDaemoned() && n.Phase != NodePending } // CanRetry returns whether the node should be retried or not. diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index b6c39e75882b..ba16443e5ed1 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -442,7 +442,8 @@ func (woc *wfOperationCtx) podReconciliation() error { woc.addOutputsToScope("workflow", node.Outputs, nil) woc.updated = true } - if woc.wf.Status.Nodes[pod.ObjectMeta.Name].Completed() { + node := woc.wf.Status.Nodes[pod.ObjectMeta.Name] + if node.Completed() && !node.IsDaemoned() { woc.completedPods[pod.ObjectMeta.Name] = true } } @@ -556,7 +557,12 @@ func assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatus) *wfv1.NodeStatus { newPhase = wfv1.NodeSucceeded newDaemonStatus = &f case apiv1.PodFailed: - newPhase, message = inferFailedReason(pod) + // ignore pod failure for daemoned steps + if node.IsDaemoned() { + newPhase = wfv1.NodeSucceeded + } else { + newPhase, message = inferFailedReason(pod) + } newDaemonStatus = &f case apiv1.PodRunning: newPhase = wfv1.NodeRunning @@ -578,8 +584,8 @@ func assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatus) *wfv1.NodeStatus { return nil } } - // proceed to mark node status as succeeded (and daemoned) - newPhase = wfv1.NodeSucceeded + // proceed to mark node status as running (and daemoned) + newPhase = wfv1.NodeRunning t := true newDaemonStatus = &t log.Infof("Processing ready daemon pod: %v", pod.ObjectMeta.SelfLink) @@ -1025,7 +1031,8 @@ func (woc *wfOperationCtx) markWorkflowPhase(phase wfv1.NodePhase, markCompleted switch phase { case wfv1.NodeSucceeded, wfv1.NodeFailed, wfv1.NodeError: - if markCompleted { + // wait for all daemon nodes to get terminated before marking workflow completed + if markCompleted && !woc.hasDaemonNodes() { woc.log.Infof("Marking workflow completed") woc.wf.Status.FinishedAt = metav1.Time{Time: time.Now().UTC()} if woc.wf.ObjectMeta.Labels == nil { @@ -1037,6 +1044,15 @@ func (woc *wfOperationCtx) markWorkflowPhase(phase wfv1.NodePhase, markCompleted } } +func (woc *wfOperationCtx) hasDaemonNodes() bool { + for _, node := range woc.wf.Status.Nodes { + if node.IsDaemoned() { + return true + } + } + return false +} + func (woc *wfOperationCtx) markWorkflowRunning() { woc.markWorkflowPhase(wfv1.NodeRunning, false) }