Skip to content

Commit

Permalink
[Feat][Kubectl-Plugin] Implement kubectl session for RayJob and RaySe…
Browse files Browse the repository at this point in the history
…rvice (#2379)
  • Loading branch information
MortalHappiness authored Sep 17, 2024
1 parent 800ac16 commit 5d3bceb
Show file tree
Hide file tree
Showing 6 changed files with 615 additions and 147 deletions.
2 changes: 1 addition & 1 deletion kubectl-plugin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
k8s.io/api v0.30.2
k8s.io/apimachinery v0.30.2
k8s.io/cli-runtime v0.30.2
k8s.io/client-go v0.30.2
Expand Down Expand Up @@ -76,7 +77,6 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.30.2 // indirect
k8s.io/component-base v0.30.2 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
Expand Down
155 changes: 111 additions & 44 deletions kubectl-plugin/pkg/cmd/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,93 @@ package session
import (
"context"
"fmt"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util"
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util/client"
"github.com/spf13/cobra"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/genericiooptions"
"k8s.io/client-go/kubernetes"
"k8s.io/kubectl/pkg/cmd/portforward"
cmdutil "k8s.io/kubectl/pkg/cmd/util"

"github.com/spf13/cobra"
"k8s.io/kubectl/pkg/util/templates"
)

const (
DASHBOARD_PORT = 8265
CLIENT_PORT = 10001
)
type appPort struct {
name string
port int
}

type SessionOptions struct {
ioStreams *genericiooptions.IOStreams
configFlags *genericclioptions.ConfigFlags
ioStreams *genericiooptions.IOStreams
client client.Client
ResourceType util.ResourceType
ResourceName string
Namespace string
}

var (
dashboardPort = appPort{
name: "Ray Dashboard",
port: 8265,
}
clientPort = appPort{
name: "Ray Interactive Client",
port: 10001,
}
servePort = appPort{
name: "Ray Serve",
port: 8000,
}
)

var (
sessionLong = templates.LongDesc(`
Forward local ports to the Ray resources.
Forward different local ports depending on the resource type: RayCluster, RayJob, or RayService.
`)

sessionExample = templates.Examples(`
# Without specifying the resource type, forward local ports to the RayCluster resource
kubectl ray session my-raycluster
# Forward local ports to the RayCluster resource
kubectl ray session raycluster/my-raycluster
# Forward local ports to the RayCluster used for the RayJob resource
kubectl ray session rayjob/my-rayjob
# Forward local ports to the RayCluster used for the RayService resource
kubectl ray session rayservice/my-rayservice
`)
)

func NewSessionOptions(streams genericiooptions.IOStreams) *SessionOptions {
configFlags := genericclioptions.NewConfigFlags(true)
return &SessionOptions{
ioStreams: &streams,
configFlags: genericclioptions.NewConfigFlags(true),
configFlags: configFlags,
}
}

func NewSessionCommand(streams genericiooptions.IOStreams) *cobra.Command {
options := NewSessionOptions(streams)
factory := cmdutil.NewFactory(options.configFlags)

cmd := &cobra.Command{
Use: "session NAME",
Short: "Forward local ports to the Ray resources. Currently only supports RayCluster.",
Use: "session (RAYCLUSTER | TYPE/NAME)",
Short: "Forward local ports to the Ray resources.",
Long: sessionLong,
Example: sessionExample,
RunE: func(cmd *cobra.Command, args []string) error {
if err := options.Complete(cmd, args); err != nil {
return err
}
if err := options.Validate(); err != nil {
return err
}
return options.Run(cmd.Context(), factory)
return options.Run(cmd.Context())
},
}
options.configFlags.AddFlags(cmd.Flags())
Expand All @@ -58,14 +100,43 @@ func (options *SessionOptions) Complete(cmd *cobra.Command, args []string) error
if len(args) != 1 {
return cmdutil.UsageErrorf(cmd, "%s", cmd.Use)
}
options.ResourceName = args[0]

typeAndName := strings.Split(args[0], "/")
if len(typeAndName) == 1 {
options.ResourceType = util.RayCluster
options.ResourceName = typeAndName[0]
} else {
if len(typeAndName) != 2 || typeAndName[1] == "" {
return cmdutil.UsageErrorf(cmd, "invalid resource type/name: %s", args[0])
}

switch typeAndName[0] {
case string(util.RayCluster):
options.ResourceType = util.RayCluster
case string(util.RayJob):
options.ResourceType = util.RayJob
case string(util.RayService):
options.ResourceType = util.RayService
default:
return cmdutil.UsageErrorf(cmd, "unsupported resource type: %s", typeAndName[0])
}

options.ResourceName = typeAndName[1]
}

if *options.configFlags.Namespace == "" {
options.Namespace = "default"
} else {
options.Namespace = *options.configFlags.Namespace
}

factory := cmdutil.NewFactory(options.configFlags)
k8sClient, err := client.NewClient(factory)
if err != nil {
return fmt.Errorf("failed to create client: %w", err)
}
options.client = k8sClient

return nil
}

Expand All @@ -81,46 +152,42 @@ func (options *SessionOptions) Validate() error {
return nil
}

func (options *SessionOptions) Run(ctx context.Context, factory cmdutil.Factory) error {
kubeClientSet, err := factory.KubernetesClientSet()
if err != nil {
return fmt.Errorf("failed to initialize clientset: %w", err)
}
func (options *SessionOptions) Run(ctx context.Context) error {
factory := cmdutil.NewFactory(options.configFlags)

svcName, err := findServiceName(ctx, kubeClientSet, options.Namespace, options.ResourceName)
svcName, err := options.client.GetRayHeadSvcName(ctx, options.Namespace, options.ResourceType, options.ResourceName)
if err != nil {
return err
}
fmt.Printf("Forwarding ports to service %s\n", svcName)

var appPorts []appPort
switch options.ResourceType {
case util.RayCluster:
appPorts = []appPort{dashboardPort, clientPort}
case util.RayJob:
appPorts = []appPort{dashboardPort}
case util.RayService:
appPorts = []appPort{dashboardPort, servePort}
default:
return fmt.Errorf("unsupported resource type: %s", options.ResourceType)
}

portForwardCmd := portforward.NewCmdPortForward(factory, *options.ioStreams)
portForwardCmd.SetArgs([]string{svcName, fmt.Sprintf("%d:%d", DASHBOARD_PORT, DASHBOARD_PORT), fmt.Sprintf("%d:%d", CLIENT_PORT, CLIENT_PORT)})
args := []string{"service/" + svcName}
for _, appPort := range appPorts {
args = append(args, fmt.Sprintf("%d:%d", appPort.port, appPort.port))
}
portForwardCmd.SetArgs(args)

fmt.Printf("Ray Dashboard: http://localhost:%d\nRay Interactive Client: http://localhost:%d\n\n", DASHBOARD_PORT, CLIENT_PORT)
for _, appPort := range appPorts {
fmt.Printf("%s: http://localhost:%d\n", appPort.name, appPort.port)
}
fmt.Println()

if err := portForwardCmd.ExecuteContext(ctx); err != nil {
return fmt.Errorf("failed to port-forward: %w", err)
}

return nil
}

func findServiceName(ctx context.Context, kubeClientSet kubernetes.Interface, namespace, resourceName string) (string, error) {
listopts := metav1.ListOptions{
LabelSelector: fmt.Sprintf("ray.io/cluster=%s, ray.io/node-type=head", resourceName),
}

rayHeadSvcs, err := kubeClientSet.CoreV1().Services(namespace).List(ctx, listopts)
if err != nil {
return "", fmt.Errorf("unable to retrieve ray head services: %w", err)
}

if len(rayHeadSvcs.Items) == 0 {
return "", fmt.Errorf("no ray head services found")
}
if len(rayHeadSvcs.Items) > 1 {
return "", fmt.Errorf("more than one ray head service found")
}

rayHeadSrc := rayHeadSvcs.Items[0]
return "service/" + rayHeadSrc.Name, nil
}
Loading

0 comments on commit 5d3bceb

Please sign in to comment.