Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CheckandEstimate implementation to optimize podReconciliation #1308

Merged
merged 6 commits into from
Apr 5, 2019
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 56 additions & 9 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ type wfOperationCtx struct {
// workflowDeadline is the deadline which the workflow is expected to complete before we
// terminate the workflow.
workflowDeadline *time.Time

// currentWFSize is current Workflow size
currentWFSize int

// unSavedNodeStatusSize is unsaved workflow size
unSavedNodeStatusSize int

// wfFailed is workflow failed status
isWFFailed bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to track this state here? We already have workflow.status.phase which indicates a failed workflow, and this seems redundant to it. I'm concerned with the additional state management.

}

var (
Expand Down Expand Up @@ -124,7 +133,12 @@ func (woc *wfOperationCtx) operate() {
woc.log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
}
}()

woc.log.Infof("Processing workflow")

// Initialize Workflow failed status
woc.isWFFailed = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to initialize this since bools are initialized to false


// Perform one-time workflow validation
if woc.wf.Status.Phase == "" {
woc.markWorkflowRunning()
Expand Down Expand Up @@ -453,9 +467,9 @@ func (woc *wfOperationCtx) podReconciliation() error {
seenPodLock := &sync.Mutex{}
wfNodesLock := &sync.RWMutex{}

performAssessment := func(pod *apiv1.Pod) {
performAssessment := func(pod *apiv1.Pod) string {
if pod == nil {
return
return ""
}
nodeNameForPod := pod.Annotations[common.AnnotationKeyNodeName]
nodeID := woc.wf.NodeID(nodeNameForPod)
Expand All @@ -475,16 +489,20 @@ func (woc *wfOperationCtx) podReconciliation() error {
if node.Completed() && !node.IsDaemoned() {
if tmpVal, tmpOk := pod.Labels[common.LabelKeyCompleted]; tmpOk {
if tmpVal == "true" {
return
return nodeID
}
}
woc.completedPods[pod.ObjectMeta.Name] = true
}
}
return nodeID
}

parallelPodNum := make(chan string, 500)
var wg sync.WaitGroup

woc.currentWFSize = woc.getSize()

for _, pod := range podList.Items {
parallelPodNum <- pod.Name
wg.Add(1)
Expand All @@ -493,20 +511,23 @@ func (woc *wfOperationCtx) podReconciliation() error {
wfNodesLock.Lock()
origNodeStatus := *woc.wf.Status.DeepCopy()
wfNodesLock.Unlock()
performAssessment(&tmpPod)
nodeID := performAssessment(&tmpPod)
err = woc.applyExecutionControl(&tmpPod, wfNodesLock)
if err != nil {
woc.log.Warnf("Failed to apply execution control to pod %s", tmpPod.Name)
}
wfNodesLock.Lock()
defer wfNodesLock.Unlock()
err = woc.checkAndCompress()
err = woc.checkAndEstimate(nodeID)
if err != nil {
woc.wf.Status = origNodeStatus
nodeNameForPod := tmpPod.Annotations[common.AnnotationKeyNodeName]
woc.log.Warnf("%v", err)
woc.markNodeErrorClearOuput(nodeNameForPod, err)
err = woc.checkAndCompress()
if err != nil {
woc.isWFFailed = true
}
}
<-parallelPodNum
}(pod)
Expand Down Expand Up @@ -1664,17 +1685,17 @@ func (woc *wfOperationCtx) getSize() int {
// The compressed content will be assign to compressedNodes element and clear the nodestatus map.
func (woc *wfOperationCtx) checkAndCompress() error {

if woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize) {

if woc.isWFFailed == false && (woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: if !woc.isCompressionFailed instead of == false

nodeContent, err := json.Marshal(woc.wf.Status.Nodes)
if err != nil {
return errors.InternalWrapError(err)
}
buff := string(nodeContent)
woc.wf.Status.CompressedNodes = file.CompressEncodeString(buff)

}
if woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize {

if woc.isWFFailed || (woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize) {
woc.isWFFailed = true
return errors.InternalError(fmt.Sprintf("Workflow is longer than maximum allowed size. Size=%d", woc.getSize()))
}
return nil
Expand All @@ -1698,3 +1719,29 @@ func (woc *wfOperationCtx) checkAndDecompress() error {
}
return nil
}

// checkAndEstimate will check and estimate the workflow size with current nodestatus
func (woc *wfOperationCtx) checkAndEstimate(nodeID string) error {
if nodeID == "" {
return nil
}

if woc.isWFFailed {
return errors.InternalErrorf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+woc.unSavedNodeStatusSize)
}

if woc.wf.Status.CompressedNodes != "" {
if node, ok := woc.wf.Status.Nodes[nodeID]; ok {
content, err := json.Marshal(node)
if err != nil {
return errors.InternalWrapError(err)
}
nodeSize := len(file.CompressEncodeString(string(content)))
if (nodeSize + woc.unSavedNodeStatusSize + woc.currentWFSize) >= maxWorkflowSize {
return errors.InternalErrorf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+nodeSize+woc.unSavedNodeStatusSize)
}
woc.unSavedNodeStatusSize += nodeSize
}
}
return nil
}