Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Simplify E2E test tear-down #3749

Merged
merged 14 commits into from
Aug 17, 2020
10 changes: 9 additions & 1 deletion hack/test_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/xml"
"fmt"
"io/ioutil"
"os"
"strconv"
"strings"
)
Expand Down Expand Up @@ -40,6 +41,10 @@ func testReport() {
for _, c := range s.TestCases {
if c.Failure.Text != "" {
x := newFailureText(s.Name, c.Failure.Text)
if x.file == "" {
_, _ = fmt.Fprintf(os.Stderr, "could not parse "+c.Failure.Text)
continue
}
// https://docs.github.com/en/actions/reference/workflow-commands-for-github-actions#setting-an-error-message
// Replace ‘/n’ with ‘%0A’ for multiple strings output.
_, _ = fmt.Printf("::error file=%s,line=%v,col=0::%s\n", x.file, x.line, x.message)
Expand All @@ -61,12 +66,15 @@ func trimStdoutLines(text string) string {
return strings.Join(split[i:], "\n")
}
}
panic(text)
return text
}

func newFailureText(suite, text string) failureText {
text = trimStdoutLines(text)
parts := strings.SplitN(text, ":", 3)
if len(parts) != 3 {
return failureText{}
}
file := strings.TrimPrefix(suite, "github.com/argoproj/argo/") + "/" + parts[0]
line, _ := strconv.Atoi(parts[1])
message := strings.ReplaceAll(strings.TrimSpace(parts[2]), "\n", "%0A")
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/workflow/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
WorkflowTemplatePlural string = "workflowtemplates"
WorkflowTemplateShortName string = "wftmpl"
WorkflowTemplateFullName string = WorkflowTemplatePlural + "." + Group
WorkflowEventBindingPlural string = "workfloweventbindings"
CronWorkflowKind string = "CronWorkflow"
CronWorkflowSingular string = "cronworkflow"
CronWorkflowPlural string = "cronworkflows"
Expand Down
36 changes: 21 additions & 15 deletions test/e2e/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ func (s *CLISuite) BeforeTest(suiteName, testName string) {
}

func (s *CLISuite) testNeedsOffloading() {
skip := s.Persistence.IsEnabled() && os.Getenv("ARGO_SERVER") == ""
if skip {
s.T().Skip("test needs offloading, but not Argo Server available")
serverUnavailable := os.Getenv("ARGO_SERVER") == ""
if s.Persistence.IsEnabled() && serverUnavailable {
if !serverUnavailable {
s.T().Skip("test needs offloading, but the Argo Server is unavailable - if `testNeedsOffloading()` is the first line of your test test, you should move your test to `CliWithServerSuite`?")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

additional help for people writing tests

}
s.T().Skip("test needs offloading, but offloading not enabled")
}
}

Expand Down Expand Up @@ -278,7 +281,7 @@ func (s *CLISuite) TestRoot() {
})
}

func (s *CLISuite) TestWorkflowSuspendResume() {
func (s *CLIWithServerSuite) TestWorkflowSuspendResume() {
s.testNeedsOffloading()
s.Given().
Workflow("@testdata/sleep-3s.yaml").
Expand All @@ -302,7 +305,7 @@ func (s *CLISuite) TestWorkflowSuspendResume() {
})
}

func (s *CLISuite) TestNodeSuspendResume() {
func (s *CLIWithServerSuite) TestNodeSuspendResume() {
s.testNeedsOffloading()
s.Given().
Workflow("@testdata/node-suspend.yaml").
Expand Down Expand Up @@ -582,7 +585,7 @@ func (s *CLISuite) TestWorkflowLint() {
})
}

func (s *CLISuite) TestWorkflowRetry() {
func (s *CLIWithServerSuite) TestWorkflowRetry() {
s.testNeedsOffloading()
var retryTime corev1.Time

Expand Down Expand Up @@ -635,7 +638,7 @@ func (s *CLISuite) TestWorkflowTerminate() {
})
}

func (s *CLISuite) TestWorkflowWait() {
func (s *CLIWithServerSuite) TestWorkflowWait() {
s.testNeedsOffloading()
s.Given().
Workflow("@smoke/basic.yaml").
Expand All @@ -649,7 +652,7 @@ func (s *CLISuite) TestWorkflowWait() {
})
}

func (s *CLISuite) TestWorkflowWatch() {
func (s *CLIWithServerSuite) TestWorkflowWatch() {
s.testNeedsOffloading()
s.Given().
Workflow("@smoke/basic.yaml").
Expand Down Expand Up @@ -948,7 +951,7 @@ func (s *CLISuite) TestWorkflowTemplateRefSubmit() {
})
}

func (s *CLISuite) TestWorkflowLevelSemaphore() {
func (s *CLIWithServerSuite) TestWorkflowLevelSemaphore() {
semaphoreData := map[string]string{
"workflow": "1",
}
Expand All @@ -970,14 +973,14 @@ func (s *CLISuite) TestWorkflowLevelSemaphore() {
return wf.Status.Phase == ""
}, "Workflow is waiting for lock", 20*time.Second).
WaitForWorkflow(30 * time.Second).
DeleteConfigMap().
DeleteConfigMap("my-config").
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
})
}

func (s *CLISuite) TestTemplateLevelSemaphore() {
func (s *CLIWithServerSuite) TestTemplateLevelSemaphore() {
semaphoreData := map[string]string{
"template": "1",
}
Expand All @@ -994,8 +997,7 @@ func (s *CLISuite) TestTemplateLevelSemaphore() {
RunCli([]string{"get", "semaphore-tmpl-level"}, func(t *testing.T, output string, err error) {
assert.Contains(t, output, "Waiting for")
}).
WaitForWorkflow(30 * time.Second).
DeleteConfigMap()
WaitForWorkflow(30 * time.Second)
}

func (s *CLISuite) TestRetryOmit() {
Expand All @@ -1004,7 +1006,11 @@ func (s *CLISuite) TestRetryOmit() {
Workflow("@testdata/retry-omit.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(20*time.Second).
WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool {
return wf.Status.Nodes.Any(func(node wfv1.NodeStatus) bool {
return node.Phase == wfv1.NodeOmitted
})
}, "any node omitted", 20*time.Second).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
node := status.Nodes.FindByDisplayName("should-not-execute")
Expand All @@ -1019,7 +1025,7 @@ func (s *CLISuite) TestRetryOmit() {
WaitForWorkflow(20 * time.Second)
}

func (s *CLISuite) TestResourceTemplateStopAndTerminate() {
func (s *CLIWithServerSuite) TestResourceTemplateStopAndTerminate() {
s.testNeedsOffloading()
s.Run("ResourceTemplateStop", func() {
s.Given().
Expand Down
171 changes: 39 additions & 132 deletions test/e2e/fixtures/e2e_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"strings"
"time"

"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"

// load the azure plugin (required to authenticate against AKS clusters).
_ "k8s.io/client-go/plugin/pkg/client/auth/azure"
// load the gcp plugin (required to authenticate against GKE clusters).
Expand All @@ -17,13 +21,12 @@ import (
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/suite"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"sigs.k8s.io/yaml"

"github.com/argoproj/argo/pkg/apis/workflow"
"github.com/argoproj/argo/pkg/client/clientset/versioned"
"github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/argoproj/argo/util/kubeconfig"
Expand All @@ -47,9 +50,6 @@ type E2ESuite struct {
cronClient v1alpha1.CronWorkflowInterface
KubeClient kubernetes.Interface
hydrator hydrator.Interface
// Guard-rail.
// The number of archived workflows. If is changes between two tests, we have a problem.
numWorkflows int
}

func (s *E2ESuite) SetupSuite() {
Expand Down Expand Up @@ -82,153 +82,60 @@ func (s *E2ESuite) BeforeTest(suiteName, testName string) {
s.CheckError(err)
log.Infof("logging debug diagnostics to file://%s", name)
s.DeleteResources(Label)
numWorkflows := s.countWorkflows()
if s.numWorkflows > 0 && s.numWorkflows != numWorkflows {
s.T().Fatal("there should almost never be a change to the number of workflows between tests, this means the last test (not the current test) is bad and needs fixing - note this guard-rail does not work across test suites")
}
s.numWorkflows = numWorkflows
}

func (s *E2ESuite) countWorkflows() int {
workflows, err := s.wfClient.List(metav1.ListOptions{})
s.CheckError(err)
return len(workflows.Items)
}
var foreground = metav1.DeletePropagationForeground
var foregroundDelete = &metav1.DeleteOptions{PropagationPolicy: &foreground}

func (s *E2ESuite) DeleteResources(label string) {

// delete all cron workflows
cronList, err := s.cronClient.List(metav1.ListOptions{LabelSelector: label})
s.CheckError(err)
for _, cronWf := range cronList.Items {
log.WithFields(log.Fields{"cronWorkflow": cronWf.Name}).Debug("Deleting cron workflow")
err = s.cronClient.Delete(cronWf.Name, nil)
// delete archived workflows from the archive
if s.Persistence.IsEnabled() {
archive := s.Persistence.workflowArchive
parse, err := labels.ParseToRequirements(label)
s.CheckError(err)
}

// It is possible for a pod to become orphaned. This means that it's parent workflow
// (as set in the "workflows.argoproj.io/workflow" label) does not exist.
// We need to delete orphans as well as test pods.
// Get a list of all workflows.
// if absent from this this it has been delete - so any associated pods are orphaned
// if in the list it is either a test wf or not
isTestWf := make(map[string]bool)
{
list, err := s.wfClient.List(metav1.ListOptions{LabelSelector: label})
workflows, err := archive.ListWorkflows(Namespace, time.Time{}, time.Time{}, parse, 0, 0)
s.CheckError(err)
for _, wf := range list.Items {
isTestWf[wf.Name] = false
if s.Persistence.IsEnabled() && wf.Status.IsOffloadNodeStatus() {
err := s.Persistence.offloadNodeStatusRepo.Delete(string(wf.UID), wf.Status.OffloadNodeStatusVersion)
s.CheckError(err)
}
for _, w := range workflows {
err := archive.DeleteWorkflow(string(w.UID))
s.CheckError(err)
}
}

// delete from the archive
{
if s.Persistence.IsEnabled() {
archive := s.Persistence.workflowArchive
parse, err := labels.ParseToRequirements(Label)
s.CheckError(err)
workflows, err := archive.ListWorkflows(Namespace, time.Time{}, time.Time{}, parse, 0, 0)
s.CheckError(err)
for _, workflow := range workflows {
err := archive.DeleteWorkflow(string(workflow.UID))
s.CheckError(err)
}
}
hasTestLabel := metav1.ListOptions{LabelSelector: label}
resources := []schema.GroupVersionResource{
{Group: workflow.Group, Version: workflow.Version, Resource: workflow.CronWorkflowPlural},
{Group: workflow.Group, Version: workflow.Version, Resource: workflow.WorkflowEventBindingPlural},
{Group: workflow.Group, Version: workflow.Version, Resource: workflow.WorkflowPlural},
{Group: workflow.Group, Version: workflow.Version, Resource: workflow.WorkflowTemplatePlural},
{Group: workflow.Group, Version: workflow.Version, Resource: workflow.ClusterWorkflowTemplatePlural},
{Version: "v1", Resource: "resourcequotas"},
{Version: "v1", Resource: "configmaps"},
}

// delete all workflows
{
list, err := s.wfClient.List(metav1.ListOptions{LabelSelector: Label})
for _, r := range resources {
err := s.dynamicFor(r).DeleteCollection(foregroundDelete, hasTestLabel)
s.CheckError(err)
for _, wf := range list.Items {
logCtx := log.WithFields(log.Fields{"workflow": wf.Name})
logCtx.Debug("Deleting workflow")
err = s.wfClient.Delete(wf.Name, &metav1.DeleteOptions{})
if errors.IsNotFound(err) {
continue
}
s.CheckError(err)
isTestWf[wf.Name] = true
for {
_, err := s.wfClient.Get(wf.Name, metav1.GetOptions{})
if errors.IsNotFound(err) {
break
}
logCtx.Debug("Waiting for workflow to be deleted")
time.Sleep(1 * time.Second)
}
}
}

// delete workflow pods
{
podInterface := s.KubeClient.CoreV1().Pods(Namespace)
// it seems "argo delete" can leave pods behind
pods, err := podInterface.List(metav1.ListOptions{LabelSelector: "workflows.argoproj.io/workflow"})
s.CheckError(err)
for _, pod := range pods.Items {
workflow := pod.GetLabels()["workflows.argoproj.io/workflow"]
testPod, owned := isTestWf[workflow]
if testPod || !owned {
logCtx := log.WithFields(log.Fields{"workflow": workflow, "podName": pod.Name, "testPod": testPod, "owned": owned})
logCtx.Debug("Deleting pod")
err := podInterface.Delete(pod.Name, nil)
if !errors.IsNotFound(err) {
s.CheckError(err)
}
for {
_, err := podInterface.Get(pod.Name, metav1.GetOptions{})
if errors.IsNotFound(err) {
break
}
logCtx.Debug("Waiting for pod to be deleted")
time.Sleep(1 * time.Second)
}
for _, r := range resources {
for {
list, err := s.dynamicFor(r).List(hasTestLabel)
s.CheckError(err)
if len(list.Items) == 0 {
break
}
time.Sleep(time.Second)
}
}
}

// delete all workflow events
events, err := s.wfebClient.List(metav1.ListOptions{LabelSelector: label})
s.CheckError(err)

for _, item := range events.Items {
log.WithField("template", item.Name).Debug("Deleting workflow event")
err = s.wfebClient.Delete(item.Name, nil)
s.CheckError(err)
}

// delete all workflow templates
wfTmpl, err := s.wfTemplateClient.List(metav1.ListOptions{LabelSelector: label})
s.CheckError(err)

for _, wfTmpl := range wfTmpl.Items {
log.WithField("template", wfTmpl.Name).Debug("Deleting workflow template")
err = s.wfTemplateClient.Delete(wfTmpl.Name, nil)
s.CheckError(err)
}

// delete all cluster workflow templates
cwfTmpl, err := s.cwfTemplateClient.List(metav1.ListOptions{LabelSelector: label})
s.CheckError(err)
for _, cwfTmpl := range cwfTmpl.Items {
log.WithField("template", cwfTmpl.Name).Debug("Deleting cluster workflow template")
err = s.cwfTemplateClient.Delete(cwfTmpl.Name, nil)
s.CheckError(err)
}

// Delete all resourcequotas
rqList, err := s.KubeClient.CoreV1().ResourceQuotas(Namespace).List(metav1.ListOptions{LabelSelector: label})
s.CheckError(err)
for _, rq := range rqList.Items {
log.WithField("resourcequota", rq.Name).Debug("Deleting resource quota")
err = s.KubeClient.CoreV1().ResourceQuotas(Namespace).Delete(rq.Name, nil)
s.CheckError(err)
func (s *E2ESuite) dynamicFor(r schema.GroupVersionResource) dynamic.ResourceInterface {
resourceInterface := dynamic.NewForConfigOrDie(s.RestConfig).Resource(r)
if r.Resource == workflow.ClusterWorkflowTemplatePlural {
return resourceInterface
}
return resourceInterface.Namespace(Namespace)
}

func (s *E2ESuite) CheckError(err error) {
Expand Down
Loading