Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTTv5 and QoS 2 does not work #696

Closed
thatsdone opened this issue Dec 24, 2022 · 16 comments
Closed

MQTTv5 and QoS 2 does not work #696

thatsdone opened this issue Dec 24, 2022 · 16 comments
Labels
Status: Completed Nothing further to be done with this issue, it can be closed by the requestor or committer. Type: Bug

Comments

@thatsdone
Copy link

thatsdone commented Dec 24, 2022

Hi,

I noticed currently MQTTv5 and QoS 2 does not work.
I found a similar issue in MQTTv3 days (6 years ago!): #103,
and are there similar problems in MQTTv5?

Distro: Ubuntu 22.04(amd64)
Python: 3.10.6
Paho: paho-mqtt 1.6.1
MQTT Broker : EMQX Community Edition docker image (emqx/emqx:4.4.11)

$ python3 server.py 2
qos = 2
on_log(): server : 16 Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b'' properties=None
on_log(): server : 16 Sending SUBSCRIBE (d0, m1) [(b'topic1', {QoS=2, noLocal=False, retainAsPublished=False, retainHandling=0})]
on_log(): server : 16 Received CONNACK (0, Success) properties=[AssignedClientIdentifier : MzA4NDA4NjYwMzY0NDMzNTU1MTkzMzQ3NTUwNzU0ODk3OTC, TopicAliasMaximum : 65535, RetainAvailable : 1, MaximumPacketSize : 1048576, WildcardSubscriptionAvailable : 1, SubscriptionIdentifierAvailable : 1, SharedSubscriptionAvailable : 1]
on_log(): server : 16 Received SUBACK
on_log(): server : 16 Received PUBLISH (d0, q2, r0, m1), 'topic1', properties=[], ...  (18 bytes)
on_log(): server : 16 Sending PUBREC (Mid: 1)
Traceback (most recent call last):
  File "server.py", line 27, in <module>
    mqttc.loop_forever()
  File "/usr/local/lib/python3.10/dist-packages/paho/mqtt/client.py", line 1756, in loop_forever
    rc = self._loop(timeout)
  File "/usr/local/lib/python3.10/dist-packages/paho/mqtt/client.py", line 1164, in _loop
    rc = self.loop_read()
  File "/usr/local/lib/python3.10/dist-packages/paho/mqtt/client.py", line 1556, in loop_read
    rc = self._packet_read()
  File "/usr/local/lib/python3.10/dist-packages/paho/mqtt/client.py", line 2439, in _packet_read
    rc = self._packet_handle()
  File "/usr/local/lib/python3.10/dist-packages/paho/mqtt/client.py", line 3037, in _packet_handle
    return self._handle_pubrel()
  File "/usr/local/lib/python3.10/dist-packages/paho/mqtt/client.py", line 3348, in _handle_pubrel
    mid, = struct.unpack("!H", self._in_packet['packet'])
struct.error: unpack requires a buffer of 2 bytes
$

Here are my stupid programs.

If I give '1' for the argument, asking to use QoS 1, it works.
So, I'm wondering there is something wrong in QoS 2 code path.

server side

$ cat server.py
import sys
import paho.mqtt.client as mqtt

def on_message(client, userdata, msg):
    print('on_message(): %s : %s %s %s %s / %s' % (userdata, msg.topic, msg.mid, msg.timestamp, msg.retain, msg.payload.decode()))

def on_log(mqttc, userdata, level, string):
    print('on_log(): %s : %s %s' % (userdata, level, string))

def message(client, userdata, msg):
    print('message(): %s : %s %s %s %s / %s' % (userdata, msg.topic, msg.mid, msg.timestamp, msg.retain, msg.payload.decode()))

mqttc = mqtt.Client(protocol=mqtt.MQTTv5, userdata='server')
mqttc.on_message = on_message
mqttc.on_log = on_log
host = '192.168.241.12'
port = 31883
topic = 'topic1'
if len(sys.argv) >= 1:
    qos = int(sys.argv[1])
    print(f'qos = {qos}')
mqttc.connect(host, port, 60)
mqttc.message_callback_add(topic, message)

mqttc.subscribe(topic, qos=2)

mqttc.loop_forever()

client side

$ cat client.py
import sys
import paho.mqtt.client as mqtt

def on_log(mqttc, userdata, level, string):
    print('on_log(): %s : %s %s' % (userdata, level, string))

def on_message(client, userdata, msg):
    print('on_message(): %s : %s %s %s %s / %s' % (userdata, msg.topic, msg.mid, msg.timestamp, msg.retain, msg.payload.decode()))

def message(client, userdata, msg):
    print('message(): %s : %s %s %s %s / %s' % (userdata, msg.topic, msg.mid, msg.timestamp, msg.retain, msg.payload.decode()))


host = '192.168.241.12'
port = 31883
topic = 'topic1'
interval = 60
client_id = 'client'
mqttc = mqtt.Client(client_id=client_id, protocol=mqtt.MQTTv5, userdata='client')

mqttc.connect(host=host, port=port, keepalive=interval, clean_start=False)
mqttc.message_callback_add(topic, message)

def on_message(client, userdata, msg):
    print('on_message(): %s : %s %s %s %s / %s' % (userdata, msg.topic, msg.mid, msg.timestamp, msg.retain, msg.payload.decode()))

def on_log(mqttc, userdata, level, string):
    print('on_log(): %s : %s %s' % (userdata, level, string))

def message(client, userdata, msg):
    print('message(): %s : %s %s %s %s / %s' % (userdata, msg.topic, msg.mid, msg.timestamp, msg.retain, msg.payload.decode()))

qos = 2
if len(sys.argv) >= 1:
    qos = int(sys.argv[1])
    print(f'qos = {qos}')
msg = 'Hello from client!'
print('publish(): publishing a message to: %s payload: %s' % (topic, msg))
mqttc.publish(topic, msg.encode('utf-8'), qos=qos)

mqttc.loop_forever()
@github-actions github-actions bot added the Status: Available No one has claimed responsibility for resolving this issue. label Dec 24, 2022
@naknz
Copy link

naknz commented Jan 9, 2023

Looks like this is a bug in the handling of the PUBREL messages, where with MQTTv5 after the packet identifier it will now include a reason code, and property length. The pack error is because there is these new trailing bytes after the identifier.

Quick work around is just passing the first 2 bytes to unpack: client.py:3348
mid, = struct.unpack("!H", self._in_packet['packet'][:2])

Better long term would be parsing the PUBREL reason code out as it can contain an error value.

@Lenormju
Copy link

I have the same problem when publishing my messages with QoS=2 while QoS=1 works fine. The solution provided by @naknz works fine.
I am still using the 1.5.1, and to check if I have the problem I use :

python -c 'line_number=3238; import paho.mqtt.client as target; from pathlib import Path; print(target.__file__); line_text = Path(target.__file__).read_text().split("\n")[line_number-1]; expected = "        mid, = struct.unpack(\"!H\", self._in_packet['\''packet'\''])"; print(line_text); print(expected); print(line_text == expected)'

You may have to adjust the line_number for other versions.

@ralight ralight added Type: Bug Status: Completed Nothing further to be done with this issue, it can be closed by the requestor or committer. and removed Status: Available No one has claimed responsibility for resolving this issue. labels Jan 20, 2023
@ralight
Copy link
Contributor

ralight commented Jan 20, 2023

Thank you, this is now fixed and will be in the next release.

@matheuscandido
Copy link

Hey @ralight, when will the next release happen? I really needed this QoS 2 for a product launch.

@NSchrading
Copy link

I also hit this issue. @ralight when can we expect a release to occur with this fix? It looks like there hasn't been a release in quite some time. Is this project still actively maintained?

@drsantos89
Copy link

Same here. Happy to help in anything needed to close this.

@KonssnoK
Copy link

KonssnoK commented Sep 8, 2023

@ralight
we faced the same issue as soon as we switched to mqtt5.

When is a new release expected?

@drsantos89
Copy link

FYI, installing from branch 1.6.x fixed the issues for me.

@matiasAS
Copy link

@drsantos89 How to install the module from the 1.6.x branch? I don't understand, what version is it?
(with 1.6.1 and broker emqx 5.4 it gives me the error when I receive a message to which I am subscribed with qos=2)

speak spanish?, i from chile!

@hostmasterpontuax
Copy link

I ran an update in my project and noticed that banch 1.6.x was removed.
Now, what is the solution?

@KonssnoK
Copy link

i guess it was removed because it was merged
ab5e7da

@hostmasterpontuax
Copy link

Great. Can't wait to see 2.0 becoming stable release.
Till then, I managed to fork an already forked repo (with 1.6.x branch, of course).

@PierreF
Copy link
Contributor

PierreF commented Jan 26, 2024

You can also use the 2.0.0rc2 available on PyPI: https://pypi.org/project/paho-mqtt/2.0.0rc2/

If not regressions are found on this release candidate, the 2.0 will be release in February.

@matiasAS
Copy link

@PierreF what day in February?, 1st?

@matiasAS
Copy link

@PierreF 2.0.0rc2 is stable?

@Erickrk
Copy link

Erickrk commented Apr 28, 2024

Hi,

I've been testing both version 1.3 and 2.0 for a project and it looks like the issue persists.
I am currently running Mosquitto version 2.0.18 in a Docker container and have configured publishers using both versions 1.3 and 2.0 of Paho.

Paho 2.0 publisher:

import time
import random

import paho.mqtt.client as mqtt

# Based on: https://eclipse.dev/paho/files/paho.mqtt.python/html/migrations.html
# This should be static
broker_address = "192.168.122.48" 
topic = "sensor/data"

# create new client instance
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="P1")
client.connect(broker_address) 

# Publish sensor data with QoS 2 and retain flag
counter = 100
MAX_SIZE = 60 * 1024  # 60 KB
message = "A" * MAX_SIZE # This generates an interesting behavior in the broker, exchanging a lot of ACKs

for i in range(counter):
    # sensor_data = random.randint(0, 999)
    client.publish(topic, message, qos=2, retain=True)
    print(f"Message {i} published to {topic}")
    time.sleep(0.01)  # Wait before next publish, increasing this didnt made it work

I've noticed it when trying to publish with QoS 2, the current code still doesn't reply to PUBREL as follows:

Version 1.3

pubrec-till-20

Version 2.0

pubrec-till-20-v2

Is my client somehow wrong for QoS 2?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Status: Completed Nothing further to be done with this issue, it can be closed by the requestor or committer. Type: Bug
Projects
None yet
Development

No branches or pull requests