From 89f04bc3b33ebe551c6e5f08eb20c406f60094e2 Mon Sep 17 00:00:00 2001 From: Takatoshi Kondo Date: Thu, 12 Nov 2020 07:59:32 +0900 Subject: [PATCH] Supported ConstBufferSequence payload publish. Single boost::asio::const_buffer payload is originally supported. This PR added scattered payload publish. It is very useful for request - modify - response ( or forward ) pattern. --- include/mqtt/endpoint.hpp | 82 +++++++++----- include/mqtt/message.hpp | 83 +++++++++----- include/mqtt/v5_message.hpp | 48 ++++++-- test/async_pubsub_2.cpp | 218 ++++++++++++++++++++++++++++++++++++ test/message.cpp | 4 +- test/pubsub_1.cpp | 207 ++++++++++++++++++++++++++++++++++ 6 files changed, 570 insertions(+), 72 deletions(-) diff --git a/include/mqtt/endpoint.hpp b/include/mqtt/endpoint.hpp index 192cc1900..9f4301cd7 100644 --- a/include/mqtt/endpoint.hpp +++ b/include/mqtt/endpoint.hpp @@ -1076,7 +1076,7 @@ class endpoint : public std::enable_shared_from_this + typename std::enable_if< + as::is_const_buffer_sequence::value + >::type + publish( packet_id_t packet_id, as::const_buffer topic_name, - as::const_buffer contents, + ConstBufferSequence contents, publish_options pubopts, v5::properties props, any life_keeper @@ -1114,7 +1118,7 @@ class endpoint : public std::enable_shared_from_this + typename std::enable_if< + as::is_const_buffer_sequence::value + >::type + publish( packet_id_t packet_id, as::const_buffer topic_name, - as::const_buffer contents, + ConstBufferSequence contents, publish_options pubopts, any life_keeper ) { @@ -1163,7 +1171,7 @@ class endpoint : public std::enable_shared_from_this + typename std::enable_if< + as::is_const_buffer_sequence::value + >::type + async_publish( packet_id_t packet_id, as::const_buffer topic_name, - as::const_buffer contents, + ConstBufferSequence contents, publish_options pubopts = {}, any life_keeper = {}, async_handler_t func = {} @@ -2310,7 +2322,7 @@ class endpoint : public std::enable_shared_from_this + typename std::enable_if< + as::is_const_buffer_sequence::value + >::type + async_publish( packet_id_t packet_id, as::const_buffer topic_name, - as::const_buffer contents, + ConstBufferSequence contents, publish_options pubopts, v5::properties props, any life_keeper = {}, @@ -2364,7 +2380,7 @@ class endpoint : public std::enable_shared_from_this + typename std::enable_if< + as::is_const_buffer_sequence::value + >::type + send_publish( + packet_id_t packet_id, + as::const_buffer topic_name, + ConstBufferSequence payloads, + publish_options pubopts, + v5::properties props, + any life_keeper) { auto do_send_publish = [&](auto msg, auto const& serialize_publish) { @@ -8854,7 +8874,7 @@ class endpoint : public std::enable_shared_from_this( packet_id, topic_name, - payload, + force_move(payloads), pubopts ), &endpoint::on_serialize_publish_message @@ -8865,7 +8885,7 @@ class endpoint : public std::enable_shared_from_this( packet_id, topic_name, - payload, + force_move(payloads), pubopts, force_move(props) ), @@ -9296,10 +9316,14 @@ class endpoint : public std::enable_shared_from_this + typename std::enable_if< + as::is_const_buffer_sequence::value + >::type + async_send_publish( packet_id_t packet_id, as::const_buffer topic_name, - as::const_buffer payload, + ConstBufferSequence payloads, publish_options pubopts, v5::properties props, any life_keeper, @@ -9339,7 +9363,7 @@ class endpoint : public std::enable_shared_from_this( packet_id, topic_name, - payload, + force_move(payloads), pubopts ), &endpoint::on_serialize_publish_message @@ -9350,7 +9374,7 @@ class endpoint : public std::enable_shared_from_this( packet_id, topic_name, - payload, + force_move(payloads), pubopts, force_move(props) ), diff --git a/include/mqtt/message.hpp b/include/mqtt/message.hpp index 0cab39a33..7e885a4d9 100644 --- a/include/mqtt/message.hpp +++ b/include/mqtt/message.hpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -503,25 +504,40 @@ class connect_message { template class basic_publish_message { public: + template < + typename ConstBufferSequence, + typename std::enable_if< + as::is_const_buffer_sequence::value, + std::nullptr_t + >::type = nullptr + > basic_publish_message( typename packet_id_type::type packet_id, as::const_buffer topic_name, - as::const_buffer payload, + ConstBufferSequence payloads, publish_options pubopts ) : fixed_header_(make_fixed_header(control_packet_type::publish, 0b0000) | pubopts.operator std::uint8_t()), topic_name_(topic_name), topic_name_length_buf_ { num_to_2bytes(boost::numeric_cast(topic_name.size())) }, - payload_(payload), remaining_length_( 2 // topic name length + topic_name_.size() // topic name - + payload_.size() // payload + ( (pubopts.get_qos() == qos::at_least_once || pubopts.get_qos() == qos::exactly_once) ? PacketIdBytes // packet_id : 0) ) { + auto b = as::buffer_sequence_begin(payloads); + auto e = as::buffer_sequence_end(payloads); + auto num_of_payloads = static_cast(std::distance(b, e)); + payloads_.reserve(num_of_payloads); + for (; b != e; ++b) { + auto const& payload = *b; + remaining_length_ += payload.size(); + payloads_.push_back(payload); + } + utf8string_check(topic_name_); auto rb = remaining_bytes(remaining_length_); @@ -578,7 +594,9 @@ class basic_publish_message { break; }; - payload_ = as::buffer(buf); + if (!buf.empty()) { + payloads_.emplace_back(as::buffer(buf)); + } } /** @@ -587,27 +605,20 @@ class basic_publish_message { * @return const buffer sequence */ std::vector const_buffer_sequence() const { - if (packet_id_.empty()) { - return - { - as::buffer(&fixed_header_, 1), - as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()), - as::buffer(topic_name_length_buf_.data(), topic_name_length_buf_.size()), - as::buffer(topic_name_), - as::buffer(payload_) - }; - } - else { - return - { - as::buffer(&fixed_header_, 1), - as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()), - as::buffer(topic_name_length_buf_.data(), topic_name_length_buf_.size()), - as::buffer(topic_name_), - as::buffer(packet_id_.data(), packet_id_.size()), - as::buffer(payload_) - }; + std::vector ret; + ret.reserve( + 5 + // fixed_header, remaining_length_buf, topic_name_length_buf, topic_name, packet_id + payloads_.size() + ); + ret.emplace_back(as::buffer(&fixed_header_, 1)); + ret.emplace_back(as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size())); + ret.emplace_back(as::buffer(topic_name_length_buf_.data(), topic_name_length_buf_.size())); + ret.emplace_back(as::buffer(topic_name_)); + if (!packet_id_.empty()) { + ret.emplace_back(as::buffer(packet_id_.data(), packet_id_.size())); } + std::copy(payloads_.begin(), payloads_.end(), std::back_inserter(ret)); + return ret; } /** @@ -631,7 +642,14 @@ class basic_publish_message { 1 + // remaining length 2 + // topic name length, topic name (packet_id_.empty() ? 0 : 1) + // packet_id - 1; // payload + std::accumulate( + payloads_.begin(), + payloads_.end(), + std::size_t(0), + [](std::size_t s, as::const_buffer const& payload) { + return s + payload.size(); + } + ); } /** @@ -652,7 +670,9 @@ class basic_publish_message { ret.append(get_pointer(topic_name_), get_size(topic_name_)); ret.append(packet_id_.data(), packet_id_.size()); - ret.append(get_pointer(payload_), get_size(payload_)); + for (auto const& payload : payloads_) { + ret.append(get_pointer(payload), get_size(payload)); + } return ret; } @@ -709,8 +729,13 @@ class basic_publish_message { * @brief Get payload * @return payload */ - constexpr string_view payload() const { - return string_view(get_pointer(payload_), get_size(payload_)); + std::vector payload() const { + std::vector ret; + ret.reserve(payloads_.size()); + for (auto const& payload : payloads_) { + ret.emplace_back(get_pointer(payload), get_size(payload)); + } + return ret; } /** @@ -726,7 +751,7 @@ class basic_publish_message { as::const_buffer topic_name_; boost::container::static_vector topic_name_length_buf_; boost::container::static_vector packet_id_; - as::const_buffer payload_; + std::vector payloads_; std::size_t remaining_length_; boost::container::static_vector remaining_length_buf_; }; diff --git a/include/mqtt/v5_message.hpp b/include/mqtt/v5_message.hpp index 210db21d3..7cb6ab75f 100644 --- a/include/mqtt/v5_message.hpp +++ b/include/mqtt/v5_message.hpp @@ -543,10 +543,17 @@ class connack_message { template class basic_publish_message { public: + template < + typename ConstBufferSequence, + typename std::enable_if< + as::is_const_buffer_sequence::value, + std::nullptr_t + >::type = nullptr + > basic_publish_message( typename packet_id_type::type packet_id, as::const_buffer topic_name, - as::const_buffer payload, + ConstBufferSequence payloads, publish_options pubopts, properties props ) @@ -564,11 +571,9 @@ class basic_publish_message { ) ), props_(force_move(props)), - payload_(payload), remaining_length_( 2 // topic name length + topic_name_.size() // topic name - + payload_.size() // payload + ( (pubopts.get_qos() == qos::at_least_once || pubopts.get_qos() == qos::exactly_once) ? PacketIdBytes // packet_id : 0) @@ -587,10 +592,20 @@ class basic_publish_message { [](std::size_t total, property_variant const& pv) { return total + v5::num_of_const_buffer_sequence(pv); } - ) + - 1 // payload + ) ) { + auto b = as::buffer_sequence_begin(payloads); + auto e = as::buffer_sequence_end(payloads); + auto num_of_payloads = static_cast(std::distance(b, e)); + payloads_.reserve(num_of_payloads); + for (; b != e; ++b) { + auto const& payload = *b; + remaining_length_ += payload.size(); + payloads_.push_back(payload); + } + num_of_const_buffer_sequence_ += num_of_payloads; + utf8string_check(topic_name_); auto pb = variable_bytes(property_length_); @@ -670,7 +685,9 @@ class basic_publish_message { props_ = property::parse(buf.substr(0, property_length_)); buf.remove_prefix(property_length_); - payload_ = as::buffer(buf); + if (!buf.empty()) { + payloads_.emplace_back(as::buffer(buf)); + } num_of_const_buffer_sequence_ = 1 + // fixed header 1 + // remaining length @@ -686,7 +703,7 @@ class basic_publish_message { return total + v5::num_of_const_buffer_sequence(pv); } ) + - 1; // payload + payloads_.size(); // payload } /** @@ -712,7 +729,7 @@ class basic_publish_message { v5::add_const_buffer_sequence(ret, p); } - ret.emplace_back(as::buffer(payload_)); + std::copy(payloads_.begin(), payloads_.end(), std::back_inserter(ret)); return ret; } @@ -765,7 +782,9 @@ class basic_publish_message { it += static_cast(v5::size(p)); } - ret.append(get_pointer(payload_), get_size(payload_)); + for (auto const& payload : payloads_) { + ret.append(get_pointer(payload), get_size(payload)); + } return ret; } @@ -822,8 +841,13 @@ class basic_publish_message { * @brief Get payload * @return payload */ - string_view payload() const { - return string_view(get_pointer(payload_), get_size(payload_)); + std::vector payload() const { + std::vector ret; + ret.reserve(payloads_.size()); + for (auto const& payload : payloads_) { + ret.emplace_back(get_pointer(payload), get_size(payload)); + } + return ret; } /** @@ -850,7 +874,7 @@ class basic_publish_message { std::size_t property_length_; boost::container::static_vector property_length_buf_; properties props_; - as::const_buffer payload_; + std::vector payloads_; std::size_t remaining_length_; boost::container::static_vector remaining_length_buf_; std::size_t num_of_const_buffer_sequence_; diff --git a/test/async_pubsub_2.cpp b/test/async_pubsub_2.cpp index ae0396737..3d0bf9644 100644 --- a/test/async_pubsub_2.cpp +++ b/test/async_pubsub_2.cpp @@ -1145,6 +1145,224 @@ BOOST_AUTO_TEST_CASE( publish_function_buffer ) { do_combi_test_async(test); } +BOOST_AUTO_TEST_CASE( publish_function_const_buffer_sequence ) { + auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) { + using packet_id_t = typename std::remove_reference_t::packet_id_t; + c->set_client_id("cid1"); + c->set_clean_session(true); + + std::uint16_t pid_sub; + std::uint16_t pid_unsub; + + + checker chk = { + // connect + cont("h_connack"), + // subscribe topic1 QoS1 + cont("h_suback"), + // publish topic1 QoS1 + cont("h_publish"), + cont("h_puback"), + cont("h_unsuback"), + // disconnect + cont("h_close"), + }; + + switch (c->get_protocol_version()) { + case MQTT_NS::protocol_version::v3_1_1: + c->set_connack_handler( + [&chk, &c, &pid_sub] + (bool sp, MQTT_NS::connect_return_code connack_return_code) { + MQTT_CHK("h_connack"); + BOOST_TEST(sp == false); + BOOST_TEST(connack_return_code == MQTT_NS::connect_return_code::accepted); + pid_sub = c->acquire_unique_packet_id(); + c->async_subscribe(pid_sub, "topic1"_mb, MQTT_NS::qos::at_least_once); + return true; + }); + c->set_puback_handler( + [&chk, &c, &pid_unsub] + (packet_id_t packet_id) { + MQTT_CHK("h_puback"); + BOOST_TEST(packet_id == 1); + pid_unsub = c->acquire_unique_packet_id(); + c->async_unsubscribe(pid_unsub, "topic1"); + return true; + }); + c->set_pubrec_handler( + [] + (std::uint16_t) { + BOOST_CHECK(false); + return true; + }); + c->set_pubcomp_handler( + [] + (std::uint16_t) { + BOOST_CHECK(false); + return true; + }); + c->set_suback_handler( + [&chk, &c, &pid_sub] + (packet_id_t packet_id, std::vector results) { + MQTT_CHK("h_suback"); + BOOST_TEST(packet_id == pid_sub); + BOOST_TEST(results.size() == 1U); + BOOST_TEST(results[0] == MQTT_NS::suback_return_code::success_maximum_qos_1); + c->register_packet_id(1); + auto topic_name = std::make_shared("topic1"); + auto s1 = std::make_shared("topic"); + auto s2 = std::make_shared("1"); + auto s3 = std::make_shared("_"); + auto s4 = std::make_shared("contents"); + std::vector cbs { + as::buffer(*s1), + as::buffer(*s2), + as::buffer(*s3), + as::buffer(*s4) + }; + c->async_publish( + 1, + as::buffer(*topic_name), + cbs, + MQTT_NS::qos::at_least_once | MQTT_NS::dup::yes, + std::make_tuple(topic_name, s1, s2, s3, s4) + ); + return true; + }); + c->set_unsuback_handler( + [&chk, &c, &pid_unsub] + (packet_id_t packet_id) { + MQTT_CHK("h_unsuback"); + BOOST_TEST(packet_id == pid_unsub); + c->async_disconnect(); + return true; + }); + c->set_publish_handler( + [&chk] + (MQTT_NS::optional packet_id, + MQTT_NS::publish_options pubopts, + MQTT_NS::buffer topic, + MQTT_NS::buffer contents) { + MQTT_CHK("h_publish"); + BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::no); + BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::at_least_once); + BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no); + BOOST_CHECK(packet_id.value() == 1); + BOOST_TEST(topic == "topic1"); + BOOST_TEST(contents == "topic1_contents"); + return true; + }); + break; + case MQTT_NS::protocol_version::v5: + c->set_v5_connack_handler( + [&chk, &c, &pid_sub] + (bool sp, MQTT_NS::v5::connect_reason_code connack_return_code, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_connack"); + BOOST_TEST(sp == false); + BOOST_TEST(connack_return_code == MQTT_NS::v5::connect_reason_code::success); + pid_sub = c->acquire_unique_packet_id(); + c->async_subscribe(pid_sub, "topic1"_mb, MQTT_NS::qos::at_least_once); + return true; + }); + c->set_v5_puback_handler( + [&chk, &c, &pid_unsub] + (packet_id_t packet_id, MQTT_NS::v5::puback_reason_code, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_puback"); + BOOST_TEST(packet_id == 1); + pid_unsub = c->acquire_unique_packet_id(); + c->async_unsubscribe(pid_unsub, "topic1"); + return true; + }); + c->set_v5_pubrec_handler( + [] + (packet_id_t, MQTT_NS::v5::pubrec_reason_code, MQTT_NS::v5::properties /*props*/) { + BOOST_CHECK(false); + return true; + }); + c->set_v5_pubcomp_handler( + [] + (packet_id_t, MQTT_NS::v5::pubcomp_reason_code, MQTT_NS::v5::properties /*props*/) { + BOOST_CHECK(false); + return true; + }); + c->set_v5_suback_handler( + [&chk, &c, &pid_sub] + (packet_id_t packet_id, std::vector reasons, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_suback"); + BOOST_TEST(packet_id == pid_sub); + BOOST_TEST(reasons.size() == 1U); + BOOST_TEST(reasons[0] == MQTT_NS::v5::suback_reason_code::granted_qos_1); + c->register_packet_id(1); + auto topic_name = std::make_shared("topic1"); + auto s1 = std::make_shared("topic"); + auto s2 = std::make_shared("1"); + auto s3 = std::make_shared("_"); + auto s4 = std::make_shared("contents"); + std::vector cbs { + as::buffer(*s1), + as::buffer(*s2), + as::buffer(*s3), + as::buffer(*s4) + }; + c->async_publish( + 1, + as::buffer(*topic_name), + cbs, + MQTT_NS::qos::at_least_once | MQTT_NS::dup::yes, + std::make_tuple(topic_name, s1, s2, s3, s4) + ); + return true; + }); + c->set_v5_unsuback_handler( + [&chk, &c, &pid_unsub] + (packet_id_t packet_id, std::vector reasons, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_unsuback"); + BOOST_TEST(packet_id == pid_unsub); + BOOST_TEST(reasons.size() == 1U); + BOOST_TEST(reasons[0] == MQTT_NS::v5::unsuback_reason_code::success); + c->async_disconnect(); + return true; + }); + c->set_v5_publish_handler( + [&chk] + (MQTT_NS::optional packet_id, + MQTT_NS::publish_options pubopts, + MQTT_NS::buffer topic, + MQTT_NS::buffer contents, + MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_publish"); + BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::no); + BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::at_least_once); + BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no); + BOOST_CHECK(packet_id.value() == 1); + BOOST_TEST(topic == "topic1"); + BOOST_TEST(contents == "topic1_contents"); + return true; + }); + break; + default: + BOOST_CHECK(false); + break; + } + + c->set_close_handler( + [&chk, &finish] + () { + MQTT_CHK("h_close"); + finish(); + }); + c->set_error_handler( + [] + (MQTT_NS::error_code) { + BOOST_CHECK(false); + }); + c->async_connect(); + ioc.run(); + BOOST_TEST(chk.all()); + }; + do_combi_test_async(test); +} + BOOST_AUTO_TEST_CASE( pub_sub_prop ) { auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) { if (c->get_protocol_version() != MQTT_NS::protocol_version::v5) { diff --git a/test/message.cpp b/test/message.cpp index 27563f307..d85bbb182 100644 --- a/test/message.cpp +++ b/test/message.cpp @@ -255,7 +255,7 @@ BOOST_AUTO_TEST_CASE( publish_get_attributes1 ) { BOOST_TEST(m.is_retain() == true); BOOST_TEST(m.is_dup() == false); BOOST_TEST(m.topic() == "1234"); - BOOST_TEST(m.payload() == ""); + BOOST_TEST(m.payload().empty()); } catch (...) { BOOST_TEST(false); @@ -283,7 +283,7 @@ BOOST_AUTO_TEST_CASE( publish_get_attributes2 ) { BOOST_TEST(m.is_retain() == false); BOOST_TEST(m.is_dup() == true); BOOST_TEST(m.topic() == "1234"); - BOOST_TEST(m.payload() == "AB"); + BOOST_TEST(m.payload().front() == "AB"); BOOST_TEST(m.continuous_buffer() == buf); } catch (...) { diff --git a/test/pubsub_1.cpp b/test/pubsub_1.cpp index 8cc571a71..5c476463b 100644 --- a/test/pubsub_1.cpp +++ b/test/pubsub_1.cpp @@ -2072,6 +2072,213 @@ BOOST_AUTO_TEST_CASE( publish_function_buffer ) { do_combi_test_sync(test); } +BOOST_AUTO_TEST_CASE( publish_function_const_buffer_sequence ) { + auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) { + using packet_id_t = typename std::remove_reference_t::packet_id_t; + c->set_client_id("cid1"); + c->set_clean_session(true); + + packet_id_t pid_sub; + packet_id_t pid_unsub; + + + checker chk = { + // connect + cont("h_connack"), + // subscribe topic1 QoS0 + cont("h_suback"), + // publish topic1 QoS0 + cont("h_publish"), + cont("h_unsuback"), + // disconnect + cont("h_close"), + }; + + switch (c->get_protocol_version()) { + case MQTT_NS::protocol_version::v3_1_1: + c->set_connack_handler( + [&chk, &c, &pid_sub] + (bool sp, MQTT_NS::connect_return_code connack_return_code) { + MQTT_CHK("h_connack"); + BOOST_TEST(sp == false); + BOOST_TEST(connack_return_code == MQTT_NS::connect_return_code::accepted); + pid_sub = c->subscribe("topic1"_mb, MQTT_NS::qos::at_most_once); + return true; + }); + c->set_puback_handler( + [] + (packet_id_t) { + BOOST_CHECK(false); + return true; + }); + c->set_pubrec_handler( + [] + (packet_id_t) { + BOOST_CHECK(false); + return true; + }); + c->set_pubcomp_handler( + [] + (packet_id_t) { + BOOST_CHECK(false); + return true; + }); + c->set_suback_handler( + [&chk, &c, &pid_sub] + (packet_id_t packet_id, std::vector results) { + MQTT_CHK("h_suback"); + BOOST_TEST(packet_id == pid_sub); + BOOST_TEST(results.size() == 1U); + BOOST_TEST(results[0] == MQTT_NS::suback_return_code::success_maximum_qos_0); + auto topic_name = std::make_shared("topic1"); + auto s1 = std::make_shared("topic"); + auto s2 = std::make_shared("1"); + auto s3 = std::make_shared("_"); + auto s4 = std::make_shared("contents"); + std::vector cbs { + as::buffer(*s1), + as::buffer(*s2), + as::buffer(*s3), + as::buffer(*s4) + }; + c->publish( + as::buffer(*topic_name), + cbs, + MQTT_NS::qos::at_most_once, + std::make_tuple(topic_name, s1, s2, s3, s4) + ); + return true; + }); + c->set_unsuback_handler( + [&chk, &c, &pid_unsub] + (packet_id_t packet_id) { + MQTT_CHK("h_unsuback"); + BOOST_TEST(packet_id == pid_unsub); + c->disconnect(); + return true; + }); + c->set_publish_handler( + [&chk, &c, &pid_unsub] + (MQTT_NS::optional packet_id, + MQTT_NS::publish_options pubopts, + MQTT_NS::buffer topic, + MQTT_NS::buffer contents) { + MQTT_CHK("h_publish"); + BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::no); + BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::at_most_once); + BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no); + BOOST_CHECK(!packet_id); + BOOST_TEST(topic == "topic1"); + BOOST_TEST(contents == "topic1_contents"); + pid_unsub = c->unsubscribe("topic1"_mb); + return true; + }); + break; + case MQTT_NS::protocol_version::v5: + c->set_v5_connack_handler( + [&chk, &c, &pid_sub] + (bool sp, MQTT_NS::v5::connect_reason_code connack_return_code, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_connack"); + BOOST_TEST(sp == false); + BOOST_TEST(connack_return_code == MQTT_NS::v5::connect_reason_code::success); + pid_sub = c->subscribe("topic1"_mb, MQTT_NS::qos::at_most_once); + return true; + }); + c->set_v5_puback_handler( + [] + (packet_id_t, MQTT_NS::v5::puback_reason_code, MQTT_NS::v5::properties /*props*/) { + BOOST_CHECK(false); + return true; + }); + c->set_v5_pubrec_handler( + [] + (packet_id_t, MQTT_NS::v5::pubrec_reason_code, MQTT_NS::v5::properties /*props*/) { + BOOST_CHECK(false); + return true; + }); + c->set_v5_pubcomp_handler( + [] + (packet_id_t, MQTT_NS::v5::pubcomp_reason_code, MQTT_NS::v5::properties /*props*/) { + BOOST_CHECK(false); + return true; + }); + c->set_v5_suback_handler( + [&chk, &c, &pid_sub] + (packet_id_t packet_id, std::vector reasons, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_suback"); + BOOST_TEST(packet_id == pid_sub); + BOOST_TEST(reasons.size() == 1U); + BOOST_TEST(reasons[0] == MQTT_NS::v5::suback_reason_code::granted_qos_0); + auto topic_name = std::make_shared("topic1"); + auto s1 = std::make_shared("topic"); + auto s2 = std::make_shared("1"); + auto s3 = std::make_shared("_"); + auto s4 = std::make_shared("contents"); + std::vector cbs { + as::buffer(*s1), + as::buffer(*s2), + as::buffer(*s3), + as::buffer(*s4) + }; + c->publish( + as::buffer(*topic_name), + cbs, + MQTT_NS::qos::at_most_once, + std::make_tuple(topic_name, s1, s2, s3, s4) + ); + return true; + }); + c->set_v5_unsuback_handler( + [&chk, &c, &pid_unsub] + (packet_id_t packet_id, std::vector reasons, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_unsuback"); + BOOST_TEST(packet_id == pid_unsub); + BOOST_TEST(reasons.size() == 1U); + BOOST_TEST(reasons[0] == MQTT_NS::v5::unsuback_reason_code::success); + c->disconnect(); + return true; + }); + c->set_v5_publish_handler( + [&chk, &c, &pid_unsub] + (MQTT_NS::optional packet_id, + MQTT_NS::publish_options pubopts, + MQTT_NS::buffer topic, + MQTT_NS::buffer contents, + MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_publish"); + BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::no); + BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::at_most_once); + BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no); + BOOST_CHECK(!packet_id); + BOOST_TEST(topic == "topic1"); + BOOST_TEST(contents == "topic1_contents"); + pid_unsub = c->unsubscribe("topic1"_mb); + return true; + }); + break; + default: + BOOST_CHECK(false); + break; + } + + c->set_close_handler( + [&chk, &finish] + () { + MQTT_CHK("h_close"); + finish(); + }); + c->set_error_handler( + [] + (MQTT_NS::error_code) { + BOOST_CHECK(false); + }); + c->connect(); + ioc.run(); + BOOST_TEST(chk.all()); + }; + do_combi_test_sync(test); +} + BOOST_AUTO_TEST_CASE( publish_dup_function ) { auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) { using packet_id_t = typename std::remove_reference_t::packet_id_t;