diff --git a/cmd/suggestion/optuna/v1beta1/Dockerfile b/cmd/suggestion/optuna/v1beta1/Dockerfile new file mode 100644 index 00000000000..bd7e43ecaf2 --- /dev/null +++ b/cmd/suggestion/optuna/v1beta1/Dockerfile @@ -0,0 +1,31 @@ +FROM python:3.9 + +ENV TARGET_DIR /opt/katib +ENV SUGGESTION_DIR cmd/suggestion/optuna/v1beta1 + +RUN if [ "$(uname -m)" = "ppc64le" ] || [ "$(uname -m)" = "aarch64" ]; then \ + apt-get -y update && \ + apt-get -y install gfortran libopenblas-dev liblapack-dev; \ + fi + +RUN GRPC_HEALTH_PROBE_VERSION=v0.3.1 && \ + if [ "$(uname -m)" = "ppc64le" ]; then \ + wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-ppc64le; \ + elif [ "$(uname -m)" = "aarch64" ]; then \ + wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-arm64; \ + else \ + wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64; \ + fi && \ + chmod +x /bin/grpc_health_probe + +ADD ./pkg/ ${TARGET_DIR}/pkg/ +ADD ./${SUGGESTION_DIR}/ ${TARGET_DIR}/${SUGGESTION_DIR}/ +WORKDIR ${TARGET_DIR}/${SUGGESTION_DIR} +RUN pip install --no-cache-dir -r requirements.txt + +RUN chgrp -R 0 ${TARGET_DIR} \ + && chmod -R g+rwX ${TARGET_DIR} + +ENV PYTHONPATH ${TARGET_DIR}:${TARGET_DIR}/pkg/apis/manager/v1beta1/python:${TARGET_DIR}/pkg/apis/manager/health/python + +ENTRYPOINT ["python", "main.py"] diff --git a/cmd/suggestion/optuna/v1beta1/main.py b/cmd/suggestion/optuna/v1beta1/main.py new file mode 100644 index 00000000000..1290f16ced3 --- /dev/null +++ b/cmd/suggestion/optuna/v1beta1/main.py @@ -0,0 +1,42 @@ +# Copyright 2021 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc +import time +from pkg.apis.manager.v1beta1.python import api_pb2_grpc +from pkg.apis.manager.health.python import health_pb2_grpc +from pkg.suggestion.v1beta1.optuna.service import OptunaService +from concurrent import futures + +_ONE_DAY_IN_SECONDS = 60 * 60 * 24 +DEFAULT_PORT = "0.0.0.0:6789" + + +def serve(): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + service = OptunaService() + api_pb2_grpc.add_SuggestionServicer_to_server(service, server) + health_pb2_grpc.add_HealthServicer_to_server(service, server) + server.add_insecure_port(DEFAULT_PORT) + print("Listening...") + server.start() + try: + while True: + time.sleep(_ONE_DAY_IN_SECONDS) + except KeyboardInterrupt: + server.stop(0) + + +if __name__ == "__main__": + serve() diff --git a/cmd/suggestion/optuna/v1beta1/requirements.txt b/cmd/suggestion/optuna/v1beta1/requirements.txt new file mode 100644 index 00000000000..09b0692fa06 --- /dev/null +++ b/cmd/suggestion/optuna/v1beta1/requirements.txt @@ -0,0 +1,4 @@ +grpcio==1.39.0 +protobuf==3.17.3 +googleapis-common-protos==1.53.0 +optuna>=2.8.0 \ No newline at end of file diff --git a/examples/v1beta1/multivariate-tpe-example.yaml b/examples/v1beta1/multivariate-tpe-example.yaml new file mode 100644 index 00000000000..d729e540bd1 --- /dev/null +++ b/examples/v1beta1/multivariate-tpe-example.yaml @@ -0,0 +1,65 @@ +apiVersion: "kubeflow.org/v1beta1" +kind: Experiment +metadata: + namespace: kubeflow + name: multivariate-tpe-example +spec: + objective: + type: maximize + goal: 0.99 + objectiveMetricName: Validation-accuracy + additionalMetricNames: + - Train-accuracy + algorithm: + algorithmName: multivariate-tpe + parallelTrialCount: 3 + maxTrialCount: 12 + maxFailedTrialCount: 3 + parameters: + - name: lr + parameterType: double + feasibleSpace: + min: "0.01" + max: "0.03" + - name: num-layers + parameterType: int + feasibleSpace: + min: "2" + max: "5" + - name: optimizer + parameterType: categorical + feasibleSpace: + list: + - sgd + - adam + - ftrl + trialTemplate: + primaryContainerName: training-container + trialParameters: + - name: learningRate + description: Learning rate for the training model + reference: lr + - name: numberLayers + description: Number of training model layers + reference: num-layers + - name: optimizer + description: Training model optimizer (sdg, adam or ftrl) + reference: optimizer + trialSpec: + apiVersion: batch/v1 + kind: Job + spec: + template: + spec: + containers: + - name: training-container + image: docker.io/kubeflowkatib/mxnet-mnist:v1beta1-45c5727 + command: + - "python3" + - "/opt/mxnet-mnist/mnist.py" + - "--batch-size=64" + - "--lr=${trialParameters.learningRate}" + - "--num-layers=${trialParameters.numberLayers}" + - "--optimizer=${trialParameters.optimizer}" + restartPolicy: Never + diff --git a/pkg/suggestion/v1beta1/optuna/service.py b/pkg/suggestion/v1beta1/optuna/service.py new file mode 100644 index 00000000000..68d12d3cb3f --- /dev/null +++ b/pkg/suggestion/v1beta1/optuna/service.py @@ -0,0 +1,170 @@ +# Copyright 2021 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import defaultdict +import threading + +import optuna + +from pkg.apis.manager.v1beta1.python import api_pb2 +from pkg.apis.manager.v1beta1.python import api_pb2_grpc +from pkg.suggestion.v1beta1.internal.constant import INTEGER, DOUBLE, CATEGORICAL, DISCRETE, MAX_GOAL +from pkg.suggestion.v1beta1.internal.search_space import HyperParameterSearchSpace +from pkg.suggestion.v1beta1.internal.trial import Trial, Assignment +from pkg.suggestion.v1beta1.internal.base_health_service import HealthServicer + + +class OptunaService(api_pb2_grpc.SuggestionServicer, HealthServicer): + + def __init__(self): + super(OptunaService, self).__init__() + self.study = None + self.search_space = None + self.recorded_trial_names = set() + self.assignments_to_optuna_number = defaultdict(list) + self.lock = threading.Lock() + + def GetSuggestions(self, request, context): + """ + Main function to provide suggestion. + """ + with self.lock: + if self.study is None: + self.search_space = HyperParameterSearchSpace.convert(request.experiment) + self.study = self._create_study(request.experiment.spec.algorithm, self.search_space) + + trials = Trial.convert(request.trials) + + if len(trials) != 0: + self._tell(trials) + list_of_assignments = self._ask(request.request_number) + + return api_pb2.GetSuggestionsReply( + parameter_assignments=Assignment.generate(list_of_assignments) + ) + + def _create_study(self, algorithm_spec, search_space): + sampler = self._create_sampler(algorithm_spec) + direction = "maximize" if search_space.goal == MAX_GOAL else "minimize" + + study = optuna.create_study(sampler=sampler, direction=direction) + + return study + + def _create_sampler(self, algorithm_spec): + name = algorithm_spec.algorithm_name + settings = {s.name:s.value for s in algorithm_spec.algorithm_settings} + + if name == "tpe" or name == "multivariate-tpe": + kwargs = {} + for k, v in settings.items(): + if k == "startup_trials": + kwargs["n_startup_trials"] = int(v) + elif k == "ei_candidates": + kwargs["n_ei_candidates"] = int(v) + elif k == "random_state": + kwargs["seed"] = int(v) + else: + raise ValueError("Unknown name for {}: {}".format(name, k)) + + kwargs["multivariate"] = name == "multivariate-tpe" + kwargs["constant_liar"] = True + + sampler = optuna.samplers.TPESampler(**kwargs) + + elif name == "cmaes": + kwargs = {} + for k, v in settings.items(): + if k == "restart_strategy": + kwargs["restart_strategy"] = v + elif k == "sigma": + kwargs["sigma0"] = float(v) + elif k == "random_state": + kwargs["seed"] = int(v) + else: + raise ValueError("Unknown name for {}: {}".format(name, k)) + + sampler = optuna.samplers.CmaEsSampler(**kwargs) + + elif name == "random": + kwargs = {} + for k, v in settings.items(): + if k == "random_state": + kwargs["seed"] = int(v) + else: + raise ValueError("Unknown name for {}: {}".format(name, k)) + + sampler = optuna.samplers.RandomSampler(**kwargs) + + else: + raise ValueError("Unknown algorithm name: {}".format(name)) + + return sampler + + def _ask(self, request_number): + list_of_assignments = [] + for _ in range(request_number): + optuna_trial = self.study.ask(fixed_distributions=self._get_optuna_search_space()) + + assignments = [Assignment(k,v) for k,v in optuna_trial.params.items()] + list_of_assignments.append(assignments) + + assignments_key = self._get_assignments_key(assignments) + self.assignments_to_optuna_number[assignments_key].append(optuna_trial.number) + + return list_of_assignments + + def _tell(self, trials): + for trial in trials: + if trial.name not in self.recorded_trial_names: + self.recorded_trial_names.add(trial.name) + + value = float(trial.target_metric.value) + assignments_key = self._get_assignments_key(trial.assignments) + optuna_trial_numbers = self.assignments_to_optuna_number[assignments_key] + + if len(optuna_trial_numbers) != 0: + trial_number = optuna_trial_numbers.pop(0) + self.study.tell(trial_number, value) + else: + raise ValueError("An unknown trial has been passed in the GetSuggestion request.") + + def _get_assignments_key(self, assignments): + assignments = sorted(assignments, key=lambda a: a.name) + assignments_str = [f"{a.name}:{a.value}" for a in assignments] + return ",".join(assignments_str) + + def _get_optuna_search_space(self): + search_space = {} + for param in self.search_space.params: + if param.type == INTEGER: + search_space[param.name] = optuna.distributions.IntUniformDistribution(int(param.min), int(param.max)) + elif param.type == DOUBLE: + search_space[param.name] = optuna.distributions.UniformDistribution(float(param.min), float(param.max)) + elif param.type == CATEGORICAL or param.type == DISCRETE: + search_space[param.name] = optuna.distributions.CategoricalDistribution(param.list) + return search_space + + def _get_casted_assignment_value(self, assignment): + for param in self.search_space.params: + if param.name == assignment.name: + if param.type == INTEGER: + return int(assignment.value) + elif param.type == DOUBLE: + return float(assignment.value) + elif param.type == CATEGORICAL or param.type == DISCRETE: + return assignment.value + else: + raise ValueError("Unknown parameter type: {}".format(param.type)) + raise ValueError("Parameter not found in the search space: {}".format(param.name)) diff --git a/scripts/v1beta1/build.sh b/scripts/v1beta1/build.sh index 573d6812ed9..625cf9ceca4 100755 --- a/scripts/v1beta1/build.sh +++ b/scripts/v1beta1/build.sh @@ -83,6 +83,9 @@ docker build -t ${REGISTRY}/suggestion-skopt:${TAG} -f ${CMD_PREFIX}/suggestion/ echo -e "\nBuilding goptuna suggestion...\n" docker build -t ${REGISTRY}/suggestion-goptuna:${TAG} -f ${CMD_PREFIX}/suggestion/goptuna/${VERSION}/Dockerfile . +echo -e "\nBuilding optuna suggestion...\n" +docker build -t ${REGISTRY}/suggestion-optuna:${TAG} -f ${CMD_PREFIX}/suggestion/optuna/${VERSION}/Dockerfile . + echo -e "\nBuilding ENAS suggestion...\n" if [ $MACHINE_ARCH == "aarch64" ]; then docker build -t ${REGISTRY}/suggestion-enas:${TAG} -f ${CMD_PREFIX}/suggestion/nas/enas/${VERSION}/Dockerfile.aarch64 . diff --git a/test/scripts/v1beta1/python-tests.sh b/test/scripts/v1beta1/python-tests.sh index 9e10814fdb5..3179491c585 100755 --- a/test/scripts/v1beta1/python-tests.sh +++ b/test/scripts/v1beta1/python-tests.sh @@ -25,6 +25,7 @@ pip install -r test/suggestion/v1beta1/test_requirements.txt pip install -r cmd/suggestion/chocolate/v1beta1/requirements.txt pip install -r cmd/suggestion/hyperopt/v1beta1/requirements.txt pip install -r cmd/suggestion/skopt/v1beta1/requirements.txt +pip install -r cmd/suggestion/optuna/v1beta1/requirements.txt pip install -r cmd/suggestion/nas/enas/v1beta1/requirements.txt pip install -r cmd/suggestion/hyperband/v1beta1/requirements.txt pip install -r cmd/suggestion/nas/darts/v1beta1/requirements.txt diff --git a/test/suggestion/v1beta1/test_optuna_service.py b/test/suggestion/v1beta1/test_optuna_service.py new file mode 100644 index 00000000000..17f061c5cf5 --- /dev/null +++ b/test/suggestion/v1beta1/test_optuna_service.py @@ -0,0 +1,189 @@ +# Copyright 2021 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc +import grpc_testing + +import pytest + +from pkg.apis.manager.v1beta1.python import api_pb2 + +from pkg.suggestion.v1beta1.optuna.service import OptunaService + + +class TestOptuna: + def setup_method(self): + servicers = { + api_pb2.DESCRIPTOR.services_by_name['Suggestion']: OptunaService( + ) + } + + self.test_server = grpc_testing.server_from_dictionary( + servicers, grpc_testing.strict_real_time()) + + @pytest.mark.parametrize( + ["algorithm_name", "algorithm_settings"], + [ + ["tpe", {"startup_trials": "20", "ei_candidates": "10", "random_state": "71"}], + ["multivariate-tpe", {"startup_trials": "20", "ei_candidates": "10", "random_state": "71"}], + ["cmaes", {"restart_strategy": "ipop", "sigma": "2", "random_state": "71"}], + ["random", {"random_state": "71"}], + ], + ) + def test_get_suggestion(self, algorithm_name, algorithm_settings): + experiment = api_pb2.Experiment( + name="test", + spec=api_pb2.ExperimentSpec( + algorithm=api_pb2.AlgorithmSpec( + algorithm_name=algorithm_name, + algorithm_settings = [ + api_pb2.AlgorithmSetting( + name=name, + value=value + ) for name, value in algorithm_settings.items() + ], + ), + objective=api_pb2.ObjectiveSpec( + type=api_pb2.MAXIMIZE, + goal=0.9 + ), + parameter_specs=api_pb2.ExperimentSpec.ParameterSpecs( + parameters=[ + api_pb2.ParameterSpec( + name="param-1", + parameter_type=api_pb2.INT, + feasible_space=api_pb2.FeasibleSpace( + max="5", min="1", list=[]), + ), + api_pb2.ParameterSpec( + name="param-2", + parameter_type=api_pb2.CATEGORICAL, + feasible_space=api_pb2.FeasibleSpace( + max=None, min=None, list=["cat1", "cat2", "cat3"]) + ), + api_pb2.ParameterSpec( + name="param-3", + parameter_type=api_pb2.DISCRETE, + feasible_space=api_pb2.FeasibleSpace( + max=None, min=None, list=["3", "2", "6"]) + ), + api_pb2.ParameterSpec( + name="param-4", + parameter_type=api_pb2.DOUBLE, + feasible_space=api_pb2.FeasibleSpace( + max="5", min="1", list=[]) + ) + ] + ) + ) + ) + + # Run the first suggestion with no previous trials in the request + request = api_pb2.GetSuggestionsRequest( + experiment=experiment, + trials=[], + request_number=2, + ) + + get_suggestion = self.test_server.invoke_unary_unary( + method_descriptor=(api_pb2.DESCRIPTOR + .services_by_name['Suggestion'] + .methods_by_name['GetSuggestions']), + invocation_metadata={}, + request=request, timeout=1) + + response, metadata, code, details = get_suggestion.termination() + assert code == grpc.StatusCode.OK + assert 2 == len(response.parameter_assignments) + + # Run the second suggestion with trials whose parameters are assigned in the first request + trials = [ + api_pb2.Trial( + name="test-asfjh", + spec=api_pb2.TrialSpec( + objective=api_pb2.ObjectiveSpec( + type=api_pb2.MAXIMIZE, + objective_metric_name="metric-2", + goal=0.9 + ), + parameter_assignments=api_pb2.TrialSpec.ParameterAssignments( + assignments=response.parameter_assignments[0].assignments + ) + ), + status=api_pb2.TrialStatus( + condition=api_pb2.TrialStatus.TrialConditionType.SUCCEEDED, + observation=api_pb2.Observation( + metrics=[ + api_pb2.Metric( + name="metric-1", + value="435" + ), + api_pb2.Metric( + name="metric-2", + value="5643" + ), + ] + ) + ) + ), + api_pb2.Trial( + name="test-234hs", + spec=api_pb2.TrialSpec( + objective=api_pb2.ObjectiveSpec( + type=api_pb2.MAXIMIZE, + objective_metric_name="metric-2", + goal=0.9 + ), + parameter_assignments=api_pb2.TrialSpec.ParameterAssignments( + assignments=response.parameter_assignments[1].assignments + ) + ), + status=api_pb2.TrialStatus( + condition=api_pb2.TrialStatus.TrialConditionType.SUCCEEDED, + observation=api_pb2.Observation( + metrics=[ + api_pb2.Metric( + name="metric-1", + value="123" + ), + api_pb2.Metric( + name="metric-2", + value="3028" + ), + ] + ) + ) + ) + ] + + request = api_pb2.GetSuggestionsRequest( + experiment=experiment, + trials=trials, + request_number=2, + ) + + get_suggestion = self.test_server.invoke_unary_unary( + method_descriptor=(api_pb2.DESCRIPTOR + .services_by_name['Suggestion'] + .methods_by_name['GetSuggestions']), + invocation_metadata={}, + request=request, timeout=1) + + response, metadata, code, details = get_suggestion.termination() + assert code == grpc.StatusCode.OK + assert 2 == len(response.parameter_assignments) + + +if __name__ == '__main__': + pytest.main()