Skip to content

Commit

Permalink
Refactor checkandEstimate to optimize podReconciliation (#1311)
Browse files Browse the repository at this point in the history
* Refactor checkandEstimate to optimize podReconciliation

* Move compress function to persistUpdates
  • Loading branch information
xianlubird authored and sarabala1979 committed Apr 9, 2019
1 parent bbdf2e2 commit 0d400f2
Showing 1 changed file with 10 additions and 61 deletions.
71 changes: 10 additions & 61 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,6 @@ type wfOperationCtx struct {
// workflowDeadline is the deadline which the workflow is expected to complete before we
// terminate the workflow.
workflowDeadline *time.Time

// currentWFSize is current Workflow size
currentWFSize int

// unSavedNodeStatusSize is unsaved workflow size
unSavedNodeStatusSize int

// isWFCompressionFailed is workflow compression failed status
isWFCompressionFailed bool
}

var (
Expand Down Expand Up @@ -300,6 +291,7 @@ func (woc *wfOperationCtx) persistUpdates() {
err := woc.checkAndCompress()
if err != nil {
woc.log.Warnf("Error compressing workflow: %v", err)
woc.markWorkflowFailed(err.Error())
}
if woc.wf.Status.CompressedNodes != "" {
woc.wf.Status.Nodes = nil
Expand Down Expand Up @@ -464,9 +456,9 @@ func (woc *wfOperationCtx) podReconciliation() error {
seenPodLock := &sync.Mutex{}
wfNodesLock := &sync.RWMutex{}

performAssessment := func(pod *apiv1.Pod) string {
performAssessment := func(pod *apiv1.Pod) {
if pod == nil {
return ""
return
}
nodeNameForPod := pod.Annotations[common.AnnotationKeyNodeName]
nodeID := woc.wf.NodeID(nodeNameForPod)
Expand All @@ -486,51 +478,34 @@ func (woc *wfOperationCtx) podReconciliation() error {
if node.Completed() && !node.IsDaemoned() {
if tmpVal, tmpOk := pod.Labels[common.LabelKeyCompleted]; tmpOk {
if tmpVal == "true" {
return nodeID
return
}
}
woc.completedPods[pod.ObjectMeta.Name] = true
}
}
return nodeID
return
}

parallelPodNum := make(chan string, 500)
var wg sync.WaitGroup

woc.currentWFSize = woc.getSize()

for _, pod := range podList.Items {
parallelPodNum <- pod.Name
wg.Add(1)
go func(tmpPod apiv1.Pod) {
defer wg.Done()
wfNodesLock.Lock()
origNodeStatus := *woc.wf.Status.DeepCopy()
wfNodesLock.Unlock()
nodeID := performAssessment(&tmpPod)
performAssessment(&tmpPod)
err = woc.applyExecutionControl(&tmpPod, wfNodesLock)
if err != nil {
woc.log.Warnf("Failed to apply execution control to pod %s", tmpPod.Name)
}
wfNodesLock.Lock()
defer wfNodesLock.Unlock()
err = woc.checkAndEstimate(nodeID)
if err != nil {
woc.wf.Status = origNodeStatus
nodeNameForPod := tmpPod.Annotations[common.AnnotationKeyNodeName]
woc.log.Warnf("%v", err)
woc.markNodeErrorClearOuput(nodeNameForPod, err)
err = woc.checkAndCompress()
if err != nil {
woc.isWFCompressionFailed = true
}
}
<-parallelPodNum
}(pod)
}

wg.Wait()

// Now check for deleted pods. Iterate our nodes. If any one of our nodes does not show up in
// the seen list it implies that the pod was deleted without the controller seeing the event.
// It is now impossible to infer pod status. The only thing we can do at this point is to mark
Expand Down Expand Up @@ -1682,7 +1657,7 @@ func (woc *wfOperationCtx) getSize() int {
// The compressed content will be assign to compressedNodes element and clear the nodestatus map.
func (woc *wfOperationCtx) checkAndCompress() error {

if !woc.isWFCompressionFailed && (woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize)) {
if woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize) {
nodeContent, err := json.Marshal(woc.wf.Status.Nodes)
if err != nil {
return errors.InternalWrapError(err)
Expand All @@ -1691,10 +1666,10 @@ func (woc *wfOperationCtx) checkAndCompress() error {
woc.wf.Status.CompressedNodes = file.CompressEncodeString(buff)
}

if woc.isWFCompressionFailed || (woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize) {
woc.isWFCompressionFailed = true
if woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize {
return errors.InternalError(fmt.Sprintf("Workflow is longer than maximum allowed size. Size=%d", woc.getSize()))
}

return nil
}

Expand All @@ -1716,29 +1691,3 @@ func (woc *wfOperationCtx) checkAndDecompress() error {
}
return nil
}

// checkAndEstimate will check and estimate the workflow size with current nodestatus
func (woc *wfOperationCtx) checkAndEstimate(nodeID string) error {
if nodeID == "" {
return nil
}

if woc.isWFCompressionFailed {
return errors.InternalErrorf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+woc.unSavedNodeStatusSize)
}

if woc.wf.Status.CompressedNodes != "" {
if node, ok := woc.wf.Status.Nodes[nodeID]; ok {
content, err := json.Marshal(node)
if err != nil {
return errors.InternalWrapError(err)
}
nodeSize := len(file.CompressEncodeString(string(content)))
if (nodeSize + woc.unSavedNodeStatusSize + woc.currentWFSize) >= maxWorkflowSize {
return errors.InternalErrorf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+nodeSize+woc.unSavedNodeStatusSize)
}
woc.unSavedNodeStatusSize += nodeSize
}
}
return nil
}

0 comments on commit 0d400f2

Please sign in to comment.