Skip to content

Commit

Permalink
Error running 1000s of tasks: "etcdserver: request is too large" #1186 (
Browse files Browse the repository at this point in the history
#1264)

* Error running 1000s of tasks: "etcdserver: request is too large" #1186

This PR is addressing the feature request #1186.
Issue:
Nodestatus element keeps growing  for big workflow.  Workflow will fail once the workflow total size reachs 1 MB (maz size limit in ETCD) .
Solution:
Compressing the Nodestatus once size reachs the 1 MB which increasing 60% to 80% more steps to execute in compress mode.

Latest: Argo cli and Argo UI will able to decode and print nodestatus from compressednoode.

Limitation:
Kubectl willl not decode the compressedNode element

* added Operator.go

* revert the testing yaml

* Fixed the lint issue

* fixed

* fixed lint

* Fixed Testcase

* incorporated the review comments

* Reverted the change

* incorporated review comments

* fixing gometalinter checks

* incorporated review comments

* Update pod-limits.yaml

* updated few comments

* updated error message format

* reverted unwanted files
  • Loading branch information
sarabala1979 authored Mar 18, 2019
1 parent b2743f3 commit 4bfbb20
Show file tree
Hide file tree
Showing 11 changed files with 264 additions and 58 deletions.
24 changes: 22 additions & 2 deletions cmd/argo/commands/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (
"strings"
"text/tabwriter"

"github.com/argoproj/argo/errors"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/util/file"
"github.com/argoproj/pkg/humanize"
"github.com/ghodss/yaml"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
)

const onExitSuffix = "onExit"
Expand All @@ -36,6 +37,10 @@ func NewGetCommand() *cobra.Command {
if err != nil {
log.Fatal(err)
}
err = CheckAndDecompress(wf)
if err != nil {
log.Fatal(err)
}
printWorkflow(wf, output)
},
}
Expand All @@ -45,6 +50,21 @@ func NewGetCommand() *cobra.Command {
return command
}

func CheckAndDecompress(wf *wfv1.Workflow) error {
if wf.Status.CompressedNodes != "" {
nodeContent, err := file.DecodeDecompressString(wf.Status.CompressedNodes)
if err != nil {
return errors.InternalWrapError(err)
}
err = json.Unmarshal([]byte(nodeContent), &wf.Status.Nodes)
if err != nil {
log.Fatal(err)
}
wf.Status.CompressedNodes = ""
}
return nil
}

func printWorkflow(wf *wfv1.Workflow, outFmt string) {
switch outFmt {
case "name":
Expand Down
4 changes: 4 additions & 0 deletions cmd/argo/commands/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ func countPendingRunningCompleted(wf *wfv1.Workflow) (int, int, int) {
pending := 0
running := 0
completed := 0
err := CheckAndDecompress(wf)
if err != nil {
log.Fatal(err)
}
for _, node := range wf.Status.Nodes {
tmpl := wf.GetTemplate(node.TemplateName)
if tmpl == nil || !tmpl.IsPodType() {
Expand Down
12 changes: 11 additions & 1 deletion cmd/argo/commands/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
)

type logEntry struct {
Expand Down Expand Up @@ -136,6 +136,11 @@ func (p *logPrinter) PrintPodLogs(podName string) error {
// Prints logs for workflow pod steps and return most recent log timestamp per pod name
func (p *logPrinter) printRecentWorkflowLogs(wf *v1alpha1.Workflow) map[string]*time.Time {
var podNodes []v1alpha1.NodeStatus
err := CheckAndDecompress(wf)
if err != nil {
log.Warn(err)
return nil
}
for _, node := range wf.Status.Nodes {
if node.Type == v1alpha1.NodeTypePod && node.Phase != v1alpha1.NodeError {
podNodes = append(podNodes, node)
Expand Down Expand Up @@ -193,6 +198,11 @@ func (p *logPrinter) printLiveWorkflowLogs(workflowName string, wfClient workflo
defer cancel()

processPods := func(wf *v1alpha1.Workflow) {
err := CheckAndDecompress(wf)
if err != nil {
log.Warn(err)
return
}
for id := range wf.Status.Nodes {
node := wf.Status.Nodes[id]
if node.Type == v1alpha1.NodeTypePod && node.Phase != v1alpha1.NodeError && streamedPods[node.ID] == false {
Expand Down
2 changes: 2 additions & 0 deletions cmd/argo/commands/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func watchWorkflow(name string) {
select {
case next := <-watchIf.ResultChan():
wf, _ = next.Object.(*wfv1.Workflow)
err := CheckAndDecompress(wf)
errors.CheckError(err)
case <-ticker.C:
}
if wf == nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/workflow/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,9 @@ type WorkflowStatus struct {
// A human readable message indicating details about why the workflow is in this condition.
Message string `json:"message,omitempty"`

// Compressed and base64 decoded Nodes map
CompressedNodes string `json:"compressedNodes,omitempty"`

// Nodes is a mapping between a node ID and the node's status.
Nodes map[string]NodeStatus `json:"nodes,omitempty"`

Expand Down
97 changes: 97 additions & 0 deletions util/file/fileutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package file

import (
"archive/tar"
"bytes"
"compress/gzip"
"encoding/base64"
"io"
"io/ioutil"
"os"
"strings"

log "github.com/sirupsen/logrus"
)

// IsFileOrDirExistInGZip return true if file or directory exists in GZip file
func IsFileOrDirExistInGZip(sourcePath string, gzipFilePath string) bool {

fi, err := os.Open(gzipFilePath)

if os.IsNotExist(err) {
return false
}
defer close(fi)

fz, err := gzip.NewReader(fi)
if err != nil {
return false
}
tr := tar.NewReader(fz)
for {
hdr, err := tr.Next()
if err == io.EOF {
break
}
if err != nil {

return false
}
if hdr.FileInfo().IsDir() && strings.Contains(strings.Trim(hdr.Name, "/"), strings.Trim(sourcePath, "/")) {
return true
}
if strings.Contains(sourcePath, hdr.Name) && hdr.Size > 0 {
return true
}
}
return false
}

//Close the file
func close(f io.Closer) {
err := f.Close()
if err != nil {
log.Warnf("Failed to close the file/writer/reader. %v", err)
}
}

// CompressEncodeString will return the compressed string with base64 encoded
func CompressEncodeString(content string) string {
return base64.StdEncoding.EncodeToString(CompressContent([]byte(content)))
}

// DecodeDecompressString will return decode and decompress the
func DecodeDecompressString(content string) (string, error) {

buf, err := base64.StdEncoding.DecodeString(content)
if err != nil {
return "", err
}
dBuf, err := DecompressContent(buf)
if err != nil {
return "", err
}
return string(dBuf), nil
}

// CompressContent will compress the byte array using zip writer
func CompressContent(content []byte) []byte {
var buf bytes.Buffer
zipWriter := gzip.NewWriter(&buf)

_, err := zipWriter.Write(content)
if err != nil {
log.Warnf("Error in compressing: %v", err)
}
close(zipWriter)
return buf.Bytes()
}

// DecompressContent will return the uncompressed content
func DecompressContent(content []byte) ([]byte, error) {

buf := bytes.NewReader(content)
gZipReader, _ := gzip.NewReader(buf)
defer close(gZipReader)
return ioutil.ReadAll(gZipReader)
}
21 changes: 21 additions & 0 deletions util/file/fileutil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package file

import (
"testing"

"github.com/stretchr/testify/assert"
)

// TestResubmitWorkflowWithOnExit ensures we do not carry over the onExit node even if successful
func TestCompressContentString(t *testing.T) {
content := "{\"pod-limits-rrdm8-591645159\":{\"id\":\"pod-limits-rrdm8-591645159\",\"name\":\"pod-limits-rrdm8[0]." +
"run-pod(0:0)\",\"displayName\":\"run-pod(0:0)\",\"type\":\"Pod\",\"templateName\":\"run-pod\",\"phase\":" +
"\"Succeeded\",\"boundaryID\":\"pod-limits-rrdm8\",\"startedAt\":\"2019-03-07T19:14:50Z\",\"finishedAt\":" +
"\"2019-03-07T19:14:55Z\"}}"

compString := CompressEncodeString(content)

resultString, _ := DecodeDecompressString(compString)

assert.Equal(t, content, resultString)
}
10 changes: 10 additions & 0 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,16 @@ func (wfc *WorkflowController) processNextItem() bool {
}

woc := newWorkflowOperationCtx(wf, wfc)
//Decompress the node if it is compressed

err = woc.checkAndDecompress()
if err != nil {
log.Warnf("Failed to decompress '%s' to workflow object: %v", key, err)
woc.markWorkflowFailed(fmt.Sprintf("invalid spec: %s", err.Error()))
woc.persistUpdates()
wfc.throttler.Remove(key)
return true
}
woc.operate()
if woc.wf.Status.Completed() {
wfc.throttler.Remove(key)
Expand Down
95 changes: 93 additions & 2 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/argoproj/argo/errors"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/argoproj/argo/util/file"
"github.com/argoproj/argo/util/retry"
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/util"
Expand Down Expand Up @@ -72,6 +73,9 @@ var (
// for before requeuing the workflow onto the workqueue.
const maxOperationTime time.Duration = 10 * time.Second

//maxWorkflowSize is the maximum size for workflow.yaml
const maxWorkflowSize int = 1024 * 1024

// newWorkflowOperationCtx creates and initializes a new wfOperationCtx object.
func newWorkflowOperationCtx(wf *wfv1.Workflow, wfc *WorkflowController) *wfOperationCtx {
// NEVER modify objects from the store. It's a read-only, local cache.
Expand Down Expand Up @@ -275,9 +279,17 @@ func (woc *wfOperationCtx) persistUpdates() {
return
}
wfClient := woc.controller.wfclientset.ArgoprojV1alpha1().Workflows(woc.wf.ObjectMeta.Namespace)
_, err := wfClient.Update(woc.wf)
err := woc.checkAndCompress()
if err != nil {
woc.log.Warnf("Error updating workflow: %v", err)
woc.log.Warnf("Error compressing workflow: %v", err)
}
if woc.wf.Status.CompressedNodes != "" {
woc.wf.Status.Nodes = nil
}

_, err = wfClient.Update(woc.wf)
if err != nil {
woc.log.Warnf("Error updating workflow: %v %s", err, apierr.ReasonForError(err))
if argokubeerr.IsRequestEntityTooLargeErr(err) {
woc.persistWorkflowSizeLimitErr(wfClient, err)
return
Expand Down Expand Up @@ -450,11 +462,24 @@ func (woc *wfOperationCtx) podReconciliation() error {
}

for _, pod := range podList.Items {
origNodeStatus := *woc.wf.Status.DeepCopy()
performAssessment(&pod)
err = woc.applyExecutionControl(&pod)
if err != nil {
woc.log.Warnf("Failed to apply execution control to pod %s", pod.Name)
}
err = woc.checkAndCompress()
if err != nil {
woc.wf.Status = origNodeStatus
nodeNameForPod := pod.Annotations[common.AnnotationKeyNodeName]
woc.log.Warnf("%v", err)
woc.markNodeErrorClearOuput(nodeNameForPod, err)
err = woc.checkAndCompress()
if err != nil {
woc.markWorkflowError(err, true)
}
}

}

// Now check for deleted pods. Iterate our nodes. If any one of our nodes does not show up in
Expand Down Expand Up @@ -1138,6 +1163,14 @@ func (woc *wfOperationCtx) markNodePhase(nodeName string, phase wfv1.NodePhase,
return node
}

// markNodeErrorClearOuput is a convenience method to mark a node with an error and clear the output
func (woc *wfOperationCtx) markNodeErrorClearOuput(nodeName string, err error) *wfv1.NodeStatus {
nodeStatus := woc.markNodeError(nodeName, err)
nodeStatus.Outputs = nil
woc.wf.Status.Nodes[nodeStatus.ID] = *nodeStatus
return nodeStatus
}

// markNodeError is a convenience method to mark a node with an error and set the message from the error
func (woc *wfOperationCtx) markNodeError(nodeName string, err error) *wfv1.NodeStatus {
return woc.markNodePhase(nodeName, wfv1.NodeError, err.Error())
Expand Down Expand Up @@ -1576,3 +1609,61 @@ func expandSequence(seq *wfv1.Sequence) ([]wfv1.Item, error) {
}
return items, nil
}

// getSize return the entire workflow json string size
func (woc *wfOperationCtx) getSize() int {
nodeContent, err := json.Marshal(woc.wf)
if err != nil {
return -1
}

compressNodeSize := len(woc.wf.Status.CompressedNodes)

if compressNodeSize > 0 {
nodeStatus, err := json.Marshal(woc.wf.Status.Nodes)
if err != nil {
return -1
}
return len(nodeContent) - len(nodeStatus)
}
return len(nodeContent)
}

// checkAndCompress will check the workflow size and compress node status if total workflow size is more than maxWorkflowSize.
// The compressed content will be assign to compressedNodes element and clear the nodestatus map.
func (woc *wfOperationCtx) checkAndCompress() error {

if woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize) {

nodeContent, err := json.Marshal(woc.wf.Status.Nodes)
if err != nil {
return errors.InternalWrapError(err)
}
buff := string(nodeContent)
woc.wf.Status.CompressedNodes = file.CompressEncodeString(buff)

}
if woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize {
return errors.InternalError(fmt.Sprintf("Workflow is longer than maximum allowed size. Size=%d", woc.getSize()))
}
return nil
}

// checkAndDecompress will decompress the compressednode and assign to workflow.status.nodes map.
func (woc *wfOperationCtx) checkAndDecompress() error {
if woc.wf.Status.CompressedNodes != "" {
nodeContent, err := file.DecodeDecompressString(woc.wf.Status.CompressedNodes)
if err != nil {
return errors.InternalWrapError(err)
}
var tempNodes map[string]wfv1.NodeStatus

err = json.Unmarshal([]byte(nodeContent), &tempNodes)
if err != nil {
woc.log.Warn(err)
return err
}
woc.wf.Status.Nodes = tempNodes
}
return nil
}
2 changes: 1 addition & 1 deletion workflow/executor/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"strings"
"time"

"github.com/argoproj/argo/workflow/util/file"
"github.com/argoproj/argo/util/file"

"github.com/argoproj/argo/util"

Expand Down
Loading

0 comments on commit 4bfbb20

Please sign in to comment.