From b6755bbdc4d73eee18f8025c0bc68b5d90c8747b Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Mon, 5 Dec 2022 14:50:23 +0000 Subject: [PATCH 1/2] [SDK] Get Trial Metrics from Katib DB --- sdk/python/v1beta1/.gitignore | 3 + .../kubeflow/katib/api/katib_client.py | 91 ++++++++++++++----- .../kubeflow/katib/constants/constants.py | 4 +- sdk/python/v1beta1/setup.py | 12 +++ 4 files changed, 85 insertions(+), 25 deletions(-) diff --git a/sdk/python/v1beta1/.gitignore b/sdk/python/v1beta1/.gitignore index 97c5614a935..81f90cfca9b 100644 --- a/sdk/python/v1beta1/.gitignore +++ b/sdk/python/v1beta1/.gitignore @@ -1,2 +1,5 @@ # Distribution / packaging dist/ + +# Katib gRPC APIs +kubeflow/katib/katib_api_pb2.py diff --git a/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py b/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py index 938a1bbb0e3..d6289b82ef5 100644 --- a/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py +++ b/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py @@ -16,12 +16,14 @@ from typing import Callable, List, Dict, Any import inspect import textwrap +import grpc +from kubernetes import client, config from kubeflow.katib import models from kubeflow.katib.api_client import ApiClient from kubeflow.katib.constants import constants from kubeflow.katib.utils import utils -from kubernetes import client, config +import kubeflow.katib.katib_api_pb2 as katib_api_pb2 class KatibClient(object): @@ -300,7 +302,7 @@ def tune( # TODO (andreyvelich): Get Experiment should always return one Experiment. # Use list_experiments to return Experiment list. # That function should return Experiment object. - def get_experiment(self, name=None, namespace=None): + def get_experiment(self, name=None, namespace=utils.get_default_target_namespace()): """Get the Katib Experiment. :param name: Experiment name. @@ -312,8 +314,6 @@ def get_experiment(self, name=None, namespace=None): :return: Experiment object. :rtype: dict """ - if namespace is None: - namespace = utils.get_default_target_namespace() if name: thread = self.api_instance.get_namespaced_custom_object( @@ -374,7 +374,7 @@ def get_experiment(self, name=None, namespace=None): return katibexp - def get_suggestion(self, name=None, namespace=None): + def get_suggestion(self, name=None, namespace=utils.get_default_target_namespace()): """Get the Katib Suggestion. :param name: Suggestion name. @@ -386,9 +386,6 @@ def get_suggestion(self, name=None, namespace=None): :rtype: dict """ - if namespace is None: - namespace = utils.get_default_target_namespace() - if name: thread = self.api_instance.get_namespaced_custom_object( constants.KUBEFLOW_GROUP, @@ -475,7 +472,7 @@ def delete_experiment(self, name, namespace=utils.get_default_target_namespace() # TODO (andreyvelich): Use proper logger. print("Experiment {} has been deleted".format(name)) - def list_experiments(self, namespace=None): + def list_experiments(self, namespace=utils.get_default_target_namespace()): """List all Katib Experiments. :param namespace: Experiments namespace. @@ -484,8 +481,6 @@ def list_experiments(self, namespace=None): :return: List of Experiment objects. :rtype: list[V1beta1Experiment] """ - if namespace is None: - namespace = utils.get_default_target_namespace() thread = self.api_instance.list_namespaced_custom_object( constants.KUBEFLOW_GROUP, @@ -523,7 +518,9 @@ def list_experiments(self, namespace=None): ) return result - def get_experiment_status(self, name, namespace=None): + def get_experiment_status( + self, name, namespace=utils.get_default_target_namespace() + ): """Get the Experiment current status. :param name: Experiment name. @@ -533,14 +530,14 @@ def get_experiment_status(self, name, namespace=None): :return: Current Experiment status. :rtype: str """ - if namespace is None: - namespace = utils.get_default_target_namespace() katibexp = self.get_experiment(name, namespace=namespace) last_condition = katibexp.get("status", {}).get("conditions", [])[-1] return last_condition.get("type", "") - def is_experiment_succeeded(self, name, namespace=None): + def is_experiment_succeeded( + self, name, namespace=utils.get_default_target_namespace() + ): """Check if Experiment has succeeded. :param name: Experiment name. @@ -553,7 +550,7 @@ def is_experiment_succeeded(self, name, namespace=None): experiment_status = self.get_experiment_status(name, namespace=namespace) return experiment_status.lower() == "succeeded" - def list_trials(self, name=None, namespace=None): + def list_trials(self, name=None, namespace=utils.get_default_target_namespace()): """List all Experiment's Trials. :param name: Experiment name. @@ -563,8 +560,6 @@ def list_trials(self, name=None, namespace=None): :return: List of Trial objects :rtype: list[V1beta1Trial] """ - if namespace is None: - namespace = utils.get_default_target_namespace() thread = self.api_instance.list_namespaced_custom_object( constants.KUBEFLOW_GROUP, @@ -609,7 +604,9 @@ def list_trials(self, name=None, namespace=None): ) return result - def get_success_trial_details(self, name=None, namespace=None): + def get_success_trial_details( + self, name=None, namespace=utils.get_default_target_namespace() + ): """Get the Trial details that have succeeded for an Experiment. :param name: Experiment name. @@ -619,8 +616,6 @@ def get_success_trial_details(self, name=None, namespace=None): :return: Trial names with the hyperparameters and metrics. :type: list[dict] """ - if namespace is None: - namespace = utils.get_default_target_namespace() thread = self.api_instance.list_namespaced_custom_object( constants.KUBEFLOW_GROUP, @@ -676,7 +671,9 @@ def get_success_trial_details(self, name=None, namespace=None): return result - def get_optimal_hyperparameters(self, name=None, namespace=None): + def get_optimal_hyperparameters( + self, name=None, namespace=utils.get_default_target_namespace() + ): """Get the current optimal Trial from the Experiment. :param name: Experiment name. @@ -685,8 +682,6 @@ def get_optimal_hyperparameters(self, name=None, namespace=None): :return: Current optimal Trial for the Experiment. :rtype: dict """ - if namespace is None: - namespace = utils.get_default_target_namespace() katibexp = self.get_experiment(name, namespace=namespace) result = {} @@ -695,3 +690,51 @@ def get_optimal_hyperparameters(self, name=None, namespace=None): ) return result + + def get_trial_metrics( + self, + name: str, + namespace: str = utils.get_default_target_namespace(), + db_manager_address=constants.DEFAULT_DB_MANAGER_ADDRESS, + ): + """Get the Trial Metric Results from the Katib DB. + Katib DB Manager service should be accessible while calling this API. + + If you run this API in-cluster (e.g. from the Kubeflow Notebook) you can + use the default Katib DB Manager address: `katib-db-manager.kubeflow:6789`. + + If you run this API outside the cluster, you have to port-forward the + Katib DB Manager before getting the Trial metrics: `kubectl port-forward svc/katib-db-manager -n kubeflow 6789`. + In that case, you can use this Katib DB Manager address: `localhost:6789`. + + You can use `curl` to verify that Katib DB Manager is reachable: `curl `. + + Args: + name: Name for the Trial. + namespace: Namespace for the Trial. + db-manager-address: Address for the Katib DB Manager in this format: `ip-address:port`. + + Returns: List of MetricLog objects (https://github.com/kubeflow/katib/blob/4a2db414d85f29f17bc8ec6ff3462beef29585da/pkg/apis/manager/v1beta1/gen-doc/api.md#api-v1-beta1-MetricLog). + For example, to get the first metric value run the following: + `get_trial_metrics(...)[0].metric.value + """ + + db_manager_address = db_manager_address.split(":") + channel = grpc.beta.implementations.insecure_channel( + db_manager_address[0], int(db_manager_address[1]) + ) + + with katib_api_pb2.beta_create_DBManager_stub(channel) as client: + + try: + # When metric name is empty, we select all logs from the Katib DB. + observation_logs = client.GetObservationLog( + katib_api_pb2.GetObservationLogRequest(trial_name=name), + timeout=constants.APISERVER_TIMEOUT, + ) + except Exception as e: + raise RuntimeError( + f"Unable to get metrics for Trial {name} in namespace {namespace}. Exception: {e}" + ) + + return observation_logs.observation_log.metric_logs diff --git a/sdk/python/v1beta1/kubeflow/katib/constants/constants.py b/sdk/python/v1beta1/kubeflow/katib/constants/constants.py index d054e88282f..dfedeafb051 100644 --- a/sdk/python/v1beta1/kubeflow/katib/constants/constants.py +++ b/sdk/python/v1beta1/kubeflow/katib/constants/constants.py @@ -14,7 +14,7 @@ import os -# How long to wait in seconds for requests to the ApiServer +# How long to wait in seconds for requests to the Kubernetes or gRPC API Server. APISERVER_TIMEOUT = 120 # Global CRD version @@ -36,3 +36,5 @@ BASE_IMAGE_TENSORFLOW_GPU = "docker.io/tensorflow/tensorflow:2.9.1-gpu" BASE_IMAGE_PYTORCH = "docker.io/pytorch/pytorch:1.12.1-cuda11.3-cudnn8-runtime" BASE_IMAGE_MXNET = "docker.io/mxnet/python:1.9.1_native_py3" + +DEFAULT_DB_MANAGER_ADDRESS = "katib-db-manager.kubeflow:6789" diff --git a/sdk/python/v1beta1/setup.py b/sdk/python/v1beta1/setup.py index c61dea35ad6..b184e1ee4fb 100644 --- a/sdk/python/v1beta1/setup.py +++ b/sdk/python/v1beta1/setup.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os +import shutil import setuptools REQUIRES = [ @@ -20,8 +22,18 @@ "setuptools>=21.0.0", "urllib3>=1.15.1", "kubernetes>=23.6.0", + "grpcio==1.41.1", ] +katib_grpc_api_file = "kubeflow/katib/katib_api_pb2.py" + +# Copy Katib gRPC Python APIs to use it in the Katib SDK Client. +# We need to copy this file only on the SDK building stage, not on installation stage. +if not os.path.exists(katib_grpc_api_file): + shutil.copy( + "../../../pkg/apis/manager/v1beta1/python/api_pb2.py", katib_grpc_api_file, + ) + setuptools.setup( name="kubeflow-katib", version="0.14.0", From 806b438f42d72942ad899d98cca421b3bc31141e Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Thu, 8 Dec 2022 15:17:34 +0000 Subject: [PATCH 2/2] Always Copy gRPC API File --- sdk/python/v1beta1/setup.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/python/v1beta1/setup.py b/sdk/python/v1beta1/setup.py index b184e1ee4fb..31ddbb28aaf 100644 --- a/sdk/python/v1beta1/setup.py +++ b/sdk/python/v1beta1/setup.py @@ -25,13 +25,13 @@ "grpcio==1.41.1", ] -katib_grpc_api_file = "kubeflow/katib/katib_api_pb2.py" +katib_grpc_api_file = "../../../pkg/apis/manager/v1beta1/python/api_pb2.py" # Copy Katib gRPC Python APIs to use it in the Katib SDK Client. -# We need to copy this file only on the SDK building stage, not on installation stage. -if not os.path.exists(katib_grpc_api_file): +# We need to always copy this file only on the SDK building stage, not on SDK installation stage. +if os.path.exists(katib_grpc_api_file): shutil.copy( - "../../../pkg/apis/manager/v1beta1/python/api_pb2.py", katib_grpc_api_file, + katib_grpc_api_file, "kubeflow/katib/katib_api_pb2.py", ) setuptools.setup(