Skip to content

Commit

Permalink
dynamic jobProvider and suggestionComposer registration (#1069)
Browse files Browse the repository at this point in the history
* dynamic jobProvider and suggestionComposer registration

* enforce gofmt

* fix ut

* fix ut

* fix gofmt

* fix

* refine imports

* contain providers into package(v1alpha3)

* move job kubeflow into where providerBase in

* add consts
  • Loading branch information
sperlingxx authored Feb 28, 2020
1 parent 3a19e1c commit 8267929
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 95 deletions.
19 changes: 8 additions & 11 deletions pkg/controller.v1alpha3/consts/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (

// DefaultKatibNamespaceEnvName is the default env name of katib namespace
DefaultKatibNamespaceEnvName = "KATIB_CORE_NAMESPACE"
// DefaultKatibComposerEnvName is the default env name of katib suggestion composer
DefaultKatibComposerEnvName = "KATIB_SUGGESTION_COMPOSER"

// KatibConfigMapName is the config map constants
// Configmap name which includes Katib's configuration
Expand Down Expand Up @@ -102,17 +104,10 @@ const (
// JobKindPyTorch is the kind of PyTorchJob.
JobKindPyTorch = "PyTorchJob"

// JobVersionJob is the api version of Kubernetes Job.
JobVersionJob = "v1"
// JobVersionTF is the api version of TFJob.
JobVersionTF = "v1"
// JobVersionPyTorch is the api version of PyTorchJob.
JobVersionPyTorch = "v1"

// JobGroupJob is the group name of Kubernetes Job.
JobGroupJob = "batch"
// JobGroupKubeflow is the group name of Kubeflow.
JobGroupKubeflow = "kubeflow.org"
// built-in JobRoles
JobRole = "job-role"
JobRoleTF = "tf-job-role"
JobRolePyTorch = "pytorch-job-role"

// AnnotationIstioSidecarInjectName is the annotation of Istio Sidecar
AnnotationIstioSidecarInjectName = "sidecar.istio.io/inject"
Expand All @@ -124,4 +119,6 @@ const (
var (
// DefaultKatibNamespace is the default namespace of katib deployment.
DefaultKatibNamespace = env.GetEnvOrDefault(DefaultKatibNamespaceEnvName, "kubeflow")
// DefaultComposer is the default composer of katib suggestion.
DefaultComposer = env.GetEnvOrDefault(DefaultKatibComposerEnvName, "General")
)
24 changes: 18 additions & 6 deletions pkg/controller.v1alpha3/suggestion/composer/composer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/manager"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"

suggestionsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/suggestions/v1alpha3"
Expand All @@ -28,23 +29,26 @@ const (
defaultGRPCHealthCheckProbe = "/bin/grpc_health_probe"
)

var log = logf.Log.WithName("suggestion-composer")
var (
log = logf.Log.WithName("suggestion-composer")
ComposerRegistry = make(map[string]Composer)
)

type Composer interface {
DesiredDeployment(s *suggestionsv1alpha3.Suggestion) (*appsv1.Deployment, error)
DesiredService(s *suggestionsv1alpha3.Suggestion) (*corev1.Service, error)
CreateComposer(mgr manager.Manager) Composer
}

type General struct {
scheme *runtime.Scheme
client.Client
}

func New(scheme *runtime.Scheme, client client.Client) Composer {
return &General{
scheme: scheme,
Client: client,
}
func New(mgr manager.Manager) Composer {
// We assume DefaultComposer always exists in ComposerRegistry.
ptr, _ := ComposerRegistry[consts.DefaultComposer]
return ptr.CreateComposer(mgr)
}

func (g *General) DesiredDeployment(s *suggestionsv1alpha3.Suggestion) (*appsv1.Deployment, error) {
Expand Down Expand Up @@ -208,3 +212,11 @@ func (g *General) desiredContainer(s *suggestionsv1alpha3.Suggestion) (*corev1.C
}
return c, nil
}

func (g *General) CreateComposer(mgr manager.Manager) Composer {
return &General{mgr.GetScheme(), mgr.GetClient()}
}

func init() {
ComposerRegistry[consts.DefaultComposer] = &General{}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler {
Client: mgr.GetClient(),
SuggestionClient: suggestionclient.New(),
scheme: mgr.GetScheme(),
Composer: composer.New(mgr.GetScheme(), mgr.GetClient()),
Composer: composer.New(mgr),
recorder: mgr.GetRecorder(ControllerName),
}
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/controller.v1alpha3/trial/trial_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}

for _, gvk := range jobv1alpha3.GetSupportedJobList() {
for _, gvk := range jobv1alpha3.SupportedJobList {
unstructuredJob := &unstructured.Unstructured{}
unstructuredJob.SetGroupVersionKind(gvk)
err = c.Watch(
Expand Down Expand Up @@ -274,6 +274,15 @@ func (r *ReconcileTrial) reconcileJob(instance *trialsv1alpha3.Trial, desiredJob
if instance.IsCompleted() {
return nil, nil
}
jobProvider, err := jobv1alpha3.New(desiredJob.GetKind())
if err != nil {
return nil, err
}
// mutate desiredJob according to provider
if err := jobProvider.MutateJob(instance, desiredJob); err != nil {
logger.Error(err, "Mutating desiredSpec of km.Training error")
return nil, err
}
logger.Info("Creating Job", "kind", kind,
"name", desiredJob.GetName())
err = r.Create(context.TODO(), desiredJob)
Expand Down
48 changes: 0 additions & 48 deletions pkg/job/v1alpha3/consts.go

This file was deleted.

31 changes: 26 additions & 5 deletions pkg/job/v1alpha3/job/job.go → pkg/job/v1alpha3/job.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package job
package v1alpha3

import (
commonv1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"

"github.com/kubeflow/katib/pkg/apis/controller/trials/v1alpha3"
"github.com/kubeflow/katib/pkg/controller.v1alpha3/consts"
)

var (
log = logf.Log.WithName("provider-job")
jobLogger = logf.Log.WithName("provider-job")
)

// Job is the provider of Job kind.
Expand All @@ -25,20 +29,20 @@ func (j Job) GetDeployedJobStatus(
status, ok, unerr := unstructured.NestedFieldCopy(deployedJob.Object, "status")
if !ok {
if unerr != nil {
log.Error(unerr, "NestedFieldCopy unstructured to status error")
jobLogger.Error(unerr, "NestedFieldCopy unstructured to status error")
return nil, unerr
}
// Job does not have the running condition in status, thus we think
// the job is running when it is created.
log.Info("NestedFieldCopy", "err", "status cannot be found in job")
jobLogger.Info("NestedFieldCopy", "err", "status cannot be found in job")
return nil, nil
}

statusMap := status.(map[string]interface{})
jobStatus := batchv1.JobStatus{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus)
if err != nil {
log.Error(err, "Convert unstructured to status error")
jobLogger.Error(err, "Convert unstructured to status error")
return nil, err
}
for _, cond := range jobStatus.Conditions {
Expand All @@ -65,3 +69,20 @@ func (j Job) IsTrainingContainer(index int, c corev1.Container) bool {
}
return false
}
func (j Job) MutateJob(*v1alpha3.Trial, *unstructured.Unstructured) error {
return nil
}

func (j *Job) Create(kind string) Provider {
return &Job{}
}

func init() {
ProviderRegistry[consts.JobKindJob] = &Job{}
SupportedJobList[consts.JobKindJob] = schema.GroupVersionKind{
Group: "batch",
Version: "v1",
Kind: consts.JobKindJob,
}
JobRoleMap[consts.JobKindJob] = []string{}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package kubeflow
package v1alpha3

import (
pytorchv1 "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1"
commonv1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1"
tfv1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"

"github.com/kubeflow/katib/pkg/apis/controller/trials/v1alpha3"
"github.com/kubeflow/katib/pkg/controller.v1alpha3/consts"
)

var (
log = logf.Log.WithName("provider-kubeflow")
kfLogger = logf.Log.WithName("provider-kubeflow")
)

// Kubeflow is the provider of Kubeflow kinds.
Expand All @@ -31,10 +32,10 @@ func (k Kubeflow) GetDeployedJobStatus(
status, ok, unerr := unstructured.NestedFieldCopy(deployedJob.Object, "status")
if !ok {
if unerr != nil {
log.Error(unerr, "NestedFieldCopy unstructured to status error")
kfLogger.Error(unerr, "NestedFieldCopy unstructured to status error")
return nil, unerr
}
log.Info("NestedFieldCopy unstructured to status error",
kfLogger.Info("NestedFieldCopy unstructured to status error",
"err", "Status is not found in job")
return nil, nil
}
Expand All @@ -43,7 +44,7 @@ func (k Kubeflow) GetDeployedJobStatus(
jobStatus := commonv1.JobStatus{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus)
if err != nil {
log.Error(err, "Convert unstructured to status error")
kfLogger.Error(err, "Convert unstructured to status error")
return nil, err
}
// Get the latest condition and set it to jobCondition.
Expand All @@ -70,8 +71,33 @@ func (k Kubeflow) IsTrainingContainer(index int, c corev1.Container) bool {
return true
}
default:
log.Info("Invalid Katib worker kind", "JobKind", k.Kind)
kfLogger.Info("Invalid Katib worker kind", "JobKind", k.Kind)
return false
}
return false
}

func (k Kubeflow) MutateJob(*v1alpha3.Trial, *unstructured.Unstructured) error {
return nil
}

func (k *Kubeflow) Create(kind string) Provider {
return &Kubeflow{Kind: kind}
}

func init() {
ProviderRegistry[consts.JobKindTF] = &Kubeflow{}
SupportedJobList[consts.JobKindTF] = schema.GroupVersionKind{
Group: "kubeflow.org",
Version: "v1",
Kind: consts.JobKindTF,
}
JobRoleMap[consts.JobKindTF] = []string{consts.JobRole, consts.JobRoleTF}
ProviderRegistry[consts.JobKindPyTorch] = &Kubeflow{}
SupportedJobList[consts.JobKindPyTorch] = schema.GroupVersionKind{
Group: "kubeflow.org",
Version: "v1",
Kind: consts.JobKindPyTorch,
}
JobRoleMap[consts.JobKindPyTorch] = []string{consts.JobRole, consts.JobRolePyTorch}
}
33 changes: 20 additions & 13 deletions pkg/job/v1alpha3/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,21 @@ package v1alpha3
import (
"fmt"

commonv1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/kubeflow/katib/pkg/controller.v1alpha3/consts"
"github.com/kubeflow/katib/pkg/job/v1alpha3/job"
"github.com/kubeflow/katib/pkg/job/v1alpha3/kubeflow"
commonv1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1"
"github.com/kubeflow/katib/pkg/apis/controller/trials/v1alpha3"
)

var (
ProviderRegistry = make(map[string]Provider)
// JobRoleMap is the map which is used to determin if the replica is master.
// Katib will inject metrics collector into master replica.
JobRoleMap = make(map[string][]string)
// SupportedJobList returns the list of the supported jobs' GVK.
SupportedJobList = make(map[string]schema.GroupVersionKind)
)

// Provider provides utilities for different jobs.
Expand All @@ -19,19 +27,18 @@ type Provider interface {
deployedJob *unstructured.Unstructured) (*commonv1.JobCondition, error)
// IsTrainingContainer returns if the c is the actual training container.
IsTrainingContainer(index int, c corev1.Container) bool
// Mutate jobSpec before creation if necessary
MutateJob(*v1alpha3.Trial, *unstructured.Unstructured) error
// Recreate Provider from kind
Create(kind string) Provider
}

// New creates a new Provider.
func New(kind string) (Provider, error) {
switch kind {
case consts.JobKindJob:
return &job.Job{}, nil
case consts.JobKindPyTorch, consts.JobKindTF:
return &kubeflow.Kubeflow{
Kind: kind,
}, nil
default:
if ptr, ok := ProviderRegistry[kind]; ok {
return ptr.Create(kind), nil
} else {
return nil, fmt.Errorf(
"Failed to create the provider: Unknown kind %s", kind)
"failed to create the provider: Unknown kind %s", kind)
}
}
2 changes: 1 addition & 1 deletion pkg/webhook/v1alpha3/experiment/validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (g *DefaultValidator) validateTrialTemplate(instance *experimentsv1alpha3.E

func (g *DefaultValidator) validateSupportedJob(job *unstructured.Unstructured) error {
gvk := job.GroupVersionKind()
supportedJobs := jobv1alpha3.GetSupportedJobList()
supportedJobs := jobv1alpha3.SupportedJobList
for _, sJob := range supportedJobs {
if gvk == sJob {
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/webhook/v1alpha3/pod/inject_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"testing"

common "github.com/kubeflow/katib/pkg/apis/controller/common/v1alpha3"
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
)

Expand Down
Loading

0 comments on commit 8267929

Please sign in to comment.