Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create experiment in db #509

Merged
merged 2 commits into from
May 15, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions pkg/common/v1alpha2/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package v1alpha2

import (
"os"
"context"

experimentsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/experiment/v1alpha2"
api_pb "github.com/kubeflow/katib/pkg/api/v1alpha2"
"google.golang.org/grpc"
)

const (
Expand All @@ -15,6 +18,11 @@ const (
ManagerAddr = KatibManagerService + ":" + KatibManagerPort
)

type katibClientAndConnection struct {
Conn *grpc.ClientConn
KatibClient api_pb.ManagerClient
hougangliu marked this conversation as resolved.
Show resolved Hide resolved
}

func GetManagerAddr() string {
ns := os.Getenv(experimentsv1alpha2.DefaultKatibNamespaceEnvName)
if len(ns) == 0 {
Expand All @@ -29,3 +37,27 @@ func GetManagerAddr() string {
return KatibManagerService + "." + ns + ":" + KatibManagerPort
}
}

func getKatibClientAndConnection() (*katibClientAndConnection, error) {
addr := GetManagerAddr()
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
return nil, err
}
kc := &katibClientAndConnection {
Conn: conn,
KatibClient: api_pb.NewManagerClient(conn),
}
return kc, nil
}

func RegisterExperiment(request *api_pb.RegisterExperimentRequest) (*api_pb.RegisterExperimentReply, error) {
hougangliu marked this conversation as resolved.
Show resolved Hide resolved
ctx := context.Background()
kcc, err := getKatibClientAndConnection()
if err != nil {
return nil, err
}
defer kcc.Conn.Close()
kc := kcc.KatibClient
return kc.RegisterExperiment(ctx, request)
}
19 changes: 13 additions & 6 deletions pkg/controller/v1alpha2/experiment/util/api_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@ import (
experimentsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/experiment/v1alpha2"
commonv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/common/v1alpha2"
api_pb "github.com/kubeflow/katib/pkg/api/v1alpha2"
common "github.com/kubeflow/katib/pkg/common/v1alpha2"
)

func CreateExperimentInDB(instance *experimentsv1alpha2.Experiment) error {
//TODO: Save experiment in to db
// experiment := GetExperimentConf(instance)

experiment := GetExperimentConf(instance)
request := &api_pb.RegisterExperimentRequest {
Experiment: experiment,
}
if _, err := common.RegisterExperiment(request); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -88,7 +93,9 @@ func GetExperimentConf(instance *experimentsv1alpha2.Experiment) *api_pb.Experim

//Populate HP Experiment
if instance.Spec.Parameters != nil {

parameterSpecs := &api_pb.ExperimentSpec_ParameterSpecs{
Parameters: []*api_pb.ParameterSpec{},
}
for _, p := range instance.Spec.Parameters {
parameter := &api_pb.ParameterSpec{
FeasibleSpace: &api_pb.FeasibleSpace{},
Expand All @@ -111,9 +118,9 @@ func GetExperimentConf(instance *experimentsv1alpha2.Experiment) *api_pb.Experim
case experimentsv1alpha2.ParameterTypeUnknown:
parameter.ParameterType = api_pb.ParameterType_UNKNOWN_TYPE
}
experiment.ExperimentSpec.ParameterSpecs.Parameters = append(experiment.ExperimentSpec.ParameterSpecs.Parameters, parameter)
parameterSpecs.Parameters = append(parameterSpecs.Parameters, parameter)
}

experiment.ExperimentSpec.ParameterSpecs = parameterSpecs
}

//Populate NAS Experiment
Expand Down
32 changes: 11 additions & 21 deletions pkg/db/v1alpha2/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,44 +135,33 @@ func (d *dbConn) RegisterExperiment(experiment *v1alpha2.Experiment) error {
if experiment.ExperimentSpec.ParameterSpecs != nil {
paramSpecs, err = (&jsonpb.Marshaler{}).MarshalToString(experiment.ExperimentSpec.ParameterSpecs)
if err != nil {
log.Fatalf("Error marshaling Parameters: %v", err)
return fmt.Errorf("Error marshaling Parameters: %v", err)
}
}
if experiment.ExperimentSpec.Objective != nil {
objSpec, err = (&jsonpb.Marshaler{}).MarshalToString(experiment.ExperimentSpec.Objective)
if err != nil {
log.Fatalf("Error marshaling Objective: %v", err)
return fmt.Errorf("Error marshaling Objective: %v", err)
}
}
if experiment.ExperimentSpec.Algorithm != nil {
algoSpec, err = (&jsonpb.Marshaler{}).MarshalToString(experiment.ExperimentSpec.Algorithm)
if err != nil {
log.Fatalf("Error marshaling Algorithm: %v", err)
return fmt.Errorf("Error marshaling Algorithm: %v", err)
}
}
if experiment.ExperimentSpec.NasConfig != nil {
nasConfig, err = (&jsonpb.Marshaler{}).MarshalToString(experiment.ExperimentSpec.NasConfig)
if err != nil {
log.Fatalf("Error marshaling NasConfig: %v", err)
}
}
}
if experiment.ExperimentStatus != nil {
if experiment.ExperimentStatus.StartTime != "" {
s_time, err := time.Parse(time.RFC3339Nano, experiment.ExperimentStatus.StartTime)
if err != nil {
log.Printf("Error parsing start time %s: %v", experiment.ExperimentStatus.StartTime, err)
return fmt.Errorf("Error marshaling NasConfig: %v", err)
}
start_time = s_time.UTC().Format(mysqlTimeFmt)
}
if experiment.ExperimentStatus.CompletionTime != "" {
c_time, err := time.Parse(time.RFC3339Nano, experiment.ExperimentStatus.CompletionTime)
if err != nil {
log.Printf("Error parsing completion time %s: %v", experiment.ExperimentStatus.CompletionTime, err)
}
completion_time = c_time.UTC().Format(mysqlTimeFmt)
}
} else {
return fmt.Errorf("Invalid experiment: spec is nil.")
}
now_str := time.Now().UTC().Format(mysqlTimeFmt)
start_time = now_str
completion_time = now_str
_, err = d.db.Exec(
`INSERT INTO experiments (
name,
Expand All @@ -195,13 +184,14 @@ func (d *dbConn) RegisterExperiment(experiment *v1alpha2.Experiment) error {
experiment.ExperimentSpec.MetricsCollectorSpec,
experiment.ExperimentSpec.ParallelTrialCount,
experiment.ExperimentSpec.MaxTrialCount,
experiment.ExperimentStatus.Condition,
v1alpha2.ExperimentStatus_CREATED,
start_time,
completion_time,
nasConfig,
)
return err
}

func (d *dbConn) DeleteExperiment(experimentName string) error {
_, err := d.db.Exec("DELETE FROM experiments WHERE name = ?", experimentName)
return err
Expand Down