Skip to content

Commit

Permalink
Fix run_e2e_workflow (#33)
Browse files Browse the repository at this point in the history
run_e2e_workflow.py is exiting with an exception causing all prow jobs to be marked as a failure

Add an E2E test to kubeflow/testing. This provides a way to test run_e2e_workflow.py as part of the presubmits.

Fix #30
  • Loading branch information
jlewi authored Feb 15, 2018
1 parent 8ac5729 commit d8f3ed5
Show file tree
Hide file tree
Showing 23 changed files with 76,903 additions and 69 deletions.
404 changes: 404 additions & 0 deletions .pylintrc

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions prow_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# This file configures the workflows to trigger in our Prow jobs.
# see kubeflow/testing/py/run_e2e_workflow.py
workflows:
- app_dir: kubeflow/testing/workflows
component: workflows
name: unittests
8 changes: 4 additions & 4 deletions py/kubeflow/testing/argo_client.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
"""Some utility functions for working with TfJobs."""

import datetime
import json
import logging
import time

from kubernetes import client as k8s_client
from kubernetes.client.rest import ApiException

from kubeflow.testing import util

Expand Down Expand Up @@ -65,11 +63,13 @@ def wait_for_workflows(client, namespace, names,
return all_results
if datetime.datetime.now() + polling_interval > end_time:
raise util.TimeoutError(
"Timeout waiting for workflow {0} in namespace {1} to finish.".format(
name, namespace))
"Timeout waiting for workflows {0} in namespace {1} to finish.".format(
",".join(names), namespace))

time.sleep(polling_interval.seconds)

return []

def wait_for_workflow(client, namespace, name,
timeout=datetime.timedelta(minutes=30),
polling_interval=datetime.timedelta(seconds=30),
Expand Down
5 changes: 2 additions & 3 deletions py/kubeflow/testing/argo_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@

import unittest

from testing import argo_client
from kubeflow.testing import argo_client
from kubernetes import client as k8s_client
import mock
import os
import yaml
from py import util

class ArgoClientTest(unittest.TestCase):
def setUp(self):
Expand All @@ -24,4 +23,4 @@ def test_wait_for_workflow(self):
self.assertIsNotNone(result)

if __name__ == "__main__":
unittest.main()
unittest.main()
27 changes: 2 additions & 25 deletions py/kubeflow/testing/prow_artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,24 +113,12 @@ def get_gcs_dir(bucket):

def copy_artifacts(args):
"""Sync artifacts to GCS."""
job_name = os.getenv("JOB_NAME")

# GCS layout is defined here:
# https://github.com/kubernetes/test-infra/tree/master/gubernator#job-artifact-gcs-layout
pull_number = os.getenv("PULL_NUMBER")

repo_owner = os.getenv("REPO_OWNER")
repo_name = os.getenv("REPO_NAME")

output = get_gcs_dir(args.bucket)

if os.getenv("GOOGLE_APPLICATION_CREDENTIALS"):
logging.info("GOOGLE_APPLICATION_CREDENTIALS is set; configuring gcloud "
"to use service account.")
# Since a service account is set tell gcloud to use it.
util.run(["gcloud", "auth", "activate-service-account", "--key-file=" +
os.getenv("GOOGLE_APPLICATION_CREDENTIALS")])

util.maybe_activate_service_account()
util.run(["gsutil", "-m", "rsync", "-r", args.artifacts_dir, output])

def create_pr_symlink(args):
Expand All @@ -145,24 +133,13 @@ def create_pr_symlink(args):
pull_number = os.getenv("PULL_NUMBER")
if not pull_number:
# Symlinks are only created for pull requests.
return ""
return

path = "pr-logs/directory/{job}/{build}.txt".format(
job=os.getenv("JOB_NAME"), build=os.getenv("BUILD_NUMBER"))

pull_number = os.getenv("PULL_NUMBER")

repo_owner = os.getenv("REPO_OWNER")
repo_name = os.getenv("REPO_NAME")


build_dir = ("gs://{bucket}/pr-logs/pull/{owner}_{repo}/"
"{pull_number}/{job}/{build}").format(
bucket=args.bucket,
owner=repo_owner, repo=repo_name,
pull_number=pull_number,
job=os.getenv("JOB_NAME"),
build=os.getenv("BUILD_NUMBER"))
source = util.to_gcs_uri(args.bucket, path)
target = get_gcs_dir(args.bucket)
logging.info("Creating symlink %s pointing to %s", source, target)
Expand Down
17 changes: 8 additions & 9 deletions py/kubeflow/testing/prow_artifacts_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@
import os
import unittest
import mock
from testing import prow_artifacts
import tempfile

from kubeflow.testing import prow_artifacts
from google.cloud import storage # pylint: disable=no-name-in-module

class TestProw(unittest.TestCase):
@mock.patch("testing.prow_artifacts.time.time")
@mock.patch("kubeflow.testing.prow_artifacts.time.time")
def testCreateStartedPresubmit(self, mock_time): # pylint: disable=no-self-use
"""Test create started for presubmit job."""
mock_time.return_value = 1000
Expand All @@ -27,7 +25,7 @@ def testCreateStartedPresubmit(self, mock_time): # pylint: disable=no-self-use

self.assertEquals(expected, json.loads(actual))

@mock.patch("testing.prow_artifacts.time.time")
@mock.patch("kubeflow.testing.prow_artifacts.time.time")
def testCreateFinished(self, mock_time): # pylint: disable=no-self-use
"""Test create finished job."""
mock_time.return_value = 1000
Expand All @@ -44,18 +42,18 @@ def testCreateFinished(self, mock_time): # pylint: disable=no-self-use

self.assertEquals(expected, json.loads(actual))

@mock.patch("testing.prow_artifacts.util.run")
@mock.patch("kubeflow.testing.prow_artifacts.util.run")
def testCopyArtifactsPresubmit(self, mock_run): # pylint: disable=no-self-use
"""Test copy artifacts to GCS."""

os.environ = {}
os.environ["REPO_OWNER"] = "fake_org"
os.environ["REPO_NAME"] = "fake_name"
os.environ["PULL_NUMBER"] = "72"
os.environ["BUILD_NUMBER"] = "100"
os.environ["PULL_PULL_SHA"] = "123abc"
os.environ["JOB_NAME"] = "kubeflow-presubmit"

temp_dir = tempfile.mkdtemp(prefix="tmpTestProwTestCreateFinished.")
args = ["--artifacts_dir=/tmp/some/dir", "copy_artifacts",
"--bucket=some_bucket"]
prow_artifacts.main(args)
Expand All @@ -66,7 +64,7 @@ def testCopyArtifactsPresubmit(self, mock_run): # pylint: disable=no-self-use
"/100"],
)

def testCreateSymlink(self):
def testCreateSymlink(self): # pylint: disable=no-self-use
gcs_client = mock.MagicMock(spec=storage.Client)
mock_bucket = mock.MagicMock(spec=storage.Bucket)
gcs_client.get_bucket.return_value = mock_bucket
Expand All @@ -75,7 +73,8 @@ def testCreateSymlink(self):
# We can't add the decorator the instance method because that would
# interfere with creating gcs_client since storage.Client would then
# point to the mock and not the actual class.
with mock.patch("testing.prow_artifacts.storage.Client") as mock_client:
with mock.patch("kubeflow.testing.prow_artifacts.storage"
".Client") as mock_client:
mock_client.return_value = gcs_client

os.environ["REPO_OWNER"] = "fake_org"
Expand Down
195 changes: 195 additions & 0 deletions py/kubeflow/testing/py_checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
"""Run checks on python source files.
This binary invokes checks (e.g. lint and unittests) on our Python source files.
"""
import argparse
import fnmatch
import logging
import os
import subprocess
import time

from google.cloud import storage # pylint: disable=no-name-in-module

from kubeflow.testing import util
from kubeflow.testing import test_util

def should_exclude(root, full_dir_excludes):
for e in full_dir_excludes:
if root.startswith(e):
return True
return False

def run_lint(args):
start_time = time.time()
# Print out the pylint version because different versions can produce
# different results.
util.run(["pylint", "--version"])

# kubeflow_testing is imported as a submodule so we should exclude it
# TODO(jlewi): We should make this an argument.
dir_excludes = ["dashboard/frontend/node_modules", "kubeflow_testing",
"vendor",]
full_dir_excludes = [os.path.join(os.path.abspath(args.src_dir), f) for f in
dir_excludes]

# TODO(jlewi): Use pathlib once we switch to python3.
includes = ["*.py"]
failed_files = []
rc_file = os.path.join(args.src_dir, ".pylintrc")
for root, dirs, files in os.walk(os.path.abspath(args.src_dir),
topdown=True):
# excludes can be done with fnmatch.filter and complementary set,
# but it's more annoying to read.
if should_exclude(root, full_dir_excludes):
continue

dirs[:] = [d for d in dirs]
for pat in includes:
for f in fnmatch.filter(files, pat):
full_path = os.path.join(root, f)
try:
util.run(["pylint", "--rcfile=" + rc_file, full_path],
cwd=args.src_dir)
except subprocess.CalledProcessError:
failed_files.append(full_path[len(args.src_dir):])

if failed_files:
failed_files.sort()
logging.error("%s files had lint errors:\n%s", len(failed_files),
"\n".join(failed_files))
else:
logging.info("No lint issues.")


if not args.junit_path:
logging.info("No --junit_path.")
return

test_case = test_util.TestCase()
test_case.class_name = "pylint"
test_case.name = "pylint"
test_case.time = time.time() - start_time
if failed_files:
test_case.failure = "Files with lint issues: {0}".format(", ".join(failed_files))

gcs_client = None
if args.junit_path.startswith("gs://"):
gcs_client = storage.Client(project=args.project)

test_util.create_junit_xml_file([test_case], args.junit_path, gcs_client)


def run_tests(args):
# Print out the pylint version because different versions can produce
# different results.
util.run(["pylint", "--version"])

# kubeflow_testing is imported as a submodule so we should exclude it
# TODO(jlewi): Perhaps we should get a list of submodules and exclude
# them automatically?
dir_excludes = ["vendor"]
includes = ["*_test.py"]
test_cases = []

num_failed = 0
for root, dirs, files in os.walk(args.src_dir, topdown=True):
# excludes can be done with fnmatch.filter and complementary set,
# but it's more annoying to read.
dirs[:] = [d for d in dirs if d not in dir_excludes]
for pat in includes:
for f in fnmatch.filter(files, pat):
full_path = os.path.join(root, f)

test_case = test_util.TestCase()
test_case.class_name = "pytest"
test_case.name = full_path[len(args.src_dir):]
start_time = time.time()
test_cases.append(test_case)
try:
util.run(["python", full_path], cwd=args.src_dir)
except subprocess.CalledProcessError:
test_case.failure = "{0} failed.".format(test_case.name)
num_failed += 1
finally:
test_case.time = time.time() - start_time

if num_failed:
logging.error("%s tests failed.", num_failed)
else:
logging.info("No lint issues.")


if not args.junit_path:
logging.info("No --junit_path.")
return

gcs_client = None
if args.junit_path.startswith("gs://"):
gcs_client = storage.Client(project=args.project)

test_util.create_junit_xml_file(test_cases, args.junit_path, gcs_client)


def add_common_args(parser):
"""Add a set of common parser arguments."""

parser.add_argument(
"--src_dir",
default=os.getcwd(),
type=str,
help=("The root directory of the source tree. Defaults to current "
"directory."))

parser.add_argument(
"--project",
default=None,
type=str,
help=("(Optional). The project to use with the GCS client."))

parser.add_argument(
"--junit_path",
default=None,
type=str,
help=("(Optional). The GCS location to write the junit file with the "
"results."))

def main(): # pylint: disable=too-many-locals
logging.getLogger().setLevel(logging.INFO) # pylint: disable=too-many-locals
logging.basicConfig(level=logging.INFO,
format=('%(levelname)s|%(asctime)s'
'|%(pathname)s|%(lineno)d| %(message)s'),
datefmt='%Y-%m-%dT%H:%M:%S',
)
# create the top-level parser
parser = argparse.ArgumentParser(
description="Run python code checks.")
subparsers = parser.add_subparsers()

#############################################################################
# lint
#
# Create the parser for running lint.

parser_lint = subparsers.add_parser("lint", help="Run lint.")

add_common_args(parser_lint)
parser_lint.set_defaults(func=run_lint)

#############################################################################
# tests
#
# Create the parser for running the tests.

parser_test = subparsers.add_parser("test", help="Run tests.")

add_common_args(parser_test)
parser_test.set_defaults(func=run_tests)

# parse the args and call whatever function was selected
args = parser.parse_args()
args.func(args)
logging.info("Finished")

if __name__ == "__main__":
main()
Loading

0 comments on commit d8f3ed5

Please sign in to comment.