From 23f1b8c85151e41d77955f80b32ab29dbab6f0e8 Mon Sep 17 00:00:00 2001 From: hougang liu Date: Mon, 13 May 2019 22:55:52 +0800 Subject: [PATCH 1/2] create experiment in db --- pkg/common/v1alpha2/common.go | 32 +++++++++++++++++++ .../v1alpha2/experiment/util/api_util.go | 19 +++++++---- pkg/db/v1alpha2/interface.go | 32 +++++++------------ 3 files changed, 56 insertions(+), 27 deletions(-) diff --git a/pkg/common/v1alpha2/common.go b/pkg/common/v1alpha2/common.go index d0ff11559d6..19b5000fff8 100644 --- a/pkg/common/v1alpha2/common.go +++ b/pkg/common/v1alpha2/common.go @@ -2,8 +2,11 @@ package v1alpha2 import ( "os" + "context" experimentsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/experiment/v1alpha2" + api_pb "github.com/kubeflow/katib/pkg/api/v1alpha2" + "google.golang.org/grpc" ) const ( @@ -15,6 +18,11 @@ const ( ManagerAddr = KatibManagerService + ":" + KatibManagerPort ) +type katibClientAndConnection struct { + Conn *grpc.ClientConn + KatibClient api_pb.ManagerClient +} + func GetManagerAddr() string { ns := os.Getenv(experimentsv1alpha2.DefaultKatibNamespaceEnvName) if len(ns) == 0 { @@ -29,3 +37,27 @@ func GetManagerAddr() string { return KatibManagerService + "." + ns + ":" + KatibManagerPort } } + +func getKatibClientAndConnection() (*katibClientAndConnection, error) { + addr := GetManagerAddr() + conn, err := grpc.Dial(addr, grpc.WithInsecure()) + if err != nil { + return nil, err + } + kc := &katibClientAndConnection { + Conn: conn, + KatibClient: api_pb.NewManagerClient(conn), + } + return kc, nil +} + +func RegisterExperiment(request *api_pb.RegisterExperimentRequest) (*api_pb.RegisterExperimentReply, error) { + ctx := context.Background() + kcc, err := getKatibClientAndConnection() + if err != nil { + return nil, err + } + defer kcc.Conn.Close() + kc := kcc.KatibClient + return kc.RegisterExperiment(ctx, request) +} diff --git a/pkg/controller/v1alpha2/experiment/util/api_util.go b/pkg/controller/v1alpha2/experiment/util/api_util.go index 816d956475c..ffb2a36cc6a 100644 --- a/pkg/controller/v1alpha2/experiment/util/api_util.go +++ b/pkg/controller/v1alpha2/experiment/util/api_util.go @@ -21,12 +21,17 @@ import ( experimentsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/experiment/v1alpha2" commonv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/common/v1alpha2" api_pb "github.com/kubeflow/katib/pkg/api/v1alpha2" + common "github.com/kubeflow/katib/pkg/common/v1alpha2" ) func CreateExperimentInDB(instance *experimentsv1alpha2.Experiment) error { - //TODO: Save experiment in to db - // experiment := GetExperimentConf(instance) - + experiment := GetExperimentConf(instance) + request := &api_pb.RegisterExperimentRequest { + Experiment: experiment, + } + if _, err := common.RegisterExperiment(request); err != nil { + return err + } return nil } @@ -88,7 +93,9 @@ func GetExperimentConf(instance *experimentsv1alpha2.Experiment) *api_pb.Experim //Populate HP Experiment if instance.Spec.Parameters != nil { - + parameterSpecs := &api_pb.ExperimentSpec_ParameterSpecs{ + Parameters: []*api_pb.ParameterSpec{}, + } for _, p := range instance.Spec.Parameters { parameter := &api_pb.ParameterSpec{ FeasibleSpace: &api_pb.FeasibleSpace{}, @@ -111,9 +118,9 @@ func GetExperimentConf(instance *experimentsv1alpha2.Experiment) *api_pb.Experim case experimentsv1alpha2.ParameterTypeUnknown: parameter.ParameterType = api_pb.ParameterType_UNKNOWN_TYPE } - experiment.ExperimentSpec.ParameterSpecs.Parameters = append(experiment.ExperimentSpec.ParameterSpecs.Parameters, parameter) + parameterSpecs.Parameters = append(parameterSpecs.Parameters, parameter) } - + experiment.ExperimentSpec.ParameterSpecs = parameterSpecs } //Populate NAS Experiment diff --git a/pkg/db/v1alpha2/interface.go b/pkg/db/v1alpha2/interface.go index 0d960279fd0..4a8054a3e0c 100644 --- a/pkg/db/v1alpha2/interface.go +++ b/pkg/db/v1alpha2/interface.go @@ -135,44 +135,33 @@ func (d *dbConn) RegisterExperiment(experiment *v1alpha2.Experiment) error { if experiment.ExperimentSpec.ParameterSpecs != nil { paramSpecs, err = (&jsonpb.Marshaler{}).MarshalToString(experiment.ExperimentSpec.ParameterSpecs) if err != nil { - log.Fatalf("Error marshaling Parameters: %v", err) + return fmt.Errorf("Error marshaling Parameters: %v", err) } } if experiment.ExperimentSpec.Objective != nil { objSpec, err = (&jsonpb.Marshaler{}).MarshalToString(experiment.ExperimentSpec.Objective) if err != nil { - log.Fatalf("Error marshaling Objective: %v", err) + return fmt.Errorf("Error marshaling Objective: %v", err) } } if experiment.ExperimentSpec.Algorithm != nil { algoSpec, err = (&jsonpb.Marshaler{}).MarshalToString(experiment.ExperimentSpec.Algorithm) if err != nil { - log.Fatalf("Error marshaling Algorithm: %v", err) + return fmt.Errorf("Error marshaling Algorithm: %v", err) } } if experiment.ExperimentSpec.NasConfig != nil { nasConfig, err = (&jsonpb.Marshaler{}).MarshalToString(experiment.ExperimentSpec.NasConfig) if err != nil { - log.Fatalf("Error marshaling NasConfig: %v", err) - } - } - } - if experiment.ExperimentStatus != nil { - if experiment.ExperimentStatus.StartTime != "" { - s_time, err := time.Parse(time.RFC3339Nano, experiment.ExperimentStatus.StartTime) - if err != nil { - log.Printf("Error parsing start time %s: %v", experiment.ExperimentStatus.StartTime, err) + return fmt.Errorf("Error marshaling NasConfig: %v", err) } - start_time = s_time.UTC().Format(mysqlTimeFmt) - } - if experiment.ExperimentStatus.CompletionTime != "" { - c_time, err := time.Parse(time.RFC3339Nano, experiment.ExperimentStatus.CompletionTime) - if err != nil { - log.Printf("Error parsing completion time %s: %v", experiment.ExperimentStatus.CompletionTime, err) - } - completion_time = c_time.UTC().Format(mysqlTimeFmt) } + } else { + return fmt.Errorf("Invalid experiment: spec is nil.") } + now_str := time.Now().UTC().Format(mysqlTimeFmt) + start_time = now_str + completion_time = now_str _, err = d.db.Exec( `INSERT INTO experiments ( name, @@ -195,13 +184,14 @@ func (d *dbConn) RegisterExperiment(experiment *v1alpha2.Experiment) error { experiment.ExperimentSpec.MetricsCollectorSpec, experiment.ExperimentSpec.ParallelTrialCount, experiment.ExperimentSpec.MaxTrialCount, - experiment.ExperimentStatus.Condition, + v1alpha2.ExperimentStatus_CREATED, start_time, completion_time, nasConfig, ) return err } + func (d *dbConn) DeleteExperiment(experimentName string) error { _, err := d.db.Exec("DELETE FROM experiments WHERE name = ?", experimentName) return err From b5e480b6b5bdb85e7290f11a08079cff341d9a48 Mon Sep 17 00:00:00 2001 From: hougang liu Date: Wed, 15 May 2019 07:07:43 +0800 Subject: [PATCH 2/2] rename katibClient to katibManagerClient --- .../{common.go => katib_manager_util.go} | 33 ++++++++++++++----- 1 file changed, 24 insertions(+), 9 deletions(-) rename pkg/common/v1alpha2/{common.go => katib_manager_util.go} (58%) diff --git a/pkg/common/v1alpha2/common.go b/pkg/common/v1alpha2/katib_manager_util.go similarity index 58% rename from pkg/common/v1alpha2/common.go rename to pkg/common/v1alpha2/katib_manager_util.go index 19b5000fff8..b477c872029 100644 --- a/pkg/common/v1alpha2/common.go +++ b/pkg/common/v1alpha2/katib_manager_util.go @@ -1,3 +1,18 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package v1alpha2 import ( @@ -15,12 +30,12 @@ const ( KatibManagerServiceNamespaceEnvName = "KATIB_MANAGER_NAMESPACE" KatibManagerService = "katib-manager" KatibManagerPort = "6789" - ManagerAddr = KatibManagerService + ":" + KatibManagerPort + ManagerAddr = KatibManagerService + ":" + KatibManagerPort ) -type katibClientAndConnection struct { +type katibManagerClientAndConn struct { Conn *grpc.ClientConn - KatibClient api_pb.ManagerClient + KatibManagerClient api_pb.ManagerClient } func GetManagerAddr() string { @@ -38,26 +53,26 @@ func GetManagerAddr() string { } } -func getKatibClientAndConnection() (*katibClientAndConnection, error) { +func getKatibManagerClientAndConn() (*katibManagerClientAndConn, error) { addr := GetManagerAddr() conn, err := grpc.Dial(addr, grpc.WithInsecure()) if err != nil { return nil, err } - kc := &katibClientAndConnection { + kcc := &katibManagerClientAndConn { Conn: conn, - KatibClient: api_pb.NewManagerClient(conn), + KatibManagerClient: api_pb.NewManagerClient(conn), } - return kc, nil + return kcc, nil } func RegisterExperiment(request *api_pb.RegisterExperimentRequest) (*api_pb.RegisterExperimentReply, error) { ctx := context.Background() - kcc, err := getKatibClientAndConnection() + kcc, err := getKatibManagerClientAndConn() if err != nil { return nil, err } defer kcc.Conn.Close() - kc := kcc.KatibClient + kc := kcc.KatibManagerClient return kc.RegisterExperiment(ctx, request) }