Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement validation for optuna suggestion service #1924

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions pkg/suggestion/v1beta1/optuna/base_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Copyright 2022 The Kubeflow Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import optuna
from collections import defaultdict

from pkg.suggestion.v1beta1.internal.constant import INTEGER, DOUBLE, CATEGORICAL, DISCRETE, MAX_GOAL
from pkg.suggestion.v1beta1.internal.trial import Assignment


class BaseOptunaService(object):
def __init__(self,
algorithm_name="",
algorithm_config=None,
search_space=None):
self.algorithm_name = algorithm_name
self.algorithm_config = algorithm_config
self.search_space = search_space
self.assignments_to_optuna_number = defaultdict(list)
self.recorded_trial_names = set()
self.study = None
self._create_study()

def _create_study(self):
sampler = self._create_sampler()
direction = "maximize" if self.search_space.goal == MAX_GOAL else "minimize"

self.study = optuna.create_study(sampler=sampler, direction=direction)

def _create_sampler(self):
if self.algorithm_name == "tpe" or self.algorithm_name == "multivariate-tpe":
return optuna.samplers.TPESampler(**self.algorithm_config)

elif self.algorithm_name == "cmaes":
return optuna.samplers.CmaEsSampler(**self.algorithm_config)

elif self.algorithm_name == "random":
return optuna.samplers.RandomSampler(**self.algorithm_config)

def get_suggestions(self, trials, current_request_number):
if len(trials) != 0:
self._tell(trials)
return self._ask(current_request_number)

def _ask(self, current_request_number):
list_of_assignments = []
for _ in range(current_request_number):
optuna_trial = self.study.ask(fixed_distributions=self._get_optuna_search_space())

assignments = [Assignment(k, v) for k, v in optuna_trial.params.items()]
list_of_assignments.append(assignments)

assignments_key = self._get_assignments_key(assignments)
self.assignments_to_optuna_number[assignments_key].append(optuna_trial.number)

return list_of_assignments

def _tell(self, trials):
for trial in trials:
if trial.name not in self.recorded_trial_names:
self.recorded_trial_names.add(trial.name)

value = float(trial.target_metric.value)
assignments_key = self._get_assignments_key(trial.assignments)
optuna_trial_numbers = self.assignments_to_optuna_number[assignments_key]

if len(optuna_trial_numbers) != 0:
trial_number = optuna_trial_numbers.pop(0)
self.study.tell(trial_number, value)
else:
raise ValueError("An unknown trial has been passed in the GetSuggestion request.")

@staticmethod
def _get_assignments_key(assignments):
assignments = sorted(assignments, key=lambda a: a.name)
assignments_str = [f"{a.name}:{a.value}" for a in assignments]
return ",".join(assignments_str)

def _get_optuna_search_space(self):
search_space = {}
for param in self.search_space.params:
if param.type == INTEGER:
search_space[param.name] = optuna.distributions.IntUniformDistribution(int(param.min), int(param.max))
elif param.type == DOUBLE:
search_space[param.name] = optuna.distributions.UniformDistribution(float(param.min), float(param.max))
elif param.type == CATEGORICAL or param.type == DISCRETE:
search_space[param.name] = optuna.distributions.CategoricalDistribution(param.list)
return search_space
260 changes: 135 additions & 125 deletions pkg/suggestion/v1beta1/optuna/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,159 +12,169 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import defaultdict
import threading

import optuna
import grpc
import logging

from pkg.apis.manager.v1beta1.python import api_pb2
from pkg.apis.manager.v1beta1.python import api_pb2_grpc
from pkg.suggestion.v1beta1.internal.constant import INTEGER, DOUBLE, CATEGORICAL, DISCRETE, MAX_GOAL
from pkg.suggestion.v1beta1.internal.constant import INTEGER, DOUBLE
from pkg.suggestion.v1beta1.internal.search_space import HyperParameterSearchSpace
from pkg.suggestion.v1beta1.internal.trial import Trial, Assignment
from pkg.suggestion.v1beta1.optuna.base_service import BaseOptunaService
from pkg.suggestion.v1beta1.internal.base_health_service import HealthServicer

logger = logging.getLogger(__name__)


class OptunaService(api_pb2_grpc.SuggestionServicer, HealthServicer):

def __init__(self):
super(OptunaService, self).__init__()
self.study = None
self.search_space = None
self.recorded_trial_names = set()
self.assignments_to_optuna_number = defaultdict(list)
self.lock = threading.Lock()
self.base_service = None

def GetSuggestions(self, request, context):
"""
Main function to provide suggestion.
"""
with self.lock:
if self.study is None:
self.search_space = HyperParameterSearchSpace.convert(request.experiment)
self.study = self._create_study(request.experiment.spec.algorithm, self.search_space)
name, config = OptimizerConfiguration.convert_algorithm_spec(request.experiment.spec.algorithm)
if self.base_service is None:
search_space = HyperParameterSearchSpace.convert(request.experiment)
self.base_service = BaseOptunaService(
algorithm_name=name,
algorithm_config=config,
search_space=search_space)

trials = Trial.convert(request.trials)

if len(trials) != 0:
self._tell(trials)
list_of_assignments = self._ask(request.current_request_number)

list_of_assignments = self.base_service.get_suggestions(trials, request.current_request_number)
return api_pb2.GetSuggestionsReply(
parameter_assignments=Assignment.generate(list_of_assignments)
)

def _create_study(self, algorithm_spec, search_space):
sampler = self._create_sampler(algorithm_spec)
direction = "maximize" if search_space.goal == MAX_GOAL else "minimize"

study = optuna.create_study(sampler=sampler, direction=direction)

return study

def _create_sampler(self, algorithm_spec):
name = algorithm_spec.algorithm_name
settings = {s.name: s.value for s in algorithm_spec.algorithm_settings}

if name == "tpe" or name == "multivariate-tpe":
kwargs = {}
for k, v in settings.items():
if k == "n_startup_trials":
kwargs["n_startup_trials"] = int(v)
elif k == "n_ei_candidates":
kwargs["n_ei_candidates"] = int(v)
elif k == "random_state":
kwargs["seed"] = int(v)
def ValidateAlgorithmSettings(self, request, context):
is_valid, message = OptimizerConfiguration.validate_algorithm_spec(
request.experiment.spec.algorithm)
if not is_valid:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details(message)
logger.error(message)
return api_pb2.ValidateAlgorithmSettingsReply()


class OptimizerConfiguration(object):
__conversion_dict = {
"tpe": {
"n_startup_trials": lambda x: int(x),
"n_ei_candidates": lambda x: int(x),
"seed": lambda x: int(x),
"constant_liar": True,
},
"multivariate-tpe": {
"n_startup_trials": lambda x: int(x),
"n_ei_candidates": lambda x: int(x),
"seed": lambda x: int(x),
"multivariate": "multivariate-tpe",
"constant_liar": True,
},
"cmaes": {
"restart_strategy": lambda x: None if x == "None" or x == "none" else x,
"sigma0": lambda x: float(x),
"seed": lambda x: int(x),
},
"random": {
"seed": lambda x: int(x),
},
}

@classmethod
def convert_algorithm_spec(cls, algorithm_spec):
config = {}

algorithm_name = algorithm_spec.algorithm_name
setting_schema = cls.__conversion_dict[algorithm_name]
for s in algorithm_spec.algorithm_settings:
if s.name in setting_schema:
config[s.name] = setting_schema[s.name](s.value)
elif s.name == "sigma":
config["sigma0"] = setting_schema["sigma0"](s.value)
elif s.name == "random_state":
config["seed"] = setting_schema["seed"](s.value)

if algorithm_name == "tpe" or algorithm_name == "multivariate-tpe":
config["constant_liar"] = setting_schema["constant_liar"]
if algorithm_name == "multivariate-tpe":
config["multivariate"] = setting_schema["multivariate"]

return algorithm_spec.algorithm_name, config

@classmethod
def validate_algorithm_spec(cls, algorithm_spec):
algorithm_name = algorithm_spec.algorithm_name
algorithm_settings = algorithm_spec.algorithm_settings

if algorithm_name == "tpe" or algorithm_name == "multivariate-tpe":
return cls._validate_tpe_setting(algorithm_spec)
elif algorithm_name == "cmaes":
return cls._validate_cmaes_setting(algorithm_settings)
elif algorithm_name == "random":
return cls._validate_random_setting(algorithm_settings)

@classmethod
def _validate_tpe_setting(cls, algorithm_spec):
algorithm_name = algorithm_spec.algorithm_name
algorithm_settings = algorithm_spec.algorithm_settings

for s in algorithm_settings:
try:
if s.name in ["n_startup_trials", "n_ei_candidates", "random_state"]:
if not int(s.value) >= 0:
return False, "{} should be greate or equal than zero".format(s.name)
else:
raise ValueError("Unknown name for {}: {}".format(name, k))

kwargs["multivariate"] = name == "multivariate-tpe"
kwargs["constant_liar"] = True

sampler = optuna.samplers.TPESampler(**kwargs)

elif name == "cmaes":
kwargs = {}
for k, v in settings.items():
if k == "restart_strategy":
kwargs["restart_strategy"] = v
elif k == "sigma":
kwargs["sigma0"] = float(v)
elif k == "random_state":
kwargs["seed"] = int(v)
return False, "unknown setting {} for algorithm {}".format(s.name, algorithm_name)
except Exception as e:
return False, "failed to validate {name}({value}): {exception}".format(name=s.name, value=s.value,
exception=e)

return True, ""

@classmethod
def _validate_cmaes_setting(cls, algorithm_settings):
if len(algorithm_settings) < 2:
return False, "cmaes only supports two or more dimensional continuous search space."

for s in algorithm_settings:
try:
if s.name == "restart_strategy":
if s.value not in ["ipop", "None", "none"]:
return False, "restart_strategy {} is not supported in CMAES optimization".format(s.value)
elif s.name == "sigma":
if not float(s.value) >= 0:
return False, "sigma should be greate or equal than zero"
elif s.name == "random_state":
if not int(s.value) >= 0:
return False, "random_state should be greate or equal than zero"
else:
raise ValueError("Unknown name for {}: {}".format(name, k))

sampler = optuna.samplers.CmaEsSampler(**kwargs)

elif name == "random":
kwargs = {}
for k, v in settings.items():
if k == "random_state":
kwargs["seed"] = int(v)
return False, "unknown setting {} for algorithm cmaes".format(s.name)

except Exception as e:
return False, "failed to validate {name}({value}): {exception}".format(name=s.name, value=s.value,
exception=e)
return True, ""

@classmethod
def _validate_random_setting(cls, algorithm_settings):
for s in algorithm_settings:
try:
if s.name == "random_state":
if not int(s.value) >= 0:
return False, ""
else:
raise ValueError("Unknown name for {}: {}".format(name, k))
return False, "unknown setting {} for algorithm random".format(s.name)

sampler = optuna.samplers.RandomSampler(**kwargs)
except Exception as e:
return False, "failed to validate {name}({value}): {exception}".format(name=s.name, value=s.value,
exception=e)

else:
raise ValueError("Unknown algorithm name: {}".format(name))

return sampler

def _ask(self, current_request_number):
list_of_assignments = []
for _ in range(current_request_number):
optuna_trial = self.study.ask(fixed_distributions=self._get_optuna_search_space())

assignments = [Assignment(k, v) for k, v in optuna_trial.params.items()]
list_of_assignments.append(assignments)

assignments_key = self._get_assignments_key(assignments)
self.assignments_to_optuna_number[assignments_key].append(optuna_trial.number)

return list_of_assignments

def _tell(self, trials):
for trial in trials:
if trial.name not in self.recorded_trial_names:
self.recorded_trial_names.add(trial.name)

value = float(trial.target_metric.value)
assignments_key = self._get_assignments_key(trial.assignments)
optuna_trial_numbers = self.assignments_to_optuna_number[assignments_key]

if len(optuna_trial_numbers) != 0:
trial_number = optuna_trial_numbers.pop(0)
self.study.tell(trial_number, value)
else:
raise ValueError("An unknown trial has been passed in the GetSuggestion request.")

def _get_assignments_key(self, assignments):
assignments = sorted(assignments, key=lambda a: a.name)
assignments_str = [f"{a.name}:{a.value}" for a in assignments]
return ",".join(assignments_str)

def _get_optuna_search_space(self):
search_space = {}
for param in self.search_space.params:
if param.type == INTEGER:
search_space[param.name] = optuna.distributions.IntUniformDistribution(int(param.min), int(param.max))
elif param.type == DOUBLE:
search_space[param.name] = optuna.distributions.UniformDistribution(float(param.min), float(param.max))
elif param.type == CATEGORICAL or param.type == DISCRETE:
search_space[param.name] = optuna.distributions.CategoricalDistribution(param.list)
return search_space

def _get_casted_assignment_value(self, assignment):
for param in self.search_space.params:
if param.name == assignment.name:
if param.type == INTEGER:
return int(assignment.value)
elif param.type == DOUBLE:
return float(assignment.value)
elif param.type == CATEGORICAL or param.type == DISCRETE:
return assignment.value
else:
raise ValueError("Unknown parameter type: {}".format(param.type))
raise ValueError("Parameter not found in the search space: {}".format(param.name))
return True, ""
Loading