Skip to content

Commit

Permalink
RayCluster Headless Worker Service Should PublishNotReadyAddresses (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanaoleary committed Sep 12, 2024
1 parent 0e1c248 commit b8f6d06
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 14 deletions.
3 changes: 1 addition & 2 deletions ray-operator/controllers/ray/common/association_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ func TestRayClusterHeadlessServiceListOptions(t *testing.T) {
Namespace: "test-ns",
},
}
headlessSvc, err := BuildHeadlessServiceForRayCluster(*instance)
assert.Nil(t, err)
headlessSvc := BuildHeadlessServiceForRayCluster(*instance)

rayClusterName := ""
for k, v := range headlessSvc.Labels {
Expand Down
7 changes: 5 additions & 2 deletions ray-operator/controllers/ray/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func BuildServeService(ctx context.Context, rayService rayv1.RayService, rayClus
}

// BuildHeadlessService builds the headless service for workers in multi-host worker groups to communicate
func BuildHeadlessServiceForRayCluster(rayCluster rayv1.RayCluster) (*corev1.Service, error) {
func BuildHeadlessServiceForRayCluster(rayCluster rayv1.RayCluster) *corev1.Service {
name := rayCluster.Name + utils.DashSymbol + utils.HeadlessServiceSuffix
namespace := rayCluster.Namespace

Expand All @@ -310,10 +310,13 @@ func BuildHeadlessServiceForRayCluster(rayCluster rayv1.RayCluster) (*corev1.Ser
ClusterIP: "None",
Selector: selectorLabels,
Type: corev1.ServiceTypeClusterIP,
// The headless worker service is used for peer communication between multi-host workers and should not be
// dependent on Proxy Actor placement to publish DNS addresses.
PublishNotReadyAddresses: true,
},
}

return headlessService, nil
return headlessService
}

func setServiceTypeForUserProvidedService(ctx context.Context, service *corev1.Service, defaultType corev1.ServiceType) {
Expand Down
55 changes: 49 additions & 6 deletions ray-operator/controllers/ray/common/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ var (
},
},
}
instanceForServeSvc = &rayv1.RayCluster{
instanceForSvc = &rayv1.RayCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "raycluster-sample-svc",
Namespace: "default",
Expand Down Expand Up @@ -450,6 +450,49 @@ func TestBuildServiceForHeadPodPortsOrder(t *testing.T) {
}
}

func TestBuildHeadlessServiceForRayCluster(t *testing.T) {
svc := BuildHeadlessServiceForRayCluster(*instanceForSvc)

actualSelector := svc.Spec.Selector[utils.RayClusterLabelKey]
expectedSelector := instanceForSvc.Name
if !reflect.DeepEqual(expectedSelector, actualSelector) {
t.Fatalf("Expected `%v` but got `%v`", expectedSelector, actualSelector)
}

actualSelector = svc.Spec.Selector[utils.RayNodeTypeLabelKey]
expectedSelector = string(rayv1.WorkerNode)
if !reflect.DeepEqual(expectedSelector, actualSelector) {
t.Fatalf("Expected `%v` but got `%v`", expectedSelector, actualSelector)
}

actualLabel := svc.Labels[utils.RayClusterHeadlessServiceLabelKey]
expectedLabel := instanceForSvc.Name
if !reflect.DeepEqual(expectedLabel, actualLabel) {
t.Fatalf("Expected `%v` but got `%v`", expectedLabel, actualLabel)
}

actualType := svc.Spec.Type
expectedType := corev1.ServiceTypeClusterIP
if !reflect.DeepEqual(expectedType, actualType) {
t.Fatalf("Expected `%v` but got `%v`", expectedType, actualType)
}

actualClusterIP := svc.Spec.ClusterIP
expectedClusterIP := corev1.ClusterIPNone
if !reflect.DeepEqual(expectedClusterIP, actualClusterIP) {
t.Fatalf("Expected `%v` but got `%v`", expectedClusterIP, actualClusterIP)
}

actualPublishNotReadyAddresses := svc.Spec.PublishNotReadyAddresses
expectedPublishNotReadyAddresses := true
if !reflect.DeepEqual(expectedClusterIP, actualClusterIP) {
t.Fatalf("Expected `%v` but got `%v`", expectedPublishNotReadyAddresses, actualPublishNotReadyAddresses)
}

expectedName := fmt.Sprintf("%s-%s", instanceForSvc.Name, utils.HeadlessServiceSuffix)
validateNameAndNamespaceForUserSpecifiedService(svc, serviceInstance.ObjectMeta.Namespace, expectedName, t)
}

func TestBuildServeServiceForRayService(t *testing.T) {
svc, err := BuildServeServiceForRayService(context.Background(), *serviceInstance, *instanceWithWrongSvc)
assert.Nil(t, err)
Expand Down Expand Up @@ -483,30 +526,30 @@ func TestBuildServeServiceForRayService(t *testing.T) {
}

func TestBuildServeServiceForRayCluster(t *testing.T) {
svc, err := BuildServeServiceForRayCluster(context.Background(), *instanceForServeSvc)
svc, err := BuildServeServiceForRayCluster(context.Background(), *instanceForSvc)
assert.Nil(t, err)

actualResult := svc.Spec.Selector[utils.RayClusterLabelKey]
expectedResult := instanceForServeSvc.Name
expectedResult := instanceForSvc.Name
if !reflect.DeepEqual(expectedResult, actualResult) {
t.Fatalf("Expected `%v` but got `%v`", expectedResult, actualResult)
}

actualLabel := svc.Labels[utils.RayOriginatedFromCRNameLabelKey]
expectedLabel := instanceForServeSvc.Name
expectedLabel := instanceForSvc.Name
assert.Equal(t, expectedLabel, actualLabel)

actualLabel = svc.Labels[utils.RayOriginatedFromCRDLabelKey]
expectedLabel = utils.RayOriginatedFromCRDLabelValue(utils.RayClusterCRD)
assert.Equal(t, expectedLabel, actualLabel)

actualType := svc.Spec.Type
expectedType := instanceForServeSvc.Spec.HeadGroupSpec.ServiceType
expectedType := instanceForSvc.Spec.HeadGroupSpec.ServiceType
if !reflect.DeepEqual(expectedType, actualType) {
t.Fatalf("Expected `%v` but got `%v`", expectedType, actualType)
}

expectedName := fmt.Sprintf("%s-%s-%s", instanceForServeSvc.Name, "serve", "svc")
expectedName := fmt.Sprintf("%s-%s-%s", instanceForSvc.Name, "serve", "svc")
validateNameAndNamespaceForUserSpecifiedService(svc, serviceInstance.ObjectMeta.Namespace, expectedName, t)
}

Expand Down
5 changes: 1 addition & 4 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,10 +597,7 @@ func (r *RayClusterReconciler) reconcileHeadlessService(ctx context.Context, ins
return nil
}
// Create headless tpu worker service if there's no existing one in the cluster.
headlessSvc, err := common.BuildHeadlessServiceForRayCluster(*instance)
if err != nil {
return err
}
headlessSvc := common.BuildHeadlessServiceForRayCluster(*instance)

if err := r.createService(ctx, headlessSvc, instance); err != nil {
return err
Expand Down

0 comments on commit b8f6d06

Please sign in to comment.