diff --git a/ray-operator/controllers/ray/common/association_test.go b/ray-operator/controllers/ray/common/association_test.go index 87e39ed6b6..af79b2e397 100644 --- a/ray-operator/controllers/ray/common/association_test.go +++ b/ray-operator/controllers/ray/common/association_test.go @@ -130,8 +130,7 @@ func TestRayClusterHeadlessServiceListOptions(t *testing.T) { Namespace: "test-ns", }, } - headlessSvc, err := BuildHeadlessServiceForRayCluster(*instance) - assert.Nil(t, err) + headlessSvc := BuildHeadlessServiceForRayCluster(*instance) rayClusterName := "" for k, v := range headlessSvc.Labels { diff --git a/ray-operator/controllers/ray/common/service.go b/ray-operator/controllers/ray/common/service.go index da00fbdeaf..70ddeaa428 100644 --- a/ray-operator/controllers/ray/common/service.go +++ b/ray-operator/controllers/ray/common/service.go @@ -288,7 +288,7 @@ func BuildServeService(ctx context.Context, rayService rayv1.RayService, rayClus } // BuildHeadlessService builds the headless service for workers in multi-host worker groups to communicate -func BuildHeadlessServiceForRayCluster(rayCluster rayv1.RayCluster) (*corev1.Service, error) { +func BuildHeadlessServiceForRayCluster(rayCluster rayv1.RayCluster) *corev1.Service { name := rayCluster.Name + utils.DashSymbol + utils.HeadlessServiceSuffix namespace := rayCluster.Namespace @@ -310,10 +310,13 @@ func BuildHeadlessServiceForRayCluster(rayCluster rayv1.RayCluster) (*corev1.Ser ClusterIP: "None", Selector: selectorLabels, Type: corev1.ServiceTypeClusterIP, + // The headless worker service is used for peer communication between multi-host workers and should not be + // dependent on Proxy Actor placement to publish DNS addresses. + PublishNotReadyAddresses: true, }, } - return headlessService, nil + return headlessService } func setServiceTypeForUserProvidedService(ctx context.Context, service *corev1.Service, defaultType corev1.ServiceType) { diff --git a/ray-operator/controllers/ray/common/service_test.go b/ray-operator/controllers/ray/common/service_test.go index 45762f7a73..fcea1d2a49 100644 --- a/ray-operator/controllers/ray/common/service_test.go +++ b/ray-operator/controllers/ray/common/service_test.go @@ -97,7 +97,7 @@ var ( }, }, } - instanceForServeSvc = &rayv1.RayCluster{ + instanceForSvc = &rayv1.RayCluster{ ObjectMeta: metav1.ObjectMeta{ Name: "raycluster-sample-svc", Namespace: "default", @@ -450,6 +450,49 @@ func TestBuildServiceForHeadPodPortsOrder(t *testing.T) { } } +func TestBuildHeadlessServiceForRayCluster(t *testing.T) { + svc := BuildHeadlessServiceForRayCluster(*instanceForSvc) + + actualSelector := svc.Spec.Selector[utils.RayClusterLabelKey] + expectedSelector := instanceForSvc.Name + if !reflect.DeepEqual(expectedSelector, actualSelector) { + t.Fatalf("Expected `%v` but got `%v`", expectedSelector, actualSelector) + } + + actualSelector = svc.Spec.Selector[utils.RayNodeTypeLabelKey] + expectedSelector = string(rayv1.WorkerNode) + if !reflect.DeepEqual(expectedSelector, actualSelector) { + t.Fatalf("Expected `%v` but got `%v`", expectedSelector, actualSelector) + } + + actualLabel := svc.Labels[utils.RayClusterHeadlessServiceLabelKey] + expectedLabel := instanceForSvc.Name + if !reflect.DeepEqual(expectedLabel, actualLabel) { + t.Fatalf("Expected `%v` but got `%v`", expectedLabel, actualLabel) + } + + actualType := svc.Spec.Type + expectedType := corev1.ServiceTypeClusterIP + if !reflect.DeepEqual(expectedType, actualType) { + t.Fatalf("Expected `%v` but got `%v`", expectedType, actualType) + } + + actualClusterIP := svc.Spec.ClusterIP + expectedClusterIP := corev1.ClusterIPNone + if !reflect.DeepEqual(expectedClusterIP, actualClusterIP) { + t.Fatalf("Expected `%v` but got `%v`", expectedClusterIP, actualClusterIP) + } + + actualPublishNotReadyAddresses := svc.Spec.PublishNotReadyAddresses + expectedPublishNotReadyAddresses := true + if !reflect.DeepEqual(expectedClusterIP, actualClusterIP) { + t.Fatalf("Expected `%v` but got `%v`", expectedPublishNotReadyAddresses, actualPublishNotReadyAddresses) + } + + expectedName := fmt.Sprintf("%s-%s", instanceForSvc.Name, utils.HeadlessServiceSuffix) + validateNameAndNamespaceForUserSpecifiedService(svc, serviceInstance.ObjectMeta.Namespace, expectedName, t) +} + func TestBuildServeServiceForRayService(t *testing.T) { svc, err := BuildServeServiceForRayService(context.Background(), *serviceInstance, *instanceWithWrongSvc) assert.Nil(t, err) @@ -483,17 +526,17 @@ func TestBuildServeServiceForRayService(t *testing.T) { } func TestBuildServeServiceForRayCluster(t *testing.T) { - svc, err := BuildServeServiceForRayCluster(context.Background(), *instanceForServeSvc) + svc, err := BuildServeServiceForRayCluster(context.Background(), *instanceForSvc) assert.Nil(t, err) actualResult := svc.Spec.Selector[utils.RayClusterLabelKey] - expectedResult := instanceForServeSvc.Name + expectedResult := instanceForSvc.Name if !reflect.DeepEqual(expectedResult, actualResult) { t.Fatalf("Expected `%v` but got `%v`", expectedResult, actualResult) } actualLabel := svc.Labels[utils.RayOriginatedFromCRNameLabelKey] - expectedLabel := instanceForServeSvc.Name + expectedLabel := instanceForSvc.Name assert.Equal(t, expectedLabel, actualLabel) actualLabel = svc.Labels[utils.RayOriginatedFromCRDLabelKey] @@ -501,12 +544,12 @@ func TestBuildServeServiceForRayCluster(t *testing.T) { assert.Equal(t, expectedLabel, actualLabel) actualType := svc.Spec.Type - expectedType := instanceForServeSvc.Spec.HeadGroupSpec.ServiceType + expectedType := instanceForSvc.Spec.HeadGroupSpec.ServiceType if !reflect.DeepEqual(expectedType, actualType) { t.Fatalf("Expected `%v` but got `%v`", expectedType, actualType) } - expectedName := fmt.Sprintf("%s-%s-%s", instanceForServeSvc.Name, "serve", "svc") + expectedName := fmt.Sprintf("%s-%s-%s", instanceForSvc.Name, "serve", "svc") validateNameAndNamespaceForUserSpecifiedService(svc, serviceInstance.ObjectMeta.Namespace, expectedName, t) } diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 37d261ebbb..cce3c96a33 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -597,10 +597,7 @@ func (r *RayClusterReconciler) reconcileHeadlessService(ctx context.Context, ins return nil } // Create headless tpu worker service if there's no existing one in the cluster. - headlessSvc, err := common.BuildHeadlessServiceForRayCluster(*instance) - if err != nil { - return err - } + headlessSvc := common.BuildHeadlessServiceForRayCluster(*instance) if err := r.createService(ctx, headlessSvc, instance); err != nil { return err