Skip to content

Commit

Permalink
Merge pull request #2650 from samuelspagl/fix/run-custom-messages-in-…
Browse files Browse the repository at this point in the history
…greenlet

Add functionality to run listener functions for `custom_messages` concurrently
  • Loading branch information
cyberw authored Mar 25, 2024
2 parents af8c067 + 78dc88e commit 18b19e0
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 7 deletions.
15 changes: 14 additions & 1 deletion docs/running-distributed.rst
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,20 @@ order to coordinate the test. This can be easily accomplished with custom messag
environment.runner.send_message('test_users', users)
Note that when running locally (i.e. non-distributed), this functionality will be preserved;
the messages will simply be handled by the runner that sends them.
the messages will simply be handled by the runner that sends them.

.. note::
Using the default options while registering a message handler will run the listener function
in a **blocking** way, resulting in the heartbeat and other messages being delayed for the amount
of the execution.
If it is known that the listener function will handle time-intensive tasks, it is possible to register the
function as **concurrent** (as a separate greenlet).

.. code-block::
environment.runner.register_message('test_users', setup_test_users, concurrent=True)
Please use this feature with care, as otherwise it could result in greenlets running and influencing
the running loadtest.

For more details, see the `complete example <https://github.com/locustio/locust/tree/master/examples/custom_messages.py>`_.

Expand Down
12 changes: 12 additions & 0 deletions examples/custom_messages.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
from locust import HttpUser, between, events, task
from locust.runners import MasterRunner, WorkerRunner

import gevent

usernames = []


def setup_test_users(environment, msg, **kwargs):
# Fired when the worker receives a message of type 'test_users'
usernames.extend(map(lambda u: u["name"], msg.data))
# Even though "acknowledge_concurrent_users" was sent first, "acknowledge_users"
# will print its statement first, as "acknowledge_concurrent_users" was registered
# running concurrently, and therefore not blocking other messages.
environment.runner.send_message("concurrent_message", "This is a non blocking message")
environment.runner.send_message("acknowledge_users", f"Thanks for the {len(msg.data)} users!")


Expand All @@ -15,12 +21,18 @@ def on_acknowledge(msg, **kwargs):
print(msg.data)


def on_concurrent_message(msg, **kwargs):
gevent.sleep(10)
print(msg.data)


@events.init.add_listener
def on_locust_init(environment, **_kwargs):
if not isinstance(environment.runner, MasterRunner):
environment.runner.register_message("test_users", setup_test_users)
if not isinstance(environment.runner, WorkerRunner):
environment.runner.register_message("acknowledge_users", on_acknowledge)
environment.runner.register_message("concurrent_message", on_concurrent_message, concurrent=True)


@events.test_start.add_listener
Expand Down
20 changes: 14 additions & 6 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def __init__(self, environment: Environment) -> None:
self.target_user_classes_count: dict[str, int] = {}
# target_user_count is set before the ramp-up/ramp-down occurs.
self.target_user_count: int = 0
self.custom_messages: dict[str, Callable] = {}
self.custom_messages: dict[str, tuple[Callable, bool]] = {}

self._users_dispatcher: UsersDispatcher | None = None

Expand Down Expand Up @@ -420,7 +420,7 @@ def log_exception(self, node_id: str, msg: str, formatted_tb: str) -> None:
row["nodes"].add(node_id)
self.exceptions[key] = row

def register_message(self, msg_type: str, listener: Callable) -> None:
def register_message(self, msg_type: str, listener: Callable, concurrent=False) -> None:
"""
Register a listener for a custom message from another node
Expand All @@ -429,7 +429,7 @@ def register_message(self, msg_type: str, listener: Callable) -> None:
"""
if msg_type in self.custom_messages:
raise Exception(f"Tried to register listener method for {msg_type}, but it already had a listener!")
self.custom_messages[msg_type] = listener
self.custom_messages[msg_type] = (listener, concurrent)


class LocalRunner(Runner):
Expand Down Expand Up @@ -568,7 +568,7 @@ def send_message(self, msg_type: str, data: Any | None = None, client_id: str |
"""
logger.debug("Running locally: sending %s message to self" % msg_type)
if msg_type in self.custom_messages:
listener = self.custom_messages[msg_type]
listener, concurrent = self.custom_messages[msg_type]
msg = Message(msg_type, data, "local")
listener(environment=self.environment, msg=msg)
else:
Expand Down Expand Up @@ -1139,7 +1139,11 @@ def client_listener(self) -> NoReturn:
f"Received {msg.type} message from worker {msg.node_id} (index {self.get_worker_index(msg.node_id)})"
)
try:
self.custom_messages[msg.type](environment=self.environment, msg=msg)
listener, concurrent = self.custom_messages[msg.type]
if not concurrent:
listener(environment=self.environment, msg=msg)
else:
gevent.spawn(listener, self.environment, msg)
except Exception:
logging.error(f"Uncaught exception in handler for {msg.type}\n{traceback.format_exc()}")

Expand Down Expand Up @@ -1393,7 +1397,11 @@ def worker(self) -> NoReturn:
self.last_heartbeat_timestamp = time.time()
elif msg.type in self.custom_messages:
logger.debug("Received %s message from master" % msg.type)
self.custom_messages[msg.type](environment=self.environment, msg=msg)
listener, concurrent = self.custom_messages[msg.type]
if not concurrent:
listener(environment=self.environment, msg=msg)
else:
gevent.spawn(listener, self.environment, msg)
else:
logger.warning(f"Unknown message type received: {msg.type}")

Expand Down
25 changes: 25 additions & 0 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,31 @@ def on_custom_msg(msg, **kw):
self.assertTrue(test_custom_msg[0])
self.assertEqual(123, test_custom_msg_data[0]["test_data"])

def test_concurrent_custom_message(self):
class MyUser(User):
wait_time = constant(1)

@task
def my_task(self):
pass

test_custom_msg = [False]
test_custom_msg_data = [{}]

def on_custom_msg(msg, **kw):
test_custom_msg[0] = True
test_custom_msg_data[0] = msg.data

environment = Environment(user_classes=[MyUser])
runner = LocalRunner(environment)

runner.register_message("test_custom_msg", on_custom_msg, concurrent=True)
runner.send_message("test_custom_msg", {"test_data": 123})

gevent.sleep(0.5)
self.assertTrue(test_custom_msg[0])
self.assertEqual(123, test_custom_msg_data[0]["test_data"])

def test_undefined_custom_message(self):
class MyUser(User):
wait_time = constant(1)
Expand Down

0 comments on commit 18b19e0

Please sign in to comment.