Skip to content

Commit

Permalink
[RayCluster] Add serviceName to status.headInfo (#2089)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewsykim authored Apr 23, 2024
1 parent b8212e5 commit ffac341
Show file tree
Hide file tree
Showing 16 changed files with 73 additions and 34 deletions.
12 changes: 6 additions & 6 deletions apiserver/pkg/server/ray_job_submission_service_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (s *RayJobSubmissionServiceServer) SubmitRayJob(ctx context.Context, req *a
}
rayDashboardClient := s.dashboardClientFunc()
// TODO: support proxy subresources in kuberay-apiserver
if err := rayDashboardClient.InitClient(*url, nil); err != nil {
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
return nil, err
}
request := &utils.RayJobRequest{Entrypoint: req.Jobsubmission.Entrypoint}
Expand Down Expand Up @@ -106,7 +106,7 @@ func (s *RayJobSubmissionServiceServer) GetJobDetails(ctx context.Context, req *
}
rayDashboardClient := s.dashboardClientFunc()
// TODO: support proxy subresources in kuberay-apiserver
if err := rayDashboardClient.InitClient(*url, nil); err != nil {
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
return nil, err
}
nodeInfo, err := rayDashboardClient.GetJobInfo(ctx, req.Submissionid)
Expand All @@ -129,7 +129,7 @@ func (s *RayJobSubmissionServiceServer) GetJobLog(ctx context.Context, req *api.
}
rayDashboardClient := s.dashboardClientFunc()
// TODO: support proxy subresources in kuberay-apiserver
if err := rayDashboardClient.InitClient(*url, nil); err != nil {
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
return nil, err
}
jlog, err := rayDashboardClient.GetJobLog(ctx, req.Submissionid)
Expand All @@ -152,7 +152,7 @@ func (s *RayJobSubmissionServiceServer) ListJobDetails(ctx context.Context, req
}
rayDashboardClient := s.dashboardClientFunc()
// TODO: support proxy subresources in kuberay-apiserver
if err := rayDashboardClient.InitClient(*url, nil); err != nil {
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
return nil, err
}
nodesInfo, err := rayDashboardClient.ListJobs(ctx)
Expand All @@ -176,7 +176,7 @@ func (s *RayJobSubmissionServiceServer) StopRayJob(ctx context.Context, req *api
}
rayDashboardClient := s.dashboardClientFunc()
// TODO: support proxy subresources in kuberay-apiserver
if err := rayDashboardClient.InitClient(*url, nil); err != nil {
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
return nil, err
}
err = rayDashboardClient.StopJob(ctx, req.Submissionid)
Expand All @@ -196,7 +196,7 @@ func (s *RayJobSubmissionServiceServer) DeleteRayJob(ctx context.Context, req *a
}
rayDashboardClient := s.dashboardClientFunc()
// TODO: support proxy subresources in kuberay-apiserver
if err := rayDashboardClient.InitClient(*url, nil); err != nil {
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
return nil, err
}
err = rayDashboardClient.DeleteJob(ctx, req.Submissionid)
Expand Down
2 changes: 2 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions ray-operator/apis/ray/v1/raycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,9 @@ type RayClusterStatus struct {

// HeadInfo gives info about head
type HeadInfo struct {
PodIP string `json:"podIP,omitempty"`
ServiceIP string `json:"serviceIP,omitempty"`
PodIP string `json:"podIP,omitempty"`
ServiceIP string `json:"serviceIP,omitempty"`
ServiceName string `json:"serviceName,omitempty"`
}

// RayNodeType the type of a ray node: head/worker
Expand Down
2 changes: 2 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayclusters.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayservices.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 8 additions & 7 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1309,21 +1309,21 @@ func (r *RayClusterReconciler) getHeadPodIP(ctx context.Context, instance *rayv1
return runtimePods.Items[0].Status.PodIP, nil
}

func (r *RayClusterReconciler) getHeadServiceIP(ctx context.Context, instance *rayv1.RayCluster) (string, error) {
func (r *RayClusterReconciler) getHeadServiceIPAndName(ctx context.Context, instance *rayv1.RayCluster) (string, string, error) {
runtimeServices := corev1.ServiceList{}
filterLabels := client.MatchingLabels(common.HeadServiceLabels(*instance))
if err := r.List(ctx, &runtimeServices, client.InNamespace(instance.Namespace), filterLabels); err != nil {
return "", err
return "", "", err
}
if len(runtimeServices.Items) < 1 {
return "", fmt.Errorf("unable to find head service. cluster name %s, filter labels %v", instance.Name, filterLabels)
return "", "", fmt.Errorf("unable to find head service. cluster name %s, filter labels %v", instance.Name, filterLabels)
} else if len(runtimeServices.Items) > 1 {
return "", fmt.Errorf("found multiple head services. cluster name %s, filter labels %v", instance.Name, filterLabels)
return "", "", fmt.Errorf("found multiple head services. cluster name %s, filter labels %v", instance.Name, filterLabels)
} else if runtimeServices.Items[0].Spec.ClusterIP == "" {
return "", fmt.Errorf("head service IP is empty. cluster name %s, filter labels %v", instance.Name, filterLabels)
return "", "", fmt.Errorf("head service IP is empty. cluster name %s, filter labels %v", instance.Name, filterLabels)
}

return runtimeServices.Items[0].Spec.ClusterIP, nil
return runtimeServices.Items[0].Spec.ClusterIP, runtimeServices.Items[0].Name, nil
}

func (r *RayClusterReconciler) updateEndpoints(ctx context.Context, instance *rayv1.RayCluster) error {
Expand Down Expand Up @@ -1374,10 +1374,11 @@ func (r *RayClusterReconciler) updateHeadInfo(ctx context.Context, instance *ray
instance.Status.Head.PodIP = ip
}

if ip, err := r.getHeadServiceIP(ctx, instance); err != nil {
if ip, name, err := r.getHeadServiceIPAndName(ctx, instance); err != nil {
return err
} else {
instance.Status.Head.ServiceIP = ip
instance.Status.Head.ServiceName = name
}

return nil
Expand Down
17 changes: 11 additions & 6 deletions ray-operator/controllers/ray/raycluster_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1355,7 +1355,7 @@ func TestGetHeadPodIP(t *testing.T) {
}
}

func TestGetHeadServiceIP(t *testing.T) {
func TestGetHeadServiceIPAndName(t *testing.T) {
setupTest(t)

headServiceIP := "1.2.3.4"
Expand All @@ -1379,21 +1379,25 @@ func TestGetHeadServiceIP(t *testing.T) {
tests := map[string]struct {
services []runtime.Object
expectedIP string
expectedName string
returnsError bool
}{
"get expected Service IP if there's one head Service": {
services: testServices,
expectedIP: headServiceIP,
expectedName: headService.Name,
returnsError: false,
},
"get error if there's no head Service": {
services: []runtime.Object{},
expectedIP: "",
expectedName: "",
returnsError: true,
},
"get error if there's more than one head Service": {
services: append(testServices, extraHeadService),
expectedIP: "",
expectedName: "",
returnsError: true,
},
}
Expand All @@ -1408,15 +1412,15 @@ func TestGetHeadServiceIP(t *testing.T) {
Scheme: scheme.Scheme,
}

ip, err := testRayClusterReconciler.getHeadServiceIP(context.TODO(), testRayCluster)

ip, name, err := testRayClusterReconciler.getHeadServiceIPAndName(context.TODO(), testRayCluster)
if tc.returnsError {
assert.NotNil(t, err, "getHeadServiceIP should return error")
assert.NotNil(t, err, "getHeadServiceIPAndName should return error")
} else {
assert.Nil(t, err, "getHeadServiceIP should not return error")
assert.Nil(t, err, "getHeadServiceIPAndName should not return error")
}

assert.Equal(t, tc.expectedIP, ip, "getHeadServiceIP returned unexpected IP")
assert.Equal(t, tc.expectedIP, ip, "getHeadServiceIPAndName returned unexpected IP")
assert.Equal(t, tc.expectedName, name, "getHeadServiceIPAndName returned unexpected name")
})
}
}
Expand Down Expand Up @@ -1645,6 +1649,7 @@ func TestCalculateStatus(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, headNodeIP, newInstance.Status.Head.PodIP)
assert.Equal(t, headServiceIP, newInstance.Status.Head.ServiceIP)
assert.Equal(t, headService.Name, newInstance.Status.Head.ServiceName)
}

func Test_TerminatedWorkers_NoAutoscaler(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
}

rayDashboardClient := r.dashboardClientFunc()
err = rayDashboardClient.InitClient(rayJobInstance.Status.DashboardURL, rayClusterInstance)
err = rayDashboardClient.InitClient(ctx, rayJobInstance.Status.DashboardURL, rayClusterInstance)
if err != nil {
logger.Error(err, "Failed to initialize dashboard client")
}
Expand Down Expand Up @@ -218,7 +218,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)

// Check the current status of ray jobs
rayDashboardClient := r.dashboardClientFunc()
if err := rayDashboardClient.InitClient(rayJobInstance.Status.DashboardURL, rayClusterInstance); err != nil {
if err := rayDashboardClient.InitClient(ctx, rayJobInstance.Status.DashboardURL, rayClusterInstance); err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}

Expand Down
4 changes: 2 additions & 2 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ func (r *RayServiceReconciler) updateStatusForActiveCluster(ctx context.Context,
}

rayDashboardClient := r.dashboardClientFunc()
if err := rayDashboardClient.InitClient(clientURL, rayClusterInstance); err != nil {
if err := rayDashboardClient.InitClient(ctx, clientURL, rayClusterInstance); err != nil {
return err
}

Expand Down Expand Up @@ -1081,7 +1081,7 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
}

rayDashboardClient := r.dashboardClientFunc()
if err := rayDashboardClient.InitClient(clientURL, rayClusterInstance); err != nil {
if err := rayDashboardClient.InitClient(ctx, clientURL, rayClusterInstance); err != nil {
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, err
}

Expand Down
17 changes: 12 additions & 5 deletions ray-operator/controllers/ray/utils/dashboard_httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (
)

type RayDashboardClientInterface interface {
InitClient(url string, rayCluster *rayv1.RayCluster) error
InitClient(ctx context.Context, url string, rayCluster *rayv1.RayCluster) error
UpdateDeployments(ctx context.Context, configJson []byte) error
// V2/multi-app Rest API
GetServeDetails(ctx context.Context) (*ServeDetails, error)
Expand Down Expand Up @@ -109,11 +109,18 @@ func FetchHeadServiceURL(ctx context.Context, cli client.Client, rayCluster *ray
return headServiceURL, nil
}

func (r *RayDashboardClient) InitClient(url string, rayCluster *rayv1.RayCluster) error {
func (r *RayDashboardClient) InitClient(ctx context.Context, url string, rayCluster *rayv1.RayCluster) error {
log := ctrl.LoggerFrom(ctx)

if r.useProxy {
headSvcName, err := GenerateHeadServiceName(RayClusterCRD, rayCluster.Spec, rayCluster.Name)
if err != nil {
return err
var err error
headSvcName := rayCluster.Status.Head.ServiceName
if headSvcName == "" {
log.Info("RayCluster is missing .status.head.serviceName, calling GenerateHeadServiceName instead...", "RayCluster name", rayCluster.Name, "namespace", rayCluster.Namespace)
headSvcName, err = GenerateHeadServiceName(RayClusterCRD, rayCluster.Spec, rayCluster.Name)
if err != nil {
return err
}
}

r.client = r.mgr.GetHTTPClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var _ = Describe("RayFrameworkGenerator", func() {
}

rayDashboardClient = &RayDashboardClient{}
err := rayDashboardClient.InitClient("127.0.0.1:8090", nil)
err := rayDashboardClient.InitClient(context.Background(), "127.0.0.1:8090", nil)
Expect(err).To(BeNil())
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type FakeRayDashboardClient struct {

var _ RayDashboardClientInterface = (*FakeRayDashboardClient)(nil)

func (r *FakeRayDashboardClient) InitClient(url string, rayCluster *rayv1.RayCluster) error {
func (r *FakeRayDashboardClient) InitClient(ctx context.Context, url string, rayCluster *rayv1.RayCluster) error {
r.client = &http.Client{}
r.dashboardURL = "http://" + url
return nil
Expand Down
13 changes: 11 additions & 2 deletions ray-operator/pkg/client/applyconfiguration/ray/v1/headinfo.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit ffac341

Please sign in to comment.