Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allowing more extensibility to customize the mqtt events logged in the database #173

Merged
merged 5 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions examples/mqtt_custom_client_ex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import time
import typing

from locust import task, TaskSet
from locust_plugins.users.mqtt import MqttUser
from locust_plugins.users.mqtt import MqttClient


# extend the MqttClient class with your own custom implementation
class MyMqttClient(MqttClient):

# you can override the event name with your custom implementation
def _generate_event_name(self, event_type: str, qos: int, topic: str):
return f"mqtt:{event_type}:{qos}"


class MyUser(MqttUser):
# override the client_cls with your custom MqttClient implementation
client_cls: typing.Type[MyMqttClient] = MyMqttClient

@task
class MyTasks(TaskSet):
# Sleep for a while to allow the client time to connect.
# This is probably not the most "correct" way to do this: a better method
# might be to add a gevent.event.Event to the MqttClient's on_connect
# callback and wait for that (with a timeout) here.
# However, this works well enough for the sake of an example.
def on_start(self):
time.sleep(5)

@task
def say_hello(self):
self.client.publish("hello/locust", b"hello world")
13 changes: 8 additions & 5 deletions locust_plugins/users/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ def __init__(
self._publish_requests: dict[int, PublishedMessageContext] = {}
self._subscribe_requests: dict[int, SubscribeContext] = {}

def _generate_event_name(self, event_type: str, qos: int, topic: str):
return _generate_mqtt_event_name(event_type, qos, topic)

def _on_publish_cb(
self,
client: mqtt.Client,
Expand All @@ -143,7 +146,7 @@ def _on_publish_cb(
# fire successful publish event
self.environment.events.request.fire(
request_type=REQUEST_TYPE,
name=_generate_mqtt_event_name("publish", request_context.qos, request_context.topic),
name=self._generate_event_name("publish", request_context.qos, request_context.topic),
response_time=(cb_time - request_context.start_time) * 1000,
response_length=request_context.payload_size,
exception=None,
Expand Down Expand Up @@ -180,7 +183,7 @@ def _on_subscribe_cb(
if SUBACK_FAILURE in granted_qos:
self.environment.events.request.fire(
request_type=REQUEST_TYPE,
name=_generate_mqtt_event_name("subscribe", request_context.qos, request_context.topic),
name=self._generate_event_name("subscribe", request_context.qos, request_context.topic),
response_time=(cb_time - request_context.start_time) * 1000,
response_length=0,
exception=AssertionError(f"Broker returned an error response during subscription: {granted_qos}"),
Expand All @@ -193,7 +196,7 @@ def _on_subscribe_cb(
# fire successful subscribe event
self.environment.events.request.fire(
request_type=REQUEST_TYPE,
name=_generate_mqtt_event_name("subscribe", request_context.qos, request_context.topic),
name=self._generate_event_name("subscribe", request_context.qos, request_context.topic),
response_time=(cb_time - request_context.start_time) * 1000,
response_length=0,
exception=None,
Expand Down Expand Up @@ -287,7 +290,7 @@ def publish(
if publish_info.rc != mqtt.MQTT_ERR_SUCCESS:
self.environment.events.request.fire(
request_type=REQUEST_TYPE,
name=_generate_mqtt_event_name("publish", request_context.qos, request_context.topic),
name=self._generate_event_name("publish", request_context.qos, request_context.topic),
response_time=0,
response_length=0,
exception=publish_info.rc,
Expand Down Expand Up @@ -325,7 +328,7 @@ def subscribe(
if result != mqtt.MQTT_ERR_SUCCESS:
self.environment.events.request.fire(
request_type=REQUEST_TYPE,
name=_generate_mqtt_event_name("subscribe", request_context.qos, request_context.topic),
name=self._generate_event_name("subscribe", request_context.qos, request_context.topic),
response_time=0,
response_length=0,
exception=result,
Expand Down
Loading