diff --git a/cmd/suggestion/hyperband/v1alpha2/Dockerfile b/cmd/suggestion/hyperband/v1alpha2/Dockerfile new file mode 100644 index 00000000000..2df509cfb00 --- /dev/null +++ b/cmd/suggestion/hyperband/v1alpha2/Dockerfile @@ -0,0 +1,8 @@ +FROM python:3 + +ADD . /usr/src/app/github.com/kubeflow/katib +WORKDIR /usr/src/app/github.com/kubeflow/katib/cmd/suggestion/hyperband/v1alpha2 +RUN pip install --no-cache-dir -r requirements.txt +ENV PYTHONPATH /usr/src/app/github.com/kubeflow/katib:/usr/src/app/github.com/kubeflow/katib/pkg/api/v1alpha2/python + +ENTRYPOINT ["python", "main.py"] diff --git a/cmd/suggestion/hyperband/v1alpha2/__init__.py b/cmd/suggestion/hyperband/v1alpha2/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/cmd/suggestion/hyperband/v1alpha2/main.py b/cmd/suggestion/hyperband/v1alpha2/main.py new file mode 100644 index 00000000000..b09670b5ce6 --- /dev/null +++ b/cmd/suggestion/hyperband/v1alpha2/main.py @@ -0,0 +1,23 @@ +import grpc +import time +from pkg.api.v1alpha2.python import api_pb2_grpc +from pkg.suggestion.v1alpha2.hyperband_service import HyperbandService +from concurrent import futures + +_ONE_DAY_IN_SECONDS = 60 * 60 * 24 +DEFAULT_PORT = "0.0.0.0:6789" + +def serve(): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + api_pb2_grpc.add_SuggestionServicer_to_server(HyperbandService(), server) + server.add_insecure_port(DEFAULT_PORT) + print("Listening...") + server.start() + try: + while True: + time.sleep(_ONE_DAY_IN_SECONDS) + except KeyboardInterrupt: + server.stop(0) + +if __name__ == "__main__": + serve() diff --git a/cmd/suggestion/hyperband/v1alpha2/requirements.txt b/cmd/suggestion/hyperband/v1alpha2/requirements.txt new file mode 100644 index 00000000000..8d2c9d4bda7 --- /dev/null +++ b/cmd/suggestion/hyperband/v1alpha2/requirements.txt @@ -0,0 +1,9 @@ +grpcio +duecredit +cloudpickle==0.5.6 +numpy>=1.13.3 +scikit-learn>=0.19.0 +scipy>=0.19.1 +forestci +protobuf +googleapis-common-protos diff --git a/examples/v1alpha2/hyperband-example.yaml b/examples/v1alpha2/hyperband-example.yaml new file mode 100644 index 00000000000..a59d68aae36 --- /dev/null +++ b/examples/v1alpha2/hyperband-example.yaml @@ -0,0 +1,69 @@ +apiVersion: "kubeflow.org/v1alpha2" +kind: Experiment +metadata: + namespace: kubeflow + name: hyperband-example +spec: + objective: + type: maximize + goal: 0.99 + objectiveMetricName: Validation-accuracy + additionalMetricNames: + - accuracy + algorithm: + algorithmName: hyperband + algorithmSettings: + - name: "resourceName" + value: "--num-epochs" + - name: "eta" + value: "3" + - name: "r_l" + value: "9" + maxFailedTrialCount: 3 + parameters: + - name: --lr + parameterType: double + feasibleSpace: + min: "0.01" + max: "0.03" + - name: --num-layers + parameterType: int + feasibleSpace: + min: "2" + max: "5" + - name: --optimizer + parameterType: categorical + feasibleSpace: + list: + - sgd + - adam + - ftrl + - name: --num-epochs + parametertype: int + feasible: + min: "20" + max: "20" + trialTemplate: + goTemplate: + rawTemplate: |- + apiVersion: batch/v1 + kind: Job + metadata: + name: {{.Trial}} + namespace: {{.NameSpace}} + spec: + template: + spec: + containers: + - name: {{.Trial}} + image: katib/mxnet-mnist-example + command: + - "python" + - "/mxnet/example/image-classification/train_mnist.py" + - "--batch-size=64" + {{- with .HyperParameters}} + {{- range .}} + - "{{.Name}}={{.Value}}" + {{- end}} + {{- end}} + restartPolicy: Never diff --git a/manifests/v1alpha2/suggestion/hyperband/deployment.yaml b/manifests/v1alpha2/suggestion/hyperband/deployment.yaml new file mode 100644 index 00000000000..35bfe8d689d --- /dev/null +++ b/manifests/v1alpha2/suggestion/hyperband/deployment.yaml @@ -0,0 +1,24 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: katib-suggestion-hyperband + namespace: kubeflow + labels: + app: katib + component: suggestion-hyperband +spec: + replicas: 1 + template: + metadata: + name: katib-suggestion-hyperband + labels: + app: katib + component: suggestion-hyperband + spec: + containers: + - name: katib-suggestion-hyperband + image: katib/v1alpha2/suggestion-hyperband + imagePullPolicy: IfNotPresent + ports: + - name: api + containerPort: 6789 diff --git a/manifests/v1alpha2/suggestion/hyperband/service.yaml b/manifests/v1alpha2/suggestion/hyperband/service.yaml new file mode 100644 index 00000000000..9648d9dc113 --- /dev/null +++ b/manifests/v1alpha2/suggestion/hyperband/service.yaml @@ -0,0 +1,17 @@ +apiVersion: v1 +kind: Service +metadata: + name: katib-suggestion-hyperband + namespace: kubeflow + labels: + app: katib + component: suggestion-hyperband +spec: + type: ClusterIP + ports: + - port: 6789 + protocol: TCP + name: api + selector: + app: katib + component: suggestion-hyperband diff --git a/pkg/controller/v1alpha2/experiment/experiment_controller.go b/pkg/controller/v1alpha2/experiment/experiment_controller.go index 38e44eb36b6..b15a5e420db 100644 --- a/pkg/controller/v1alpha2/experiment/experiment_controller.go +++ b/pkg/controller/v1alpha2/experiment/experiment_controller.go @@ -354,6 +354,10 @@ func (r *ReconcileExperiment) createTrials(instance *experimentsv1alpha2.Experim logger.Error(err, "Get suggestions error") return err } + if len(trials) == 0 { + // for some suggestion services, such as hyperband, it will stop generating new trial once some condition satisfied + util.UpdateExperimentStatusCondition(instance, false, true) + } for _, trial := range trials { if err = r.createTrialInstance(instance, trial); err != nil { logger.Error(err, "Create trial instance error", "trial", trial) diff --git a/pkg/controller/v1alpha2/experiment/util/status_util.go b/pkg/controller/v1alpha2/experiment/util/status_util.go index 0eed7f803e5..f3c890d43e8 100644 --- a/pkg/controller/v1alpha2/experiment/util/status_util.go +++ b/pkg/controller/v1alpha2/experiment/util/status_util.go @@ -38,7 +38,7 @@ func UpdateExperimentStatus(instance *experimentsv1alpha2.Experiment, trials *tr isObjectiveGoalReached := updateTrialsSummary(instance, trials) - updateExperimentStatusCondition(instance, isObjectiveGoalReached) + UpdateExperimentStatusCondition(instance, isObjectiveGoalReached, false) return nil } @@ -134,7 +134,7 @@ func getObjectiveMetricValue(trial trialsv1alpha2.Trial, objectiveMetricName str return nil } -func updateExperimentStatusCondition(instance *experimentsv1alpha2.Experiment, isObjectiveGoalReached bool) { +func UpdateExperimentStatusCondition(instance *experimentsv1alpha2.Experiment, isObjectiveGoalReached bool, getSuggestionDone bool) { completedTrialsCount := instance.Status.TrialsSucceeded + instance.Status.TrialsFailed + instance.Status.TrialsKilled failedTrialsCount := instance.Status.TrialsFailed @@ -154,6 +154,13 @@ func updateExperimentStatusCondition(instance *experimentsv1alpha2.Experiment, i return } + if getSuggestionDone && (instance.Status.TrialsPending+instance.Status.TrialsRunning) == 0 { + msg := "Experiment has succeeded because suggestion service has reached the end" + instance.MarkExperimentStatusSucceeded(ExperimentSucceededReason, msg) + instance.Status.CompletionTime = &now + return + } + if (instance.Spec.MaxFailedTrialCount != nil) && (failedTrialsCount >= *instance.Spec.MaxFailedTrialCount) { msg := "Experiment has failed because max failed count has reached" instance.MarkExperimentStatusFailed(ExperimentFailedReason, msg) diff --git a/pkg/controller/v1alpha2/trial/trial_controller.go b/pkg/controller/v1alpha2/trial/trial_controller.go index 3c23757f8e0..219db776313 100644 --- a/pkg/controller/v1alpha2/trial/trial_controller.go +++ b/pkg/controller/v1alpha2/trial/trial_controller.go @@ -233,7 +233,7 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1alpha2.Trial) error { return err } // Update Trial job status only if observation field is available. - // This will ensure that trial is set to be complete only if metric is collected atleast once + // This will ensure that trial is set to be complete only if metric is collected at least once if isTrialObservationAvailable(instance) { if err = r.UpdateTrialStatusCondition(instance, deployedJob); err != nil { logger.Error(err, "Update trial status condition error") diff --git a/pkg/suggestion/v1alpha2/hyperband_service.py b/pkg/suggestion/v1alpha2/hyperband_service.py new file mode 100644 index 00000000000..684ad5ee22d --- /dev/null +++ b/pkg/suggestion/v1alpha2/hyperband_service.py @@ -0,0 +1,260 @@ +import math +import traceback + +import logging +from logging import getLogger, StreamHandler, DEBUG +from pkg.api.v1alpha2.python import api_pb2 +from pkg.api.v1alpha2.python import api_pb2_grpc +import grpc +from . import parsing_util + +class HyperbandService(api_pb2_grpc.SuggestionServicer): + def __init__(self): + self.manager_addr = "katib-manager" + self.manager_port = 6789 + FORMAT = '%(asctime)-15s Experiment %(experiment_name)s %(message)s' + self.logger = getLogger(__name__) + logging.basicConfig(format=FORMAT) + handler = StreamHandler() + handler.setLevel(DEBUG) + self.logger.setLevel(DEBUG) + self.logger.addHandler(handler) + + def _get_experiment(self, name): + channel = grpc.beta.implementations.insecure_channel(self.manager_addr, self.manager_port) + with api_pb2.beta_create_Manager_stub(channel) as client: + exp = client.GetExperiment(api_pb2.GetExperimentRequest(experiment_name=name), 10) + return exp.experiment + + def _get_algorithm_settings(self, experiment_name): + channel = grpc.beta.implementations.insecure_channel(self.manager_addr, self.manager_port) + with api_pb2.beta_create_Manager_stub(channel) as client: + alg = client.GetAlgorithmExtraSettings(api_pb2.GetAlgorithmExtraSettingsRequest( + experiment_name=experiment_name), 10) + params = alg.extra_algorithm_settings + alg_settings = {} + for param in params: + alg_settings[param.name] = param.value + return alg_settings + + def _get_trials(self, experiment_name): + channel = grpc.beta.implementations.insecure_channel(self.manager_addr, self.manager_port) + with api_pb2.beta_create_Manager_stub(channel) as client: + reply = client.GetTrialList(api_pb2.GetTrialListRequest(experiment_name=experiment_name), 10) + return reply.trials + + def GetSuggestions(self, request, context): + """ + Main function to provide suggestion. + """ + try: + reply = api_pb2.GetSuggestionsReply() + experiment_name = request.experiment_name + experiment = self._get_experiment(experiment_name) + alg_settings = self._get_algorithm_settings(experiment_name) + + sParams = self._parse_suggestionParameters(experiment, alg_settings) + if sParams["current_s"] < 0: + return reply + + trials = self._make_bracket(experiment, sParams) + self._update_algorithm_extrasettings(experiment_name, sParams) + for trial in trials: + reply.trials.add(spec=trial) + return reply + except Exception as e: + self.logger.error("Fail to generate trials: \n%s", + traceback.format_exc(), extra={"experiment_name": experiment_name}) + raise e + + def _update_hbParameters(self, sParams): + sParams["current_i"] += 1 + if sParams["current_i"] > sParams["current_s"]: + self._new_hbParameters(sParams) + + def _new_hbParameters(self, sParams): + sParams["current_s"] -= 1 + sParams["current_i"] = 0 + if sParams["current_s"] >= 0: + # when sParams["current_s"] < 0, hyperband algorithm reaches the end + sParams["n"] = int(math.ceil(float(sParams["sMax"] + 1) * ( + float(sParams["eta"]**sParams["current_s"]) / float(sParams["current_s"]+1)))) + sParams["r"] = sParams["r_l"]*sParams["eta"]**(-sParams["current_s"]) + + def _parse_suggestionParameters(self, experiment, alg_settings): + sParams = { + "eta": 3, + "sMax": -1, + "r_l": -1, + "b_l": -1, + "r": -1, + "n": -1, + "current_s": -2, + "current_i": -1, + "resourceName": "", + "evaluatingTrials": 0, + } + + for k, v in alg_settings.items(): + if k in ["eta", "r_l", "b_l"]: + sParams[k] = float(v) + elif k in ["n", "r", "current_s", "current_i", "sMax", "evaluatingTrials"]: + sParams[k] = int(float(v)) + elif k == "resourceName": + sParams[k] = v + else: + self.logger.info("Unknown HyperBand Param %s, ignore it", + k, extra={"experiment_name": experiment.name}) + if sParams["current_s"] == -1: + # Hyperband outlerloop has finished + self.logger.info("HyperBand outlerloop has finished.", + extra={"experiment_name": experiment.name}) + return sParams + + if sParams["eta"] <= 0: + sParams["eta"] = 3 + if sParams["sMax"] < 0: + sParams["sMax"] = int(math.log(sParams["r_l"]) / math.log(sParams["eta"])) + if sParams["b_l"] < 0: + sParams["b_l"] = (sParams["sMax"] + 1) * sParams["r_l"] + if sParams["current_s"] < 0: + sParams["current_s"] = sParams["sMax"] + if sParams["current_i"] < 0: + sParams["current_i"] = 0 + if sParams["n"] < 0: + sParams["n"] = int(math.ceil(float(sParams["sMax"] + 1) * ( + float(sParams["eta"]**sParams["current_s"]) / float(sParams["current_s"]+1)))) + if sParams["r"] < 0: + sParams["r"] = sParams["r_l"]*sParams["eta"]**(-sParams["current_s"]) + + return sParams + + def _make_bracket(self, experiment, sParams): + if sParams["evaluatingTrials"] == 0: + trialSpecs = self._make_master_bracket(experiment, sParams) + else: + trialSpecs = self._make_child_bracket(experiment, sParams) + if sParams["current_i"] < sParams["current_s"]: + sParams["evaluatingTrials"] = len(trialSpecs) + else: + sParams["evaluatingTrials"] = 0 + + self.logger.info("HyperBand Param eta %d.", + sParams["eta"], extra={"experiment_name": experiment.name}) + self.logger.info("HyperBand Param R %d.", + sParams["r_l"], extra={"experiment_name": experiment.name}) + self.logger.info("HyperBand Param sMax %d.", + sParams["sMax"], extra={"experiment_name": experiment.name}) + self.logger.info("HyperBand Param B %d.", + sParams["b_l"], extra={"experiment_name": experiment.name}) + self.logger.info("HyperBand Param n %d.", + sParams["n"], extra={"experiment_name": experiment.name}) + self.logger.info("HyperBand Param r %d.", + sParams["r"], extra={"experiment_name": experiment.name}) + self.logger.info("HyperBand Param s %d.", + sParams["current_s"], extra={"experiment_name": experiment.name}) + self.logger.info("HyperBand Param i %d.", + sParams["current_i"], extra={"experiment_name": experiment.name}) + self.logger.info("HyperBand evaluating trials count %d.", + sParams["evaluatingTrials"], extra={"experiment_name": experiment.name}) + + if sParams["evaluatingTrials"] == 0: + self._new_hbParameters(sParams) + + return trialSpecs + + def _make_child_bracket(self, experiment, sParams): + n_i = math.ceil(sParams["n"] * sParams["eta"]**(-sParams["current_i"])) + top_trials_num = int(math.ceil(n_i / sParams["eta"])) + self._update_hbParameters(sParams) + r_i = int(sParams["r"] * sParams["eta"]**sParams["current_i"]) + last_trials = self._get_top_trial(sParams["evaluatingTrials"], top_trials_num, experiment) + trialSpecs = self._copy_trials(last_trials, r_i, sParams["resourceName"]) + + self.logger.info("Generate %d trials by child bracket.", + top_trials_num, extra={"experiment_name": experiment.name}) + return trialSpecs + + def _get_last_trials(self, all_trials, latest_trials_num): + sorted_trials = sorted(all_trials, key=lambda trial: trial.status.start_time) + if len(sorted_trials) > latest_trials_num: + return sorted_trials[-latest_trials_num:] + else: + return sorted_trials + + def _get_top_trial(self, latest_trials_num, top_trials_num, experiment): + objective_metric = experiment.spec.objective.objective_metric_name + objective_type = experiment.spec.objective.type + + def get_objective_value(t): + for m in t.status.observation.metrics: + if m.name == objective_metric: + return float(m.value) + + top_trials = [] + all_trials = self._get_trials(experiment.name) + latest_trials = self._get_last_trials(all_trials, latest_trials_num) + + for t in latest_trials: + if t.status.condition != api_pb2.TrialStatus.TrialConditionType.SUCCEEDED: + raise Exception( + "There are some trials which are not completed yet for experiment %s." % experiment.name) + + if objective_type == api_pb2.MAXIMIZE: + top_trials.extend(sorted(latest_trials, key=get_objective_value, reverse=True)) + else: + top_trials.extend(sorted(latest_trials, key=get_objective_value)) + return top_trials[:top_trials_num] + + def _copy_trials(self, trials, r_i, resourceName): + trialSpecs = [] + for t in trials: + trial_spec = api_pb2.TrialSpec() + for assignment in t.spec.parameter_assignments.assignments: + if assignment.name == resourceName: + value = str(r_i) + else: + value = assignment.value + trial_spec.parameter_assignments.assignments.add(name=assignment.name, + value=value) + trialSpecs.append(trial_spec) + return trialSpecs + + def _make_master_bracket(self, experiment, sParams): + n = sParams["n"] + r = int(sParams["r"]) + parameter_config = parsing_util.parse_parameter_configs( + experiment.spec.parameter_specs.parameters) + trial_specs = [] + for _ in range(n): + sample = parameter_config.random_sample() + suggestion = parsing_util.parse_x_next_vector( + sample, + parameter_config.parameter_types, + parameter_config.names, + parameter_config.discrete_info, + parameter_config.categorical_info) + trial_spec = api_pb2.TrialSpec() + trial_spec.experiment_name = experiment.name + for param in suggestion: + if param['name'] == sParams["resourceName"]: + param['value'] = str(r) + trial_spec.parameter_assignments.assignments.add(name=param['name'], + value=str(param['value'])) + trial_specs.append(trial_spec) + self.logger.info("Generate %d trials by master bracket.", + n, extra={"experiment_name": experiment.name}) + return trial_specs + + def _update_algorithm_extrasettings(self, experiment_name, sParams): + as_list = [] + for k, v in sParams.items(): + as_list.append(api_pb2.AlgorithmSetting(name=k, value=str(v))) + channel = grpc.beta.implementations.insecure_channel(self.manager_addr, self.manager_port) + with api_pb2.beta_create_Manager_stub(channel) as client: + client.UpdateAlgorithmExtraSettings(api_pb2.UpdateAlgorithmExtraSettingsRequest( + experiment_name=experiment_name, extra_algorithm_settings=as_list), 10) + + def ValidateAlgorithmSettings(self, request, context): + # TODO + return api_pb2.ValidateAlgorithmSettingsReply() diff --git a/test/scripts/v1alpha2/build-suggestion-hyperband.sh b/test/scripts/v1alpha2/build-suggestion-hyperband.sh index 4f9d0f1464e..6299ece5834 100755 --- a/test/scripts/v1alpha2/build-suggestion-hyperband.sh +++ b/test/scripts/v1alpha2/build-suggestion-hyperband.sh @@ -16,8 +16,6 @@ # This shell script is used to build an image from our argo workflow -exit 0 - set -o errexit set -o nounset set -o pipefail @@ -39,6 +37,6 @@ cp -r vendor ${GO_DIR}/vendor cd ${GO_DIR} -#cp cmd/suggestion/hyperband/Dockerfile . -#gcloud builds submit . --tag=${REGISTRY}/${REPO_NAME}/suggestion-hyperband:${VERSION} --project=${PROJECT} -#gcloud container images add-tag --quiet ${REGISTRY}/${REPO_NAME}/suggestion-hyperband:${VERSION} ${REGISTRY}/${REPO_NAME}/suggestion-hyperband:latest --verbosity=info +cp cmd/suggestion/hyperband/v1alpha2/Dockerfile . +gcloud builds submit . --tag=${REGISTRY}/${REPO_NAME}/v1alpha2/suggestion-hyperband:${VERSION} --project=${PROJECT} +gcloud container images add-tag --quiet ${REGISTRY}/${REPO_NAME}/v1alpha2/suggestion-hyperband:${VERSION} ${REGISTRY}/${REPO_NAME}/v1alpha2/suggestion-hyperband:latest --verbosity=info diff --git a/test/scripts/v1alpha2/run-tests.sh b/test/scripts/v1alpha2/run-tests.sh index a2986abf98a..2333dde03ec 100755 --- a/test/scripts/v1alpha2/run-tests.sh +++ b/test/scripts/v1alpha2/run-tests.sh @@ -73,6 +73,7 @@ sed -i -e "s@image: katib\/v1alpha2\/suggestion-random@image: ${REGISTRY}\/${REP sed -i -e "s@image: katib\/v1alpha2\/suggestion-bayesianoptimization@image: ${REGISTRY}\/${REPO_NAME}\/v1alpha2\/suggestion-bayesianoptimization:${VERSION}@" manifests/v1alpha2/suggestion/bayesianoptimization/deployment.yaml sed -i -e "s@image: katib\/v1alpha2\/suggestion-nasrl@image: ${REGISTRY}\/${REPO_NAME}\/v1alpha2\/suggestion-nasrl:${VERSION}@" manifests/v1alpha2/suggestion/nasrl/deployment.yaml sed -i -e "s@image: katib\/v1alpha2\/suggestion-grid@image: ${REGISTRY}\/${REPO_NAME}\/v1alpha2\/suggestion-grid:${VERSION}@" manifests/v1alpha2/suggestion/grid/deployment.yaml +sed -i -e "s@image: katib\/v1alpha2\/suggestion-hyperband@image: ${REGISTRY}\/${REPO_NAME}\/v1alpha2\/suggestion-hyperband:${VERSION}@" manifests/v1alpha2/suggestion/hyperband/deployment.yaml ./scripts/v1alpha2/deploy.sh