Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamic jobProvider and suggestionComposer registration #1069

Merged
merged 10 commits into from
Feb 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions pkg/controller.v1alpha3/consts/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (

// DefaultKatibNamespaceEnvName is the default env name of katib namespace
DefaultKatibNamespaceEnvName = "KATIB_CORE_NAMESPACE"
// DefaultKatibComposerEnvName is the default env name of katib suggestion composer
DefaultKatibComposerEnvName = "KATIB_SUGGESTION_COMPOSER"

// KatibConfigMapName is the config map constants
// Configmap name which includes Katib's configuration
Expand Down Expand Up @@ -102,17 +104,10 @@ const (
// JobKindPyTorch is the kind of PyTorchJob.
JobKindPyTorch = "PyTorchJob"

// JobVersionJob is the api version of Kubernetes Job.
JobVersionJob = "v1"
// JobVersionTF is the api version of TFJob.
JobVersionTF = "v1"
// JobVersionPyTorch is the api version of PyTorchJob.
JobVersionPyTorch = "v1"

// JobGroupJob is the group name of Kubernetes Job.
JobGroupJob = "batch"
// JobGroupKubeflow is the group name of Kubeflow.
JobGroupKubeflow = "kubeflow.org"
// built-in JobRoles
JobRole = "job-role"
JobRoleTF = "tf-job-role"
JobRolePyTorch = "pytorch-job-role"

// AnnotationIstioSidecarInjectName is the annotation of Istio Sidecar
AnnotationIstioSidecarInjectName = "sidecar.istio.io/inject"
Expand All @@ -124,4 +119,6 @@ const (
var (
// DefaultKatibNamespace is the default namespace of katib deployment.
DefaultKatibNamespace = env.GetEnvOrDefault(DefaultKatibNamespaceEnvName, "kubeflow")
// DefaultComposer is the default composer of katib suggestion.
DefaultComposer = env.GetEnvOrDefault(DefaultKatibComposerEnvName, "General")
)
24 changes: 18 additions & 6 deletions pkg/controller.v1alpha3/suggestion/composer/composer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/manager"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"

suggestionsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/suggestions/v1alpha3"
Expand All @@ -28,23 +29,26 @@ const (
defaultGRPCHealthCheckProbe = "/bin/grpc_health_probe"
)

var log = logf.Log.WithName("suggestion-composer")
var (
log = logf.Log.WithName("suggestion-composer")
ComposerRegistry = make(map[string]Composer)
)

type Composer interface {
DesiredDeployment(s *suggestionsv1alpha3.Suggestion) (*appsv1.Deployment, error)
DesiredService(s *suggestionsv1alpha3.Suggestion) (*corev1.Service, error)
CreateComposer(mgr manager.Manager) Composer
}

type General struct {
scheme *runtime.Scheme
client.Client
}

func New(scheme *runtime.Scheme, client client.Client) Composer {
return &General{
scheme: scheme,
Client: client,
}
func New(mgr manager.Manager) Composer {
// We assume DefaultComposer always exists in ComposerRegistry.
ptr, _ := ComposerRegistry[consts.DefaultComposer]
return ptr.CreateComposer(mgr)
}

func (g *General) DesiredDeployment(s *suggestionsv1alpha3.Suggestion) (*appsv1.Deployment, error) {
Expand Down Expand Up @@ -208,3 +212,11 @@ func (g *General) desiredContainer(s *suggestionsv1alpha3.Suggestion) (*corev1.C
}
return c, nil
}

func (g *General) CreateComposer(mgr manager.Manager) Composer {
return &General{mgr.GetScheme(), mgr.GetClient()}
}

func init() {
ComposerRegistry[consts.DefaultComposer] = &General{}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler {
Client: mgr.GetClient(),
SuggestionClient: suggestionclient.New(),
scheme: mgr.GetScheme(),
Composer: composer.New(mgr.GetScheme(), mgr.GetClient()),
Composer: composer.New(mgr),
recorder: mgr.GetRecorder(ControllerName),
}
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/controller.v1alpha3/trial/trial_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}

for _, gvk := range jobv1alpha3.GetSupportedJobList() {
for _, gvk := range jobv1alpha3.SupportedJobList {
unstructuredJob := &unstructured.Unstructured{}
unstructuredJob.SetGroupVersionKind(gvk)
err = c.Watch(
Expand Down Expand Up @@ -274,6 +274,15 @@ func (r *ReconcileTrial) reconcileJob(instance *trialsv1alpha3.Trial, desiredJob
if instance.IsCompleted() {
return nil, nil
}
jobProvider, err := jobv1alpha3.New(desiredJob.GetKind())
if err != nil {
return nil, err
}
// mutate desiredJob according to provider
if err := jobProvider.MutateJob(instance, desiredJob); err != nil {
logger.Error(err, "Mutating desiredSpec of km.Training error")
return nil, err
}
logger.Info("Creating Job", "kind", kind,
"name", desiredJob.GetName())
err = r.Create(context.TODO(), desiredJob)
Expand Down
48 changes: 0 additions & 48 deletions pkg/job/v1alpha3/consts.go

This file was deleted.

31 changes: 26 additions & 5 deletions pkg/job/v1alpha3/job/job.go → pkg/job/v1alpha3/job.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package job
package v1alpha3

import (
commonv1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"

"github.com/kubeflow/katib/pkg/apis/controller/trials/v1alpha3"
"github.com/kubeflow/katib/pkg/controller.v1alpha3/consts"
)

var (
log = logf.Log.WithName("provider-job")
jobLogger = logf.Log.WithName("provider-job")
)

// Job is the provider of Job kind.
Expand All @@ -25,20 +29,20 @@ func (j Job) GetDeployedJobStatus(
status, ok, unerr := unstructured.NestedFieldCopy(deployedJob.Object, "status")
if !ok {
if unerr != nil {
log.Error(unerr, "NestedFieldCopy unstructured to status error")
jobLogger.Error(unerr, "NestedFieldCopy unstructured to status error")
return nil, unerr
}
// Job does not have the running condition in status, thus we think
// the job is running when it is created.
log.Info("NestedFieldCopy", "err", "status cannot be found in job")
jobLogger.Info("NestedFieldCopy", "err", "status cannot be found in job")
return nil, nil
}

statusMap := status.(map[string]interface{})
jobStatus := batchv1.JobStatus{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus)
if err != nil {
log.Error(err, "Convert unstructured to status error")
jobLogger.Error(err, "Convert unstructured to status error")
return nil, err
}
for _, cond := range jobStatus.Conditions {
Expand All @@ -65,3 +69,20 @@ func (j Job) IsTrainingContainer(index int, c corev1.Container) bool {
}
return false
}
func (j Job) MutateJob(*v1alpha3.Trial, *unstructured.Unstructured) error {
return nil
}

func (j *Job) Create(kind string) Provider {
return &Job{}
}

func init() {
ProviderRegistry[consts.JobKindJob] = &Job{}
SupportedJobList[consts.JobKindJob] = schema.GroupVersionKind{
Group: "batch",
Version: "v1",
Kind: consts.JobKindJob,
}
JobRoleMap[consts.JobKindJob] = []string{}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package kubeflow
package v1alpha3

import (
pytorchv1 "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1"
commonv1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1"
tfv1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"

"github.com/kubeflow/katib/pkg/apis/controller/trials/v1alpha3"
"github.com/kubeflow/katib/pkg/controller.v1alpha3/consts"
)

var (
log = logf.Log.WithName("provider-kubeflow")
kfLogger = logf.Log.WithName("provider-kubeflow")
)

// Kubeflow is the provider of Kubeflow kinds.
Expand All @@ -31,10 +32,10 @@ func (k Kubeflow) GetDeployedJobStatus(
status, ok, unerr := unstructured.NestedFieldCopy(deployedJob.Object, "status")
if !ok {
if unerr != nil {
log.Error(unerr, "NestedFieldCopy unstructured to status error")
kfLogger.Error(unerr, "NestedFieldCopy unstructured to status error")
return nil, unerr
}
log.Info("NestedFieldCopy unstructured to status error",
kfLogger.Info("NestedFieldCopy unstructured to status error",
"err", "Status is not found in job")
return nil, nil
}
Expand All @@ -43,7 +44,7 @@ func (k Kubeflow) GetDeployedJobStatus(
jobStatus := commonv1.JobStatus{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus)
if err != nil {
log.Error(err, "Convert unstructured to status error")
kfLogger.Error(err, "Convert unstructured to status error")
return nil, err
}
// Get the latest condition and set it to jobCondition.
Expand All @@ -70,8 +71,33 @@ func (k Kubeflow) IsTrainingContainer(index int, c corev1.Container) bool {
return true
}
default:
log.Info("Invalid Katib worker kind", "JobKind", k.Kind)
kfLogger.Info("Invalid Katib worker kind", "JobKind", k.Kind)
return false
}
return false
}

func (k Kubeflow) MutateJob(*v1alpha3.Trial, *unstructured.Unstructured) error {
return nil
}

func (k *Kubeflow) Create(kind string) Provider {
return &Kubeflow{Kind: kind}
}

func init() {
ProviderRegistry[consts.JobKindTF] = &Kubeflow{}
SupportedJobList[consts.JobKindTF] = schema.GroupVersionKind{
Group: "kubeflow.org",
Version: "v1",
Kind: consts.JobKindTF,
}
JobRoleMap[consts.JobKindTF] = []string{consts.JobRole, consts.JobRoleTF}
ProviderRegistry[consts.JobKindPyTorch] = &Kubeflow{}
SupportedJobList[consts.JobKindPyTorch] = schema.GroupVersionKind{
Group: "kubeflow.org",
Version: "v1",
Kind: consts.JobKindPyTorch,
}
JobRoleMap[consts.JobKindPyTorch] = []string{consts.JobRole, consts.JobRolePyTorch}
}
33 changes: 20 additions & 13 deletions pkg/job/v1alpha3/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,21 @@ package v1alpha3
import (
"fmt"

commonv1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/kubeflow/katib/pkg/controller.v1alpha3/consts"
"github.com/kubeflow/katib/pkg/job/v1alpha3/job"
"github.com/kubeflow/katib/pkg/job/v1alpha3/kubeflow"
commonv1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1"
"github.com/kubeflow/katib/pkg/apis/controller/trials/v1alpha3"
)

var (
ProviderRegistry = make(map[string]Provider)
// JobRoleMap is the map which is used to determin if the replica is master.
// Katib will inject metrics collector into master replica.
JobRoleMap = make(map[string][]string)
// SupportedJobList returns the list of the supported jobs' GVK.
SupportedJobList = make(map[string]schema.GroupVersionKind)
)

// Provider provides utilities for different jobs.
Expand All @@ -19,19 +27,18 @@ type Provider interface {
deployedJob *unstructured.Unstructured) (*commonv1.JobCondition, error)
// IsTrainingContainer returns if the c is the actual training container.
IsTrainingContainer(index int, c corev1.Container) bool
// Mutate jobSpec before creation if necessary
MutateJob(*v1alpha3.Trial, *unstructured.Unstructured) error
// Recreate Provider from kind
Create(kind string) Provider
}

// New creates a new Provider.
func New(kind string) (Provider, error) {
switch kind {
case consts.JobKindJob:
return &job.Job{}, nil
case consts.JobKindPyTorch, consts.JobKindTF:
return &kubeflow.Kubeflow{
Kind: kind,
}, nil
default:
if ptr, ok := ProviderRegistry[kind]; ok {
return ptr.Create(kind), nil
} else {
return nil, fmt.Errorf(
"Failed to create the provider: Unknown kind %s", kind)
"failed to create the provider: Unknown kind %s", kind)
}
}
2 changes: 1 addition & 1 deletion pkg/webhook/v1alpha3/experiment/validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (g *DefaultValidator) validateTrialTemplate(instance *experimentsv1alpha3.E

func (g *DefaultValidator) validateSupportedJob(job *unstructured.Unstructured) error {
gvk := job.GroupVersionKind()
supportedJobs := jobv1alpha3.GetSupportedJobList()
supportedJobs := jobv1alpha3.SupportedJobList
for _, sJob := range supportedJobs {
if gvk == sJob {
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/webhook/v1alpha3/pod/inject_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"testing"

common "github.com/kubeflow/katib/pkg/apis/controller/common/v1alpha3"
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
)

Expand Down
Loading