From 024a2254212955ce641c0d223e094c22ac77cb6f Mon Sep 17 00:00:00 2001 From: Michael Jones Date: Thu, 25 Apr 2019 15:59:18 -0500 Subject: [PATCH] Resolves #222 --- include/mqtt/endpoint.hpp | 443 +++++++++++++++--------------- test/as_buffer_async_pubsub_1.cpp | 40 ++- test/as_buffer_async_pubsub_2.cpp | 20 +- 3 files changed, 249 insertions(+), 254 deletions(-) diff --git a/include/mqtt/endpoint.hpp b/include/mqtt/endpoint.hpp index aebf817ce..432e085e9 100644 --- a/include/mqtt/endpoint.hpp +++ b/include/mqtt/endpoint.hpp @@ -1756,14 +1756,7 @@ class endpoint : public std::enable_shared_from_this(args)...); - return packet_id; + return subscribe(as::buffer(topic_name.data(), topic_name.size()), option, std::forward(args)...); } /** @@ -1789,11 +1782,6 @@ class endpoint : public std::enable_shared_from_this(args)...); return packet_id; @@ -1979,6 +1967,62 @@ class endpoint : public std::enable_shared_from_this + * See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901104
+ * 3.3.1.3 RETAIN + * @param props + * Properties
+ * See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901109
+ * 3.3.2.3 PUBLISH Properties + */ + bool publish_at_most_once( + packet_id_t packet_id, + mqtt::string_view topic_name, + mqtt::string_view contents, + bool retain = false, + std::vector props = {} + ) { + return publish_at_most_once(packet_id, + as::buffer(topic_name.data(), topic_name.size()), + as::buffer(contents.data(), contents.size()), + retain, + std::move(props)); + } + + /** + * @brief Publish QoS0 + * @param topic_name + * A topic name to publish + * @param contents + * The contents to publish + * @param retain + * A retain flag. If set it to true, the contents is retained.
+ * See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901104
+ * @param props + * Properties
+ * See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901109
+ * 3.3.2.3 PUBLISH Properties + * 3.3.1.3 RETAIN + */ + bool publish_at_most_once( + packet_id_t packet_id, + as::const_buffer topic_name, + as::const_buffer contents, + bool retain = false, + std::vector props = {} + ) { + acquired_publish_at_most_once(packet_id, topic_name, contents, mqtt::any(), retain, std::move(props)); + return true; + } + /** * @brief Publish QoS1 with a manual set packet identifier * @param packet_id @@ -2281,44 +2325,9 @@ class endpoint : public std::enable_shared_from_this - * 3.8.2.1 SUBSCRIBE Properties - * @return If packet_id is used in the publishing/subscribing sequence, then returns false and - * doesn't subscribe, otherwise return true and subscribes. - * You can subscribe multiple topics all at once.
- * See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901161
- */ - template - bool subscribe( - packet_id_t packet_id, - mqtt::string_view topic_name, - std::uint8_t qos, - Args&&... args) { - BOOST_ASSERT(qos == qos::at_most_once || qos == qos::at_least_once || qos == qos::exactly_once); - if (register_packet_id(packet_id)) { - acquired_subscribe(packet_id, as::buffer(topic_name.data(), topic_name.size()), qos, std::forward(args)...); - return true; - } - return false; - } - - /** - * @brief Subscribe with a manual set packet identifier - * @param packet_id - * packet identifier - * @param topic_name - * A topic name to subscribe - * @param qos - * mqtt::qos - * @param args - * args should be zero or more pairs of topic_name and qos. + * args should be one or more pairs of topic_name and qos. + * * You can set props as the last argument optionally. * See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901164
* 3.8.2.1 SUBSCRIBE Properties @@ -2330,12 +2339,9 @@ class endpoint : public std::enable_shared_from_this bool subscribe( packet_id_t packet_id, - as::const_buffer topic_name, - std::uint8_t qos, Args&&... args) { - BOOST_ASSERT(qos == qos::at_most_once || qos == qos::at_least_once || qos == qos::exactly_once); if (register_packet_id(packet_id)) { - acquired_subscribe(packet_id, topic_name, qos, std::forward(args)...); + acquired_subscribe(packet_id, std::forward(args)...); return true; } return false; @@ -2403,38 +2409,8 @@ class endpoint : public std::enable_shared_from_this - * 3.10.2.1 UNSUBSCRIBE Properties - * @return If packet_id is used in the publishing/subscribing sequence, then returns false and - * doesn't unsubscribe, otherwise return true and unsubscribes. - * You can subscribe multiple topics all at once.
- * See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901179 - */ - template - bool unsubscribe( - packet_id_t packet_id, - mqtt::string_view topic_name, - Args&&... args) { - if (register_packet_id(packet_id)) { - acquired_unsubscribe(packet_id, topic_name, std::forward(args)...); - return true; - } - return false; - } - - /** - * @brief Unsubscribe with a manual set packet identifier - * @param packet_id - * packet identifier - * @param topic_name - * A topic name to subscribe * @param args - * args should be zero or more topics + * args should be one or more topics * You can set props as the last argument optionally. * See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901182
* 3.10.2.1 UNSUBSCRIBE Properties @@ -2446,10 +2422,9 @@ class endpoint : public std::enable_shared_from_this bool unsubscribe( packet_id_t packet_id, - as::const_buffer topic_name, Args&&... args) { if (register_packet_id(packet_id)) { - acquired_unsubscribe(packet_id, topic_name, std::forward(args)...); + acquired_unsubscribe(packet_id, std::forward(args)...); return true; } return false; @@ -2515,6 +2490,81 @@ class endpoint : public std::enable_shared_from_this + * https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901104
+ * 3.3.1.3 RETAIN + * @param props + * Properties
+ * See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901109
+ * 3.3.2.3 PUBLISH Properties + */ + void acquired_publish_at_most_once( + packet_id_t packet_id, + mqtt::string_view topic_name, + mqtt::string_view contents, + bool retain = false, + std::vector props = {} + ) { + acquired_publish_at_most_once( + packet_id, + as::buffer(topic_name.data(), topic_name.size()), + as::buffer(contents.data(), contents.size()), + mqtt::any(), + retain, + std::move(props) + ); + } + + /** + * @brief Publish QoS0 with already acquired packet identifier + * @param packet_id + * packet identifier. It should be acquired by acquire_unique_packet_id, or register_packet_id. + * The ownership of the packet_id moves to the library. + * @param topic_name + * A topic name to publish + * @param contents + * The contents to publish + * @param life_keeper + * An object that stays alive (but is moved with std::move()) until the async operation is finished. + * @param retain + * A retain flag. If set it to true, the contents is retained.
+ * https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901104
+ * 3.3.1.3 RETAIN + * @param props + * Properties
+ * See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901109
+ * 3.3.2.3 PUBLISH Properties + */ + void acquired_publish_at_most_once( + packet_id_t packet_id, + as::const_buffer topic_name, + as::const_buffer contents, + mqtt::any life_keeper, + bool retain = false, + std::vector props = {} + ) { + BOOST_ASSERT(packet_id == 0); + send_publish( + topic_name, + qos::at_most_once, + retain, + false, + packet_id, + std::move(props), + contents, + std::move(life_keeper) + ); + } /** * @brief Publish QoS1 with already acquired packet identifier * @param packet_id @@ -2540,22 +2590,19 @@ class endpoint : public std::enable_shared_from_this props = {} ) { - auto sp_topic_name = std::make_shared(std::move(topic_name)); auto sp_contents = std::make_shared(std::move(contents)); auto topic_buf = as::buffer(*sp_topic_name); auto contents_buf = as::buffer(*sp_contents); - send_publish( - topic_buf, - qos::at_least_once, - retain, - false, + acquired_publish_at_least_once( packet_id, - std::move(props), + topic_buf, contents_buf, - std::make_pair(std::move(sp_topic_name), std::move(sp_contents)) + std::make_pair(std::move(sp_topic_name), std::move(sp_contents)), + retain, + std::move(props) ); } @@ -2625,22 +2672,19 @@ class endpoint : public std::enable_shared_from_this props = {} ) { - auto sp_topic_name = std::make_shared(std::move(topic_name)); auto sp_contents = std::make_shared(std::move(contents)); auto topic_buf = as::buffer(*sp_topic_name); auto contents_buf = as::buffer(*sp_contents); - send_publish( - topic_buf, - qos::exactly_once, - retain, - false, + acquired_publish_exactly_once( packet_id, - std::move(props), + topic_buf, contents_buf, - std::make_pair(std::move(sp_topic_name), std::move(sp_contents)) + std::make_pair(std::move(sp_topic_name), std::move(sp_contents)), + retain, + std::move(props) ); } @@ -2672,7 +2716,7 @@ class endpoint : public std::enable_shared_from_this props = {} ) { - + BOOST_ASSERT(packet_id != 0); send_publish( topic_name, qos::exactly_once, @@ -2714,25 +2758,33 @@ class endpoint : public std::enable_shared_from_this props = {} ) { - BOOST_ASSERT(qos == qos::at_most_once || qos == qos::at_least_once || qos == qos::exactly_once); - BOOST_ASSERT((qos == qos::at_most_once && packet_id == 0) || (qos != qos::at_most_once && packet_id != 0)); - - auto sp_topic_name = std::make_shared(std::move(topic_name)); - auto sp_contents = std::make_shared(std::move(contents)); + if(qos == qos::at_most_once) + { + // In the at_most_once case, we know a priori that send_publish won't track the lifetime. + acquired_publish(packet_id, + as::buffer(topic_name), + as::buffer(contents), + mqtt::any(), + qos, + retain, + std::move(props)); + } + else + { + auto sp_topic_name = std::make_shared(std::move(topic_name)); + auto sp_contents = std::make_shared(std::move(contents)); - auto topic_buf = as::buffer(*sp_topic_name); - auto contents_buf = as::buffer(*sp_contents); + auto topic_buf = as::buffer(*sp_topic_name); + auto contents_buf = as::buffer(*sp_contents); - send_publish( - topic_buf, - qos, - retain, - false, - packet_id, - std::move(props), - contents_buf, - std::make_pair(std::move(sp_topic_name), std::move(sp_contents)) - ); + acquired_publish(packet_id, + topic_buf, + contents_buf, + std::make_pair(std::move(sp_topic_name), std::move(sp_contents)), + qos, + retain, + std::move(props)); + } } /** @@ -2811,25 +2863,33 @@ class endpoint : public std::enable_shared_from_this props = {} ) { - BOOST_ASSERT(qos == qos::at_most_once || qos == qos::at_least_once || qos == qos::exactly_once); - BOOST_ASSERT((qos == qos::at_most_once && packet_id == 0) || (qos != qos::at_most_once && packet_id != 0)); - - auto sp_topic_name = std::make_shared(std::move(topic_name)); - auto sp_contents = std::make_shared(std::move(contents)); + if(qos == qos::at_most_once) + { + // In the at_most_once case, we know a priori that send_publish won't track the lifetime. + acquired_publish_dup(packet_id, + as::buffer(topic_name), + as::buffer(contents), + mqtt::any(), + qos, + retain, + std::move(props)); + } + else + { + auto sp_topic_name = std::make_shared(std::move(topic_name)); + auto sp_contents = std::make_shared(std::move(contents)); - auto topic_buf = as::buffer(*sp_topic_name); - auto contents_buf = as::buffer(*sp_contents); + auto topic_buf = as::buffer(*sp_topic_name); + auto contents_buf = as::buffer(*sp_contents); - send_publish( - topic_buf, - qos, - retain, - true, - packet_id, - std::move(props), - contents_buf, - std::make_pair(std::move(sp_topic_name), std::move(sp_contents)) - ); + acquired_publish_dup(packet_id, + topic_buf, + contents_buf, + std::make_pair(std::move(sp_topic_name), std::move(sp_contents)), + qos, + retain, + std::move(props)); + } } /** @@ -2884,12 +2944,8 @@ class endpoint : public std::enable_shared_from_this @@ -2900,40 +2956,10 @@ class endpoint : public std::enable_shared_from_this void acquired_subscribe( packet_id_t packet_id, - mqtt::string_view topic_name, - std::uint8_t qos, Args&&... args) { - BOOST_ASSERT(qos == qos::at_most_once || qos == qos::at_least_once || qos == qos::exactly_once); - std::vector> params; - params.reserve((sizeof...(args) + 2) / 2); - send_subscribe(std::move(params), packet_id, as::buffer(topic_name.data(), topic_name.size()), qos, std::forward(args)...); - } - - /** - * @brief Subscribe with already acquired packet identifier - * @param packet_id - * packet identifier. It should be acquired by acquire_unique_packet_id, or register_packet_id. - * The ownership of the packet_id moves to the library. - * @param topic_name - * A topic name to subscribe - * @param qos - * mqtt::qos - * @param args - * args should be zero or more pairs of topic_name and qos. - * You can set props as the last argument optionally. - * See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901164
- * 3.8.2.1 SUBSCRIBE Properties - * You can subscribe multiple topics all at once.
- * See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901161
- */ - template - void acquired_subscribe( - packet_id_t packet_id, - as::const_buffer topic_name, - std::uint8_t qos, Args&&... args) { - BOOST_ASSERT(qos == qos::at_most_once || qos == qos::at_least_once || qos == qos::exactly_once); + Args&&... args) { std::vector> params; - params.reserve((sizeof...(args) + 2) / 2); - send_subscribe(std::move(params), packet_id, topic_name, qos, std::forward(args)...); + params.reserve(sizeof...(args) / 2); + send_subscribe(std::move(params), packet_id, std::forward(args)...); } /** @@ -2988,10 +3014,9 @@ class endpoint : public std::enable_shared_from_this * 3.10.2.1 UNSUBSCRIBE Properties @@ -3001,36 +3026,12 @@ class endpoint : public std::enable_shared_from_this void acquired_unsubscribe( packet_id_t packet_id, - mqtt::string_view topic_name, Args&&... args) { - std::vector params; - params.reserve(sizeof...(args) + 1); - send_unsubscribe(std::move(params), packet_id, as::buffer(topic_name.data(), topic_name.size()), std::forward(args)...); - } - /** - * @brief Unsubscribe with already acquired packet identifier - * @param packet_id - * packet identifier. It should be acquired by acquire_unique_packet_id, or register_packet_id. - * The ownership of the packet_id moves to the library. - * @param topic_name - * A topic name to subscribe - * @param args - * args should be zero or more topics - * You can set props as the last argument optionally. - * See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901182
- * 3.10.2.1 UNSUBSCRIBE Properties - * You can subscribe multiple topics all at once.
- * See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901179 - */ - template - void acquired_unsubscribe( - packet_id_t packet_id, - as::const_buffer topic_name, - Args&&... args) { std::vector params; - params.reserve(sizeof...(args) + 1); - send_unsubscribe(std::move(params), packet_id, topic_name, std::forward(args)...); + params.reserve(sizeof...(args)); + + send_unsubscribe(std::move(params), packet_id, std::forward(args)...); } /** @@ -3405,6 +3406,8 @@ class endpoint : public std::enable_shared_from_this * https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901104
@@ -3415,10 +3418,11 @@ class endpoint : public std::enable_shared_from_this * https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901104
@@ -3443,11 +3449,12 @@ class endpoint : public std::enable_shared_from_this props, async_handler_t func = async_handler_t() ) { - acquired_async_publish(0, topic_name, contents, mqtt::any(), qos::at_most_once, retain, std::move(props), std::move(func)); + acquired_async_publish(0, topic_name, contents, std::move(life_keeper), qos::at_most_once, retain, std::move(props), std::move(func)); } /** @@ -5943,7 +5950,7 @@ class endpoint : public std::enable_shared_from_this{}, contents, std::move(func), - life_keeper + std::move(life_keeper) ); } @@ -6139,7 +6146,7 @@ class endpoint : public std::enable_shared_from_this{}, contents, std::move(func), - life_keeper + std::move(life_keeper) ); } @@ -8010,7 +8017,7 @@ class endpoint : public std::enable_shared_from_this(), packet_id, qos, std::forward(args)...); + async_send_suback(std::vector{}, packet_id, qos, std::forward(args)...); } template @@ -8037,7 +8044,7 @@ class endpoint : public std::enable_shared_from_this(), packet_id, qos, std::forward(args)...); + async_send_unsuback(std::vector{}, packet_id, qos, std::forward(args)...); } template @@ -9119,12 +9126,12 @@ class endpoint : public std::enable_shared_from_this(payload_[i]); - if ((options & 0b11111100) != 0) { + std::uint8_t option = static_cast(payload_[i]); + if ((option & 0b11111100) != 0) { if (func) func(boost::system::errc::make_error_code(boost::system::errc::protocol_error)); return false; } - entries.emplace_back(std::move(topic_filter), options); + entries.emplace_back(std::move(topic_filter), option); ++i; } if (h_subscribe_) return h_subscribe_(packet_id, std::move(entries)); @@ -9168,12 +9175,12 @@ class endpoint : public std::enable_shared_from_this(payload_[i]); - if ((options & 0b11000000) != 0) { + std::uint8_t option = static_cast(payload_[i]); + if ((option & 0b11000000) != 0) { if (func) func(boost::system::errc::make_error_code(boost::system::errc::protocol_error)); return false; } - entries.emplace_back(std::move(topic_filter), options); + entries.emplace_back(std::move(topic_filter), option); ++i; } if (h_v5_subscribe_) return h_v5_subscribe_(packet_id, std::move(entries), std::move(props)); diff --git a/test/as_buffer_async_pubsub_1.cpp b/test/as_buffer_async_pubsub_1.cpp index 3a6a2cf46..024f8d7ad 100644 --- a/test/as_buffer_async_pubsub_1.cpp +++ b/test/as_buffer_async_pubsub_1.cpp @@ -76,13 +76,11 @@ BOOST_AUTO_TEST_CASE( pub_qos0_sub_qos0 ) { BOOST_TEST(packet_id == pid_sub); BOOST_TEST(results.size() == 1U); BOOST_TEST(*results[0] == mqtt::qos::at_most_once); - auto topic1 = std::make_shared("topic1"); - auto contents = std::make_shared("topic1_contents"); c->async_publish_at_most_once( - as::buffer(*topic1), - as::buffer(*contents), - false, - [topic1, contents](boost::system::error_code const&) {} + as::buffer("topic1", sizeof("topic1") - 1), + as::buffer("topic1_contents", sizeof("topic1_contents") - 1), + mqtt::any(), + false ); return true; }); @@ -155,13 +153,11 @@ BOOST_AUTO_TEST_CASE( pub_qos0_sub_qos0 ) { BOOST_TEST(packet_id == pid_sub); BOOST_TEST(reasons.size() == 1U); BOOST_TEST(reasons[0] == mqtt::v5::reason_code::granted_qos_0); - auto topic1 = std::make_shared("topic1"); - auto contents = std::make_shared("topic1_contents"); c->async_publish_at_most_once( - as::buffer(*topic1), - as::buffer(*contents), - false, - [topic1, contents](boost::system::error_code const&) {} + as::buffer("topic1", sizeof("topic1") - 1), + as::buffer("topic1_contents", sizeof("topic1_contents") - 1), + mqtt::any(), + false ); return true; }); @@ -712,13 +708,11 @@ BOOST_AUTO_TEST_CASE( pub_qos0_sub_qos1 ) { BOOST_TEST(packet_id == pid_sub); BOOST_TEST(results.size() == 1U); BOOST_TEST(*results[0] == mqtt::qos::at_least_once); - auto topic1 = std::make_shared("topic1"); - auto contents = std::make_shared("topic1_contents"); c->async_publish_at_most_once( - as::buffer(*topic1), - as::buffer(*contents), - false, - [topic1, contents](boost::system::error_code const&) {} + as::buffer("topic1", sizeof("topic1") - 1), + as::buffer("topic1_contents", sizeof("topic1_contents") - 1), + mqtt::any(), + false ); return true; }); @@ -791,13 +785,11 @@ BOOST_AUTO_TEST_CASE( pub_qos0_sub_qos1 ) { BOOST_TEST(packet_id == pid_sub); BOOST_TEST(reasons.size() == 1U); BOOST_TEST(reasons[0] == mqtt::v5::reason_code::granted_qos_1); - auto topic1 = std::make_shared("topic1"); - auto contents = std::make_shared("topic1_contents"); c->async_publish_at_most_once( - as::buffer(*topic1), - as::buffer(*contents), - false, - [topic1, contents](boost::system::error_code const&) {} + as::buffer("topic1", sizeof("topic1") - 1), + as::buffer("topic1_contents", sizeof("topic1_contents") - 1), + mqtt::any(), + false ); return true; }); diff --git a/test/as_buffer_async_pubsub_2.cpp b/test/as_buffer_async_pubsub_2.cpp index 5b1fc6c12..a8adae5b1 100644 --- a/test/as_buffer_async_pubsub_2.cpp +++ b/test/as_buffer_async_pubsub_2.cpp @@ -77,13 +77,11 @@ BOOST_AUTO_TEST_CASE( pub_qos0_sub_qos2 ) { BOOST_TEST(packet_id == pid_sub); BOOST_TEST(results.size() == 1U); BOOST_TEST(*results[0] == mqtt::qos::exactly_once); - auto topic1 = std::make_shared("topic1"); - auto contents = std::make_shared("topic1_contents"); c->async_publish_at_most_once( - as::buffer(*topic1), - as::buffer(*contents), - false, - [topic1, contents](boost::system::error_code const&) {} + as::buffer("topic1", sizeof("topic1") - 1), + as::buffer("topic1_contents", sizeof("topic1_contents") - 1), + mqtt::any(), + false ); return true; }); @@ -156,13 +154,11 @@ BOOST_AUTO_TEST_CASE( pub_qos0_sub_qos2 ) { BOOST_TEST(packet_id == pid_sub); BOOST_TEST(reasons.size() == 1U); BOOST_TEST(reasons[0] == mqtt::v5::reason_code::granted_qos_2); - auto topic1 = std::make_shared("topic1"); - auto contents = std::make_shared("topic1_contents"); c->async_publish_at_most_once( - as::buffer(*topic1), - as::buffer(*contents), - false, - [topic1, contents](boost::system::error_code const&) {} + as::buffer("topic1", sizeof("topic1") - 1), + as::buffer("topic1_contents", sizeof("topic1_contents") - 1), + mqtt::any(), + false ); return true; });