diff --git a/examples/mqtt_custom_client_ex.py b/examples/mqtt_custom_client_ex.py new file mode 100644 index 0000000..c7b4085 --- /dev/null +++ b/examples/mqtt_custom_client_ex.py @@ -0,0 +1,33 @@ +import time +import typing + +from locust import task, TaskSet +from locust_plugins.users.mqtt import MqttUser +from locust_plugins.users.mqtt import MqttClient + + +# extend the MqttClient class with your own custom implementation +class MyMqttClient(MqttClient): + + # you can override the event name with your custom implementation + def _generate_event_name(self, event_type: str, qos: int, topic: str): + return f"mqtt:{event_type}:{qos}" + + +class MyUser(MqttUser): + # override the client_cls with your custom MqttClient implementation + client_cls: typing.Type[MyMqttClient] = MyMqttClient + + @task + class MyTasks(TaskSet): + # Sleep for a while to allow the client time to connect. + # This is probably not the most "correct" way to do this: a better method + # might be to add a gevent.event.Event to the MqttClient's on_connect + # callback and wait for that (with a timeout) here. + # However, this works well enough for the sake of an example. + def on_start(self): + time.sleep(5) + + @task + def say_hello(self): + self.client.publish("hello/locust", b"hello world") diff --git a/locust_plugins/users/mqtt.py b/locust_plugins/users/mqtt.py index 21826a1..2068445 100644 --- a/locust_plugins/users/mqtt.py +++ b/locust_plugins/users/mqtt.py @@ -117,6 +117,9 @@ def __init__( self._publish_requests: dict[int, PublishedMessageContext] = {} self._subscribe_requests: dict[int, SubscribeContext] = {} + def _generate_event_name(self, event_type: str, qos: int, topic: str): + return _generate_mqtt_event_name(event_type, qos, topic) + def _on_publish_cb( self, client: mqtt.Client, @@ -143,7 +146,7 @@ def _on_publish_cb( # fire successful publish event self.environment.events.request.fire( request_type=REQUEST_TYPE, - name=_generate_mqtt_event_name("publish", request_context.qos, request_context.topic), + name=self._generate_event_name("publish", request_context.qos, request_context.topic), response_time=(cb_time - request_context.start_time) * 1000, response_length=request_context.payload_size, exception=None, @@ -180,7 +183,7 @@ def _on_subscribe_cb( if SUBACK_FAILURE in granted_qos: self.environment.events.request.fire( request_type=REQUEST_TYPE, - name=_generate_mqtt_event_name("subscribe", request_context.qos, request_context.topic), + name=self._generate_event_name("subscribe", request_context.qos, request_context.topic), response_time=(cb_time - request_context.start_time) * 1000, response_length=0, exception=AssertionError(f"Broker returned an error response during subscription: {granted_qos}"), @@ -193,7 +196,7 @@ def _on_subscribe_cb( # fire successful subscribe event self.environment.events.request.fire( request_type=REQUEST_TYPE, - name=_generate_mqtt_event_name("subscribe", request_context.qos, request_context.topic), + name=self._generate_event_name("subscribe", request_context.qos, request_context.topic), response_time=(cb_time - request_context.start_time) * 1000, response_length=0, exception=None, @@ -287,7 +290,7 @@ def publish( if publish_info.rc != mqtt.MQTT_ERR_SUCCESS: self.environment.events.request.fire( request_type=REQUEST_TYPE, - name=_generate_mqtt_event_name("publish", request_context.qos, request_context.topic), + name=self._generate_event_name("publish", request_context.qos, request_context.topic), response_time=0, response_length=0, exception=publish_info.rc, @@ -325,7 +328,7 @@ def subscribe( if result != mqtt.MQTT_ERR_SUCCESS: self.environment.events.request.fire( request_type=REQUEST_TYPE, - name=_generate_mqtt_event_name("subscribe", request_context.qos, request_context.topic), + name=self._generate_event_name("subscribe", request_context.qos, request_context.topic), response_time=0, response_length=0, exception=result,