Skip to content

Commit

Permalink
[Test][HA] RayService high-availability test without autoscaling enab…
Browse files Browse the repository at this point in the history
…led (#2176)

Signed-off-by: kaihsun <kaihsun@anyscale.com>
  • Loading branch information
kevin85421 authored Jun 7, 2024
1 parent dc8be94 commit 8875c8b
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ data:
import os
class ConstantUser(FastHttpUser):
wait_time = constant(float(os.environ.get("LOCUS_WAIT_TIME", "1")))
wait_time = constant(float(os.environ.get("LOCUST_WAIT_TIME", "1")))
network_timeout = None
connection_timeout = None
@task
Expand Down
132 changes: 132 additions & 0 deletions ray-operator/test/e2e/locust_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""
This script is based on: https://raw.githubusercontent.com/ray-project/serve_workloads/main/microbenchmarks/locust_runner.py
Run Locust on Ray cluster.
Run this script on a Ray cluster's head node to launch one Locust worker per
CPU across the Ray cluster's nodes.
Example command:
$ python locust_runner.py -f locustfile.py -u 200 -r 50 --host [HOST_URL]
"""

import os
import ray
import json
import time
import argparse
import sys
import subprocess
from tqdm import tqdm


ray.init()

HTML_RESULTS_DIR = os.environ.get("HTML_RESULTS_DIR", "locust_results")
DEFAULT_RESULT_FILENAME = \
f"{time.strftime('%Y-%m-%d-%p-%H-%M-%S-results.html')}"

parser = argparse.ArgumentParser()
parser.add_argument(
"--html",
default=DEFAULT_RESULT_FILENAME,
type=str,
help="HTML file to save results to.",
)
parser.add_argument(
"-t",
"--run-time",
default=None,
type=str,
help="Test duration. Same option as Locust's --run-time.",
)

args, locust_args = parser.parse_known_args()

num_locust_workers = int(ray.available_resources()["CPU"])
master_address = ray.util.get_node_ip_address()

if not os.path.exists(HTML_RESULTS_DIR):
os.mkdir(HTML_RESULTS_DIR)

# Required locust args: -f, -u, -r, --host, and any custom locustfile args
base_locust_cmd = [
"locust",
"--headless",
f"--html={HTML_RESULTS_DIR}/{args.html}",
*locust_args,
]


@ray.remote(num_cpus=1)
class LocustWorker:
def __init__(self):
self.proc = None

def start(self):
worker_locust_cmd = base_locust_cmd + [
"--worker",
f"--master-host={master_address}",
]
self.proc = subprocess.Popen(worker_locust_cmd)


print(f"Spawning {num_locust_workers} Locust worker Ray tasks.")

# Hold reference to each locust worker to prevent them from being torn down
locust_workers = []
start_refs = []
for _ in tqdm(range(num_locust_workers)):
locust_worker = LocustWorker.remote()
locust_workers.append(locust_worker)
start_refs.append(locust_worker.start.remote())

print("Waiting for Locust worker processes to start.")


def wait_for_locust_workers(start_refs):
"""Generator that yields whenever a worker process starts.
Use with tqdm to track how many workers have started. If you don't need
tqdm, use ray.get(start_refs) instead of calling this function.
"""

remaining_start_refs = start_refs
while remaining_start_refs:
finished_start_refs, remaining_start_refs = \
ray.wait(remaining_start_refs)
for ref in finished_start_refs:
yield ray.get(ref)


# No-op for-loop to let tqdm track wait_for_locust_workers() progress
for _ in tqdm(wait_for_locust_workers(start_refs), total=num_locust_workers):
pass

master_locust_cmd = base_locust_cmd + [
"--master",
f"--expect-workers={num_locust_workers}",
"--json",
]
print(f"Locust command: {master_locust_cmd}")

if args.run_time is not None:
master_locust_cmd += [f"--run-time={args.run_time}"]
proc = subprocess.Popen(master_locust_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = proc.communicate()

print("STDOUT:", stdout.decode())
print("STDERR:", stderr.decode())

data = json.loads(stdout.decode())
assert len(data) == 1, f"data_len: {len(data)}"

num_failures = data[0]["num_failures"]
num_requests = data[0]["num_requests"]

assert num_failures == 0, f"num_failures: {num_failures}"
assert num_requests != 0, f"num_requests: {num_requests}"

print("returncode:", proc.returncode)
sys.exit(proc.returncode)
26 changes: 26 additions & 0 deletions ray-operator/test/e2e/locustfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from locust import FastHttpUser, task, constant, LoadTestShape


class ConstantUser(FastHttpUser):
wait_time = constant(1)
network_timeout = None
connection_timeout = None

@task
def hello_world(self):
self.client.post("/")


# Derived from https://github.com/locustio/locust/blob/master/examples/custom_shape/stages.py
class StagesShape(LoadTestShape):
stages = [
{"duration": 60, "users": 10, "spawn_rate": 10},
]

def tick(self):
run_time = self.get_run_time()
for stage in self.stages:
if run_time < stage["duration"]:
tick_data = (stage["users"], stage["spawn_rate"])
return tick_data
return None
86 changes: 86 additions & 0 deletions ray-operator/test/e2e/rayservice_ha_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package e2e

import (
"testing"

. "github.com/onsi/gomega"

corev1ac "k8s.io/client-go/applyconfigurations/core/v1"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1"
. "github.com/ray-project/kuberay/ray-operator/test/support"
)

func TestRayService(t *testing.T) {
test := With(t)

// Create a namespace
namespace := test.NewTestNamespace()
test.StreamKubeRayOperatorLogs()

// Scripts for creating and terminating detached actors to trigger autoscaling
scriptsAC := newConfigMap(namespace.Name, "scripts", files(test, "locustfile.py", "locust_runner.py"))
scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name)

test.T().Run("Static RayService", func(t *testing.T) {
rayServiceAC := rayv1ac.RayService("static-raysvc", namespace.Name).
WithSpec(rayv1ac.RayServiceSpec().
WithServeConfigV2(`
proxy_location: EveryNode
applications:
- name: no_ops
route_prefix: /
import_path: microbenchmarks.no_ops:app_builder
args:
num_forwards: 0
runtime_env:
working_dir: https://github.com/ray-project/serve_workloads/archive/a2e2405f3117f1b4134b6924b5f44c4ff0710c00.zip
deployments:
- name: NoOp
num_replicas: 2
max_replicas_per_node: 1
ray_actor_options:
num_cpus: 1
`).
WithRayClusterSpec(newRayClusterSpec()))

rayService, err := test.Client().Ray().RayV1().RayServices(namespace.Name).Apply(test.Ctx(), rayServiceAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created RayService %s/%s successfully", rayService.Namespace, rayService.Name)

test.T().Logf("Waiting for RayService %s/%s to running", rayService.Namespace, rayService.Name)
test.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutMedium).
Should(WithTransform(RayServiceStatus, Equal(rayv1.Running)))

locustClusterAC := rayv1ac.RayCluster("locust-cluster", namespace.Name).
WithSpec(rayv1ac.RayClusterSpec().
WithRayVersion(GetRayVersion()).
WithHeadGroupSpec(rayv1ac.HeadGroupSpec().
WithRayStartParams(map[string]string{"dashboard-host": "0.0.0.0"}).
WithTemplate(apply(headPodTemplateApplyConfiguration(), mountConfigMap[corev1ac.PodTemplateSpecApplyConfiguration](scripts, "/home/ray/test_scripts")))))
locustCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), locustClusterAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created Locust RayCluster %s/%s successfully", locustCluster.Namespace, locustCluster.Name)

// Wait for RayCluster to become ready and verify the number of available worker replicas.
test.Eventually(RayCluster(test, locustCluster.Namespace, locustCluster.Name), TestTimeoutMedium).
Should(WithTransform(RayClusterState, Equal(rayv1.Ready)))
locustCluster = GetRayCluster(test, locustCluster.Namespace, locustCluster.Name)
test.Expect(locustCluster.Status.DesiredWorkerReplicas).To(Equal(int32(0)))

headPod := GetHeadPod(test, locustCluster)
test.T().Logf("Found head pod %s/%s", headPod.Namespace, headPod.Name)

// Install Locust in the head Pod
ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"pip", "install", "locust"})

// Run Locust test
ExecPodCmd(test, headPod, common.RayHeadContainer, []string{
"python", "/home/ray/test_scripts/locust_runner.py", "-f", "/home/ray/test_scripts/locustfile.py", "--host", "http://static-raysvc-serve-svc:8000",
})
})
}
9 changes: 1 addition & 8 deletions ray-operator/test/e2e/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,10 @@ func headPodTemplateApplyConfiguration() *corev1ac.PodTemplateSpecApplyConfigura
WithImage(GetRayImage()).
WithPorts(
corev1ac.ContainerPort().WithName("gcs").WithContainerPort(6379),
corev1ac.ContainerPort().WithName("serve").WithContainerPort(8000),
corev1ac.ContainerPort().WithName("dashboard").WithContainerPort(8265),
corev1ac.ContainerPort().WithName("client").WithContainerPort(10001),
).
WithLifecycle(corev1ac.Lifecycle().
WithPreStop(corev1ac.LifecycleHandler().
WithExec(corev1ac.ExecAction().
WithCommand("/bin/sh", "-c", "ray stop")))).
WithResources(corev1ac.ResourceRequirements().
WithRequests(corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("300m"),
Expand All @@ -150,10 +147,6 @@ func workerPodTemplateApplyConfiguration() *corev1ac.PodTemplateSpecApplyConfigu
WithContainers(corev1ac.Container().
WithName("ray-worker").
WithImage(GetRayImage()).
WithLifecycle(corev1ac.Lifecycle().
WithPreStop(corev1ac.LifecycleHandler().
WithExec(corev1ac.ExecAction().
WithCommand("/bin/sh", "-c", "ray stop")))).
WithResources(corev1ac.ResourceRequirements().
WithRequests(corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("300m"),
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/test/support/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,6 @@ func ExecPodCmd(t Test, pod *corev1.Pod, containerName string, cmd []string) {

t.T().Logf("Executing command: kubectl %s", kubectlCmd)
output, err := exec.Command("kubectl", kubectlCmd...).CombinedOutput()
t.Expect(err).NotTo(gomega.HaveOccurred())
t.T().Logf("Command output: %s", output)
t.Expect(err).NotTo(gomega.HaveOccurred())
}
12 changes: 12 additions & 0 deletions ray-operator/test/support/ray.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,15 @@ func GetHeadPod(t Test, rayCluster *rayv1.RayCluster) *corev1.Pod {
t.Expect(len(pods.Items)).To(gomega.Equal(1))
return &pods.Items[0]
}

func RayService(t Test, namespace, name string) func(g gomega.Gomega) *rayv1.RayService {
return func(g gomega.Gomega) *rayv1.RayService {
service, err := t.Client().Ray().RayV1().RayServices(namespace).Get(t.Ctx(), name, metav1.GetOptions{})
g.Expect(err).NotTo(gomega.HaveOccurred())
return service
}
}

func RayServiceStatus(service *rayv1.RayService) rayv1.ServiceStatus {
return service.Status.ServiceStatus
}

0 comments on commit 8875c8b

Please sign in to comment.