Skip to content

Commit

Permalink
Context manager and polling output in test_run_headless_with_multiple…
Browse files Browse the repository at this point in the history
…_locustfiles_with_shape
  • Loading branch information
fletelli42 committed Sep 17, 2024
1 parent 63dd993 commit afb29e3
Showing 1 changed file with 77 additions and 52 deletions.
129 changes: 77 additions & 52 deletions locust/test/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -972,69 +972,94 @@ def tick(self):
)
self.assertNotIn("Traceback", combined_output, msg="Unexpected traceback found in output.")

@unittest.skipIf(os.name == "nt", reason="Signal handling on Windows is hard")
def test_run_headless_with_multiple_locustfiles_with_shape(self):
content = textwrap.dedent(
"""
from locust import User, task, between
class TestUser2(User):
wait_time = between(2, 4)
@task
def my_task(self):
print("running my_task() again")
"""
)
with mock_locustfile(content=content) as mocked1:
with temporary_file(
content=textwrap.dedent(
"""
from locust import User, task, between, LoadTestShape
class LoadTestShape(LoadTestShape):
def tick(self):
run_time = self.get_run_time()
if run_time < 2:
return (10, 1)
return None
with TemporaryDirectory() as temp_dir:
with mock_locustfile(
content=textwrap.dedent("""
from locust import User, task, between
class TestUser2(User):
wait_time = between(2, 4)
@task
def my_task(self):
print("running my_task() again")
"""),
dir=temp_dir,
) as mocked1:
with temporary_file(
content=textwrap.dedent("""
from locust import User, task, between, LoadTestShape
class MyLoadTestShape(LoadTestShape):
def tick(self):
run_time = self.get_run_time()
if run_time < 2:
return (10, 1) # (users, spawn rate)
return None # Stop the test
class TestUser(User):
wait_time = between(2, 4)
@task
def my_task(self):
print("running my_task()")
"""
)
) as mocked2:
proc = subprocess.Popen(
[
"locust",
class TestUser(User):
wait_time = between(2, 4)
@task
def my_task(self):
print("running my_task()")
"""),
dir=temp_dir,
) as mocked2:
args = [
"-f",
f"{mocked1.file_path},{mocked2}",
"--host",
"https://test.com/",
"--headless",
"--exit-code-on-error",
"0",
],
stdout=PIPE,
stderr=PIPE,
text=True,
env=os.environ.copy(),
)
]

try:
success = True
_, stderr = proc.communicate(timeout=5)
except subprocess.TimeoutExpired:
success = False
with run_locust_process(file_content=None, args=args, port=None) as manager:
shape_start_message = "Shape test starting"
shape_update_message = "Shape test updating to 10 users at 1.00 spawn rate"
shape_stop_message = "Shape test stopping"
shutdown_message = "Shutting down (exit code 0)"

messages_to_check = [
shape_start_message,
shape_update_message,
shape_stop_message,
shutdown_message,
]

proc.send_signal(signal.SIGTERM)
_, stderr = proc.communicate()
self.assertIn("Shape test updating to 10 users at 1.00 spawn rate", stderr)
self.assertTrue(success, "Got timeout and had to kill the process")
# ensure stats printer printed at least one report before shutting down and that there was a final report printed as well
self.assertRegex(stderr, r".*Aggregated[\S\s]*Shutting down[\S\s]*Aggregated.*")
self.assertIn("Shutting down (exit code 0)", stderr)
self.assertEqual(0, proc.returncode)
for message in messages_to_check:
if not wait_for_output_condition_non_threading(
manager.proc, manager.output_lines, message, timeout=10
):
print(f"Locust output after expected '{message}' time:")
print("\n".join(manager.output_lines))
manager.proc.terminate()
manager.proc.wait(timeout=5)
self.fail(f"Timeout waiting for Locust to output: {message}")

try:
manager.proc.wait(timeout=10)
except subprocess.TimeoutExpired:
print("Locust did not terminate within the expected time.")
manager.proc.terminate()
manager.proc.wait(timeout=10)
self.fail("Locust process did not terminate gracefully after SIGTERM.")

combined_output = "\n".join(manager.output_lines)

self.assertIn("running my_task()", combined_output, msg="TestUser task output not found.")
self.assertIn("running my_task() again", combined_output, msg="TestUser2 task output not found.")
self.assertRegex(
combined_output,
r".*Aggregated[\S\s]*Shutting down[\S\s]*Aggregated.*",
msg="Aggregated output not found before shutdown.",
)
self.assertEqual(
0,
manager.proc.returncode,
msg=f"Locust process exited with return code {manager.proc.returncode}.",
)
self.assertNotIn("Traceback", combined_output, msg="Unexpected traceback found in output.")

@unittest.skipIf(os.name == "nt", reason="Signal handling on windows is hard")
def test_autostart_wo_run_time(self):
Expand Down

0 comments on commit afb29e3

Please sign in to comment.