From e1edb4ca17ca7a6da4fc8b3e9505dca339068921 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Sat, 7 Sep 2024 22:22:07 -0700 Subject: [PATCH] Add batch-scheduler option, deprecate enable-batch-scheduler option (#2300) --- .../templates/deployment.yaml | 5 + helm-chart/kuberay-operator/values.yaml | 25 ++++ .../apis/config/v1alpha1/config_utils.go | 34 +++++ .../apis/config/v1alpha1/config_utils_test.go | 84 ++++++++++++ .../config/v1alpha1/configuration_types.go | 4 + .../ray/batchscheduler/schedulermanager.go | 121 +++++++++-------- .../batchscheduler/schedulermanager_test.go | 125 ++++++++++++++++++ .../controllers/ray/raycluster_controller.go | 31 +++-- ray-operator/controllers/ray/suite_test.go | 4 +- ray-operator/main.go | 25 ++-- 10 files changed, 377 insertions(+), 81 deletions(-) create mode 100644 ray-operator/apis/config/v1alpha1/config_utils.go create mode 100644 ray-operator/apis/config/v1alpha1/config_utils_test.go create mode 100644 ray-operator/controllers/ray/batchscheduler/schedulermanager_test.go diff --git a/helm-chart/kuberay-operator/templates/deployment.yaml b/helm-chart/kuberay-operator/templates/deployment.yaml index 878a495b91..a9cd9b5838 100644 --- a/helm-chart/kuberay-operator/templates/deployment.yaml +++ b/helm-chart/kuberay-operator/templates/deployment.yaml @@ -57,9 +57,14 @@ spec: args: {{- $argList := list -}} {{- $argList = append $argList (include "kuberay.featureGates" . | trim) -}} + {{- if .Values.batchScheduler -}} {{- if .Values.batchScheduler.enabled -}} {{- $argList = append $argList "--enable-batch-scheduler" -}} {{- end -}} + {{- if .Values.batchScheduler.name -}} + {{- $argList = append $argList (printf "--batch-scheduler=%s" .Values.batchScheduler.name) -}} + {{- end -}} + {{- end -}} {{- $watchNamespace := "" -}} {{- if and .Values.singleNamespaceInstall (not .Values.watchNamespace) -}} {{- $watchNamespace = .Release.Namespace -}} diff --git a/helm-chart/kuberay-operator/values.yaml b/helm-chart/kuberay-operator/values.yaml index b65f4047dd..75a343c681 100644 --- a/helm-chart/kuberay-operator/values.yaml +++ b/helm-chart/kuberay-operator/values.yaml @@ -55,8 +55,33 @@ readinessProbe: periodSeconds: 5 failureThreshold: 5 +# Enable customized Kubernetes scheduler integration. If enabled, Ray workloads will be scheduled +# by the customized scheduler. +# * "enabled" is the legacy option and will be deprecated soon. +# * "name" is the standard option, expecting a scheduler name, supported values are +# "default", "volcano", and "yunikorn". +# +# Examples: +# 1. Use volcano (deprecated) +# batchScheduler: +# enabled: true +# +# 2. Use volcano +# batchScheduler: +# name: volcano +# +# 3. Use yunikorn +# batchScheduler: +# name: yunikorn +# batchScheduler: + # Deprecated. This option will be removed in the future. + # Note, for backwards compatibility. When it sets to true, it enables volcano scheduler integration. enabled: false + # Name of the scheduler, currently supported "default", "volcano" and "yunikorn", + # set the customized scheduler name, e.g "volcano" or "yunikorn", do not set + # "batchScheduler.enabled=true" at the same time as it will override this option. + name: default featureGates: - name: RayClusterStatusConditions diff --git a/ray-operator/apis/config/v1alpha1/config_utils.go b/ray-operator/apis/config/v1alpha1/config_utils.go new file mode 100644 index 0000000000..4ef3ecf303 --- /dev/null +++ b/ray-operator/apis/config/v1alpha1/config_utils.go @@ -0,0 +1,34 @@ +package v1alpha1 + +import ( + "fmt" + + "github.com/go-logr/logr" + + "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn" +) + +func ValidateBatchSchedulerConfig(logger logr.Logger, config Configuration) error { + if config.EnableBatchScheduler { + logger.Info("Feature flag enable-batch-scheduler is deprecated and will not be supported soon. " + + "Use batch-scheduler instead. ") + return nil + } + + if len(config.BatchScheduler) > 0 { + // default option, no-opt. + if config.BatchScheduler == "default" { + return nil + } + + // if a customized scheduler is configured, check it is supported + if config.BatchScheduler == volcano.GetPluginName() || config.BatchScheduler == yunikorn.GetPluginName() { + logger.Info("Feature flag batch-scheduler is enabled", + "scheduler name", config.BatchScheduler) + } else { + return fmt.Errorf("scheduler is not supported, name=%s", config.BatchScheduler) + } + } + return nil +} diff --git a/ray-operator/apis/config/v1alpha1/config_utils_test.go b/ray-operator/apis/config/v1alpha1/config_utils_test.go new file mode 100644 index 0000000000..9589a7c682 --- /dev/null +++ b/ray-operator/apis/config/v1alpha1/config_utils_test.go @@ -0,0 +1,84 @@ +package v1alpha1 + +import ( + "testing" + + "github.com/go-logr/logr" + "github.com/go-logr/logr/testr" + + "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn" +) + +func TestValidateBatchSchedulerConfig(t *testing.T) { + type args struct { + logger logr.Logger + config Configuration + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "legacy option, enable-batch-scheduler=false", + args: args{ + logger: testr.New(t), + config: Configuration{ + EnableBatchScheduler: false, + }, + }, + wantErr: false, + }, + { + name: "legacy option, enable-batch-scheduler=true", + args: args{ + logger: testr.New(t), + config: Configuration{ + EnableBatchScheduler: true, + }, + }, + wantErr: false, + }, + { + name: "valid option, batch-scheduler=yunikorn", + args: args{ + logger: testr.New(t), + config: Configuration{ + BatchScheduler: yunikorn.GetPluginName(), + }, + }, + wantErr: false, + }, + { + name: "valid option, batch-scheduler=volcano", + args: args{ + logger: testr.New(t), + config: Configuration{ + BatchScheduler: volcano.GetPluginName(), + }, + }, + wantErr: false, + }, + { + name: "invalid option, invalid scheduler name", + args: args{ + logger: testr.New(t), + config: Configuration{ + EnableBatchScheduler: false, + BatchScheduler: "unknown-scheduler-name", + }, + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Logf(tt.name) + if err := ValidateBatchSchedulerConfig(tt.args.logger, tt.args.config); (err != nil) != tt.wantErr { + t.Errorf("ValidateBatchSchedulerConfig() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/ray-operator/apis/config/v1alpha1/configuration_types.go b/ray-operator/apis/config/v1alpha1/configuration_types.go index 7f1681b417..92a8f00fff 100644 --- a/ray-operator/apis/config/v1alpha1/configuration_types.go +++ b/ray-operator/apis/config/v1alpha1/configuration_types.go @@ -43,6 +43,10 @@ type Configuration struct { // Defaults to `json` if empty. LogStdoutEncoder string `json:"logStdoutEncoder,omitempty"` + // BatchScheduler enables the batch scheduler integration with a specific scheduler + // based on the given name, currently, supported values are volcano and yunikorn. + BatchScheduler string `json:"batchScheduler,omitempty"` + // HeadSidecarContainers includes specification for a sidecar container // to inject into every Head pod. HeadSidecarContainers []corev1.Container `json:"headSidecarContainers,omitempty"` diff --git a/ray-operator/controllers/ray/batchscheduler/schedulermanager.go b/ray-operator/controllers/ray/batchscheduler/schedulermanager.go index f5c32064f7..058e0a9cfd 100644 --- a/ray-operator/controllers/ray/batchscheduler/schedulermanager.go +++ b/ray-operator/controllers/ray/batchscheduler/schedulermanager.go @@ -1,93 +1,92 @@ package batchscheduler import ( - "fmt" "sync" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/builder" - rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" - schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface" + configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano" "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn" + + "k8s.io/client-go/rest" + + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" ) -var schedulerContainers = map[string]schedulerinterface.BatchSchedulerFactory{ - schedulerinterface.GetDefaultPluginName(): &schedulerinterface.DefaultBatchSchedulerFactory{}, - volcano.GetPluginName(): &volcano.VolcanoBatchSchedulerFactory{}, - yunikorn.GetPluginName(): &yunikorn.YuniKornSchedulerFactory{}, -} - -func GetRegisteredNames() []string { - var pluginNames []string - for key := range schedulerContainers { - pluginNames = append(pluginNames, key) - } - return pluginNames +type SchedulerManager struct { + config *rest.Config + factory schedulerinterface.BatchSchedulerFactory + scheduler schedulerinterface.BatchScheduler + rayConfigs configapi.Configuration + sync.Mutex } -func ConfigureReconciler(b *builder.Builder) *builder.Builder { - for _, factory := range schedulerContainers { - b = factory.ConfigureReconciler(b) +// NewSchedulerManager maintains a specific scheduler plugin based on config +func NewSchedulerManager(rayConfigs configapi.Configuration, config *rest.Config) (*SchedulerManager, error) { + // init the scheduler factory from config + factory := getSchedulerFactory(rayConfigs) + scheduler, err := factory.New(config) + if err != nil { + return nil, err } - return b -} -func AddToScheme(scheme *runtime.Scheme) { - for _, factory := range schedulerContainers { - factory.AddToScheme(scheme) + manager := SchedulerManager{ + rayConfigs: rayConfigs, + config: config, + factory: factory, + scheduler: scheduler, } -} -type SchedulerManager struct { - config *rest.Config - plugins map[string]schedulerinterface.BatchScheduler - sync.Mutex + return &manager, nil } -func NewSchedulerManager(config *rest.Config) *SchedulerManager { - manager := SchedulerManager{ - config: config, - plugins: make(map[string]schedulerinterface.BatchScheduler), +func getSchedulerFactory(rayConfigs configapi.Configuration) schedulerinterface.BatchSchedulerFactory { + var factory schedulerinterface.BatchSchedulerFactory + // init with the default factory + factory = &schedulerinterface.DefaultBatchSchedulerFactory{} + // when a batch scheduler name is provided + if len(rayConfigs.BatchScheduler) > 0 { + switch rayConfigs.BatchScheduler { + case volcano.GetPluginName(): + factory = &volcano.VolcanoBatchSchedulerFactory{} + case yunikorn.GetPluginName(): + factory = &yunikorn.YuniKornSchedulerFactory{} + default: + factory = &schedulerinterface.DefaultBatchSchedulerFactory{} + } } - return &manager -} -func (batch *SchedulerManager) GetSchedulerForCluster(app *rayv1.RayCluster) (schedulerinterface.BatchScheduler, error) { - if schedulerName, ok := app.ObjectMeta.Labels[utils.RaySchedulerName]; ok { - return batch.GetScheduler(schedulerName) + // legacy option, if this is enabled, register volcano + // this is for backwards compatibility + if rayConfigs.EnableBatchScheduler { + factory = &volcano.VolcanoBatchSchedulerFactory{} } - // no scheduler provided - return &schedulerinterface.DefaultBatchScheduler{}, nil + return factory } -func (batch *SchedulerManager) GetScheduler(schedulerName string) (schedulerinterface.BatchScheduler, error) { - factory, registered := schedulerContainers[schedulerName] - if !registered { - return nil, fmt.Errorf("unregistered scheduler plugin %s", schedulerName) +func (batch *SchedulerManager) GetSchedulerForCluster(app *rayv1.RayCluster) (schedulerinterface.BatchScheduler, error) { + // for backwards compatibility + if batch.rayConfigs.EnableBatchScheduler { + if schedulerName, ok := app.ObjectMeta.Labels[utils.RaySchedulerName]; ok { + if schedulerName == volcano.GetPluginName() { + return batch.scheduler, nil + } + } } - batch.Lock() - defer batch.Unlock() + return batch.scheduler, nil +} - plugin, existed := batch.plugins[schedulerName] +func (batch *SchedulerManager) ConfigureReconciler(b *builder.Builder) *builder.Builder { + batch.factory.ConfigureReconciler(b) + return b +} - if existed && plugin != nil { - return plugin, nil - } - if existed && plugin == nil { - return nil, fmt.Errorf( - "failed to get scheduler plugin %s, previous initialization has failed", schedulerName) - } - plugin, err := factory.New(batch.config) - if err != nil { - batch.plugins[schedulerName] = nil - return nil, err - } - batch.plugins[schedulerName] = plugin - return plugin, nil +func (batch *SchedulerManager) AddToScheme(scheme *runtime.Scheme) { + batch.factory.AddToScheme(scheme) } diff --git a/ray-operator/controllers/ray/batchscheduler/schedulermanager_test.go b/ray-operator/controllers/ray/batchscheduler/schedulermanager_test.go new file mode 100644 index 0000000000..59f922986b --- /dev/null +++ b/ray-operator/controllers/ray/batchscheduler/schedulermanager_test.go @@ -0,0 +1,125 @@ +package batchscheduler + +import ( + "reflect" + "testing" + + "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1" + schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn" +) + +func TestGetSchedulerFactory(t *testing.T) { + DefaultFactory := &schedulerinterface.DefaultBatchSchedulerFactory{} + VolcanoFactory := &volcano.VolcanoBatchSchedulerFactory{} + YuniKornFactory := &yunikorn.YuniKornSchedulerFactory{} + + type args struct { + rayConfigs v1alpha1.Configuration + } + tests := []struct { + want reflect.Type + name string + args args + }{ + { + name: "enableBatchScheduler=false, batchScheduler set to default", + args: args{ + rayConfigs: v1alpha1.Configuration{ + EnableBatchScheduler: false, + BatchScheduler: schedulerinterface.GetDefaultPluginName(), + }, + }, + want: reflect.TypeOf(DefaultFactory), + }, + { + name: "enableBatchScheduler=false, batchScheduler not set", + args: args{ + rayConfigs: v1alpha1.Configuration{ + EnableBatchScheduler: false, + }, + }, + want: reflect.TypeOf(DefaultFactory), + }, + { + name: "enableBatchScheduler=false, batchScheduler set to yunikorn", + args: args{ + rayConfigs: v1alpha1.Configuration{ + EnableBatchScheduler: false, + BatchScheduler: yunikorn.GetPluginName(), + }, + }, + want: reflect.TypeOf(YuniKornFactory), + }, + { + name: "enableBatchScheduler=false, batchScheduler set to volcano", + args: args{ + rayConfigs: v1alpha1.Configuration{ + EnableBatchScheduler: false, + BatchScheduler: volcano.GetPluginName(), + }, + }, + want: reflect.TypeOf(VolcanoFactory), + }, + { + name: "enableBatchScheduler not set, batchScheduler set to yunikorn", + args: args{ + rayConfigs: v1alpha1.Configuration{ + BatchScheduler: yunikorn.GetPluginName(), + }, + }, + want: reflect.TypeOf(YuniKornFactory), + }, + { + name: "enableBatchScheduler not set, batchScheduler set to volcano", + args: args{ + rayConfigs: v1alpha1.Configuration{ + BatchScheduler: volcano.GetPluginName(), + }, + }, + want: reflect.TypeOf(VolcanoFactory), + }, + { + // for backwards compatibility, if enableBatchScheduler=true, always use volcano + name: "enableBatchScheduler=true, batchScheduler set to yunikorn", + args: args{ + rayConfigs: v1alpha1.Configuration{ + EnableBatchScheduler: true, + BatchScheduler: yunikorn.GetPluginName(), + }, + }, + want: reflect.TypeOf(VolcanoFactory), + }, + { + // for backwards compatibility, if enableBatchScheduler=true, always use volcano + name: "enableBatchScheduler=true, batchScheduler set to volcano", + args: args{ + rayConfigs: v1alpha1.Configuration{ + EnableBatchScheduler: true, + BatchScheduler: volcano.GetPluginName(), + }, + }, + want: reflect.TypeOf(VolcanoFactory), + }, + { + // for backwards compatibility, if enableBatchScheduler=true, always use volcano + name: "enableBatchScheduler=true, batchScheduler set to volcano", + args: args{ + rayConfigs: v1alpha1.Configuration{ + EnableBatchScheduler: true, + BatchScheduler: schedulerinterface.GetDefaultPluginName(), + }, + }, + want: reflect.TypeOf(VolcanoFactory), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := getSchedulerFactory(tt.args.rayConfigs); reflect.TypeOf(got) != tt.want { + t.Errorf("getSchedulerFactory() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 6138befdc9..ffd806ea5f 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -15,6 +15,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/ptr" + configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler" "github.com/ray-project/kuberay/ray-operator/controllers/ray/common" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" @@ -51,7 +52,6 @@ type reconcileFunc func(context.Context, *rayv1.RayCluster) error var ( DefaultRequeueDuration = 2 * time.Second - EnableBatchScheduler bool // Definition of a index field for pod name podUIDIndexField = "metadata.uid" @@ -97,7 +97,7 @@ func getClusterType(ctx context.Context) bool { } // NewReconciler returns a new reconcile.Reconciler -func NewReconciler(ctx context.Context, mgr manager.Manager, options RayClusterReconcilerOptions) *RayClusterReconciler { +func NewReconciler(ctx context.Context, mgr manager.Manager, options RayClusterReconcilerOptions, rayConfigs configapi.Configuration) *RayClusterReconciler { if err := mgr.GetFieldIndexer().IndexField(ctx, &corev1.Pod{}, podUIDIndexField, func(rawObj client.Object) []string { pod := rawObj.(*corev1.Pod) return []string{string(pod.UID)} @@ -106,11 +106,22 @@ func NewReconciler(ctx context.Context, mgr manager.Manager, options RayClusterR } isOpenShift := getClusterType(ctx) + // init the batch scheduler manager + schedulerMgr, err := batchscheduler.NewSchedulerManager(rayConfigs, mgr.GetConfig()) + if err != nil { + // fail fast if the scheduler plugin fails to init + // prevent running the controller in an undefined state + panic(err) + } + + // add schema to runtime + schedulerMgr.AddToScheme(mgr.GetScheme()) + return &RayClusterReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Recorder: mgr.GetEventRecorderFor("raycluster-controller"), - BatchSchedulerMgr: batchscheduler.NewSchedulerManager(mgr.GetConfig()), + BatchSchedulerMgr: schedulerMgr, IsOpenShift: isOpenShift, headSidecarContainers: options.HeadSidecarContainers, @@ -622,7 +633,9 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv if err := r.List(ctx, &headPods, common.RayClusterHeadPodsAssociationOptions(instance).ToListOptions()...); err != nil { return err } - if EnableBatchScheduler { + // check if the batch scheduler integration is enabled + // call the scheduler plugin if so + if r.BatchSchedulerMgr != nil { if scheduler, err := r.BatchSchedulerMgr.GetSchedulerForCluster(instance); err == nil { if err := scheduler.DoBatchSchedulingOnSubmission(ctx, instance); err != nil { return err @@ -957,7 +970,9 @@ func (r *RayClusterReconciler) createHeadPod(ctx context.Context, instance rayv1 // build the pod then create it pod := r.buildHeadPod(ctx, instance) - if EnableBatchScheduler { + // check if the batch scheduler integration is enabled + // call the scheduler plugin if so + if r.BatchSchedulerMgr != nil { if scheduler, err := r.BatchSchedulerMgr.GetSchedulerForCluster(&instance); err == nil { scheduler.AddMetadataToPod(&instance, utils.RayNodeHeadGroupLabelValue, &pod) } else { @@ -979,7 +994,7 @@ func (r *RayClusterReconciler) createWorkerPod(ctx context.Context, instance ray // build the pod then create it pod := r.buildWorkerPod(ctx, instance, worker) - if EnableBatchScheduler { + if r.BatchSchedulerMgr != nil { if scheduler, err := r.BatchSchedulerMgr.GetSchedulerForCluster(&instance); err == nil { scheduler.AddMetadataToPod(&instance, worker.GroupName, &pod) } else { @@ -1134,8 +1149,8 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcu Owns(&corev1.Pod{}). Owns(&corev1.Service{}) - if EnableBatchScheduler { - b = batchscheduler.ConfigureReconciler(b) + if r.BatchSchedulerMgr != nil { + r.BatchSchedulerMgr.ConfigureReconciler(b) } return b. diff --git a/ray-operator/controllers/ray/suite_test.go b/ray-operator/controllers/ray/suite_test.go index d614e64d4f..de73a2e1e6 100644 --- a/ray-operator/controllers/ray/suite_test.go +++ b/ray-operator/controllers/ray/suite_test.go @@ -22,6 +22,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" + configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" @@ -120,7 +121,8 @@ var _ = BeforeSuite(func(ctx SpecContext) { }, }, } - err = NewReconciler(ctx, mgr, options).SetupWithManager(mgr, 1) + configs := configapi.Configuration{} + err = NewReconciler(ctx, mgr, options, configs).SetupWithManager(mgr, 1) Expect(err).NotTo(HaveOccurred(), "failed to setup RayCluster controller") testClientProvider := TestClientProvider{} diff --git a/ray-operator/main.go b/ray-operator/main.go index 456a6afe25..172183040a 100644 --- a/ray-operator/main.go +++ b/ray-operator/main.go @@ -32,7 +32,6 @@ import ( configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray" - "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" "github.com/ray-project/kuberay/ray-operator/pkg/features" // +kubebuilder:scaffold:imports @@ -50,7 +49,6 @@ func init() { utilruntime.Must(routev1.Install(scheme)) utilruntime.Must(batchv1.AddToScheme(scheme)) utilruntime.Must(configapi.AddToScheme(scheme)) - batchscheduler.AddToScheme(scheme) // +kubebuilder:scaffold:scheme } @@ -68,6 +66,8 @@ func main() { var useKubernetesProxy bool var configFile string var featureGates string + var enableBatchScheduler bool + var batchScheduler string // TODO: remove flag-based config once Configuration API graduates to v1. flag.StringVar(&metricsAddr, "metrics-addr", configapi.DefaultMetricsAddr, "The address the metric endpoint binds to.") @@ -90,8 +90,10 @@ func main() { "Encoder to use for log file. Valid values are 'json' and 'console'. Defaults to 'json'") flag.StringVar(&logStdoutEncoder, "log-stdout-encoder", "json", "Encoder to use for logging stdout. Valid values are 'json' and 'console'. Defaults to 'json'") - flag.BoolVar(&ray.EnableBatchScheduler, "enable-batch-scheduler", false, - "Enable batch scheduler. Currently is volcano, which supports gang scheduler policy.") + flag.BoolVar(&enableBatchScheduler, "enable-batch-scheduler", false, + "(Deprecated) Enable batch scheduler. Currently is volcano, which supports gang scheduler policy.") + flag.StringVar(&batchScheduler, "batch-scheduler", "default", + "Batch scheduler name, supported values are default, volcano, yunikorn.") flag.StringVar(&configFile, "config", "", "Path to structured config file. Flags are ignored if config file is set.") flag.BoolVar(&useKubernetesProxy, "use-kubernetes-proxy", false, "Use Kubernetes proxy subresource when connecting to the Ray Head node.") @@ -111,9 +113,6 @@ func main() { config, err = decodeConfig(configData, scheme) exitOnError(err, "failed to decode config file") - - // TODO: remove globally-scoped variables - ray.EnableBatchScheduler = config.EnableBatchScheduler } else { config.MetricsAddr = metricsAddr config.ProbeAddr = probeAddr @@ -124,7 +123,8 @@ func main() { config.LogFile = logFile config.LogFileEncoder = logFileEncoder config.LogStdoutEncoder = logStdoutEncoder - config.EnableBatchScheduler = ray.EnableBatchScheduler + config.EnableBatchScheduler = enableBatchScheduler + config.BatchScheduler = batchScheduler config.UseKubernetesProxy = useKubernetesProxy config.DeleteRayJobAfterJobFinishes = os.Getenv(utils.DELETE_RAYJOB_CR_AFTER_JOB_FINISHES) == "true" } @@ -160,8 +160,11 @@ func main() { if forcedClusterUpgrade { setupLog.Info("Deprecated feature flag forced-cluster-upgrade is enabled, which has no effect.") } - if ray.EnableBatchScheduler { - setupLog.Info("Feature flag enable-batch-scheduler is enabled.") + + // validate the batch scheduler configs, + // exit with error if the configs is invalid. + if err := configapi.ValidateBatchSchedulerConfig(setupLog, config); err != nil { + exitOnError(err, "batch scheduler configs validation failed") } if err := utilfeature.DefaultMutableFeatureGate.Set(featureGates); err != nil { @@ -220,7 +223,7 @@ func main() { WorkerSidecarContainers: config.WorkerSidecarContainers, } ctx := ctrl.SetupSignalHandler() - exitOnError(ray.NewReconciler(ctx, mgr, rayClusterOptions).SetupWithManager(mgr, config.ReconcileConcurrency), + exitOnError(ray.NewReconciler(ctx, mgr, rayClusterOptions, config).SetupWithManager(mgr, config.ReconcileConcurrency), "unable to create controller", "controller", "RayCluster") exitOnError(ray.NewRayServiceReconciler(ctx, mgr, config).SetupWithManager(mgr, config.ReconcileConcurrency), "unable to create controller", "controller", "RayService")