diff --git a/docs/running-distributed.rst b/docs/running-distributed.rst index 1baf20cf47..a23d657f50 100644 --- a/docs/running-distributed.rst +++ b/docs/running-distributed.rst @@ -131,7 +131,20 @@ order to coordinate the test. This can be easily accomplished with custom messag environment.runner.send_message('test_users', users) Note that when running locally (i.e. non-distributed), this functionality will be preserved; -the messages will simply be handled by the runner that sends them. +the messages will simply be handled by the runner that sends them. + +.. note:: + Using the default options while registering a message handler will run the listener function + in a **blocking** way, resulting in the heartbeat and other messages being delayed for the amount + of the execution. + If it is known that the listener function will handle time-intensive tasks, it is possible to register the + function as **concurrent** (as a separate greenlet). + + .. code-block:: + environment.runner.register_message('test_users', setup_test_users, concurrent=True) + + Please use this feature with care, as otherwise it could result in greenlets running and influencing + the running loadtest. For more details, see the `complete example `_. diff --git a/examples/custom_messages.py b/examples/custom_messages.py index bd679de04a..2cbc6af795 100644 --- a/examples/custom_messages.py +++ b/examples/custom_messages.py @@ -1,12 +1,18 @@ from locust import HttpUser, between, events, task from locust.runners import MasterRunner, WorkerRunner +import gevent + usernames = [] def setup_test_users(environment, msg, **kwargs): # Fired when the worker receives a message of type 'test_users' usernames.extend(map(lambda u: u["name"], msg.data)) + # Even though "acknowledge_concurrent_users" was sent first, "acknowledge_users" + # will print its statement first, as "acknowledge_concurrent_users" was registered + # running concurrently, and therefore not blocking other messages. + environment.runner.send_message("concurrent_message", "This is a non blocking message") environment.runner.send_message("acknowledge_users", f"Thanks for the {len(msg.data)} users!") @@ -15,12 +21,18 @@ def on_acknowledge(msg, **kwargs): print(msg.data) +def on_concurrent_message(msg, **kwargs): + gevent.sleep(10) + print(msg.data) + + @events.init.add_listener def on_locust_init(environment, **_kwargs): if not isinstance(environment.runner, MasterRunner): environment.runner.register_message("test_users", setup_test_users) if not isinstance(environment.runner, WorkerRunner): environment.runner.register_message("acknowledge_users", on_acknowledge) + environment.runner.register_message("concurrent_message", on_concurrent_message, concurrent=True) @events.test_start.add_listener diff --git a/locust/runners.py b/locust/runners.py index 03b7c210ae..3f546f6066 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -119,7 +119,7 @@ def __init__(self, environment: Environment) -> None: self.target_user_classes_count: dict[str, int] = {} # target_user_count is set before the ramp-up/ramp-down occurs. self.target_user_count: int = 0 - self.custom_messages: dict[str, Callable] = {} + self.custom_messages: dict[str, tuple[Callable, bool]] = {} self._users_dispatcher: UsersDispatcher | None = None @@ -420,7 +420,7 @@ def log_exception(self, node_id: str, msg: str, formatted_tb: str) -> None: row["nodes"].add(node_id) self.exceptions[key] = row - def register_message(self, msg_type: str, listener: Callable) -> None: + def register_message(self, msg_type: str, listener: Callable, concurrent=False) -> None: """ Register a listener for a custom message from another node @@ -429,7 +429,7 @@ def register_message(self, msg_type: str, listener: Callable) -> None: """ if msg_type in self.custom_messages: raise Exception(f"Tried to register listener method for {msg_type}, but it already had a listener!") - self.custom_messages[msg_type] = listener + self.custom_messages[msg_type] = (listener, concurrent) class LocalRunner(Runner): @@ -568,7 +568,7 @@ def send_message(self, msg_type: str, data: Any | None = None, client_id: str | """ logger.debug("Running locally: sending %s message to self" % msg_type) if msg_type in self.custom_messages: - listener = self.custom_messages[msg_type] + listener, concurrent = self.custom_messages[msg_type] msg = Message(msg_type, data, "local") listener(environment=self.environment, msg=msg) else: @@ -1139,7 +1139,11 @@ def client_listener(self) -> NoReturn: f"Received {msg.type} message from worker {msg.node_id} (index {self.get_worker_index(msg.node_id)})" ) try: - self.custom_messages[msg.type](environment=self.environment, msg=msg) + listener, concurrent = self.custom_messages[msg.type] + if not concurrent: + listener(environment=self.environment, msg=msg) + else: + gevent.spawn(listener, self.environment, msg) except Exception: logging.error(f"Uncaught exception in handler for {msg.type}\n{traceback.format_exc()}") @@ -1393,7 +1397,11 @@ def worker(self) -> NoReturn: self.last_heartbeat_timestamp = time.time() elif msg.type in self.custom_messages: logger.debug("Received %s message from master" % msg.type) - self.custom_messages[msg.type](environment=self.environment, msg=msg) + listener, concurrent = self.custom_messages[msg.type] + if not concurrent: + listener(environment=self.environment, msg=msg) + else: + gevent.spawn(listener, self.environment, msg) else: logger.warning(f"Unknown message type received: {msg.type}") diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index 735e016ba9..cd5c1e6fe1 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -606,6 +606,31 @@ def on_custom_msg(msg, **kw): self.assertTrue(test_custom_msg[0]) self.assertEqual(123, test_custom_msg_data[0]["test_data"]) + def test_concurrent_custom_message(self): + class MyUser(User): + wait_time = constant(1) + + @task + def my_task(self): + pass + + test_custom_msg = [False] + test_custom_msg_data = [{}] + + def on_custom_msg(msg, **kw): + test_custom_msg[0] = True + test_custom_msg_data[0] = msg.data + + environment = Environment(user_classes=[MyUser]) + runner = LocalRunner(environment) + + runner.register_message("test_custom_msg", on_custom_msg, concurrent=True) + runner.send_message("test_custom_msg", {"test_data": 123}) + + gevent.sleep(0.5) + self.assertTrue(test_custom_msg[0]) + self.assertEqual(123, test_custom_msg_data[0]["test_data"]) + def test_undefined_custom_message(self): class MyUser(User): wait_time = constant(1)