From 22e19d5854dfedcc30e1a4da83c5af25b3973c19 Mon Sep 17 00:00:00 2001 From: Alexandru Ionut Bujdei Date: Tue, 20 Feb 2024 08:44:08 +0000 Subject: [PATCH 1/5] allowing more extensibility in order to be able to customize the mqtt events logged in the database --- locust_plugins/users/mqtt.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/locust_plugins/users/mqtt.py b/locust_plugins/users/mqtt.py index 21826a1f..510c6c08 100644 --- a/locust_plugins/users/mqtt.py +++ b/locust_plugins/users/mqtt.py @@ -117,6 +117,10 @@ def __init__( self._publish_requests: dict[int, PublishedMessageContext] = {} self._subscribe_requests: dict[int, SubscribeContext] = {} + def _generate_event_name(self, event_type: str, qos: int, topic: str){ + return _generate_mqtt_event_name(event_type, qos, topic) + } + def _on_publish_cb( self, client: mqtt.Client, @@ -143,7 +147,7 @@ def _on_publish_cb( # fire successful publish event self.environment.events.request.fire( request_type=REQUEST_TYPE, - name=_generate_mqtt_event_name("publish", request_context.qos, request_context.topic), + name=self._generate_event_name("publish", request_context.qos, request_context.topic), response_time=(cb_time - request_context.start_time) * 1000, response_length=request_context.payload_size, exception=None, @@ -180,7 +184,7 @@ def _on_subscribe_cb( if SUBACK_FAILURE in granted_qos: self.environment.events.request.fire( request_type=REQUEST_TYPE, - name=_generate_mqtt_event_name("subscribe", request_context.qos, request_context.topic), + name=self._generate_event_name("subscribe", request_context.qos, request_context.topic), response_time=(cb_time - request_context.start_time) * 1000, response_length=0, exception=AssertionError(f"Broker returned an error response during subscription: {granted_qos}"), @@ -193,7 +197,7 @@ def _on_subscribe_cb( # fire successful subscribe event self.environment.events.request.fire( request_type=REQUEST_TYPE, - name=_generate_mqtt_event_name("subscribe", request_context.qos, request_context.topic), + name=self._generate_event_name("subscribe", request_context.qos, request_context.topic), response_time=(cb_time - request_context.start_time) * 1000, response_length=0, exception=None, @@ -287,7 +291,7 @@ def publish( if publish_info.rc != mqtt.MQTT_ERR_SUCCESS: self.environment.events.request.fire( request_type=REQUEST_TYPE, - name=_generate_mqtt_event_name("publish", request_context.qos, request_context.topic), + name=self._generate_event_name("publish", request_context.qos, request_context.topic), response_time=0, response_length=0, exception=publish_info.rc, @@ -325,7 +329,7 @@ def subscribe( if result != mqtt.MQTT_ERR_SUCCESS: self.environment.events.request.fire( request_type=REQUEST_TYPE, - name=_generate_mqtt_event_name("subscribe", request_context.qos, request_context.topic), + name=self._generate_event_name("subscribe", request_context.qos, request_context.topic), response_time=0, response_length=0, exception=result, From 99cb2d24374fb6d3b0d1365094c01d2c559e373d Mon Sep 17 00:00:00 2001 From: Alexandru Ionut Bujdei Date: Tue, 20 Feb 2024 09:38:51 +0000 Subject: [PATCH 2/5] fixed syntax issue --- locust_plugins/users/mqtt.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/locust_plugins/users/mqtt.py b/locust_plugins/users/mqtt.py index 510c6c08..2068445e 100644 --- a/locust_plugins/users/mqtt.py +++ b/locust_plugins/users/mqtt.py @@ -117,9 +117,8 @@ def __init__( self._publish_requests: dict[int, PublishedMessageContext] = {} self._subscribe_requests: dict[int, SubscribeContext] = {} - def _generate_event_name(self, event_type: str, qos: int, topic: str){ + def _generate_event_name(self, event_type: str, qos: int, topic: str): return _generate_mqtt_event_name(event_type, qos, topic) - } def _on_publish_cb( self, From 3d51443f786660b0b3ba8d106af03b5e62167855 Mon Sep 17 00:00:00 2001 From: Alexandru Ionut Bujdei Date: Tue, 20 Feb 2024 10:34:52 +0000 Subject: [PATCH 3/5] added example with simple custom Mqtt client implementation --- examples/mqtt_custom_client_ex.py | 49 +++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 examples/mqtt_custom_client_ex.py diff --git a/examples/mqtt_custom_client_ex.py b/examples/mqtt_custom_client_ex.py new file mode 100644 index 00000000..05d65325 --- /dev/null +++ b/examples/mqtt_custom_client_ex.py @@ -0,0 +1,49 @@ +import os +import ssl +import time +import typing + +from locust import task, TaskSet +from locust.env import Environment +from locust.user.wait_time import between +from locust_plugins.users.mqtt import MqttUser +from locust_plugins.users.mqtt import MqttClient + + + +tls_context = ssl.SSLContext(ssl.PROTOCOL_TLS) +tls_context.load_verify_locations(os.environ["LOCUST_MQTT_CAFILE"]) + +# extend the MqttClient class with your own custom implementation +class MyMqttClient(MqttClient): + def __init__( + self, + *args, + environment: Environment, + client_id: typing.Optional[str] = None, + **kwargs, + ): + super().__init__(*args, environment, client_id, **kwargs) + + # you can override the event name with your custom implementation + def _generate_event_name(self, event_type: str, qos: int, topic: str): + return f"mqtt:{event_type}:{qos}" + + +class MyUser(MqttUser): + # override the client_cls with your custom MqttClient implementation + client_cls: typing.Type[MyMqttClient] = MyMqttClient + + @task + class MyTasks(TaskSet): + # Sleep for a while to allow the client time to connect. + # This is probably not the most "correct" way to do this: a better method + # might be to add a gevent.event.Event to the MqttClient's on_connect + # callback and wait for that (with a timeout) here. + # However, this works well enough for the sake of an example. + def on_start(self): + time.sleep(5) + + @task + def say_hello(self): + self.client.publish("hello/locust", b"hello world") From f646950c510b2cf98c1d8ba8903617452d07a79f Mon Sep 17 00:00:00 2001 From: Alexandru Ionut Bujdei Date: Tue, 20 Feb 2024 11:00:38 +0000 Subject: [PATCH 4/5] removed unrequired override of initializer --- examples/mqtt_custom_client_ex.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/examples/mqtt_custom_client_ex.py b/examples/mqtt_custom_client_ex.py index 05d65325..919aaf73 100644 --- a/examples/mqtt_custom_client_ex.py +++ b/examples/mqtt_custom_client_ex.py @@ -10,20 +10,8 @@ from locust_plugins.users.mqtt import MqttClient - -tls_context = ssl.SSLContext(ssl.PROTOCOL_TLS) -tls_context.load_verify_locations(os.environ["LOCUST_MQTT_CAFILE"]) - # extend the MqttClient class with your own custom implementation class MyMqttClient(MqttClient): - def __init__( - self, - *args, - environment: Environment, - client_id: typing.Optional[str] = None, - **kwargs, - ): - super().__init__(*args, environment, client_id, **kwargs) # you can override the event name with your custom implementation def _generate_event_name(self, event_type: str, qos: int, topic: str): From 39640ddae65e56cc121dd28efe7c388cfb7fd746 Mon Sep 17 00:00:00 2001 From: Alexandru Ionut Bujdei Date: Tue, 20 Feb 2024 11:02:11 +0000 Subject: [PATCH 5/5] removed unrequired dependencies --- examples/mqtt_custom_client_ex.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/examples/mqtt_custom_client_ex.py b/examples/mqtt_custom_client_ex.py index 919aaf73..c7b4085a 100644 --- a/examples/mqtt_custom_client_ex.py +++ b/examples/mqtt_custom_client_ex.py @@ -1,11 +1,7 @@ -import os -import ssl import time import typing from locust import task, TaskSet -from locust.env import Environment -from locust.user.wait_time import between from locust_plugins.users.mqtt import MqttUser from locust_plugins.users.mqtt import MqttClient