From 80f41f27d18a57efb761d3dab0d0d6614a841264 Mon Sep 17 00:00:00 2001 From: Jeremy Lewi Date: Wed, 17 Jun 2020 06:18:40 -0700 Subject: [PATCH] Support multi-package namespaces for python. (#354) Fix code to work with IAP using workload identity. * The existing code to get an ID token didn't seem to work with workload identity and didn't match the latest code on the IAP. The latest code appears to use a helper function to get the id token Related to: kubeflow/gcp-blueprints#51 Create a tekton test for blueprints to verify the endpoint is ready. --- py/kubeflow/__init__.py | 1 + .../testing/pytests/endpoint_ready_test.py | 6 - py/kubeflow/kfctl/testing/util/gcp_util.py | 156 ++++++++---------- 3 files changed, 67 insertions(+), 96 deletions(-) diff --git a/py/kubeflow/__init__.py b/py/kubeflow/__init__.py index e69de29bb..69e3be50d 100644 --- a/py/kubeflow/__init__.py +++ b/py/kubeflow/__init__.py @@ -0,0 +1 @@ +__path__ = __import__('pkgutil').extend_path(__path__, __name__) diff --git a/py/kubeflow/kfctl/testing/pytests/endpoint_ready_test.py b/py/kubeflow/kfctl/testing/pytests/endpoint_ready_test.py index a62018aff..6ae72ec67 100644 --- a/py/kubeflow/kfctl/testing/pytests/endpoint_ready_test.py +++ b/py/kubeflow/kfctl/testing/pytests/endpoint_ready_test.py @@ -1,16 +1,10 @@ -import datetime import json import logging import os -import subprocess -import tempfile -import uuid -from retrying import retry import pytest from kubeflow.testing import util -from kubeflow.kfctl.testing.util import deploy_utils from kubeflow.kfctl.testing.util import gcp_util # There's really no good reason to run test_endpoint during presubmits. diff --git a/py/kubeflow/kfctl/testing/util/gcp_util.py b/py/kubeflow/kfctl/testing/util/gcp_util.py index a31b60417..f44359d03 100644 --- a/py/kubeflow/kfctl/testing/util/gcp_util.py +++ b/py/kubeflow/kfctl/testing/util/gcp_util.py @@ -1,82 +1,66 @@ -import argparse -import base64 import datetime import logging import os -import errno -import shutil -import subprocess -import tempfile -import threading -from functools import partial -from multiprocessing import Process from time import sleep from google.auth.transport.requests import Request -from googleapiclient import discovery -from oauth2client.client import GoogleCredentials +from google.oauth2 import id_token import requests -import yaml -import google.auth -import google.auth.compute_engine.credentials -import google.auth.iam -import google.oauth2.credentials -import google.oauth2.service_account from retrying import retry from requests.exceptions import SSLError from requests.exceptions import ConnectionError as ReqConnectionError -IAM_SCOPE = "https://www.googleapis.com/auth/iam" -OAUTH_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token" COOKIE_NAME = "KUBEFLOW-AUTH-KEY" -def get_service_account_credentials(client_id_key): - # Figure out what environment we're running in and get some preliminary - # information about the service account. - credentials, _ = google.auth.default(scopes=[IAM_SCOPE]) - if isinstance(credentials, google.oauth2.credentials.Credentials): - raise Exception("make_iap_request is only supported for service " - "accounts.") - - # For service account's using the Compute Engine metadata service, - # service_account_email isn't available until refresh is called. - credentials.refresh(Request()) - - signer_email = credentials.service_account_email - if isinstance(credentials, - google.auth.compute_engine.credentials.Credentials): - signer = google.auth.iam.Signer(Request(), credentials, signer_email) - else: - # A Signer object can sign a JWT using the service account's key. - signer = credentials.signer - - # Construct OAuth 2.0 service account credentials using the signer - # and email acquired from the bootstrap credentials. - return google.oauth2.service_account.Credentials( - signer, - signer_email, - token_uri=OAUTH_TOKEN_URI, - additional_claims={"target_audience": may_get_env_var(client_id_key)}) - -def get_google_open_id_connect_token(service_account_credentials): - service_account_jwt = ( - service_account_credentials._make_authorization_grant_assertion()) - request = google.auth.transport.requests.Request() - body = { - "assertion": service_account_jwt, - "grant_type": google.oauth2._client._JWT_GRANT_TYPE, - } - token_response = google.oauth2._client._token_endpoint_request( - request, OAUTH_TOKEN_URI, body) - return token_response["id_token"] - def may_get_env_var(name): env_val = os.getenv(name) if env_val: - logging.info("%s is set" % name) + logging.info("%s is set", name) return env_val - else: - raise Exception("%s not set" % name) + + raise Exception("%s not set" % name) + +# Code copied from: +# https://cloud.google.com/iap/docs/authentication-howto#iap_make_request-python +def make_iap_request(url, client_id, method='GET', **kwargs): + """Makes a request to an application protected by Identity-Aware Proxy. + + Args: + url: The Identity-Aware Proxy-protected URL to fetch. + client_id: The client ID used by Identity-Aware Proxy. + method: The request method to use + ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE') + **kwargs: Any of the parameters defined for the request function: + https://github.com/requests/requests/blob/master/requests/api.py + If no timeout is provided, it is set to 90 by default. + + Returns: + The page body, or raises an exception if the page couldn't be retrieved. + """ + # Set the default timeout, if missing + if 'timeout' not in kwargs: + kwargs['timeout'] = 90 + + # Obtain an OpenID Connect (OIDC) token from metadata server or using service + # account. + google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id) + + # Fetch the Identity-Aware Proxy-protected URL, including an + # Authorization header containing "Bearer " followed by a + # Google-issued OpenID Connect token for the service account. + resp = requests.request( + method, url, + headers={'Authorization': 'Bearer {}'.format( + google_open_id_connect_token)}, **kwargs) + if resp.status_code == 403: # pylint: disable=no-else-raise + raise Exception('Service account does not have permission to ' + 'access the IAP-protected application.') + elif resp.status_code != 200: # pylint: disable=no-else-raise + raise Exception( + 'Bad response from application: {!r} / {!r} / {!r}'.format( + resp.status_code, resp.headers, resp.text)) + else: + return resp.text def iap_is_ready(url, wait_min=15): """ @@ -87,11 +71,7 @@ def iap_is_ready(url, wait_min=15): Returns: True if the url is ready """ - google_open_id_connect_token = None - - service_account_credentials = get_service_account_credentials("CLIENT_ID") - google_open_id_connect_token = get_google_open_id_connect_token( - service_account_credentials) + client_id = may_get_env_var("CLIENT_ID") # Wait up to 30 minutes for IAP access test. num_req = 0 end_time = datetime.datetime.now() + datetime.timedelta( @@ -101,24 +81,17 @@ def iap_is_ready(url, wait_min=15): logging.info("Trying url: %s", url) try: resp = None - resp = requests.request( - "GET", - url, - headers={ - "Authorization": - "Bearer {}".format(google_open_id_connect_token) - }, - verify=False) + resp = make_iap_request(url, client_id, method="GET", verify=False) logging.info(resp.text) - if resp.status_code == 200: + if resp.status_code == 200: # pylint: disable=no-else-return logging.info("Endpoint is ready for %s!", url) return True else: logging.info( - "%s: Endpoint not ready, request number: %s" % (url, num_req)) - except Exception as e: - logging.info("%s: Endpoint not ready, exception caught %s, request number: %s" % - (url, str(e), num_req)) + "%s: Endpoint not ready, request number: %s", url, num_req) + except Exception as e: # pylint: disable=broad-except + logging.info("%s: Endpoint not ready, exception caught %s, request " + "number: %s", url, str(e), num_req) sleep(10) return False @@ -145,24 +118,26 @@ def retry_on_error(e): def retry_on_result_func(code): if code is None: return lambda _: False - else: - return lambda resp: not resp or resp.status_code != code + + return lambda resp: not resp or resp.status_code != code @retry(stop_max_delay=wait_sec * 1000, wait_fixed=10 * 1000, retry_on_exception=retry_on_error, retry_on_result=retry_on_result_func(retry_result_code)) def _send(url, req_gen): resp = None - logging.info("sending request to %s" % url) + logging.info("sending request to %s", url) try: resp = req_gen() except Exception as e: - logging.warning("%s: request with error: %s" % (url, e)) + logging.warning("%s: request with error: %s", url, e) raise e return resp return _send(url, req_gen) +# TODO(jlewi): basic_auth is no longer supported so we could probably +# delete this code path. def basic_auth_is_ready(url, username, password, wait_min=15): get_url = url + "/kflogin" post_url = url + "/apikflogin" @@ -176,8 +151,8 @@ def basic_auth_is_ready(url, username, password, wait_min=15): get_url, verify=False), retry_result_code=200) - logging.info("%s: endpoint is ready; response: %s" % (get_url, resp.text)) - logging.info("%s: testing login API" % post_url) + logging.info("%s: endpoint is ready; response: %s", get_url, resp.text) + logging.info("%s: testing login API", post_url) wait_time = datetime.datetime.now() - end_time resp = _send_req(wait_time.seconds, post_url, lambda: requests.post( @@ -187,19 +162,20 @@ def basic_auth_is_ready(url, username, password, wait_min=15): "x-from-login": "true", }, verify=False)) - logging.info("%s: %s" % (post_url, resp.text)) + logging.info("%s: %s", post_url, resp.text) if resp.status_code != 205: logging.error("%s: login is failed", post_url) return False - logging.info("%s: testing cookies credentials" % url) + logging.info("%s: testing cookies credentials", url) cookie = None for c in resp.cookies: if c.name == COOKIE_NAME: cookie = c break if cookie is None: - logging.error("%s: auth cookie cannot be found; name: %s" % (post_url, COOKIE_NAME)) + logging.error("%s: auth cookie cannot be found; name: %s", + post_url, COOKIE_NAME) return False wait_time = datetime.datetime.now() - end_time @@ -209,6 +185,6 @@ def basic_auth_is_ready(url, username, password, wait_min=15): cookie.name: cookie.value, }, verify=False)) - logging.info("%s: %s" % (url, resp.status_code)) + logging.info("%s: %s", url, resp.status_code) logging.info(resp.content) return resp.status_code == 200