diff --git a/pkg/controller.v1alpha3/consts/const.go b/pkg/controller.v1alpha3/consts/const.go index 9eea03b3a22..ff07bc7cc59 100644 --- a/pkg/controller.v1alpha3/consts/const.go +++ b/pkg/controller.v1alpha3/consts/const.go @@ -37,6 +37,8 @@ const ( // DefaultKatibNamespaceEnvName is the default env name of katib namespace DefaultKatibNamespaceEnvName = "KATIB_CORE_NAMESPACE" + // DefaultKatibComposerEnvName is the default env name of katib suggestion composer + DefaultKatibComposerEnvName = "KATIB_SUGGESTION_COMPOSER" // KatibConfigMapName is the config map constants // Configmap name which includes Katib's configuration @@ -102,17 +104,10 @@ const ( // JobKindPyTorch is the kind of PyTorchJob. JobKindPyTorch = "PyTorchJob" - // JobVersionJob is the api version of Kubernetes Job. - JobVersionJob = "v1" - // JobVersionTF is the api version of TFJob. - JobVersionTF = "v1" - // JobVersionPyTorch is the api version of PyTorchJob. - JobVersionPyTorch = "v1" - - // JobGroupJob is the group name of Kubernetes Job. - JobGroupJob = "batch" - // JobGroupKubeflow is the group name of Kubeflow. - JobGroupKubeflow = "kubeflow.org" + // built-in JobRoles + JobRole = "job-role" + JobRoleTF = "tf-job-role" + JobRolePyTorch = "pytorch-job-role" // AnnotationIstioSidecarInjectName is the annotation of Istio Sidecar AnnotationIstioSidecarInjectName = "sidecar.istio.io/inject" @@ -124,4 +119,6 @@ const ( var ( // DefaultKatibNamespace is the default namespace of katib deployment. DefaultKatibNamespace = env.GetEnvOrDefault(DefaultKatibNamespaceEnvName, "kubeflow") + // DefaultComposer is the default composer of katib suggestion. + DefaultComposer = env.GetEnvOrDefault(DefaultKatibComposerEnvName, "General") ) diff --git a/pkg/controller.v1alpha3/suggestion/composer/composer.go b/pkg/controller.v1alpha3/suggestion/composer/composer.go index c8f4e8428a8..8fa7bdc18b5 100644 --- a/pkg/controller.v1alpha3/suggestion/composer/composer.go +++ b/pkg/controller.v1alpha3/suggestion/composer/composer.go @@ -11,6 +11,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/manager" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" suggestionsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/suggestions/v1alpha3" @@ -28,11 +29,15 @@ const ( defaultGRPCHealthCheckProbe = "/bin/grpc_health_probe" ) -var log = logf.Log.WithName("suggestion-composer") +var ( + log = logf.Log.WithName("suggestion-composer") + ComposerRegistry = make(map[string]Composer) +) type Composer interface { DesiredDeployment(s *suggestionsv1alpha3.Suggestion) (*appsv1.Deployment, error) DesiredService(s *suggestionsv1alpha3.Suggestion) (*corev1.Service, error) + CreateComposer(mgr manager.Manager) Composer } type General struct { @@ -40,11 +45,10 @@ type General struct { client.Client } -func New(scheme *runtime.Scheme, client client.Client) Composer { - return &General{ - scheme: scheme, - Client: client, - } +func New(mgr manager.Manager) Composer { + // We assume DefaultComposer always exists in ComposerRegistry. + ptr, _ := ComposerRegistry[consts.DefaultComposer] + return ptr.CreateComposer(mgr) } func (g *General) DesiredDeployment(s *suggestionsv1alpha3.Suggestion) (*appsv1.Deployment, error) { @@ -208,3 +212,11 @@ func (g *General) desiredContainer(s *suggestionsv1alpha3.Suggestion) (*corev1.C } return c, nil } + +func (g *General) CreateComposer(mgr manager.Manager) Composer { + return &General{mgr.GetScheme(), mgr.GetClient()} +} + +func init() { + ComposerRegistry[consts.DefaultComposer] = &General{} +} diff --git a/pkg/controller.v1alpha3/suggestion/suggestion_controller.go b/pkg/controller.v1alpha3/suggestion/suggestion_controller.go index 6564f5d485b..6194d0fd24a 100644 --- a/pkg/controller.v1alpha3/suggestion/suggestion_controller.go +++ b/pkg/controller.v1alpha3/suggestion/suggestion_controller.go @@ -62,7 +62,7 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler { Client: mgr.GetClient(), SuggestionClient: suggestionclient.New(), scheme: mgr.GetScheme(), - Composer: composer.New(mgr.GetScheme(), mgr.GetClient()), + Composer: composer.New(mgr), recorder: mgr.GetRecorder(ControllerName), } } diff --git a/pkg/controller.v1alpha3/trial/trial_controller.go b/pkg/controller.v1alpha3/trial/trial_controller.go index c43ac23a450..37dc3c5bc3e 100644 --- a/pkg/controller.v1alpha3/trial/trial_controller.go +++ b/pkg/controller.v1alpha3/trial/trial_controller.go @@ -106,7 +106,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { return err } - for _, gvk := range jobv1alpha3.GetSupportedJobList() { + for _, gvk := range jobv1alpha3.SupportedJobList { unstructuredJob := &unstructured.Unstructured{} unstructuredJob.SetGroupVersionKind(gvk) err = c.Watch( @@ -274,6 +274,15 @@ func (r *ReconcileTrial) reconcileJob(instance *trialsv1alpha3.Trial, desiredJob if instance.IsCompleted() { return nil, nil } + jobProvider, err := jobv1alpha3.New(desiredJob.GetKind()) + if err != nil { + return nil, err + } + // mutate desiredJob according to provider + if err := jobProvider.MutateJob(instance, desiredJob); err != nil { + logger.Error(err, "Mutating desiredSpec of km.Training error") + return nil, err + } logger.Info("Creating Job", "kind", kind, "name", desiredJob.GetName()) err = r.Create(context.TODO(), desiredJob) diff --git a/pkg/job/v1alpha3/consts.go b/pkg/job/v1alpha3/consts.go deleted file mode 100644 index 5010b415cea..00000000000 --- a/pkg/job/v1alpha3/consts.go +++ /dev/null @@ -1,48 +0,0 @@ -package v1alpha3 - -import ( - "github.com/kubeflow/katib/pkg/controller.v1alpha3/consts" - "k8s.io/apimachinery/pkg/runtime/schema" -) - -const ( - // JobNameLabel represents the label key for the job name, the value is job name - JobNameLabel = "job-name" - // JobRoleLabel represents the label key for the job role, e.g. the value is master - JobRoleLabel = "job-role" - // TFJobRoleLabel is deprecated in kubeflow 0.7, but we need to be compatible. - TFJobRoleLabel = "tf-job-role" - // PyTorchJobRoleLabel is deprecated in kubeflow 0.7, but we need to be compatible. - PyTorchJobRoleLabel = "pytorch-job-role" -) - -// JobRoleMap is the map which is used to determin if the replica is master. -// Katib will inject metrics collector into master replica. -var JobRoleMap = map[string][]string{ - // Job kind does not support distributed training, thus no master. - consts.JobKindJob: {}, - consts.JobKindTF: {JobRoleLabel, TFJobRoleLabel}, - consts.JobKindPyTorch: {JobRoleLabel, PyTorchJobRoleLabel}, -} - -// GetSupportedJobList returns the list of the supported jobs' GVK. -func GetSupportedJobList() []schema.GroupVersionKind { - supportedJobList := []schema.GroupVersionKind{ - { - Group: consts.JobGroupJob, - Version: consts.JobVersionJob, - Kind: consts.JobKindJob, - }, - { - Group: consts.JobGroupKubeflow, - Version: consts.JobVersionTF, - Kind: consts.JobKindTF, - }, - { - Group: consts.JobGroupKubeflow, - Version: consts.JobVersionPyTorch, - Kind: consts.JobKindPyTorch, - }, - } - return supportedJobList -} diff --git a/pkg/job/v1alpha3/job/job.go b/pkg/job/v1alpha3/job.go similarity index 68% rename from pkg/job/v1alpha3/job/job.go rename to pkg/job/v1alpha3/job.go index 1574383ed2f..063fba0bc4e 100644 --- a/pkg/job/v1alpha3/job/job.go +++ b/pkg/job/v1alpha3/job.go @@ -1,4 +1,4 @@ -package job +package v1alpha3 import ( commonv1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1" @@ -6,11 +6,15 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" + + "github.com/kubeflow/katib/pkg/apis/controller/trials/v1alpha3" + "github.com/kubeflow/katib/pkg/controller.v1alpha3/consts" ) var ( - log = logf.Log.WithName("provider-job") + jobLogger = logf.Log.WithName("provider-job") ) // Job is the provider of Job kind. @@ -25,12 +29,12 @@ func (j Job) GetDeployedJobStatus( status, ok, unerr := unstructured.NestedFieldCopy(deployedJob.Object, "status") if !ok { if unerr != nil { - log.Error(unerr, "NestedFieldCopy unstructured to status error") + jobLogger.Error(unerr, "NestedFieldCopy unstructured to status error") return nil, unerr } // Job does not have the running condition in status, thus we think // the job is running when it is created. - log.Info("NestedFieldCopy", "err", "status cannot be found in job") + jobLogger.Info("NestedFieldCopy", "err", "status cannot be found in job") return nil, nil } @@ -38,7 +42,7 @@ func (j Job) GetDeployedJobStatus( jobStatus := batchv1.JobStatus{} err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus) if err != nil { - log.Error(err, "Convert unstructured to status error") + jobLogger.Error(err, "Convert unstructured to status error") return nil, err } for _, cond := range jobStatus.Conditions { @@ -65,3 +69,20 @@ func (j Job) IsTrainingContainer(index int, c corev1.Container) bool { } return false } +func (j Job) MutateJob(*v1alpha3.Trial, *unstructured.Unstructured) error { + return nil +} + +func (j *Job) Create(kind string) Provider { + return &Job{} +} + +func init() { + ProviderRegistry[consts.JobKindJob] = &Job{} + SupportedJobList[consts.JobKindJob] = schema.GroupVersionKind{ + Group: "batch", + Version: "v1", + Kind: consts.JobKindJob, + } + JobRoleMap[consts.JobKindJob] = []string{} +} diff --git a/pkg/job/v1alpha3/kubeflow/kubeflow.go b/pkg/job/v1alpha3/kubeflow.go similarity index 61% rename from pkg/job/v1alpha3/kubeflow/kubeflow.go rename to pkg/job/v1alpha3/kubeflow.go index 686bb46446f..2c9c539236a 100644 --- a/pkg/job/v1alpha3/kubeflow/kubeflow.go +++ b/pkg/job/v1alpha3/kubeflow.go @@ -1,20 +1,21 @@ -package kubeflow +package v1alpha3 import ( pytorchv1 "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1" commonv1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1" tfv1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" + "github.com/kubeflow/katib/pkg/apis/controller/trials/v1alpha3" "github.com/kubeflow/katib/pkg/controller.v1alpha3/consts" ) var ( - log = logf.Log.WithName("provider-kubeflow") + kfLogger = logf.Log.WithName("provider-kubeflow") ) // Kubeflow is the provider of Kubeflow kinds. @@ -31,10 +32,10 @@ func (k Kubeflow) GetDeployedJobStatus( status, ok, unerr := unstructured.NestedFieldCopy(deployedJob.Object, "status") if !ok { if unerr != nil { - log.Error(unerr, "NestedFieldCopy unstructured to status error") + kfLogger.Error(unerr, "NestedFieldCopy unstructured to status error") return nil, unerr } - log.Info("NestedFieldCopy unstructured to status error", + kfLogger.Info("NestedFieldCopy unstructured to status error", "err", "Status is not found in job") return nil, nil } @@ -43,7 +44,7 @@ func (k Kubeflow) GetDeployedJobStatus( jobStatus := commonv1.JobStatus{} err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus) if err != nil { - log.Error(err, "Convert unstructured to status error") + kfLogger.Error(err, "Convert unstructured to status error") return nil, err } // Get the latest condition and set it to jobCondition. @@ -70,8 +71,33 @@ func (k Kubeflow) IsTrainingContainer(index int, c corev1.Container) bool { return true } default: - log.Info("Invalid Katib worker kind", "JobKind", k.Kind) + kfLogger.Info("Invalid Katib worker kind", "JobKind", k.Kind) return false } return false } + +func (k Kubeflow) MutateJob(*v1alpha3.Trial, *unstructured.Unstructured) error { + return nil +} + +func (k *Kubeflow) Create(kind string) Provider { + return &Kubeflow{Kind: kind} +} + +func init() { + ProviderRegistry[consts.JobKindTF] = &Kubeflow{} + SupportedJobList[consts.JobKindTF] = schema.GroupVersionKind{ + Group: "kubeflow.org", + Version: "v1", + Kind: consts.JobKindTF, + } + JobRoleMap[consts.JobKindTF] = []string{consts.JobRole, consts.JobRoleTF} + ProviderRegistry[consts.JobKindPyTorch] = &Kubeflow{} + SupportedJobList[consts.JobKindPyTorch] = schema.GroupVersionKind{ + Group: "kubeflow.org", + Version: "v1", + Kind: consts.JobKindPyTorch, + } + JobRoleMap[consts.JobKindPyTorch] = []string{consts.JobRole, consts.JobRolePyTorch} +} diff --git a/pkg/job/v1alpha3/provider.go b/pkg/job/v1alpha3/provider.go index 1326479fcf8..4daa137c0b1 100644 --- a/pkg/job/v1alpha3/provider.go +++ b/pkg/job/v1alpha3/provider.go @@ -3,13 +3,21 @@ package v1alpha3 import ( "fmt" + commonv1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" - "github.com/kubeflow/katib/pkg/controller.v1alpha3/consts" - "github.com/kubeflow/katib/pkg/job/v1alpha3/job" - "github.com/kubeflow/katib/pkg/job/v1alpha3/kubeflow" - commonv1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1" + "github.com/kubeflow/katib/pkg/apis/controller/trials/v1alpha3" +) + +var ( + ProviderRegistry = make(map[string]Provider) + // JobRoleMap is the map which is used to determin if the replica is master. + // Katib will inject metrics collector into master replica. + JobRoleMap = make(map[string][]string) + // SupportedJobList returns the list of the supported jobs' GVK. + SupportedJobList = make(map[string]schema.GroupVersionKind) ) // Provider provides utilities for different jobs. @@ -19,19 +27,18 @@ type Provider interface { deployedJob *unstructured.Unstructured) (*commonv1.JobCondition, error) // IsTrainingContainer returns if the c is the actual training container. IsTrainingContainer(index int, c corev1.Container) bool + // Mutate jobSpec before creation if necessary + MutateJob(*v1alpha3.Trial, *unstructured.Unstructured) error + // Recreate Provider from kind + Create(kind string) Provider } // New creates a new Provider. func New(kind string) (Provider, error) { - switch kind { - case consts.JobKindJob: - return &job.Job{}, nil - case consts.JobKindPyTorch, consts.JobKindTF: - return &kubeflow.Kubeflow{ - Kind: kind, - }, nil - default: + if ptr, ok := ProviderRegistry[kind]; ok { + return ptr.Create(kind), nil + } else { return nil, fmt.Errorf( - "Failed to create the provider: Unknown kind %s", kind) + "failed to create the provider: Unknown kind %s", kind) } } diff --git a/pkg/webhook/v1alpha3/experiment/validator/validator.go b/pkg/webhook/v1alpha3/experiment/validator/validator.go index d0cefb08b28..fdcff19dc26 100644 --- a/pkg/webhook/v1alpha3/experiment/validator/validator.go +++ b/pkg/webhook/v1alpha3/experiment/validator/validator.go @@ -145,7 +145,7 @@ func (g *DefaultValidator) validateTrialTemplate(instance *experimentsv1alpha3.E func (g *DefaultValidator) validateSupportedJob(job *unstructured.Unstructured) error { gvk := job.GroupVersionKind() - supportedJobs := jobv1alpha3.GetSupportedJobList() + supportedJobs := jobv1alpha3.SupportedJobList for _, sJob := range supportedJobs { if gvk == sJob { return nil diff --git a/pkg/webhook/v1alpha3/pod/inject_webhook_test.go b/pkg/webhook/v1alpha3/pod/inject_webhook_test.go index 51fcc9ee785..83195e5406a 100644 --- a/pkg/webhook/v1alpha3/pod/inject_webhook_test.go +++ b/pkg/webhook/v1alpha3/pod/inject_webhook_test.go @@ -4,7 +4,7 @@ import ( "testing" common "github.com/kubeflow/katib/pkg/apis/controller/common/v1alpha3" - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" ) diff --git a/pkg/webhook/v1alpha3/pod/utils.go b/pkg/webhook/v1alpha3/pod/utils.go index d9dd8f3b5cb..af852f87b62 100644 --- a/pkg/webhook/v1alpha3/pod/utils.go +++ b/pkg/webhook/v1alpha3/pod/utils.go @@ -33,7 +33,7 @@ import ( ) func getKatibJob(pod *v1.Pod) (string, string, error) { - for _, gvk := range jobv1alpha3.GetSupportedJobList() { + for _, gvk := range jobv1alpha3.SupportedJobList { owners := pod.GetOwnerReferences() for _, owner := range owners { if isMatchGVK(owner, gvk) {