Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK] Get Trial Metrics from Katib DB #2050

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sdk/python/v1beta1/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
# Distribution / packaging
dist/

# Katib gRPC APIs
kubeflow/katib/katib_api_pb2.py
91 changes: 67 additions & 24 deletions sdk/python/v1beta1/kubeflow/katib/api/katib_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
from typing import Callable, List, Dict, Any
import inspect
import textwrap
import grpc

from kubernetes import client, config
from kubeflow.katib import models
from kubeflow.katib.api_client import ApiClient
from kubeflow.katib.constants import constants
from kubeflow.katib.utils import utils
from kubernetes import client, config
import kubeflow.katib.katib_api_pb2 as katib_api_pb2


class KatibClient(object):
Expand Down Expand Up @@ -300,7 +302,7 @@ def tune(
# TODO (andreyvelich): Get Experiment should always return one Experiment.
# Use list_experiments to return Experiment list.
# That function should return Experiment object.
def get_experiment(self, name=None, namespace=None):
def get_experiment(self, name=None, namespace=utils.get_default_target_namespace()):
"""Get the Katib Experiment.

:param name: Experiment name.
Expand All @@ -312,8 +314,6 @@ def get_experiment(self, name=None, namespace=None):
:return: Experiment object.
:rtype: dict
"""
if namespace is None:
namespace = utils.get_default_target_namespace()

if name:
thread = self.api_instance.get_namespaced_custom_object(
Expand Down Expand Up @@ -374,7 +374,7 @@ def get_experiment(self, name=None, namespace=None):

return katibexp

def get_suggestion(self, name=None, namespace=None):
def get_suggestion(self, name=None, namespace=utils.get_default_target_namespace()):
"""Get the Katib Suggestion.

:param name: Suggestion name.
Expand All @@ -386,9 +386,6 @@ def get_suggestion(self, name=None, namespace=None):
:rtype: dict
"""

if namespace is None:
namespace = utils.get_default_target_namespace()

if name:
thread = self.api_instance.get_namespaced_custom_object(
constants.KUBEFLOW_GROUP,
Expand Down Expand Up @@ -475,7 +472,7 @@ def delete_experiment(self, name, namespace=utils.get_default_target_namespace()
# TODO (andreyvelich): Use proper logger.
print("Experiment {} has been deleted".format(name))

def list_experiments(self, namespace=None):
def list_experiments(self, namespace=utils.get_default_target_namespace()):
"""List all Katib Experiments.

:param namespace: Experiments namespace.
Expand All @@ -484,8 +481,6 @@ def list_experiments(self, namespace=None):
:return: List of Experiment objects.
:rtype: list[V1beta1Experiment]
"""
if namespace is None:
namespace = utils.get_default_target_namespace()

thread = self.api_instance.list_namespaced_custom_object(
constants.KUBEFLOW_GROUP,
Expand Down Expand Up @@ -523,7 +518,9 @@ def list_experiments(self, namespace=None):
)
return result

def get_experiment_status(self, name, namespace=None):
def get_experiment_status(
self, name, namespace=utils.get_default_target_namespace()
):
"""Get the Experiment current status.

:param name: Experiment name.
Expand All @@ -533,14 +530,14 @@ def get_experiment_status(self, name, namespace=None):
:return: Current Experiment status.
:rtype: str
"""
if namespace is None:
namespace = utils.get_default_target_namespace()

katibexp = self.get_experiment(name, namespace=namespace)
last_condition = katibexp.get("status", {}).get("conditions", [])[-1]
return last_condition.get("type", "")

def is_experiment_succeeded(self, name, namespace=None):
def is_experiment_succeeded(
self, name, namespace=utils.get_default_target_namespace()
):
"""Check if Experiment has succeeded.

:param name: Experiment name.
Expand All @@ -553,7 +550,7 @@ def is_experiment_succeeded(self, name, namespace=None):
experiment_status = self.get_experiment_status(name, namespace=namespace)
return experiment_status.lower() == "succeeded"

def list_trials(self, name=None, namespace=None):
def list_trials(self, name=None, namespace=utils.get_default_target_namespace()):
"""List all Experiment's Trials.

:param name: Experiment name.
Expand All @@ -563,8 +560,6 @@ def list_trials(self, name=None, namespace=None):
:return: List of Trial objects
:rtype: list[V1beta1Trial]
"""
if namespace is None:
namespace = utils.get_default_target_namespace()

thread = self.api_instance.list_namespaced_custom_object(
constants.KUBEFLOW_GROUP,
Expand Down Expand Up @@ -609,7 +604,9 @@ def list_trials(self, name=None, namespace=None):
)
return result

def get_success_trial_details(self, name=None, namespace=None):
def get_success_trial_details(
self, name=None, namespace=utils.get_default_target_namespace()
):
"""Get the Trial details that have succeeded for an Experiment.

:param name: Experiment name.
Expand All @@ -619,8 +616,6 @@ def get_success_trial_details(self, name=None, namespace=None):
:return: Trial names with the hyperparameters and metrics.
:type: list[dict]
"""
if namespace is None:
namespace = utils.get_default_target_namespace()

thread = self.api_instance.list_namespaced_custom_object(
constants.KUBEFLOW_GROUP,
Expand Down Expand Up @@ -676,7 +671,9 @@ def get_success_trial_details(self, name=None, namespace=None):

return result

def get_optimal_hyperparameters(self, name=None, namespace=None):
def get_optimal_hyperparameters(
self, name=None, namespace=utils.get_default_target_namespace()
):
"""Get the current optimal Trial from the Experiment.

:param name: Experiment name.
Expand All @@ -685,8 +682,6 @@ def get_optimal_hyperparameters(self, name=None, namespace=None):
:return: Current optimal Trial for the Experiment.
:rtype: dict
"""
if namespace is None:
namespace = utils.get_default_target_namespace()

katibexp = self.get_experiment(name, namespace=namespace)
result = {}
Expand All @@ -695,3 +690,51 @@ def get_optimal_hyperparameters(self, name=None, namespace=None):
)

return result

def get_trial_metrics(
self,
name: str,
namespace: str = utils.get_default_target_namespace(),
db_manager_address=constants.DEFAULT_DB_MANAGER_ADDRESS,
):
"""Get the Trial Metric Results from the Katib DB.
Katib DB Manager service should be accessible while calling this API.

If you run this API in-cluster (e.g. from the Kubeflow Notebook) you can
use the default Katib DB Manager address: `katib-db-manager.kubeflow:6789`.

If you run this API outside the cluster, you have to port-forward the
Katib DB Manager before getting the Trial metrics: `kubectl port-forward svc/katib-db-manager -n kubeflow 6789`.
In that case, you can use this Katib DB Manager address: `localhost:6789`.

You can use `curl` to verify that Katib DB Manager is reachable: `curl <db-manager-address>`.

Args:
name: Name for the Trial.
namespace: Namespace for the Trial.
db-manager-address: Address for the Katib DB Manager in this format: `ip-address:port`.

Returns: List of MetricLog objects (https://github.com/kubeflow/katib/blob/4a2db414d85f29f17bc8ec6ff3462beef29585da/pkg/apis/manager/v1beta1/gen-doc/api.md#api-v1-beta1-MetricLog).
For example, to get the first metric value run the following:
`get_trial_metrics(...)[0].metric.value
"""

db_manager_address = db_manager_address.split(":")
channel = grpc.beta.implementations.insecure_channel(
db_manager_address[0], int(db_manager_address[1])
)

with katib_api_pb2.beta_create_DBManager_stub(channel) as client:

try:
# When metric name is empty, we select all logs from the Katib DB.
observation_logs = client.GetObservationLog(
katib_api_pb2.GetObservationLogRequest(trial_name=name),
timeout=constants.APISERVER_TIMEOUT,
)
except Exception as e:
raise RuntimeError(
f"Unable to get metrics for Trial {name} in namespace {namespace}. Exception: {e}"
)

return observation_logs.observation_log.metric_logs
4 changes: 3 additions & 1 deletion sdk/python/v1beta1/kubeflow/katib/constants/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import os

# How long to wait in seconds for requests to the ApiServer
# How long to wait in seconds for requests to the Kubernetes or gRPC API Server.
APISERVER_TIMEOUT = 120

# Global CRD version
Expand All @@ -36,3 +36,5 @@
BASE_IMAGE_TENSORFLOW_GPU = "docker.io/tensorflow/tensorflow:2.9.1-gpu"
BASE_IMAGE_PYTORCH = "docker.io/pytorch/pytorch:1.12.1-cuda11.3-cudnn8-runtime"
BASE_IMAGE_MXNET = "docker.io/mxnet/python:1.9.1_native_py3"

DEFAULT_DB_MANAGER_ADDRESS = "katib-db-manager.kubeflow:6789"
12 changes: 12 additions & 0 deletions sdk/python/v1beta1/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import shutil
import setuptools

REQUIRES = [
Expand All @@ -20,8 +22,18 @@
"setuptools>=21.0.0",
"urllib3>=1.15.1",
"kubernetes>=23.6.0",
"grpcio==1.41.1",
]

katib_grpc_api_file = "kubeflow/katib/katib_api_pb2.py"

# Copy Katib gRPC Python APIs to use it in the Katib SDK Client.
# We need to copy this file only on the SDK building stage, not on installation stage.
if not os.path.exists(katib_grpc_api_file):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we ensure files are uptodate and in sync?

Copy link
Member Author

@andreyvelich andreyvelich Dec 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johnugeorge It's a good point.

Maybe instead of checking that file kubeflow/katib/katib_api_pb2.py doesn't exist, we should check if ../../../pkg/apis/manager/v1beta1/python/api_pb2.py file exists and always replace it with the newest ?

katib_grpc_api_file = "../../../pkg/apis/manager/v1beta1/python/api_pb2.py"

# Copy Katib gRPC Python APIs to use it in the Katib SDK Client.
# We need to copy this file only on the SDK building stage, not on installation stage.
if os.path.exists(katib_grpc_api_file):
    shutil.copy(
        katib_grpc_api_file, "kubeflow/katib/katib_api_pb2.py",
    )

In that case, on the SDK building stage we always copy the newest gRPC APIs file ?

Any other suggestions @johnugeorge @tenzen-y ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I think, "always copy" is the safer solution

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, on the SDK building stage we always copy the newest gRPC APIs file?

sgtm

shutil.copy(
"../../../pkg/apis/manager/v1beta1/python/api_pb2.py", katib_grpc_api_file,
)

setuptools.setup(
name="kubeflow-katib",
version="0.14.0",
Expand Down