Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support bayesianoptimization in v1alpha2 #595

Merged
merged 11 commits into from
Jun 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/suggestion/bayesianoptimization/v1alpha2/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM python:3

ADD . /usr/src/app/github.com/kubeflow/katib
WORKDIR /usr/src/app/github.com/kubeflow/katib/cmd/suggestion/bayesianoptimization/v1alpha2
RUN pip install --no-cache-dir -r requirements.txt
ENV PYTHONPATH /usr/src/app/github.com/kubeflow/katib:/usr/src/app/github.com/kubeflow/katib/pkg/api/v1alpha2/python

ENTRYPOINT ["python", "main.py"]
14 changes: 14 additions & 0 deletions cmd/suggestion/bayesianoptimization/v1alpha2/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
- start the service

```
python suggestion/bayesian/main.py
```

- start the testing client

```
python suggestion/test_client.py
```

note:
the testing client uses the [Franke's function](http://www.sfu.ca/~ssurjano/franke2d.html) as the black box, and the maximum of Franke's function is around 1.22
Empty file.
25 changes: 25 additions & 0 deletions cmd/suggestion/bayesianoptimization/v1alpha2/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import grpc
from concurrent import futures

import time

from pkg.api.v1alpha2.python import api_pb2_grpc
from pkg.suggestion.v1alpha2.bayesian_service import BayesianService

_ONE_DAY_IN_SECONDS = 60 * 60 * 24
DEFAULT_PORT = "0.0.0.0:6789"

def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
api_pb2_grpc.add_SuggestionServicer_to_server(BayesianService(), server)
server.add_insecure_port(DEFAULT_PORT)
print("Listening...")
server.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)

if __name__ == "__main__":
serve()
9 changes: 9 additions & 0 deletions cmd/suggestion/bayesianoptimization/v1alpha2/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
grpcio
duecredit
cloudpickle==0.5.6
numpy>=1.13.3
scikit-learn>=0.19.0
scipy>=0.19.1
forestci
protobuf
googleapis-common-protos
64 changes: 64 additions & 0 deletions examples/v1alpha2/bayseopt-example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
apiVersion: "kubeflow.org/v1alpha2"
kind: Experiment
metadata:
namespace: kubeflow
labels:
controller-tools.k8s.io: "1.0"
name: bayseopt-example
spec:
objective:
type: maximize
goal: 0.99
objectiveMetricName: Validation-accuracy
additionalMetricNames:
- accuracy
algorithm:
algorithmName: bayesianoptimization
algorithmSettings:
- name: "burn_in"
value: "5"
parallelTrialCount: 3
maxTrialCount: 12
maxFailedTrialCount: 3
parameters:
- name: --lr
parameterType: double
feasibleSpace:
min: "0.01"
max: "0.03"
- name: --num-layers
parameterType: int
feasibleSpace:
min: "2"
max: "5"
- name: --optimizer
parameterType: categorical
feasibleSpace:
list:
- sgd
- adam
- ftrl
trialTemplate:
goTemplate:
rawTemplate: |-
apiVersion: batch/v1
kind: Job
metadata:
name: {{.Trial}}
namespace: {{.NameSpace}}
spec:
template:
spec:
containers:
- name: {{.Trial}}
image: katib/mxnet-mnist-example
command:
- "python"
- "/mxnet/example/image-classification/train_mnist.py"
- "--batch-size=64"
{{- with .HyperParameters}}
{{- range .}}
- "{{.Name}}={{.Value}}"
{{- end}}
{{- end}}
restartPolicy: Never
24 changes: 24 additions & 0 deletions manifests/v1alpha2/suggestion/bayesianoptimization/deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: katib-suggestion-bayesianoptimization
namespace: kubeflow
labels:
app: katib
component: suggestion-bayesianoptimization
spec:
replicas: 1
template:
metadata:
name: katib-suggestion-bayesianoptimization
labels:
app: katib
component: suggestion-bayesianoptimization
spec:
containers:
- name: katib-suggestion-bayesianoptimization
image: katib/v1alpha2/suggestion-bayesianoptimization
imagePullPolicy: IfNotPresent
ports:
- name: api
containerPort: 6789
17 changes: 17 additions & 0 deletions manifests/v1alpha2/suggestion/bayesianoptimization/service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: v1
kind: Service
metadata:
name: katib-suggestion-bayesianoptimization
namespace: kubeflow
labels:
app: katib
component: suggestion-bayesianoptimization
spec:
type: ClusterIP
ports:
- port: 6789
protocol: TCP
name: api
selector:
app: katib
component: suggestion-bayesianoptimization
220 changes: 220 additions & 0 deletions pkg/suggestion/v1alpha2/bayesian_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
import random
import string

import grpc
import numpy as np

from pkg.api.v1alpha2.python import api_pb2
from pkg.api.v1alpha2.python import api_pb2_grpc
from pkg.suggestion.v1alpha2.bayesianoptimization.src.bayesian_optimization_algorithm import BOAlgorithm
from pkg.suggestion.v1alpha2.bayesianoptimization.src.algorithm_manager import AlgorithmManager
import logging
from logging import getLogger, StreamHandler, INFO, DEBUG
from . import parsing_util

timeout = 10

class BayesianService(api_pb2_grpc.SuggestionServicer):
def __init__(self, logger=None):
self.manager_addr = "katib-manager"
self.manager_port = 6789
if logger == None:
self.logger = getLogger(__name__)
FORMAT = '%(asctime)-15s Experiment %(experiment_name)s %(message)s'
logging.basicConfig(format=FORMAT)
handler = StreamHandler()
handler.setLevel(DEBUG)
self.logger.setLevel(DEBUG)
self.logger.addHandler(handler)
self.logger.propagate = False
else:
self.logger = logger

def _get_experiment(self, name):
channel = grpc.beta.implementations.insecure_channel(
self.manager_addr, self.manager_port)
with api_pb2.beta_create_Manager_stub(channel) as client:
exp = client.GetExperiment(
api_pb2.GetExperimentRequest(experiment_name=name), timeout)
return exp.experiment

def ValidateAlgorithmSettings(self, request, context):
return api_pb2.ValidateAlgorithmSettingsReply()

def GetSuggestions(self, request, context):
"""
Main function to provide suggestion.
"""
service_params = self.parseParameters(request.experiment_name)
experiment = self._get_experiment(request.experiment_name)
X_train, y_train = self.getEvalHistory(
request.experiment_name, experiment.spec.objective.objective_metric_name, service_params["burn_in"])

parameter_config = parsing_util.parse_parameter_configs(
experiment.spec.parameter_specs.parameters)
algo_manager = AlgorithmManager(
experiment_name=request.experiment_name,
experiment=experiment,
parameter_config=parameter_config,
X_train=X_train,
y_train=y_train,
logger=self.logger,
)

lowerbound = np.array(algo_manager.lower_bound)
upperbound = np.array(algo_manager.upper_bound)
self.logger.debug("lowerbound: %r", lowerbound,
extra={"experiment_name": request.experiment_name})
self.logger.debug("upperbound: %r", upperbound,
extra={"experiment_name": request.experiment_name})
alg = BOAlgorithm(
experiment_name=request.experiment_name,
dim=algo_manager.dim,
N=int(service_params["N"]),
lowerbound=lowerbound,
upperbound=upperbound,
X_train=algo_manager.X_train,
y_train=algo_manager.y_train,
mode=service_params["mode"],
trade_off=service_params["trade_off"],
# todo: support length_scale with array type
length_scale=service_params["length_scale"],
noise=service_params["noise"],
nu=service_params["nu"],
kernel_type=service_params["kernel_type"],
n_estimators=service_params["n_estimators"],
max_features=service_params["max_features"],
model_type=service_params["model_type"],
logger=self.logger,
)
self.logger.debug("alg: %r", alg,
extra={"experiment_name": request.experiment_name})
trials = []
x_next_list = alg.get_suggestion(request.request_number)
self.logger.debug("x_next_list: %r", x_next_list,
extra={"experiment_name": request.experiment_name})
for x_next in x_next_list:
x_next = x_next.squeeze()
self.logger.debug("xnext: %r ", x_next, extra={
"experiment_name": request.experiment_name})
x_next = algo_manager.parse_x_next(x_next)
x_next = algo_manager.convert_to_dict(x_next)
trials.append(api_pb2.Trial(
spec=api_pb2.TrialSpec(
experiment_name=request.experiment_name,
parameter_assignments=api_pb2.TrialSpec.ParameterAssignments(
assignments=[
api_pb2.ParameterAssignment(
name=x["name"],
value=str(x["value"]),
) for x in x_next
]
)
)
))
return api_pb2.GetSuggestionsReply(
trials=trials
)

def getEvalHistory(self, experiment_name, obj_name, burn_in):
worker_hist = []
x_train = []
y_train = []
channel = grpc.beta.implementations.insecure_channel(
self.manager_addr, self.manager_port)
with api_pb2.beta_create_Manager_stub(channel) as client:
trialsrep = client.GetTrialList(api_pb2.GetTrialListRequest(
experiment_name=experiment_name
), timeout)
for t in trialsrep.trials:
if t.status.condition == api_pb2.TrialStatus.TrialConditionType.SUCCEEDED:
gwfrep = client.GetObservationLog(
api_pb2.GetObservationLogRequest(
trial_name=t.name,
metric_name=obj_name), timeout)
w = gwfrep.observation_log
for ml in w.metric_logs:
y_train.append(float(ml.metric.value))
x_train.append(t.spec.parameter_assignments.assignments)
self.logger.info("%d completed trials are found.",
len(x_train), extra={"Experiment": experiment_name})
if len(x_train) <= burn_in:
x_train = []
y_train = []
self.logger.info("Trials will be sampled until %d trials for burn-in are completed.",
burn_in, extra={"experiment_name": experiment_name})
else:
self.logger.debug("Completed trials: %r", x_train,
extra={"experiment_name": experiment_name})

return x_train, y_train

def parseParameters(self, experiment_name):
channel = grpc.beta.implementations.insecure_channel(
self.manager_addr, self.manager_port)
params = []
with api_pb2.beta_create_Manager_stub(channel) as client:
gsprep = client.GetAlgorithmExtraSettings(
api_pb2.GetAlgorithmExtraSettingsRequest(experiment_name=experiment_name), timeout)
params = gsprep.extra_algorithm_settings

parsed_service_params = {
"N": 100,
"model_type": "gp",
"max_features": "auto",
"length_scale": 0.5,
"noise": 0.0005,
"nu": 1.5,
"kernel_type": "matern",
"n_estimators": 50,
"mode": "pi",
"trade_off": 0.01,
"trial_hist": "",
"burn_in": 10,
}
modes = ["pi", "ei"]
model_types = ["gp", "rf"]
kernel_types = ["matern", "rbf"]

for param in params:
if param.name in parsed_service_params.keys():
if param.name == "length_scale" or param.name == "noise" or param.name == "nu" or param.name == "trade_off":
try:
float(param.value)
except ValueError:
self.logger.warning(
"Parameter must be float for %s: %s back to default value", param.name, param.value)
else:
parsed_service_params[param.name] = float(param.value)

elif param.name == "N" or param.name == "n_estimators" or param.name == "burn_in":
try:
int(param.value)
except ValueError:
self.logger.warning(
"Parameter must be int for %s: %s back to default value", param.name, param.value)
else:
parsed_service_params[param.name] = int(param.value)

elif param.name == "kernel_type":
if param.value != "rbf" and param.value != "matern":
parsed_service_params[param.name] = param.value
else:
self.logger.warning(
"Unknown Parameter for %s: %s back to default value", param.name, param.value)
elif param.name == "mode" and param.value in modes:
if param.value != "lcb" and param.value != "ei" and param.value != "pi":
parsed_service_params[param.name] = param.value
else:
self.logger.warning(
"Unknown Parameter for %s: %s back to default value", param.name, param.value)
elif param.name == "model_type" and param.value in model_types:
if param.value != "rf" and param.value != "gp":
parsed_service_params[param.name] = param.value
else:
self.logger.warning(
"Unknown Parameter for %s: %s back to default value", param.name, param.value)
else:
self.logger.warning("Unknown Parameter name: %s ", param.name)

return parsed_service_params
Empty file.
Loading