diff --git a/tests/test_mqttv5.py b/tests/test_mqttv5.py index 29bc2feb..83a1fdd8 100644 --- a/tests/test_mqttv5.py +++ b/tests/test_mqttv5.py @@ -17,6 +17,7 @@ """ import logging +import queue import sys import threading import time @@ -30,76 +31,72 @@ from paho.mqtt.properties import Properties from paho.mqtt.subscribeoptions import SubscribeOptions +DEFAULT_TIMEOUT = 5 +# timeout for something that should not happen but we wait to +# give it time to happen if it does due to a bug. +WAIT_NON_EVENT_TIMEOUT = 1 class Callbacks: def __init__(self): - self.messages = [] - self.publisheds = [] - self.subscribeds = [] - self.unsubscribeds = [] - self.disconnecteds = [] - self.connecteds = [] - self.conn_failures = [] + self.messages = queue.Queue() + self.publisheds = queue.Queue() + self.subscribeds = queue.Queue() + self.unsubscribeds = queue.Queue() + self.disconnecteds = queue.Queue() + self.connecteds = queue.Queue() + self.conn_failures = queue.Queue() def __str__(self): - return str(self.messages) + str(self.messagedicts) + str(self.publisheds) + \ - str(self.subscribeds) + \ - str(self.unsubscribeds) + str(self.disconnects) + return str(self.messages.queue) + str(self.messagedicts.queue) + str(self.publisheds.queue) + \ + str(self.subscribeds.queue) + \ + str(self.unsubscribeds.queue) + str(self.disconnects.queue) def clear(self): self.__init__() def on_connect(self, client, userdata, flags, reasonCode, properties): - self.connecteds.append({"userdata": userdata, "flags": flags, - "reasonCode": reasonCode, "properties": properties}) + self.connecteds.put({"userdata": userdata, "flags": flags, + "reasonCode": reasonCode, "properties": properties}) def on_connect_fail(self, client, userdata): - self.conn_failures.append({"userdata": userdata}) - - def wait(self, alist, timeout=2): - interval = 0.2 - total = 0 - while len(alist) == 0 and total < timeout: - time.sleep(interval) - total += interval - return alist.pop(0) # if len(alist) > 0 else None + self.conn_failures.put({"userdata": userdata}) def wait_connect_fail(self): - return self.wait(self.conn_failures, timeout=10) + return self.conn_failures.get(timeout=10) def wait_connected(self): - return self.wait(self.connecteds) + return self.connecteds.get(timeout=2) def on_disconnect(self, client, userdata, reasonCode, properties=None): - self.disconnecteds.append( + self.disconnecteds.put( {"reasonCode": reasonCode, "properties": properties}) def wait_disconnected(self): - return self.wait(self.disconnecteds) + return self.disconnecteds.get(timeout=2) def on_message(self, client, userdata, message): - self.messages.append({"userdata": userdata, "message": message}) + self.messages.put({"userdata": userdata, "message": message}) def published(self, client, userdata, msgid): - self.publisheds.append(msgid) + self.publisheds.put(msgid) def wait_published(self): - return self.wait(self.publisheds) + return self.publisheds.get(timeout=2) def on_subscribe(self, client, userdata, mid, reasonCodes, properties): - self.subscribeds.append({"mid": mid, "userdata": userdata, - "properties": properties, "reasonCodes": reasonCodes}) + self.subscribeds.put({"mid": mid, "userdata": userdata, + "properties": properties, "reasonCodes": reasonCodes}) def wait_subscribed(self): - return self.wait(self.subscribeds) + return self.subscribeds.get(timeout=2) def unsubscribed(self, client, userdata, mid, properties, reasonCodes): - self.unsubscribeds.append({"mid": mid, "userdata": userdata, - "properties": properties, "reasonCodes": reasonCodes}) + self.unsubscribeds.put({"mid": mid, "userdata": userdata, + "properties": properties, "reasonCodes": reasonCodes}) def wait_unsubscribed(self): - return self.wait(self.unsubscribeds) + return self.unsubscribeds.get(timeout=2) def on_log(self, client, userdata, level, buf): print(buf) @@ -114,6 +111,33 @@ def register(self, client): client.on_connect_fail = self.on_connect_fail client.on_log = self.on_log + def get_messages(self, count: int, timeout: float = DEFAULT_TIMEOUT): + result = [] + deadline = time.time() + timeout + while len(result) < count: + get_timeout = deadline - time.time() + if get_timeout <= 0: + result.append(self.messages.get_nowait()) + else: + result.append(self.messages.get(timeout=get_timeout)) + + return result + + def get_at_most_messages(self, count: int, timeout: float = DEFAULT_TIMEOUT): + result = [] + deadline = time.time() + timeout + try: + while len(result) < count: + get_timeout = deadline - time.time() + if get_timeout <= 0: + result.append(self.messages.get_nowait()) + else: + result.append(self.messages.get(timeout=get_timeout)) + except queue.Empty: + pass + + return result + def cleanRetained(port): callback = Callbacks() @@ -122,19 +146,22 @@ def cleanRetained(port): b"clean retained", protocol=paho.mqtt.client.MQTTv5, ) - curclient.loop_start() callback.register(curclient) curclient.connect(host="localhost", port=port) + curclient.loop_start() callback.wait_connected() curclient.subscribe("#", options=SubscribeOptions(qos=0)) callback.wait_subscribed() # wait for retained messages to arrive - time.sleep(1) - for message in callback.messages: - logging.info("deleting retained message for topic", message["message"]) - curclient.publish(message["message"].topic, b"", 0, retain=True) + try: + while True: + message = callback.messages.get(timeout=WAIT_NON_EVENT_TIMEOUT) + if message["message"].payload != b"": + logging.info("deleting retained message for topic", message["message"]) + curclient.publish(message["message"].topic, b"", 0, retain=True) + except queue.Empty: + pass curclient.disconnect() curclient.loop_stop() - time.sleep(.1) def cleanup(port): @@ -142,18 +169,18 @@ def cleanup(port): print("clean up starting") clientids = ("aclient", "bclient") + def _on_connect(client, *args): + client.disconnect() + for clientid in clientids: curclient = paho.mqtt.client.Client( CallbackAPIVersion.VERSION1, clientid.encode("utf-8"), protocol=paho.mqtt.client.MQTTv5, ) - curclient.loop_start() + curclient.on_connect = _on_connect curclient.connect(host="localhost", port=port, clean_start=True) - time.sleep(.1) - curclient.disconnect() - time.sleep(.1) - curclient.loop_stop() + curclient.loop_forever() # clean retained messages cleanRetained(port) @@ -185,9 +212,15 @@ def setUpClass(cls): cls._test_broker.daemon = True cls._test_broker.start() # Wait a bit for TCP server to bind to an address - time.sleep(0.5) - # Hack to find the port used by the test broker... - cls._test_broker_port = mqtt.brokers.listeners.TCPListeners.server.socket.getsockname()[1] + for _ in range(20): + time.sleep(0.1) + if mqtt.brokers.listeners.TCPListeners.server is not None: + port = mqtt.brokers.listeners.TCPListeners.server.socket.getsockname()[1] + if port != 0: + cls._test_broker_port = port + break + else: + raise ValueError("can't find the test broker port") setData() cleanup(cls._test_broker_port) @@ -209,31 +242,33 @@ def tearDownClass(cls): mqtt.brokers.listeners.TCPListeners.server.shutdown() cls._test_broker.join(5) - def waitfor(self, queue, depth, limit): - total = 0 - while len(queue) < depth and total < limit: - interval = .5 - total += interval - time.sleep(interval) - def test_basic(self): + import datetime + print(datetime.datetime.now(), "start") aclient.connect(host="localhost", port=self._test_broker_port) aclient.loop_start() + print(datetime.datetime.now(), "loop_start") response = callback.wait_connected() + print(datetime.datetime.now(), "connected") self.assertEqual(response["reasonCode"].getName(), "Success") aclient.subscribe(topics[0], options=SubscribeOptions(qos=2)) response = callback.wait_subscribed() + print(datetime.datetime.now(), "wait_subscribed") self.assertEqual(response["reasonCodes"][0].getName(), "Granted QoS 2") aclient.publish(topics[0], b"qos 0") aclient.publish(topics[0], b"qos 1", 1) aclient.publish(topics[0], b"qos 2", 2) - i = 0 - while len(callback.messages) < 3 and i < 10: - time.sleep(.2) - i += 1 - self.assertEqual(len(callback.messages), 3) + + msgs = callback.get_messages(3) + print(datetime.datetime.now(), "publish get") + got_payload = { + x["message"].payload + for x in msgs + } + + self.assertEqual(got_payload, {b"qos 0", b"qos 1", b"qos 2"}) aclient.disconnect() callback.clear() @@ -268,26 +303,27 @@ def test_retained_message(self): aclient.publish(topics[3], b"qos 2", 2, retain=True, properties=publish_properties) # wait until those messages are published - time.sleep(1) + time.sleep(WAIT_NON_EVENT_TIMEOUT) aclient.subscribe(wildtopics[5], options=SubscribeOptions(qos=2)) response = callback.wait_subscribed() self.assertEqual(response["reasonCodes"][0].getName(), "Granted QoS 2") + msgs = callback.get_messages(3) - time.sleep(1) aclient.disconnect() aclient.loop_stop() - self.assertEqual(len(callback.messages), 3) - userprops = callback.messages[0]["message"].properties.UserProperty + self.assertTrue(callback.messages.empty()) + + userprops = msgs[0]["message"].properties.UserProperty self.assertTrue(userprops in [[("a", "2"), ("c", "3")], [ ("c", "3"), ("a", "2")]], userprops) - userprops = callback.messages[1]["message"].properties.UserProperty + userprops = msgs[1]["message"].properties.UserProperty self.assertTrue(userprops in [[("a", "2"), ("c", "3")], [ ("c", "3"), ("a", "2")]], userprops) - userprops = callback.messages[2]["message"].properties.UserProperty + userprops = msgs[2]["message"].properties.UserProperty self.assertTrue(userprops in [[("a", "2"), ("c", "3")], [ ("c", "3"), ("a", "2")]], userprops) - qoss = [callback.messages[i]["message"].qos for i in range(3)] + qoss = [x["message"].qos for x in msgs] self.assertTrue(1 in qoss and 2 in qoss and 0 in qoss, qoss) cleanRetained(self._test_broker_port) @@ -296,7 +332,7 @@ def test_will_message(self): # will messages and keep alive callback.clear() callback2.clear() - self.assertEqual(len(callback2.messages), 0, callback2.messages) + self.assertTrue(callback2.messages.empty(), callback2.messages.queue) will_properties = Properties(PacketTypes.WILLMESSAGE) will_properties.WillDelayInterval = 0 # this is the default anyway @@ -318,12 +354,11 @@ def test_will_message(self): # keep alive timeout ought to be triggered so the will message is received aclient.loop_stop() # so that pings aren't sent - self.waitfor(callback2.messages, 1, 10) + msg = callback2.messages.get(timeout=10) bclient.disconnect() bclient.loop_stop() - # should have the will message - self.assertEqual(len(callback2.messages), 1, callback2.messages) - props = callback2.messages[0]["message"].properties + + props = msg["message"].properties self.assertEqual(props.UserProperty, [("a", "2"), ("c", "3")]) def test_zero_length_clientid(self): @@ -391,10 +426,14 @@ def test_offline_message_queueing(self): bclient.loop_start() bclient.connect(host="localhost", port=self._test_broker_port) callback2.wait_connected() - bclient.publish(topics[1], b"qos 0", 0) - bclient.publish(topics[2], b"qos 1", 1) - bclient.publish(topics[3], b"qos 2", 2) - time.sleep(2) + msg1 = bclient.publish(topics[1], b"qos 0", 0) + msg2 = bclient.publish(topics[2], b"qos 1", 1) + msg3 = bclient.publish(topics[3], b"qos 2", 2) + + msg1.wait_for_publish() + msg2.wait_for_publish() + msg3.wait_for_publish() + bclient.disconnect() bclient.loop_stop() @@ -405,14 +444,16 @@ def test_offline_message_queueing(self): oclient.loop_start() oclient.connect(host="localhost", port=self._test_broker_port, clean_start=False) ocallback.wait_connected() - time.sleep(2) + + msgs = ocallback.get_at_most_messages(3) + oclient.disconnect() oclient.loop_stop() - self.assertTrue(len(ocallback.messages) in [ - 2, 3], len(ocallback.messages)) + self.assertTrue(len(msgs) in [ + 2, 3], ocallback.messages.qsize()) logging.info("This server %s queueing QoS 0 messages for offline clients" % - ("is" if len(ocallback.messages) == 3 else "is not")) + ("is" if len(msgs) == 3 else "is not")) def test_overlapping_subscriptions(self): # overlapping subscriptions. When there is more than one matching subscription for the same client for a topic, @@ -434,18 +475,18 @@ def test_overlapping_subscriptions(self): ocallback.wait_subscribed() oclient.publish(topics[3], b"overlapping topic filters", 2) ocallback.wait_published() - time.sleep(1) - self.assertTrue(len(ocallback.messages) in [1, 2], ocallback.messages) - if len(ocallback.messages) == 1: + + msgs = ocallback.get_at_most_messages(2) + if len(msgs) == 1: logging.info( "This server is publishing one message for all matching overlapping subscriptions, not one for each.") self.assertEqual( - ocallback.messages[0]["message"].qos, 2, ocallback.messages[0]["message"].qos) + msgs[0]["message"].qos, 2, msgs[0]["message"].qos) else: logging.info( "This server is publishing one message per each matching overlapping subscription.") - self.assertTrue((ocallback.messages[0]["message"].qos == 2 and ocallback.messages[1]["message"].qos == 1) or - (ocallback.messages[0]["message"].qos == 1 and ocallback.messages[1]["message"].qos == 2), callback.messages) + self.assertTrue((msgs[0]["message"].qos == 2 and msgs[1]["message"].qos == 1) or + (msgs[0]["message"].qos == 1 and msgs[1]["message"].qos == 2), msgs) oclient.disconnect() oclient.loop_stop() ocallback.clear() @@ -495,13 +536,14 @@ def test_unsubscribe(self): aclient.publish(topics[0], b"topic 0 - unsubscribed", 1, retain=False) aclient.publish(topics[1], b"topic 1", 1, retain=False) aclient.publish(topics[2], b"topic 2", 1, retain=False) - time.sleep(2) + + msgs = callback2.get_messages(2) bclient.disconnect() bclient.loop_stop() aclient.disconnect() aclient.loop_stop() - self.assertEqual(len(callback2.messages), 2, callback2.messages) + self.assertEqual(len(msgs), 2) def new_client(self, clientid): callback = Callbacks() @@ -644,24 +686,23 @@ def test_user_properties(self): properties=publish_properties) uclient.publish(topics[0], b"", 2, retain=False, properties=publish_properties) - count = 0 - while len(ucallback.messages) < 3 and count < 50: - time.sleep(.1) - count += 1 + + msgs = ucallback.get_messages(3) + uclient.disconnect() ucallback.wait_disconnected() uclient.loop_stop() - self.assertEqual(len(ucallback.messages), 3, ucallback.messages) - userprops = ucallback.messages[0]["message"].properties.UserProperty + self.assertTrue(ucallback.messages.empty(), ucallback.messages.queue) + userprops = msgs[0]["message"].properties.UserProperty self.assertTrue(userprops in [[("a", "2"), ("c", "3")], [ ("c", "3"), ("a", "2")]], userprops) - userprops = ucallback.messages[1]["message"].properties.UserProperty + userprops = msgs[1]["message"].properties.UserProperty self.assertTrue(userprops in [[("a", "2"), ("c", "3")], [ ("c", "3"), ("a", "2")]], userprops) - userprops = ucallback.messages[2]["message"].properties.UserProperty + userprops = msgs[2]["message"].properties.UserProperty self.assertTrue(userprops in [[("a", "2"), ("c", "3")], [ ("c", "3"), ("a", "2")]], userprops) - qoss = [ucallback.messages[i]["message"].qos for i in range(3)] + qoss = [x["message"].qos for x in msgs] self.assertTrue(1 in qoss and 2 in qoss and 0 in qoss, qoss) def test_payload_format(self): @@ -686,28 +727,26 @@ def test_payload_format(self): topics[0], b"qos 2", 2, retain=False, properties=publish_properties) info.wait_for_publish() - count = 0 - while len(pcallback.messages) < 3 and count < 50: - time.sleep(.1) - count += 1 + msgs = pcallback.get_messages(3) + pclient.disconnect() pcallback.wait_disconnected() pclient.loop_stop() - self.assertEqual(len(pcallback.messages), 3, pcallback.messages) - props = pcallback.messages[0]["message"].properties + self.assertTrue(pcallback.messages.empty(), pcallback.messages.queue) + props = msgs[0]["message"].properties self.assertEqual(props.ContentType, "My name", props.ContentType) self.assertEqual(props.PayloadFormatIndicator, 1, props.PayloadFormatIndicator) - props = pcallback.messages[1]["message"].properties + props = msgs[1]["message"].properties self.assertEqual(props.ContentType, "My name", props.ContentType) self.assertEqual(props.PayloadFormatIndicator, 1, props.PayloadFormatIndicator) - props = pcallback.messages[2]["message"].properties + props = msgs[2]["message"].properties self.assertEqual(props.ContentType, "My name", props.ContentType) self.assertEqual(props.PayloadFormatIndicator, 1, props.PayloadFormatIndicator) - qoss = [pcallback.messages[i]["message"].qos for i in range(3)] + qoss = [x["message"].qos for x in msgs] self.assertTrue(1 in qoss and 2 in qoss and 0 in qoss, qoss) def test_message_expiry(self): @@ -750,13 +789,14 @@ def test_message_expiry(self): lbclient.loop_start() lbclient.connect(host="localhost", port=self._test_broker_port, clean_start=False) lbcallback.wait_connected() - self.waitfor(lbcallback.messages, 1, 3) - time.sleep(1) - self.assertEqual(len(lbcallback.messages), 2, lbcallback.messages) - self.assertTrue(lbcallback.messages[0]["message"].properties.MessageExpiryInterval < 6, - lbcallback.messages[0]["message"].properties.MessageExpiryInterval) - self.assertTrue(lbcallback.messages[1]["message"].properties.MessageExpiryInterval < 6, - lbcallback.messages[1]["message"].properties.MessageExpiryInterval) + + msgs = lbcallback.get_messages(2) + + self.assertTrue(lbcallback.messages.empty(), lbcallback.messages.queue) + self.assertTrue(msgs[0]["message"].properties.MessageExpiryInterval < 6, + msgs[0]["message"].properties.MessageExpiryInterval) + self.assertTrue(msgs[1]["message"].properties.MessageExpiryInterval < 6, + msgs[1]["message"].properties.MessageExpiryInterval) laclient.disconnect() lacallback.wait_disconnected() laclient.loop_stop() @@ -786,11 +826,16 @@ def test_subscribe_options(self): lbcallback.wait_subscribed() laclient.publish(topics[0], b"noLocal test", 1, retain=False) - self.waitfor(lbcallback.messages, 1, 3) - time.sleep(1) - self.assertEqual(lacallback.messages, [], lacallback.messages) - self.assertEqual(len(lbcallback.messages), 1, lbcallback.messages) + lbcallback.messages.get(timeout=DEFAULT_TIMEOUT) + try: + lacallback.messages.get(timeout=WAIT_NON_EVENT_TIMEOUT) + raise ValueError("unexpected message received") + except queue.Empty: + pass + + self.assertTrue(lacallback.messages.empty(), lacallback.messages.queue) + self.assertTrue(lbcallback.messages.empty(), lbcallback.messages.queue) laclient.disconnect() lacallback.wait_disconnected() lbclient.disconnect() @@ -806,21 +851,19 @@ def test_subscribe_options(self): laclient.subscribe(topics[0], options=SubscribeOptions( qos=2, retainAsPublished=True)) lacallback.wait_subscribed() - self.waitfor(lacallback.subscribeds, 1, 3) laclient.publish( topics[0], b"retain as published false", 1, retain=False) laclient.publish( topics[0], b"retain as published true", 1, retain=True) - self.waitfor(lacallback.messages, 2, 3) - time.sleep(1) + msgs = lacallback.get_messages(2) - self.assertEqual(len(lacallback.messages), 2, lacallback.messages) + self.assertTrue(lacallback.messages.empty(), lacallback.messages.queue) laclient.disconnect() lacallback.wait_disconnected() laclient.loop_stop() - self.assertEqual(lacallback.messages[0]["message"].retain, False) - self.assertEqual(lacallback.messages[1]["message"].retain, True) + self.assertEqual(msgs[0]["message"].retain, False) + self.assertEqual(msgs[1]["message"].retain, True) # retainHandling clientid = 'subscribe options - retain handling' @@ -836,15 +879,18 @@ def test_subscribe_options(self): laclient.subscribe( wildtopics[5], options=SubscribeOptions(2, retainHandling=1)) lacallback.wait_subscribed() - self.assertEqual(len(lacallback.messages), 3) - qoss = [lacallback.messages[i]["message"].qos for i in range(3)] + + msgs = lacallback.get_messages(3) + + self.assertTrue(lacallback.messages.empty()) + qoss = [x["message"].qos for x in msgs] self.assertTrue(1 in qoss and 2 in qoss and 0 in qoss, qoss) lacallback.clear() laclient.subscribe( wildtopics[5], options=SubscribeOptions(2, retainHandling=1)) lacallback.wait_subscribed() time.sleep(1) - self.assertEqual(len(lacallback.messages), 0) + self.assertTrue(lacallback.messages.empty()) # remove that subscription properties = Properties(PacketTypes.UNSUBSCRIBE) @@ -857,15 +903,15 @@ def test_subscribe_options(self): laclient.subscribe( wildtopics[5], options=SubscribeOptions(2, retainHandling=1)) lacallback.wait_subscribed() - self.assertEqual(len(lacallback.messages), 3) - qoss = [lacallback.messages[i]["message"].qos for i in range(3)] + msgs = lacallback.get_messages(3) + qoss = [x["message"].qos for x in msgs] self.assertTrue(1 in qoss and 2 in qoss and 0 in qoss, qoss) lacallback.clear() laclient.subscribe( wildtopics[5], options=SubscribeOptions(2, retainHandling=1)) lacallback.wait_subscribed() - time.sleep(1) - self.assertEqual(len(lacallback.messages), 0) + time.sleep(WAIT_NON_EVENT_TIMEOUT) + self.assertTrue(lacallback.messages.empty()) # remove that subscription properties = Properties(PacketTypes.UNSUBSCRIBE) @@ -878,11 +924,11 @@ def test_subscribe_options(self): laclient.subscribe( wildtopics[5], options=SubscribeOptions(2, retainHandling=2)) lacallback.wait_subscribed() - self.assertEqual(len(lacallback.messages), 0) + self.assertTrue(lacallback.messages.empty()) laclient.subscribe( wildtopics[5], options=SubscribeOptions(2, retainHandling=2)) lacallback.wait_subscribed() - self.assertEqual(len(lacallback.messages), 0) + self.assertTrue(lacallback.messages.empty()) # remove that subscription laclient.unsubscribe(wildtopics[5]) @@ -891,15 +937,14 @@ def test_subscribe_options(self): laclient.subscribe( wildtopics[5], options=SubscribeOptions(2, retainHandling=0)) lacallback.wait_subscribed() - self.assertEqual(len(lacallback.messages), 3) - qoss = [lacallback.messages[i]["message"].qos for i in range(3)] + msgs = lacallback.get_messages(3) + qoss = [x["message"].qos for x in msgs] self.assertTrue(1 in qoss and 2 in qoss and 0 in qoss, qoss) lacallback.clear() laclient.subscribe( wildtopics[5], options=SubscribeOptions(2, retainHandling=0)) - time.sleep(1) - self.assertEqual(len(lacallback.messages), 3) - qoss = [lacallback.messages[i]["message"].qos for i in range(3)] + msgs = lacallback.get_messages(3) + qoss = [x["message"].qos for x in msgs] self.assertTrue(1 in qoss and 2 in qoss and 0 in qoss, qoss) laclient.disconnect() lacallback.wait_disconnected() @@ -935,19 +980,19 @@ def test_subscription_identifiers(self): lbclient.publish(topics[0], b"sub identifier test", 1, retain=False) - self.waitfor(lacallback.messages, 1, 3) - self.assertEqual(len(lacallback.messages), 1, lacallback.messages) - self.assertEqual(lacallback.messages[0]["message"].properties.SubscriptionIdentifier[0], - 456789, lacallback.messages[0]["message"].properties.SubscriptionIdentifier) + msg = lacallback.messages.get(timeout=DEFAULT_TIMEOUT) + self.assertTrue(lacallback.messages.empty(), lacallback.messages.queue) + self.assertEqual(msg["message"].properties.SubscriptionIdentifier[0], + 456789, msg["message"].properties.SubscriptionIdentifier) laclient.disconnect() lacallback.wait_disconnected() laclient.loop_stop() - self.waitfor(lbcallback.messages, 1, 3) - self.assertEqual(len(lbcallback.messages), 1, lbcallback.messages) + msg = lbcallback.messages.get(timeout=DEFAULT_TIMEOUT) + self.assertTrue(lbcallback.messages.empty(), lbcallback.messages.queue) expected_subsids = {2, 3} received_subsids = set( - lbcallback.messages[0]["message"].properties.SubscriptionIdentifier) + msg["message"].properties.SubscriptionIdentifier) self.assertEqual(received_subsids, expected_subsids, received_subsids) lbclient.disconnect() lbcallback.wait_disconnected() @@ -982,19 +1027,17 @@ def test_request_response(self): properties=publish_properties) # client b is the responder - self.waitfor(lbcallback.messages, 1, 3) - self.assertEqual(len(lbcallback.messages), 1, lbcallback.messages) - self.assertEqual(lbcallback.messages[0]["message"].properties.ResponseTopic, topics[0], - lbcallback.messages[0]["message"].properties) - self.assertEqual(lbcallback.messages[0]["message"].properties.CorrelationData, b"334", - lbcallback.messages[0]["message"].properties) + msg = lbcallback.messages.get(timeout=DEFAULT_TIMEOUT) + self.assertEqual(msg["message"].properties.ResponseTopic, topics[0], + msg["message"].properties) + self.assertEqual(msg["message"].properties.CorrelationData, b"334", + msg["message"].properties) - lbclient.publish(lbcallback.messages[0]["message"].properties.ResponseTopic, b"response", 1, - properties=lbcallback.messages[0]["message"].properties) + lbclient.publish(msg["message"].properties.ResponseTopic, b"response", 1, + properties=msg["message"].properties) # client a gets the response - self.waitfor(lacallback.messages, 1, 3) - self.assertEqual(len(lacallback.messages), 1, lacallback.messages) + lacallback.messages.get(timeout=DEFAULT_TIMEOUT) laclient.disconnect() lacallback.wait_disconnected() @@ -1029,13 +1072,11 @@ def test_client_topic_alias(self): publish_properties.TopicAlias = 1 laclient.publish(topics[0], b"topic alias 1", 1, properties=publish_properties) - self.waitfor(lacallback.messages, 1, 3) - self.assertEqual(len(lacallback.messages), 1, lacallback.messages) + lacallback.messages.get(timeout=DEFAULT_TIMEOUT) laclient.publish("", b"topic alias 2", 1, properties=publish_properties) - self.waitfor(lacallback.messages, 2, 3) - self.assertEqual(len(lacallback.messages), 2, lacallback.messages) + lacallback.messages.get(timeout=DEFAULT_TIMEOUT) laclient.disconnect() # should get rid of the topic aliases but not subscriptions lacallback.wait_disconnected() @@ -1047,8 +1088,7 @@ def test_client_topic_alias(self): properties=connect_properties) laclient.publish(topics[0], b"topic alias 3", 1) - self.waitfor(lacallback.messages, 1, 3) - self.assertEqual(len(lacallback.messages), 1, lacallback.messages) + lacallback.messages.get(timeout=DEFAULT_TIMEOUT) publish_properties = Properties(PacketTypes.PUBLISH) publish_properties.TopicAlias = 1 @@ -1076,27 +1116,26 @@ def test_server_topic_alias(self): for qos in range(3): laclient.publish(topics[0], b"topic alias 1", qos) - self.waitfor(lacallback.messages, 3, 3) - self.assertEqual(len(lacallback.messages), 3, lacallback.messages) + msgs = lacallback.get_messages(3) laclient.disconnect() lacallback.wait_disconnected() laclient.loop_stop() # first message should set the topic alias self.assertTrue(hasattr( - lacallback.messages[0]["message"].properties, "TopicAlias"), lacallback.messages[0]["message"].properties) - topicalias = lacallback.messages[0]["message"].properties.TopicAlias + msgs[0]["message"].properties, "TopicAlias"), msgs[0]["message"].properties) + topicalias = msgs[0]["message"].properties.TopicAlias self.assertTrue(topicalias > 0) - self.assertEqual(lacallback.messages[0]["message"].topic, topics[0]) + self.assertEqual(msgs[0]["message"].topic, topics[0]) self.assertEqual( - lacallback.messages[1]["message"].properties.TopicAlias, topicalias) - self.assertEqual(lacallback.messages[1]["message"].topic, "") + msgs[1]["message"].properties.TopicAlias, topicalias) + self.assertEqual(msgs[1]["message"].topic, "") self.assertEqual( - lacallback.messages[2]["message"].properties.TopicAlias, topicalias) - self.assertEqual(lacallback.messages[2]["message"].topic, "") + msgs[2]["message"].properties.TopicAlias, topicalias) + self.assertEqual(msgs[2]["message"].topic, "") serverTopicAliasMaximum = 0 # no server topic alias allowed connect_properties = Properties(PacketTypes.CONNECT) @@ -1112,19 +1151,18 @@ def test_server_topic_alias(self): for qos in range(3): laclient.publish(topics[0], b"topic alias 2", qos) - self.waitfor(lacallback.messages, 3, 3) - self.assertEqual(len(lacallback.messages), 3, lacallback.messages) + msgs = lacallback.get_messages(3) laclient.disconnect() lacallback.wait_disconnected() laclient.loop_stop() # No topic aliases self.assertFalse(hasattr( - lacallback.messages[0]["message"].properties, "TopicAlias"), lacallback.messages[0]["message"].properties) + msgs[0]["message"].properties, "TopicAlias"), msgs[0]["message"].properties) self.assertFalse(hasattr( - lacallback.messages[1]["message"].properties, "TopicAlias"), lacallback.messages[1]["message"].properties) + msgs[1]["message"].properties, "TopicAlias"), msgs[1]["message"].properties) self.assertFalse(hasattr( - lacallback.messages[2]["message"].properties, "TopicAlias"), lacallback.messages[2]["message"].properties) + msgs[2]["message"].properties, "TopicAlias"), msgs[2]["message"].properties) serverTopicAliasMaximum = 0 # no server topic alias allowed connect_properties = Properties(PacketTypes.CONNECT) @@ -1140,19 +1178,18 @@ def test_server_topic_alias(self): for qos in range(3): laclient.publish(topics[0], b"topic alias 3", qos) - self.waitfor(lacallback.messages, 3, 3) - self.assertEqual(len(lacallback.messages), 3, lacallback.messages) + msgs = lacallback.get_messages(3) laclient.disconnect() lacallback.wait_disconnected() laclient.loop_stop() # No topic aliases self.assertFalse(hasattr( - lacallback.messages[0]["message"].properties, "TopicAlias"), lacallback.messages[0]["message"].properties) + msgs[0]["message"].properties, "TopicAlias"), msgs[0]["message"].properties) self.assertFalse(hasattr( - lacallback.messages[1]["message"].properties, "TopicAlias"), lacallback.messages[1]["message"].properties) + msgs[1]["message"].properties, "TopicAlias"), msgs[1]["message"].properties) self.assertFalse(hasattr( - lacallback.messages[2]["message"].properties, "TopicAlias"), lacallback.messages[2]["message"].properties) + msgs[2]["message"].properties, "TopicAlias"), msgs[2]["message"].properties) def test_maximum_packet_size(self): clientid = 'maximum packet size' @@ -1173,8 +1210,6 @@ def test_maximum_packet_size(self): laclient.publish(topics[0], payload, 0) # should get back a disconnect with packet size too big response = lacallback.wait_disconnected() - self.assertEqual(len(lacallback.disconnecteds), - 0, lacallback.disconnecteds) self.assertEqual(response["reasonCode"].getName(), "Packet too large", response["reasonCode"].getName()) else: @@ -1202,14 +1237,16 @@ def test_maximum_packet_size(self): # send a small enough packet, should get this one back payload = b"."*(int(maximumPacketSize/2)) laclient.publish(topics[0], payload, 0) - self.waitfor(lacallback.messages, 1, 3) - self.assertEqual(len(lacallback.messages), 1, lacallback.messages) + lacallback.messages.get(timeout=DEFAULT_TIMEOUT) # send a packet too big to receive payload = b"."*maximumPacketSize laclient.publish(topics[0], payload, 1) - self.waitfor(lacallback.messages, 2, 3) - self.assertEqual(len(lacallback.messages), 1, lacallback.messages) + try: + lacallback.messages.get(timeout=WAIT_NON_EVENT_TIMEOUT) + raise ValueError("unexpected message received") + except queue.Empty: + pass laclient.disconnect() lacallback.wait_disconnected() @@ -1266,13 +1303,12 @@ def test_will_delay(self): laclient.loop_stop() laclient.socket().close() start = time.time() - while lbcallback.messages == []: - time.sleep(.1) + msg = lbcallback.messages.get(DEFAULT_TIMEOUT) duration = time.time() - start self.assertAlmostEqual(duration, 4, delta=1) - self.assertEqual(lbcallback.messages[0]["message"].topic, topics[0]) + self.assertEqual(msg["message"].topic, topics[0]) self.assertEqual( - lbcallback.messages[0]["message"].payload, b"test_will_delay will message") + msg["message"].payload, b"test_will_delay will message") lbclient.disconnect() lbcallback.wait_disconnected() @@ -1314,27 +1350,48 @@ def test_shared_subscriptions(self): count = 1 for i in range(count): lbclient.publish(topics[0], f"message {i}", 0) - j = 0 - while len(lacallback.messages) + len(lbcallback.messages) < 2*count and j < 20: - time.sleep(.1) - j += 1 - time.sleep(1) - self.assertEqual(len(lacallback.messages), count) - self.assertEqual(len(lbcallback.messages), count) + + lacallback.get_messages(count) + lbcallback.get_messages(count) + + self.assertTrue(lacallback.messages.empty()) + self.assertTrue(lbcallback.messages.empty()) lacallback.clear() lbcallback.clear() for i in range(count): lbclient.publish(shared_pub_topic, f"message {i}", 0) - j = 0 - while len(lacallback.messages) + len(lbcallback.messages) < count and j < 20: - time.sleep(.1) - j += 1 - time.sleep(1) # Each message should only be received once - self.assertEqual(len(lacallback.messages) + - len(lbcallback.messages), count) + result = [] + deadline = time.time() + DEFAULT_TIMEOUT + while len(result) < count and time.time() < deadline: + get_timeout = deadline - time.time() + try: + if get_timeout <= 0: + result.append(lacallback.messages.get_nowait()) + else: + result.append(lacallback.messages.get(timeout=get_timeout)) + except queue.Empty: + # The message could be sent to other client, so empty queue + # could be normal + pass + + try: + get_timeout = deadline - time.time() + if get_timeout <= 0: + result.append(lbcallback.messages.get_nowait()) + else: + result.append(lbcallback.messages.get(timeout=get_timeout)) + except queue.Empty: + # The message could be sent to other client, so empty queue + # could be normal + pass + + self.assertEqual( + {x["message"].payload for x in result}, + {f"message {i}".encode() for i in range(count)} + ) laclient.disconnect() lacallback.wait_disconnected()