Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tekton pipeline client should use generate name #683

Merged
merged 3 commits into from
Jun 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion py/kubeflow/testing/prow_artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
import json
import os
import six
import time
from google.cloud import storage # pylint: disable=no-name-in-module
from kubeflow.testing import test_util
Expand Down Expand Up @@ -53,7 +54,12 @@ def create_started(ui_urls):
if PULL_REFS:
started["pull"] = PULL_REFS

for n, v in ui_urls.iteritems():
if six.PY3:
items = ui_urls.items()
else:
items = ui_urls.iteritems()

for n, v in items:
started["metadata"][n + "-ui"] = v
return json.dumps(started)

Expand Down
18 changes: 9 additions & 9 deletions py/kubeflow/testing/run_e2e_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def py_func_import(py_func, kwargs):
met = getattr(mod, create_function)
return met(**kwargs)

class WorkflowComponent(object): # pylint: disable=too-many-instance-attributes
class WorkflowComponent(object): # pylint: disable=too-many-instance-attributes,disable=useless-object-inheritance
"""Datastructure to represent a component to submit a workflow."""
def __init__(self, root_dir, data):
self.name = data.get("name")
Expand Down Expand Up @@ -249,7 +249,11 @@ def run(args, file_handler): # pylint: disable=too-many-statements,too-many-bran
# We truncate sha numbers to prevent the workflow name from being too large.
# Workflow name should not be more than 63 characters because its used
# as a label on the pods.
workflow_name = os.getenv("JOB_NAME") + "-" + w.name
#
# TODO(jlewi):This should no longer be used with Tekton. For tekton
# name should be based on generateName; we should use labels to
# provide additional metadata info like PR number.
workflow_name = os.getenv("JOB_NAME", "") + "-" + w.name

# Skip this workflow if it is scoped to a different job type.
if w.job_types and not job_type in w.job_types:
Expand Down Expand Up @@ -367,8 +371,8 @@ def run(args, file_handler): # pylint: disable=too-many-statements,too-many-bran
pull_revision = os.getenv("PULL_BASE_SHA")
else:
pull_revision = "master"
logging.info("Adding Tekton pipeline %s", w.name)
pipeline_runner = tekton_client.PipelineRunner(
workflow_name,
w.tekton_params,
w.kwargs.get(TEST_TARGET_ARG_NAME, w.name),
w.tekton_run,
Expand All @@ -377,13 +381,9 @@ def run(args, file_handler): # pylint: disable=too-many-statements,too-many-bran
repo_name,
pull_revision)
if w.tekton_teardown:
teardown_w_name = "{name}-teardown-{salt}".format(
name=w.name,
salt=uuid.uuid4().hex[0:9])
logging.info("Appending teardown process %s for %s", teardown_w_name,
workflow_name)
logging.info("Appending teardown process for Tekton pipeline %s",
w.name)
pipeline_runner.append_teardown(tekton_client.PipelineRunner(
teardown_w_name,
w.tekton_teardown_params,
w.kwargs.get(TEST_TARGET_ARG_NAME, w.name),
w.tekton_teardown,
Expand Down
145 changes: 79 additions & 66 deletions py/kubeflow/testing/tekton_client.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,27 @@

import argparse
# TODO(jlewi): I think this code has to support a mix of python2 and python3
# because run_e2e_workflow.py might still be using pyhton2.
import logging
import json
import six
import datetime
import fire
import os
from multiprocessing import Pool
import re
import time
import uuid
import traceback
from xml.etree import ElementTree as ET
import yaml

if six.PY3:
import http
else:
import httplib

from multiprocessing import Pool
from xml.etree import ElementTree as ET

from kubernetes import client as k8s_client # pylint: disable=wrong-import-position
from kubernetes.client import rest # pylint: disable=wrong-import-position
from retrying import retry # pylint: disable=wrong-import-position

from kubeflow.testing import prow_artifacts
from kubeflow.testing import prow_artifacts # pylint: disable=wrong-import-position
from kubeflow.testing import util # pylint: disable=wrong-import-position

GROUP = "tekton.dev"
Expand Down Expand Up @@ -84,7 +83,8 @@ def handle_retriable_exception(exception):
util.load_kube_config()
return True

logging.info("Retry on exception: %s", exception)
logging.info("Retry on exception: %s; stack trace:\n%s", exception,
traceback.format_exc())
return not isinstance(exception, util.TimeoutError)


Expand Down Expand Up @@ -117,11 +117,16 @@ def get_namespaced_custom_object_with_retries(namespace, name):
log_status(result)
condition = result["status"]["conditions"][0]["reason"]
if not condition in ["Failed", "Succeeded"]:
raise ValueError("Waiting for %s/%s to finish", namespace, name)
raise ValueError("Waiting for {0}/{1} to finish".format(namespace, name))

return result

def load_tekton_run(workflow_name, params, test_target_name, tekton_run,
# TODO(jlewi): Why are we setting test_target_name and unit_path?
# I think that was some attempt to ensure uniqueness of the test class names
# and GCS path when running multiple workflows from the same prow test.
# Can this be done in the Pipeline and Task resources by appending
# unique subdirectories to the paths?
def load_tekton_run(params, test_target_name, tekton_run,
bucket, repo_owner, repo_under_test, pull_revision):
"""Load Tekton configs and override information from Prow.
Args:
Expand All @@ -135,10 +140,13 @@ def load_tekton_run(workflow_name, params, test_target_name, tekton_run,
if config.get("kind", "") != "PipelineRun":
raise ValueError("Invalid config (not PipelineRun): " + config)

name = config["metadata"].get("name", "unknown-pipelinerun")
logging.info("Reading Tekton PipelineRun config: %s", name)
config["metadata"]["name"] = workflow_name
if not "generateName" in config["metadata"]:
raise ValueError("TektonPipeline is missing generateName")

logging.info("Reading Tekton PipelineRun config: %s",
config["metadata"]["generateName"])

workflow_name = config["metadata"]["generateName"]
artifacts_gcs = prow_artifacts.get_gcs_dir(bucket)
junit_path = "artifacts/junit_{run_name}".format(run_name=workflow_name)

Expand Down Expand Up @@ -183,17 +191,23 @@ def load_tekton_run(workflow_name, params, test_target_name, tekton_run,
resource["resourceSpec"]["params"] = replacing_param
break
if not foundRepo:
raise ValueError("couldn't find repo under test in resources: %s", repo_url)
raise ValueError(("The TektonPipelineRun is missing a pipeline git "
"resource that matches the repo being tested by "
"prow. The pipeline parameters must include "
"a git resource whose URL is {0}".format(repo_url)))

return config

class PipelineRunner(object):
class PipelineRunner(object): # pylint: disable=useless-object-inheritance
"""Runs and wait for the Tekton pipeline to finish.

The name for the pipeline will be generated using generateName
"""
def __init__(self, name, params, test_target_name, config_path, bucket,
repo_owner, repo_under_test, pull_revision):
self.name = name
self.config = load_tekton_run(name, params, test_target_name, config_path,
def __init__(self, params, test_target_name, config_path, bucket, # pylint: disable=too-many-arguments
repo_owner, repo_under_test, pull_revision): # pylint: disable=too-many-arguments
# Name will be dynamically generated by generateName
self.name = None
self.config = load_tekton_run(params, test_target_name, config_path,
bucket, repo_owner, repo_under_test,
pull_revision)
self.namespace = self.config["metadata"].get("namespace",
Expand All @@ -207,13 +221,34 @@ def run(self):
crd_api = k8s_client.CustomObjectsApi(client)

group, version = self.config["apiVersion"].split("/")
result = crd_api.create_namespaced_custom_object(
group=group,
version=version,
namespace=self.namespace,
plural=PLURAL,
body=self.config)
logging.info("Created workflow:\n%s", yaml.safe_dump(result))
try:
result = crd_api.create_namespaced_custom_object(
group=group,
version=version,
namespace=self.namespace,
plural=PLURAL,
body=self.config)
logging.info("Created workflow:\n%s", yaml.safe_dump(result))
except rest.ApiException as e:
logging.error("Could not create workflow: %s")
if e.body:
body = None
if isinstance(e.body, six.string_types):
body = {}
try:
logging.info("Parsing ApiException body: %s", e.body)
body = json.loads(e.body)
except json.JSONDecodeError as json_e:
logging.error("Error parsing body: %s", json_e)
else:
body = e.body
logging.error("Could not create workflow; %s", body)
else:
logging.error("Could not create workflow: %s", e)
raise

self.name = result.get("metadata", {}).get("name")
logging.info("Submitted Tekton Pipeline %s.%s", self.namespace, self.name)
return result

def append_teardown(self, runner):
Expand Down Expand Up @@ -242,15 +277,15 @@ def wait(self):
def wait_(runner):
return runner.wait()

class ClusterInfo(object):
class ClusterInfo(object): # pylint: disable=useless-object-inheritance
"""Simple data carrier to provide access to the cluster running test.
"""
def __init__(self, project, zone, cluster_name):
self.project = project
self.zone = zone
self.cluster_name = cluster_name

class TektonRunner(object):
class TektonRunner(object): # pylint: disable=useless-object-inheritance
"""Runs Tekton pipelines and wait for all the workflows to finish.
"""
def __init__(self):
Expand Down Expand Up @@ -284,8 +319,9 @@ def run(self, tekton_cluster_info, current_cluster_info):
if w.teardown_runner:
urls[w.teardown_runner.name] = w.teardown_runner.ui_url
logging.info("URL for workflow: %s", w.ui_url)
except Exception as e:
logging.error("Error when starting Tekton workflow: %s", e)
except Exception as e: # pylint: disable=broad-except
logging.error("Error when starting Tekton workflow: %s;\nstacktrace:\n%s",
e, traceback.format_exc())
finally:
# Restore kubectl
util.configure_kubectl(current_cluster_info.project,
Expand Down Expand Up @@ -314,7 +350,7 @@ def junit_parse_and_upload(artifacts_dir, output_gcs):
output_gcs: GCS location to upload artifacts to.
"""
logging.info("Walking through directory: %s", artifacts_dir)
junit_pattern = re.compile("junit.*\.xml")
junit_pattern = re.compile(r"junit.*\.xml")
failed_num = 0
found_xml = False
for root, _, files in os.walk(artifacts_dir):
Expand Down Expand Up @@ -349,39 +385,16 @@ def junit_parse_and_upload(artifacts_dir, output_gcs):
raise ValueError(
"This task is failed with {0} errors/failures.".format(failed_num))

def main(unparsed_args=None): # pylint: disable=too-many-locals
logging.getLogger().setLevel(logging.INFO) # pylint: disable=too-many-locals
# create the top-level parser
parser = argparse.ArgumentParser(description="Tekton helper.")
subparsers = parser.add_subparsers()

#############################################################################
# Copy artifacts and parse the status.
parser_copy = subparsers.add_parser(
"junit_parse_and_upload", help="Parse and upload the artifacts.")

parser_copy.add_argument(
"--artifacts_dir",
default="",
type=str,
help="Directory having artifacts to be parsed and uploaded.")

parser_copy.add_argument(
"--output_gcs",
default="",
type=str,
help=("GCS blob to upload artifacts. "
"If not given, artifacts will not be uploaded."))

parser_copy.set_defaults(func=lambda args: junit_parse_and_upload(
args.artifacts_dir,
args.output_gcs))

#############################################################################
# Process the command line arguments.
# Parse the args
args = parser.parse_args(args=unparsed_args)
args.func(args)
class CLI(object): # pylint: disable=useless-object-inheritance
@staticmethod
def junit_parse_and_upload(artifacts_dir, output_gcs):
"""Parse the junit file and upload it to GCS.

Args:
artifacts_dir: Directory containing artifacts
outputs_gcs: GCS path to upload to. If empty no artifacts will
be uploaded.
"""

if __name__ == "__main__":
logging.basicConfig(level=logging.INFO,
Expand All @@ -390,4 +403,4 @@ def main(unparsed_args=None): # pylint: disable=too-many-locals
datefmt='%Y-%m-%dT%H:%M:%S',
)
logging.getLogger().setLevel(logging.INFO)
main()
fire.Fire(CLI)
3 changes: 2 additions & 1 deletion py/kubeflow/testing/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ def load_kube_config(config_file=None,
client_configuration=None,
persist_config=True,
get_google_credentials=_refresh_credentials,
print_config=True,
print_config=False,
**kwargs):
"""Loads authentication and cluster information from kube-config file
and stores them in kubernetes.client.configuration.
Expand Down Expand Up @@ -817,6 +817,7 @@ def _save_kube_config(config_map):
loader.load_and_set(client_configuration) # pylint: disable=too-many-function-args
# Dump the loaded config.

# Warning this will print out any access tokens stored in your kubeconfig
if print_config:
run(["kubectl", "config", "view"])

Expand Down
11 changes: 10 additions & 1 deletion tekton/runs/go-test-run.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ metadata:
namespace: kf-ci
labels:
pipeline: go-test
repo: testing
spec:
params:
# When running under prow this value will automatically be replaced
# by the appropriate GCS location for the test.
# So the value here only matters for manual runs.
- name: artifacts-gcs
value: "gs://kubeflow-ci-deployment/gabrielwen-testing-2"
serviceAccountName: kf-ci
Expand All @@ -17,9 +21,14 @@ spec:
resourceSpec:
type: git
params:
# The URL must match the repo under test in prow otherwise you well get the
# error "Couldn't find repo under test"
- name: url
value: https://github.com/kubeflow/testing.git
# When running under prow revision automatically be replaced
# by the appropriate GCS location for the test.
# So the value here only matters for manual runs.
- name: revision
value: gcp_blueprints
value: master
pipelineRef:
name: go-tests
Loading