Skip to content

Commit

Permalink
[Test][Autoscaler][2/n] Add Ray Autoscaler e2e tests for GPU workers (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
rueian authored Jun 9, 2024
1 parent 93e32d0 commit 40a946a
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 2 deletions.
10 changes: 8 additions & 2 deletions ray-operator/test/e2e/create_detached_actor.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import ray
import sys
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('name')
parser.add_argument('--num-cpus', type=float, default=1)
parser.add_argument('--num-gpus', type=float, default=0)
args = parser.parse_args()

@ray.remote(num_cpus=1)
@ray.remote(num_cpus=args.num_cpus, num_gpus=args.num_gpus)
class Actor:
pass


ray.init(namespace="default_namespace")
Actor.options(name=sys.argv[1], lifetime="detached").remote()
Actor.options(name=args.name, lifetime="detached").remote()
58 changes: 58 additions & 0 deletions ray-operator/test/e2e/raycluster_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,61 @@ func TestRayClusterAutoscaler(t *testing.T) {
Should(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(0))))
})
}

func TestRayClusterAutoscalerWithFakeGPU(t *testing.T) {
test := With(t)

// Create a namespace
namespace := test.NewTestNamespace()
test.StreamKubeRayOperatorLogs()

// Scripts for creating and terminating detached actors to trigger autoscaling
scriptsAC := newConfigMap(namespace.Name, "scripts", files(test, "create_detached_actor.py", "terminate_detached_actor.py"))
scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name)

test.T().Run("Create a RayCluster with autoscaling enabled", func(_ *testing.T) {
rayClusterSpecAC := rayv1ac.RayClusterSpec().
WithEnableInTreeAutoscaling(true).
WithRayVersion(GetRayVersion()).
WithHeadGroupSpec(rayv1ac.HeadGroupSpec().
WithRayStartParams(map[string]string{"num-cpus": "0"}).
WithTemplate(headPodTemplateApplyConfiguration())).
WithWorkerGroupSpecs(rayv1ac.WorkerGroupSpec().
WithReplicas(0).
WithMinReplicas(0).
WithMaxReplicas(3).
WithGroupName("gpu-group").
WithRayStartParams(map[string]string{"num-cpus": "1", "num-gpus": "1"}).
WithTemplate(workerPodTemplateApplyConfiguration()))
rayClusterAC := rayv1ac.RayCluster("ray-cluster", namespace.Name).
WithSpec(apply(rayClusterSpecAC, mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](scripts, "/home/ray/test_scripts")))

rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created RayCluster %s/%s successfully", rayCluster.Namespace, rayCluster.Name)

// Wait for RayCluster to become ready and verify the number of available worker replicas.
test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium).
Should(WithTransform(RayClusterState, Equal(rayv1.Ready)))
rayCluster = GetRayCluster(test, rayCluster.Namespace, rayCluster.Name)
test.Expect(rayCluster.Status.DesiredWorkerReplicas).To(Equal(int32(0)))

headPod := GetHeadPod(test, rayCluster)
test.T().Logf("Found head pod %s/%s", headPod.Namespace, headPod.Name)

// Create a detached gpu actor, and a worker in the "gpu-group" should be created.
ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/create_detached_actor.py", "gpu_actor", "--num-gpus=1"})
test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium).
Should(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(1))))
// We don't use real GPU resources of Kubernetes here, therefore we can't test the RayClusterDesiredGPU.
// We test the Pods count of the "gpu-group" instead.
test.Expect(GetGroupPods(test, rayCluster, "gpu-group")).To(HaveLen(1))

// Terminate the gpu detached actor, and the worker should be deleted.
ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/terminate_detached_actor.py", "gpu_actor"})
test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium).
Should(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(0))))
})
}
10 changes: 10 additions & 0 deletions ray-operator/test/support/ray.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ func GetHeadPod(t Test, rayCluster *rayv1.RayCluster) *corev1.Pod {
return &pods.Items[0]
}

func GetGroupPods(t Test, rayCluster *rayv1.RayCluster, group string) []corev1.Pod {
t.T().Helper()
pods, err := t.Client().Core().CoreV1().Pods(rayCluster.Namespace).List(
t.Ctx(),
common.RayClusterGroupPodsAssociationOptions(rayCluster, group).ToMetaV1ListOptions(),
)
t.Expect(err).NotTo(gomega.HaveOccurred())
return pods.Items
}

func RayService(t Test, namespace, name string) func(g gomega.Gomega) *rayv1.RayService {
return func(g gomega.Gomega) *rayv1.RayService {
service, err := t.Client().Ray().RayV1().RayServices(namespace).Get(t.Ctx(), name, metav1.GetOptions{})
Expand Down

0 comments on commit 40a946a

Please sign in to comment.