-
Notifications
You must be signed in to change notification settings - Fork 107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MQTT v5 PUBACK with reason code 16 triggers error handler with 'message too long' error #650
Comments
I recommend that the first thing you do is create a minimal reproduction. Could you add a new unit test to the existing test suite? Second: You're probably going to want to look at the properties.hpp code. There are some areas there that I identified as being non-conformant, and they had backwards compatibility issues. EDIT: There are comments in the code indicating the risky areas. |
No. Here is v5 example code: You can modify both client and server side. In order to modify the server return
Here is diff: index 978d20a..ffc035d 100644
--- a/example/v5_no_tls_both.cpp
+++ b/example/v5_no_tls_both.cpp
@@ -208,6 +208,8 @@ void server_proc(Server& s, std::set<con_sp_t>& connections, mi_sub_con& subs) {
// including close_handler and error_handler.
ep.start_session(std::make_tuple(std::move(spep), std::move(g)));
+ ep.set_auto_pub_response(false, false);
+
// set connection (lower than MQTT) level handlers
ep.set_close_handler(
[&connections, &subs, wp]
@@ -292,7 +294,7 @@ void server_proc(Server& s, std::set<con_sp_t>& connections, mi_sub_con& subs) {
return true;
});
ep.set_v5_publish_handler( // use v5 handler
- [&subs]
+ [&ep, &subs]
(MQTT_NS::optional<packet_id_t> packet_id,
MQTT_NS::publish_options pubopts,
MQTT_NS::buffer topic_name,
@@ -323,8 +325,32 @@ void server_proc(Server& s, std::set<con_sp_t>& connections, mi_sub_con& subs) {
std::move(props)
);
}
+
+ switch (pubopts.get_qos()) {
+ case MQTT_NS::qos::at_least_once:
+ ep.puback(packet_id.get(), MQTT_NS::v5::puback_reason_code::no_matching_subscribers);
+ break;
+ case MQTT_NS::qos::exactly_once:
+ ep.pubrec(packet_id.get(), MQTT_NS::v5::pubrec_reason_code::no_matching_subscribers);
+ break;
+ default:
+ break;
+ };
+
return true;
});
+ ep.set_v5_pubrel_handler(
+ [&ep]
+ (packet_id_t packet_id,
+ MQTT_NS::v5::pubrel_reason_code reason_code,
+ MQTT_NS::v5::properties /*props*/) {
+ locked_cout() << "[server] pubrel received."
+ << " packet_id: " << packet_id
+ << " reason_code: " << reason_code << std::endl;
+ ep.pubcomp(packet_id);
+ return true;
+ }
+ );
ep.set_v5_subscribe_handler( // use v5 handler
[&subs, wp]
(packet_id_t packet_id, And here is whole updated code: // Copyright Takatoshi Kondo 2019
//
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
// no_tls client and server
#include <iostream>
#include <iomanip>
#include <map>
#include <mqtt_client_cpp.hpp>
#include <boost/lexical_cast.hpp>
#include "locked_cout.hpp"
template <typename Client, typename Disconnect>
void client_proc(
Client& c,
std::uint16_t& pid_sub1,
std::uint16_t& pid_sub2,
Disconnect const& disconnect) {
using packet_id_t = typename std::remove_reference_t<decltype(*c)>::packet_id_t;
// Setup client
c->set_client_id("cid1");
c->set_clean_start(true);
// Setup handlers
c->set_v5_connack_handler( // use v5 handler
[&c, &pid_sub1, &pid_sub2]
(bool sp, MQTT_NS::v5::connect_reason_code reason_code, MQTT_NS::v5::properties /*props*/){
locked_cout() << "[client] Connack handler called" << std::endl;
locked_cout() << "[client] Session Present: " << std::boolalpha << sp << std::endl;
locked_cout() << "[client] Connect Reason Code: " << reason_code << std::endl;
if (reason_code == MQTT_NS::v5::connect_reason_code::success) {
pid_sub1 = c->subscribe("mqtt_client_cpp/topic1", MQTT_NS::qos::at_most_once);
pid_sub2 = c->subscribe(
std::vector<std::tuple<MQTT_NS::string_view, MQTT_NS::subscribe_options>>
{
{ "mqtt_client_cpp/topic2_1", MQTT_NS::qos::at_least_once },
{ "mqtt_client_cpp/topic2_2", MQTT_NS::qos::exactly_once }
}
);
}
return true;
});
c->set_close_handler( // this handler doesn't depend on MQTT protocol version
[]
(){
locked_cout() << "[client] closed." << std::endl;
});
c->set_error_handler( // this handler doesn't depend on MQTT protocol version
[]
(MQTT_NS::error_code ec){
locked_cout() << "[client] error: " << ec.message() << std::endl;
});
c->set_v5_puback_handler( // use v5 handler
[&]
(packet_id_t packet_id, MQTT_NS::v5::puback_reason_code reason_code, MQTT_NS::v5::properties /*props*/){
locked_cout() <<
"[client] puback received. packet_id: " << packet_id <<
" reason_code: " << reason_code << std::endl;
disconnect();
return true;
});
c->set_v5_pubrec_handler( // use v5 handler
[&]
(packet_id_t packet_id, MQTT_NS::v5::pubrec_reason_code reason_code, MQTT_NS::v5::properties /*props*/){
locked_cout() <<
"[client] pubrec received. packet_id: " << packet_id <<
" reason_code: " << reason_code << std::endl;
return true;
});
c->set_v5_pubcomp_handler( // use v5 handler
[&]
(packet_id_t packet_id, MQTT_NS::v5::pubcomp_reason_code reason_code, MQTT_NS::v5::properties /*props*/){
locked_cout() <<
"[client] pubcomp received. packet_id: " << packet_id <<
" reason_code: " << reason_code << std::endl;
disconnect();
return true;
});
c->set_v5_suback_handler( // use v5 handler
[&]
(packet_id_t packet_id,
std::vector<MQTT_NS::v5::suback_reason_code> reasons,
MQTT_NS::v5::properties /*props*/){
locked_cout() << "[client] suback received. packet_id: " << packet_id << std::endl;
for (auto const& e : reasons) {
switch (e) {
case MQTT_NS::v5::suback_reason_code::granted_qos_0:
locked_cout() << "[client] subscribe success: qos0" << std::endl;
break;
case MQTT_NS::v5::suback_reason_code::granted_qos_1:
locked_cout() << "[client] subscribe success: qos1" << std::endl;
break;
case MQTT_NS::v5::suback_reason_code::granted_qos_2:
locked_cout() << "[client] subscribe success: qos2" << std::endl;
break;
default:
locked_cout() << "[client] subscribe failed: reason_code = " << static_cast<int>(e) << std::endl;
break;
}
}
if (packet_id == pid_sub1) {
c->publish("mqtt_client_cpp/topic1", "test1", MQTT_NS::qos::at_most_once);
}
else if (packet_id == pid_sub2) {
c->publish("mqtt_client_cpp/topic2_1", "test2_1", MQTT_NS::qos::at_least_once);
c->publish("mqtt_client_cpp/topic2_2", "test2_2", MQTT_NS::qos::exactly_once);
}
return true;
});
c->set_v5_publish_handler( // use v5 handler
[&]
(MQTT_NS::optional<packet_id_t> packet_id,
MQTT_NS::publish_options pubopts,
MQTT_NS::buffer topic_name,
MQTT_NS::buffer contents,
MQTT_NS::v5::properties /*props*/){
locked_cout() << "[client] publish received. "
<< " dup: " << pubopts.get_dup()
<< " qos: " << pubopts.get_qos()
<< " retain: " << pubopts.get_retain() << std::endl;
if (packet_id)
locked_cout() << "[client] packet_id: " << *packet_id << std::endl;
locked_cout() << "[client] topic_name: " << topic_name << std::endl;
locked_cout() << "[client] contents: " << contents << std::endl;
disconnect();
return true;
});
// Connect
c->connect();
}
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/member.hpp>
#include <mqtt_server_cpp.hpp>
namespace mi = boost::multi_index;
using con_t = MQTT_NS::server<>::endpoint_t;
using con_sp_t = std::shared_ptr<con_t>;
struct sub_con {
sub_con(MQTT_NS::buffer topic, con_sp_t con, MQTT_NS::qos qos_value, MQTT_NS::rap rap_value)
:topic(std::move(topic)), con(std::move(con)), qos_value(qos_value), rap_value(rap_value) {}
MQTT_NS::buffer topic;
con_sp_t con;
MQTT_NS::qos qos_value;
MQTT_NS::rap rap_value;
};
struct tag_topic {};
struct tag_con {};
using mi_sub_con = mi::multi_index_container<
sub_con,
mi::indexed_by<
mi::ordered_non_unique<
mi::tag<tag_topic>,
BOOST_MULTI_INDEX_MEMBER(sub_con, MQTT_NS::buffer, topic)
>,
mi::ordered_non_unique<
mi::tag<tag_con>,
BOOST_MULTI_INDEX_MEMBER(sub_con, con_sp_t, con)
>
>
>;
inline void close_proc(std::set<con_sp_t>& cons, mi_sub_con& subs, con_sp_t const& con) {
cons.erase(con);
auto& idx = subs.get<tag_con>();
auto r = idx.equal_range(con);
idx.erase(r.first, r.second);
}
template <typename Server>
void server_proc(Server& s, std::set<con_sp_t>& connections, mi_sub_con& subs) {
s.set_error_handler( // this handler doesn't depend on MQTT protocol version
[](MQTT_NS::error_code ec) {
locked_cout() << "[server] error: " << ec.message() << std::endl;
}
);
s.set_accept_handler( // this handler doesn't depend on MQTT protocol version
[&s, &connections, &subs](con_sp_t spep) {
auto& ep = *spep;
std::weak_ptr<con_t> wp(spep);
using packet_id_t = typename std::remove_reference_t<decltype(ep)>::packet_id_t;
locked_cout() << "[server] accept" << std::endl;
// For server close if ep is closed.
auto g = MQTT_NS::shared_scope_guard(
[&s] {
locked_cout() << "[server] session end" << std::endl;
s.close();
}
);
// Pass spep to keep lifetime.
// It makes sure wp.lock() never return nullptr in the handlers below
// including close_handler and error_handler.
ep.start_session(std::make_tuple(std::move(spep), std::move(g)));
ep.set_auto_pub_response(false, false);
// set connection (lower than MQTT) level handlers
ep.set_close_handler(
[&connections, &subs, wp]
(){
locked_cout() << "[server] closed." << std::endl;
auto sp = wp.lock();
BOOST_ASSERT(sp);
close_proc(connections, subs, sp);
});
ep.set_error_handler(
[&connections, &subs, wp]
(MQTT_NS::error_code ec){
locked_cout() << "[server] error: " << ec.message() << std::endl;
auto sp = wp.lock();
BOOST_ASSERT(sp);
close_proc(connections, subs, sp);
});
// set MQTT level handlers
ep.set_v5_connect_handler( // use v5 handler
[&connections, wp]
(MQTT_NS::buffer client_id,
MQTT_NS::optional<MQTT_NS::buffer> const& username,
MQTT_NS::optional<MQTT_NS::buffer> const& password,
MQTT_NS::optional<MQTT_NS::will>,
bool clean_start,
std::uint16_t keep_alive,
MQTT_NS::v5::properties /*props*/){
using namespace MQTT_NS::literals;
locked_cout() << "[server] client_id : " << client_id << std::endl;
locked_cout() << "[server] username : " << (username ? username.value() : "none"_mb) << std::endl;
locked_cout() << "[server] password : " << (password ? password.value() : "none"_mb) << std::endl;
locked_cout() << "[server] clean_start : " << std::boolalpha << clean_start << std::endl;
locked_cout() << "[server] keep_alive : " << keep_alive << std::endl;
auto sp = wp.lock();
BOOST_ASSERT(sp);
connections.insert(sp);
sp->connack(false, MQTT_NS::v5::connect_reason_code::success);
return true;
}
);
ep.set_v5_disconnect_handler( // use v5 handler
[&connections, &subs, wp]
(MQTT_NS::v5::disconnect_reason_code reason_code, MQTT_NS::v5::properties /*props*/) {
locked_cout() <<
"[server] disconnect received." <<
" reason_code: " << reason_code << std::endl;
auto sp = wp.lock();
BOOST_ASSERT(sp);
close_proc(connections, subs, sp);
});
ep.set_v5_puback_handler( // use v5 handler
[]
(packet_id_t packet_id, MQTT_NS::v5::puback_reason_code reason_code, MQTT_NS::v5::properties /*props*/){
locked_cout() <<
"[server] puback received. packet_id: " << packet_id <<
" reason_code: " << reason_code << std::endl;
return true;
});
ep.set_v5_pubrec_handler( // use v5 handler
[]
(packet_id_t packet_id, MQTT_NS::v5::pubrec_reason_code reason_code, MQTT_NS::v5::properties /*props*/){
locked_cout() <<
"[server] pubrec received. packet_id: " << packet_id <<
" reason_code: " << reason_code << std::endl;
return true;
});
ep.set_v5_pubrel_handler( // use v5 handler
[]
(packet_id_t packet_id, MQTT_NS::v5::pubrel_reason_code reason_code, MQTT_NS::v5::properties /*props*/){
locked_cout() <<
"[server] pubrel received. packet_id: " << packet_id <<
" reason_code: " << reason_code << std::endl;
return true;
});
ep.set_v5_pubcomp_handler( // use v5 handler
[]
(packet_id_t packet_id, MQTT_NS::v5::pubcomp_reason_code reason_code, MQTT_NS::v5::properties /*props*/){
locked_cout() <<
"[server] pubcomp received. packet_id: " << packet_id <<
" reason_code: " << reason_code << std::endl;
return true;
});
ep.set_v5_publish_handler( // use v5 handler
[&ep, &subs]
(MQTT_NS::optional<packet_id_t> packet_id,
MQTT_NS::publish_options pubopts,
MQTT_NS::buffer topic_name,
MQTT_NS::buffer contents,
MQTT_NS::v5::properties props){
locked_cout() << "[server] publish received."
<< " dup: " << pubopts.get_dup()
<< " qos: " << pubopts.get_qos()
<< " retain: " << pubopts.get_retain() << std::endl;
if (packet_id)
locked_cout() << "[server] packet_id: " << *packet_id << std::endl;
locked_cout() << "[server] topic_name: " << topic_name << std::endl;
locked_cout() << "[server] contents: " << contents << std::endl;
auto const& idx = subs.get<tag_topic>();
auto r = idx.equal_range(topic_name);
for (; r.first != r.second; ++r.first) {
auto retain =
[&] {
if (r.first->rap_value == MQTT_NS::rap::retain) {
return pubopts.get_retain();
}
return MQTT_NS::retain::no;
} ();
r.first->con->publish(
topic_name,
contents,
std::min(r.first->qos_value, pubopts.get_qos()) | retain,
std::move(props)
);
}
switch (pubopts.get_qos()) {
case MQTT_NS::qos::at_least_once:
ep.puback(packet_id.get(), MQTT_NS::v5::puback_reason_code::no_matching_subscribers);
break;
case MQTT_NS::qos::exactly_once:
ep.pubrec(packet_id.get(), MQTT_NS::v5::pubrec_reason_code::no_matching_subscribers);
break;
default:
break;
};
return true;
});
ep.set_v5_pubrel_handler(
[&ep]
(packet_id_t packet_id,
MQTT_NS::v5::pubrel_reason_code reason_code,
MQTT_NS::v5::properties /*props*/) {
locked_cout() << "[server] pubrel received."
<< " packet_id: " << packet_id
<< " reason_code: " << reason_code << std::endl;
ep.pubcomp(packet_id);
return true;
}
);
ep.set_v5_subscribe_handler( // use v5 handler
[&subs, wp]
(packet_id_t packet_id,
std::vector<std::tuple<MQTT_NS::buffer, MQTT_NS::subscribe_options>> entries,
MQTT_NS::v5::properties /*props*/) {
locked_cout() << "[server] subscribe received. packet_id: " << packet_id << std::endl;
std::vector<MQTT_NS::v5::suback_reason_code> res;
res.reserve(entries.size());
auto sp = wp.lock();
BOOST_ASSERT(sp);
for (auto const& e : entries) {
MQTT_NS::buffer topic = std::get<0>(e);
MQTT_NS::qos qos_value = std::get<1>(e).get_qos();
MQTT_NS::rap rap_value = std::get<1>(e).get_rap();
locked_cout() << "[server] topic: " << topic
<< " qos: " << qos_value
<< " rap: " << rap_value
<< std::endl;
res.emplace_back(MQTT_NS::v5::qos_to_suback_reason_code(qos_value));
subs.emplace(std::move(topic), sp, qos_value, rap_value);
}
sp->suback(packet_id, res);
return true;
}
);
ep.set_v5_unsubscribe_handler( // use v5 handler
[&subs, wp]
(packet_id_t packet_id,
std::vector<MQTT_NS::buffer> topics,
MQTT_NS::v5::properties /*props*/) {
locked_cout() << "[server] unsubscribe received. packet_id: " << packet_id << std::endl;
for (auto const& topic : topics) {
subs.erase(topic);
}
auto sp = wp.lock();
BOOST_ASSERT(sp);
sp->unsuback(packet_id);
return true;
}
);
}
);
s.listen();
}
int main(int argc, char** argv) {
if (argc != 2) {
locked_cout() << argv[0] << " port" << std::endl;
return -1;
}
boost::asio::io_context ioc;
std::uint16_t port = boost::lexical_cast<std::uint16_t>(argv[1]);
// server
boost::asio::io_context iocs;
auto s = MQTT_NS::server<>(
boost::asio::ip::tcp::endpoint(
boost::asio::ip::tcp::v4(),
boost::lexical_cast<std::uint16_t>(argv[1])
),
iocs
);
// You can set a specific protocol_version if you want to limit accepting version.
// Otherwise, all protocols are accepted.
s.set_protocol_version(MQTT_NS::protocol_version::v5);
std::set<con_sp_t> connections;
mi_sub_con subs;
std::thread th(
[&] {
server_proc(s, connections, subs);
iocs.run();
}
);
// client
std::uint16_t pid_sub1;
std::uint16_t pid_sub2;
// You can set the protocol_version to connect. If you don't set it, v3_1_1 is used.
auto c = MQTT_NS::make_sync_client(ioc, "localhost", port, MQTT_NS::protocol_version::v5);
int count = 0;
auto disconnect = [&] {
if (++count == 5) c->disconnect();
};
client_proc(c, pid_sub1, pid_sub2, disconnect);
ioc.run();
th.join();
} The part of output:
It seems that the client receives puback/pubrec with no_matching_subscribers reason code correctly. "Message too long" is corresponding to See https://wandbox.org/permlink/gZlwBzZRuJNBzo9W I guess that there is message length related errors. The reason of the error is mqtt_cpp or mosquitto. |
NOTE: I use |
Here's a TCP dump using tshark
The PUBACKs with ID 2 and 3 are not being received by the PUBACK handler. Does the packet dump help debug why the parsing of the message is not working as expected? |
No, I need hex dump of the puback packet that is sent by mosquitto. See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901121 If the packet_id is 1, reason_code is 0x10 (No matching subscribers), and no properties, then the packet should be
Please show me the packet hex dump. |
All the packet details are below. |
The last part of the dump(image) is My example code also #650 (comment) the same (packet_id is different but it is not essential) packet dump. Please double check it. So puback doesn't seems to cause the error. As #650 (comment) mentioned, you need to write minimal and complete code that reproduces the issue. NOTE: |
mqtt_cpp works well with mosquitto 1.6.10 even if puback reason code is no_matching_subscribers. Here is example code: #include <iostream>
#include <iomanip>
#include <map>
#include <mqtt_client_cpp.hpp>
int main(int argc, char** argv) {
if (argc != 3) {
std::cout << argv[0] << " host port" << std::endl;
return -1;
}
boost::asio::io_context ioc;
// Create no TLS client
// You can set the protocol_version to connect. If you don't set it, v3_1_1 is used.
auto c = MQTT_NS::make_sync_client(ioc, argv[1], argv[2], MQTT_NS::protocol_version::v5);
using packet_id_t = typename std::remove_reference_t<decltype(*c)>::packet_id_t;
// Setup client
c->set_client_id("cid1");
c->set_clean_start(true);
// Setup handlers
c->set_v5_connack_handler( // use v5 handler
[&]
(bool sp, MQTT_NS::v5::connect_reason_code reason_code, MQTT_NS::v5::properties /*props*/){
std::cout << "[client] Connack handler called" << std::endl;
std::cout << "[client] Session Present: " << std::boolalpha << sp << std::endl;
std::cout << "[client] Connect Reason Code: " << reason_code << std::endl;
if (reason_code == MQTT_NS::v5::connect_reason_code::success) {
c->publish("mqtt_client_cpp/topic2_1", "test2_1", MQTT_NS::qos::at_least_once);
}
return true;
});
c->set_close_handler( // this handler doesn't depend on MQTT protocol version
[]
(){
std::cout << "[client] closed." << std::endl;
});
c->set_error_handler( // this handler doesn't depend on MQTT protocol version
[]
(MQTT_NS::error_code ec){
std::cout << "[client] error: " << ec.message() << std::endl;
});
c->set_v5_puback_handler( // use v5 handler
[&]
(packet_id_t packet_id, MQTT_NS::v5::puback_reason_code reason_code, MQTT_NS::v5::properties /*props*/){
std::cout <<
"[client] puback received. packet_id: " << packet_id <<
" reason_code: " << reason_code << std::endl;
c->disconnect();
return true;
});
// Connect
c->connect();
ioc.run();
} mosquitto log
My example code log
|
Here's the sample code I used with Mosquitto version 1.6.8
Mosquitto log
Code log
|
@shantanu1singh That's very helpful. Have you been able to catch in the debugger where this error is being generated in the mqtt_cpp code? If we knew specifically which check was failing, that would help a lot. Any possibility of identifying the exact packet that causes the problem? A very helpful test would be to, instead of using mosquitto, directly write the problematic packet to the socket and observe the mqtt_cpp code throwing an error. |
@jonesmz I haven't been able to do the above. FWIW, I just tried building the sample code above with the
So, to summarize, the problem seems to be scoped to the v7.0.1 release. |
Thanks you for reporting. It is fixed at v8.0.0. So I close the issue. |
Invalid length on MQTT protocol was `boost::system::errc::message_size`. But it was not appropriate. See #650 (comment) These errors are replaced with `boost::system::errc::protocol_error` because it is MQTT protocol violation. Unexpected boost asio transferred_size error was also mapped `boost::system::errc::message_size`. It is replaced with `boost::system::errc::bad_message`.
Invalid length on MQTT protocol was `boost::system::errc::message_size`. But it was not appropriate. See redboltz#650 (comment) These errors are replaced with `boost::system::errc::protocol_error` because it is MQTT protocol violation. Unexpected boost asio transferred_size error was also mapped `boost::system::errc::message_size`. It is replaced with `boost::system::errc::bad_message`.
redboltz/mqtt_cpp Version: v7.0.1
I'm using the v5 client of the library to connect to the Mosquitto MQTT broker and publish a message.
The Mosquitto broker returns a PUBACK with a reason code of 16 i.e., no matching subscribers found.
Related entry from mosquitto broker's logs:
The client library triggers the error_handler set using
set_error_handler
with a boost error_code of 90 i.e., 'Message too long'.It should instead have invoked the pub_ack_handler (set using
set_v5_puback_handler
) and provided thepuback_reason_code
of 16 as per the Mqtt v5 specification.Is this a known issue?
I tried going through the code to see where the library returns the 'message too long' error code and found a few places so wasn't sure how to pinpoint the location where it invoked the error_handler from. Any tips to debug will be appreciated!
The text was updated successfully, but these errors were encountered: