Skip to content

Commit

Permalink
Merge pull request #781 from akx/fix-v5-tests
Browse files Browse the repository at this point in the history
(Kind of) fix mqttv5 tests
  • Loading branch information
PierreF authored Dec 31, 2023
2 parents 24fd02d + 2133e49 commit 03f2979
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 63 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/tox.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v4
with:
repository: eclipse/paho.mqtt.testing
ref: a4dc694010217b291ee78ee13a6d1db812f9babd
path: paho.mqtt.testing
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python }}
Expand Down
2 changes: 2 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ Once you have the code, it can be installed from your repository as well:
To perform all test (including MQTT v5 test), you also need to clone paho.mqtt.testing in paho.mqtt.python folder::

git clone https://github.com/eclipse/paho.mqtt.testing.git
cd paho.mqtt.testing
git checkout a4dc694010217b291ee78ee13a6d1db812f9babd

Known limitations
-----------------
Expand Down
3 changes: 0 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,3 @@ line-length = 167
"S105",
"S106",
]
"tests/test_mqttv5.py" = [
"F841", # TODO: fix when fixing this test file
]
95 changes: 35 additions & 60 deletions tests/test_mqttv5.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import threading
import time
import unittest
import unittest.mock

import paho.mqtt
import paho.mqtt.client
Expand Down Expand Up @@ -120,9 +121,9 @@ def cleanRetained(port):
curclient.loop_start()
callback.register(curclient)
curclient.connect(host="localhost", port=port)
response = callback.wait_connected()
callback.wait_connected()
curclient.subscribe("#", options=SubscribeOptions(qos=0))
response = callback.wait_subscribed() # wait for retained messages to arrive
callback.wait_subscribed() # wait for retained messages to arrive
time.sleep(1)
for message in callback.messages:
logging.info("deleting retained message for topic", message["message"])
Expand Down Expand Up @@ -164,18 +165,22 @@ def setUpClass(cls):
except ImportError as ie:
raise unittest.SkipTest("paho.mqtt.testing not present.") from ie

cls._test_broker = threading.Thread(
target=mqtt.brokers.run,
kwargs={
"config": ["listener 0"],
},
)
cls._test_broker.daemon = True
cls._test_broker.start()
# Wait a bit for TCP server to bind to an address
time.sleep(0.5)
# Hack to find the port used by the test broker...
cls._test_broker_port = mqtt.brokers.listeners.TCPListeners.server.socket.getsockname()[1]
# Hack: we need to patch `signal.signal()` because `mqtt.brokers.run()`
# calls it to set up a signal handler; however, that won't work
# from a thread...
with unittest.mock.patch("signal.signal", unittest.mock.MagicMock()):
cls._test_broker = threading.Thread(
target=mqtt.brokers.run,
kwargs={
"config": ["listener 0"],
},
)
cls._test_broker.daemon = True
cls._test_broker.start()
# Wait a bit for TCP server to bind to an address
time.sleep(0.5)
# Hack to find the port used by the test broker...
cls._test_broker_port = mqtt.brokers.listeners.TCPListeners.server.socket.getsockname()[1]
setData()
cleanup(cls._test_broker_port)

Expand Down Expand Up @@ -239,10 +244,6 @@ def test_connect_fail(self):
fclient.loop_stop()

def test_retained_message(self):
qos0topic = "fromb/qos 0"
qos1topic = "fromb/qos 1"
qos2topic = "fromb/qos2"
wildcardtopic = "fromb/+"

publish_properties = Properties(PacketTypes.PUBLISH)
publish_properties.UserProperty = ("a", "2")
Expand Down Expand Up @@ -372,15 +373,15 @@ def test_offline_message_queueing(self):
connect_properties.SessionExpiryInterval = 99999
oclient.loop_start()
oclient.connect(host="localhost", port=self._test_broker_port, properties=connect_properties)
response = ocallback.wait_connected()
ocallback.wait_connected()
oclient.subscribe(wildtopics[5], qos=2)
response = ocallback.wait_subscribed()
ocallback.wait_subscribed()
oclient.disconnect()
oclient.loop_stop()

bclient.loop_start()
bclient.connect(host="localhost", port=self._test_broker_port)
response = callback2.wait_connected()
callback2.wait_connected()
bclient.publish(topics[1], b"qos 0", 0)
bclient.publish(topics[2], b"qos 1", 1)
bclient.publish(topics[3], b"qos 2", 2)
Expand All @@ -393,7 +394,7 @@ def test_offline_message_queueing(self):
ocallback.register(oclient)
oclient.loop_start()
oclient.connect(host="localhost", port=self._test_broker_port, clean_start=False)
response = ocallback.wait_connected()
ocallback.wait_connected()
time.sleep(2)
oclient.disconnect()
oclient.loop_stop()
Expand Down Expand Up @@ -653,10 +654,10 @@ def test_payload_format(self):
pclient, pcallback = self.new_client(clientid)
pclient.loop_start()
pclient.connect_async(host="localhost", port=self._test_broker_port)
response = pcallback.wait_connected()
pcallback.wait_connected()

pclient.subscribe(topics[0], qos=2)
response = pcallback.wait_subscribed()
pcallback.wait_subscribed()
publish_properties = Properties(PacketTypes.PUBLISH)
publish_properties.PayloadFormatIndicator = 1
publish_properties.ContentType = "My name"
Expand Down Expand Up @@ -703,9 +704,9 @@ def test_message_expiry(self):
lbclient, lbcallback = self.new_client(clientid+" b")
lbclient.loop_start()
lbclient.connect(host="localhost", port=self._test_broker_port, properties=connect_properties)
response = lbcallback.wait_connected()
lbcallback.wait_connected()
lbclient.subscribe(topics[0], qos=2)
response = lbcallback.wait_subscribed()
lbcallback.wait_subscribed()
disconnect_properties = Properties(PacketTypes.DISCONNECT)
disconnect_properties.SessionExpiryInterval = 999999999
lbclient.disconnect(properties=disconnect_properties)
Expand Down Expand Up @@ -835,7 +836,7 @@ def test_subscribe_options(self):
properties.UserProperty = ("a", "2")
properties.UserProperty = ("c", "3")
laclient.unsubscribe(wildtopics[5], properties)
response = lacallback.wait_unsubscribed()
lacallback.wait_unsubscribed()

# check that we really did remove that subscription
laclient.subscribe(
Expand All @@ -856,7 +857,7 @@ def test_subscribe_options(self):
properties.UserProperty = ("a", "2")
properties.UserProperty = ("c", "3")
laclient.unsubscribe(wildtopics[5], properties)
response = lacallback.wait_unsubscribed()
lacallback.wait_unsubscribed()

lacallback.clear()
laclient.subscribe(
Expand All @@ -870,7 +871,7 @@ def test_subscribe_options(self):

# remove that subscription
laclient.unsubscribe(wildtopics[5])
response = lacallback.wait_unsubscribed()
lacallback.wait_unsubscribed()

laclient.subscribe(
wildtopics[5], options=SubscribeOptions(2, retainHandling=0))
Expand Down Expand Up @@ -990,21 +991,6 @@ def test_request_response(self):
def test_client_topic_alias(self):
clientid = 'client topic alias'

# no server side topic aliases allowed
laclient, lacallback = self.new_client(clientid+" a")
laclient.connect(host="localhost", port=self._test_broker_port)
connack = lacallback.wait_connected()
laclient.loop_start()

publish_properties = Properties(PacketTypes.PUBLISH)
publish_properties.TopicAlias = 0 # topic alias 0 not allowed
laclient.publish(topics[0], "topic alias 0", 1,
properties=publish_properties)

# should get back a disconnect with Topic alias invalid
lacallback.wait_disconnected()
laclient.loop_stop()

connect_properties = Properties(PacketTypes.CONNECT)
connect_properties.TopicAliasMaximum = 0 # server topic aliases not allowed
connect_properties.SessionExpiryInterval = 99999
Expand Down Expand Up @@ -1067,11 +1053,8 @@ def test_server_topic_alias(self):

laclient, lacallback = self.new_client(clientid+" a")
laclient.connect(host="localhost", port=self._test_broker_port, properties=connect_properties)
connack = lacallback.wait_connected()
lacallback.wait_connected()
laclient.loop_start()
clientTopicAliasMaximum = 0
if hasattr(connack["properties"], "TopicAliasMaximum"):
clientTopicAliasMaximum = connack["properties"].TopicAliasMaximum

laclient.subscribe(topics[0], qos=2)
lacallback.wait_subscribed()
Expand Down Expand Up @@ -1106,13 +1089,9 @@ def test_server_topic_alias(self):

laclient, lacallback = self.new_client(clientid+" a")
laclient.connect(host="localhost", port=self._test_broker_port, properties=connect_properties)
connack = lacallback.wait_connected()
lacallback.wait_connected()
laclient.loop_start()

clientTopicAliasMaximum = 0
if hasattr(connack["properties"], "TopicAliasMaximum"):
clientTopicAliasMaximum = connack["properties"].TopicAliasMaximum

laclient.subscribe(topics[0], qos=2)
lacallback.wait_subscribed()

Expand All @@ -1138,13 +1117,9 @@ def test_server_topic_alias(self):

laclient, lacallback = self.new_client(clientid+" a")
laclient.connect(host="localhost", port=self._test_broker_port, properties=connect_properties)
connack = lacallback.wait_connected()
lacallback.wait_connected()
laclient.loop_start()

clientTopicAliasMaximum = 0
if hasattr(connack["properties"], "TopicAliasMaximum"):
clientTopicAliasMaximum = connack["properties"].TopicAliasMaximum

laclient.subscribe(topics[0], qos=2)
lacallback.wait_subscribed()

Expand Down Expand Up @@ -1304,7 +1279,7 @@ def test_shared_subscriptions(self):

laclient.subscribe(
[(shared_sub_topic, SubscribeOptions(2)), (topics[0], SubscribeOptions(2))])
response = lacallback.wait_subscribed()
lacallback.wait_subscribed()

lbclient, lbcallback = self.new_client(clientid+" b")
lbclient.connect(host="localhost", port=self._test_broker_port)
Expand All @@ -1316,7 +1291,7 @@ def test_shared_subscriptions(self):

lbclient.subscribe(
[(shared_sub_topic, SubscribeOptions(2)), (topics[0], 2)])
response = lbcallback.wait_subscribed()
lbcallback.wait_subscribed()

lacallback.clear()
lbcallback.clear()
Expand Down

0 comments on commit 03f2979

Please sign in to comment.