diff --git a/include/mqtt/broker/broker.hpp b/include/mqtt/broker/broker.hpp index 684f306fd..0a9ffa441 100644 --- a/include/mqtt/broker/broker.hpp +++ b/include/mqtt/broker/broker.hpp @@ -1015,7 +1015,7 @@ class broker_t { optional session_expiry_interval; optional will_expiry_interval; v5::properties connack_props; - + bool response_topic_requested = false; if (ep.get_protocol_version() == protocol_version::v5) { { auto v = get_property(props); @@ -1026,9 +1026,7 @@ class broker_t { { auto v = get_property(props); if (v && v.value().val() == 1) { - connack_props.emplace_back( - v5::property::response_topic(allocate_buffer(create_uuid_string())) - ); + response_topic_requested = true; } } @@ -1109,8 +1107,34 @@ class broker_t { break; } + auto set_response_topic = + [this, response_topic_requested, &connack_props](session_state& s) { + if (response_topic_requested) { + auto response_topic = [&] { + if (auto rt_opt = s.get_response_topic()) { + return rt_opt.value(); + } + auto rt = create_uuid_string(); + s.set_response_topic(rt); + return rt; + } (); + s.set_clean_handler( + [this, response_topic] { + std::lock_guard g(mtx_retains_); + retains_.erase(response_topic); + } + ); + connack_props.emplace_back( + v5::property::response_topic( + allocate_buffer(response_topic) + ) + ); + } + }; + auto send_connack = - [&](bool session_present, std::function finish = [](error_code){}) { + [this, &ep, &connack_props] + (bool session_present, std::function finish = [](error_code){}) { // Reply to the connect message. switch (ep.get_protocol_version()) { case protocol_version::v3_1_1: @@ -1158,6 +1182,7 @@ class broker_t { } }; + /** * http://docs.oasis-open.org/mqtt/mqtt/v5.0/cs02/mqtt-v5.0-cs02.html#_Toc514345311 * 3.1.2.4 Clean Start @@ -1192,6 +1217,8 @@ class broker_t { force_move(will_expiry_interval), force_move(session_expiry_interval) ); + // set_response_topic never modify key part + set_response_topic(const_cast(*it)); send_connack(false); } else if (it->online()) { @@ -1204,6 +1231,8 @@ class broker_t { << MQTT_ADD_VALUE(address, this) << "cid:" << client_id << "online connection exists, discard old one due to new one's clean_start and renew"; + // set_response_topic never modify key part + set_response_topic(const_cast(*it)); send_connack(false); idx.modify( it, @@ -1222,6 +1251,8 @@ class broker_t { << MQTT_ADD_VALUE(address, this) << "cid:" << client_id << "online connection exists, inherit old one and renew"; + // set_response_topic never modify key part + set_response_topic(const_cast(*it)); send_connack( true, [ @@ -1279,6 +1310,8 @@ class broker_t { force_move(session_expiry_interval) ); BOOST_ASSERT(inserted); + // set_response_topic never modify key part + set_response_topic(const_cast(*it)); send_connack(false); } } @@ -1290,6 +1323,8 @@ class broker_t { << MQTT_ADD_VALUE(address, this) << "cid:" << client_id << "offline connection exists, discard old one due to new one's clean_start and renew"; + // set_response_topic never modify key part + set_response_topic(const_cast(*it)); send_connack(false); idx.modify( it, @@ -1309,6 +1344,8 @@ class broker_t { << MQTT_ADD_VALUE(address, this) << "cid:" << client_id << "offline connection exists, inherit old one and renew"; + // set_response_topic never modify key part + set_response_topic(const_cast(*it)); send_connack( true, [ diff --git a/include/mqtt/broker/session_state.hpp b/include/mqtt/broker/session_state.hpp index 8a553cf84..dec469206 100644 --- a/include/mqtt/broker/session_state.hpp +++ b/include/mqtt/broker/session_state.hpp @@ -281,10 +281,15 @@ struct session_state { } } + void set_clean_handler(std::function handler) { + clean_handler_ = force_move(handler); + } + void clean() { MQTT_LOG("mqtt_broker", trace) << MQTT_ADD_VALUE(address, this) << "clean"; + if (clean_handler_) clean_handler_(); { std::lock_guard g(mtx_inflight_messages_); inflight_messages_.clear(); @@ -509,6 +514,14 @@ struct session_state { return session_expiry_interval_; } + void set_response_topic(std::string topic) { + response_topic_.emplace(force_move(topic)); + } + + optional get_response_topic() const { + return response_topic_; + } + private: void send_will_impl() { if (!will_value_) return; @@ -576,6 +589,9 @@ struct session_state { bool remain_after_close_; std::set qos2_publish_handled_; + + optional response_topic_; + std::function clean_handler_; }; class session_states { diff --git a/include/mqtt/broker/uuid.hpp b/include/mqtt/broker/uuid.hpp index 3956030f2..ba59a8db0 100644 --- a/include/mqtt/broker/uuid.hpp +++ b/include/mqtt/broker/uuid.hpp @@ -19,7 +19,7 @@ MQTT_BROKER_NS_BEGIN inline std::string create_uuid_string() { // See https://github.com/boostorg/uuid/issues/121 - static thread_local auto gen = boost::uuids::random_generator(); + thread_local auto gen = boost::uuids::random_generator(); return boost::uuids::to_string(gen()); } diff --git a/test/system/CMakeLists.txt b/test/system/CMakeLists.txt index 68a832ba6..4047bc6f4 100644 --- a/test/system/CMakeLists.txt +++ b/test/system/CMakeLists.txt @@ -43,6 +43,7 @@ ENDIF () IF (MQTT_TEST_5) LIST (APPEND check_PROGRAMS + st_reqres.cpp st_resend.cpp st_retain_1.cpp st_resend_new_client.cpp diff --git a/test/system/st_connect.cpp b/test/system/st_connect.cpp index aec46e4e7..c74e70cb3 100644 --- a/test/system/st_connect.cpp +++ b/test/system/st_connect.cpp @@ -1936,70 +1936,6 @@ BOOST_AUTO_TEST_CASE( connack_prop ) { do_combi_test_sync(test); } -BOOST_AUTO_TEST_CASE( request_response ) { - auto test = [](boost::asio::io_context& ioc, auto& cs, auto finish, auto& /*b*/) { - auto& c = cs[0]; - clear_ordered(); - if (c->get_protocol_version() != MQTT_NS::protocol_version::v5) { - finish(); - return; - } - - c->set_client_id("cid1"); - c->set_clean_session(true); - BOOST_TEST(c->connected() == false); - - checker chk = { - // connect - cont("h_connack"), - // disconnect - cont("h_close"), - }; - - c->set_v5_connack_handler( - [&chk, &c] - (bool sp, MQTT_NS::v5::connect_reason_code connect_reason_code, MQTT_NS::v5::properties props) { - MQTT_CHK("h_connack"); - BOOST_TEST(c->connected() == true); - BOOST_TEST(sp == false); - BOOST_TEST(connect_reason_code == MQTT_NS::v5::connect_reason_code::success); - - std::size_t times = 0; - MQTT_NS::v5::visit_props( - props, - [&](MQTT_NS::v5::property::response_topic const&) { - ++times; - }, - [](auto){} - ); - BOOST_TEST(times == 1); - - c->disconnect(); - BOOST_TEST(c->connected() == true); - return true; - }); - c->set_close_handler( - [&chk, &finish] - () { - MQTT_CHK("h_close"); - finish(); - }); - c->set_error_handler( - [] - (MQTT_NS::error_code) { - BOOST_CHECK(false); - }); - c->connect( - MQTT_NS::v5::properties{ - MQTT_NS::v5::property::request_response_information(true) - } - ); - ioc.run(); - BOOST_TEST(chk.all()); - }; - do_combi_test_sync(test); -} - BOOST_AUTO_TEST_CASE( session_taken_over ) { auto test = [](boost::asio::io_context& ioc, auto& cs, auto finish, auto& /*b*/) { auto& c1 = cs[0]; diff --git a/test/system/st_reqres.cpp b/test/system/st_reqres.cpp new file mode 100644 index 000000000..213b31c70 --- /dev/null +++ b/test/system/st_reqres.cpp @@ -0,0 +1,445 @@ +// Copyright Takatoshi Kondo 2015 +// +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include "../common/test_main.hpp" +#include "combi_test.hpp" +#include "checker.hpp" +#include "ordered_caller.hpp" +#include "test_util.hpp" +#include "../common/global_fixture.hpp" + +BOOST_AUTO_TEST_SUITE(st_reqres) + +using namespace MQTT_NS::literals; + +BOOST_AUTO_TEST_CASE( pubsub ) { + auto test = [](boost::asio::io_context& ioc, auto& cs, auto finish, auto& /*b*/) { + auto& c = cs[0]; + clear_ordered(); + using packet_id_t = typename std::remove_reference_t::packet_id_t; + if (c->get_protocol_version() != MQTT_NS::protocol_version::v5) { + finish(); + return; + } + + c->set_client_id("cid1"); + + std::string response_topic; + + checker chk = { + // connect + cont("h_connack"), + // subscribe + cont("h_suback"), + // publish + cont("h_publish"), + // disconnect + cont("h_close"), + }; + + c->set_v5_connack_handler( + [&] + (bool sp, MQTT_NS::v5::connect_reason_code connect_reason_code, MQTT_NS::v5::properties props) { + MQTT_CHK("h_connack"); + BOOST_TEST(sp == false); + BOOST_TEST(connect_reason_code == MQTT_NS::v5::connect_reason_code::success); + + std::size_t times = 0; + MQTT_NS::v5::visit_props( + props, + [&](MQTT_NS::v5::property::response_topic const& v) { + ++times; + response_topic = std::string(v.val()); + }, + [](auto){} + ); + BOOST_TEST(times == 1); + + c->subscribe(response_topic, MQTT_NS::qos::at_most_once); + return true; + }); + c->set_v5_suback_handler( + [&] + (packet_id_t /*packet_id*/, + std::vector reasons, + MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_suback"); + BOOST_TEST(reasons.size() == 1U); + BOOST_TEST(reasons[0] == MQTT_NS::v5::suback_reason_code::granted_qos_0); + + c->publish(response_topic, "response_contents", MQTT_NS::qos::at_most_once); + return true; + }); + c->set_v5_publish_handler( + [&] + (MQTT_NS::optional /*packet_id*/, + MQTT_NS::publish_options /*pubopts*/, + MQTT_NS::buffer topic, + MQTT_NS::buffer contents, + MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_publish"); + BOOST_TEST(topic == response_topic); + BOOST_TEST(contents == "response_contents"); + c->disconnect(); + return true; + }); + c->set_close_handler( + [&] + () { + MQTT_CHK("h_close"); + finish(); + }); + c->set_error_handler( + [] + (MQTT_NS::error_code) { + BOOST_CHECK(false); + }); + c->set_clean_session(true); + c->connect( + MQTT_NS::v5::properties{ + MQTT_NS::v5::property::request_response_information(true), + } + ); + ioc.run(); + BOOST_TEST(chk.all()); + }; + do_combi_test_sync(test); +} + +BOOST_AUTO_TEST_CASE( session ) { + auto test = [](boost::asio::io_context& ioc, auto& cs, auto finish, auto& /*b*/) { + auto& c = cs[0]; + clear_ordered(); + if (c->get_protocol_version() != MQTT_NS::protocol_version::v5) { + finish(); + return; + } + + c->set_client_id("cid1"); + + std::string response_topic1; + std::string response_topic2; + + checker chk = { + // connect + cont("h_connack1"), + // disconnect + cont("h_close1"), + + // connect + cont("h_connack2"), + // disconnect + cont("h_close2"), + + // connect + cont("h_connack3"), + // disconnect + cont("h_close3"), + }; + + c->set_v5_connack_handler( + [&] + (bool sp, MQTT_NS::v5::connect_reason_code connect_reason_code, MQTT_NS::v5::properties props) { + auto ret = MQTT_ORDERED( + [&] { + MQTT_CHK("h_connack1"); + BOOST_TEST(sp == false); + BOOST_TEST(connect_reason_code == MQTT_NS::v5::connect_reason_code::success); + + std::size_t times = 0; + MQTT_NS::v5::visit_props( + props, + [&](MQTT_NS::v5::property::response_topic const& v) { + ++times; + response_topic1 = std::string(v.val()); + }, + [](auto){} + ); + BOOST_TEST(times == 1); + }, + [&] { + MQTT_CHK("h_connack2"); + BOOST_TEST(sp == true); + BOOST_TEST(connect_reason_code == MQTT_NS::v5::connect_reason_code::success); + + std::size_t times = 0; + MQTT_NS::v5::visit_props( + props, + [&](MQTT_NS::v5::property::response_topic const& v) { + ++times; + response_topic2 = std::string(v.val()); + }, + [](auto){} + ); + BOOST_TEST(times == 1); + BOOST_TEST(response_topic1 == response_topic2); + }, + [&] { + MQTT_CHK("h_connack3"); + BOOST_TEST(sp == false); + BOOST_TEST(connect_reason_code == MQTT_NS::v5::connect_reason_code::success); + + std::size_t times = 0; + MQTT_NS::v5::visit_props( + props, + [&](MQTT_NS::v5::property::response_topic const& v) { + ++times; + BOOST_TEST(v.val() != response_topic1); + BOOST_TEST(v.val() != response_topic2); + }, + [](auto){} + ); + BOOST_TEST(times == 1); + } + ); + BOOST_TEST(ret); + c->disconnect(); + return true; + }); + c->set_close_handler( + [&] + () { + auto ret = MQTT_ORDERED( + [&] { + MQTT_CHK("h_close1"); + c->set_clean_session(false); + // session will expire on disconnect + c->connect( + MQTT_NS::v5::properties{ + MQTT_NS::v5::property::request_response_information(true) + } + ); + }, + [&] { + MQTT_CHK("h_close2"); + // no session exists even if clean false + c->set_clean_session(false); + c->connect( + MQTT_NS::v5::properties{ + MQTT_NS::v5::property::request_response_information(true) + } + ); + }, + [&] { + MQTT_CHK("h_close3"); + finish(); + } + ); + BOOST_TEST(ret); + }); + c->set_error_handler( + [] + (MQTT_NS::error_code) { + BOOST_CHECK(false); + }); + c->set_clean_session(true); + c->connect( + MQTT_NS::v5::properties{ + MQTT_NS::v5::property::request_response_information(true), + MQTT_NS::v5::property::session_expiry_interval( + MQTT_NS::session_never_expire + ) + } + ); + ioc.run(); + BOOST_TEST(chk.all()); + }; + do_combi_test_sync(test); +} + +BOOST_AUTO_TEST_CASE( retain ) { + auto test = [](boost::asio::io_context& ioc, auto& cs, auto finish, auto& /*b*/) { + auto& c = cs[0]; + clear_ordered(); + using packet_id_t = typename std::remove_reference_t::packet_id_t; + if (c->get_protocol_version() != MQTT_NS::protocol_version::v5) { + finish(); + return; + } + + c->set_client_id("cid1"); + + std::string response_topic1; + std::string response_topic2; + + // postfix number is a connection number in this scenario + checker chk = { + // connect + cont("h_connack1"), + // publish retain + // disconnect + cont("h_close1"), + + // connect + cont("h_connack2"), + // subscribe + cont("h_suback2"), + // receive retained publish + cont("h_publish2"), + // disconnect + cont("h_close2"), + + // connect + cont("h_connack3"), + // subscribe + cont("h_suback3"), + // disconnect + cont("h_close3"), + }; + + c->set_v5_connack_handler( + [&] + (bool sp, MQTT_NS::v5::connect_reason_code connect_reason_code, MQTT_NS::v5::properties props) { + auto ret = MQTT_ORDERED( + [&] { + MQTT_CHK("h_connack1"); + BOOST_TEST(sp == false); + BOOST_TEST(connect_reason_code == MQTT_NS::v5::connect_reason_code::success); + + std::size_t times = 0; + MQTT_NS::v5::visit_props( + props, + [&](MQTT_NS::v5::property::response_topic const& v) { + ++times; + response_topic1 = std::string(v.val()); + }, + [](auto){} + ); + BOOST_TEST(times == 1); + c->publish( + response_topic1, + "response_contents", + MQTT_NS::qos::at_most_once | MQTT_NS::retain::yes + ); + c->disconnect(); + }, + [&] { + MQTT_CHK("h_connack2"); + BOOST_TEST(sp == true); + BOOST_TEST(connect_reason_code == MQTT_NS::v5::connect_reason_code::success); + + std::size_t times = 0; + MQTT_NS::v5::visit_props( + props, + [&](MQTT_NS::v5::property::response_topic const& v) { + ++times; + response_topic2 = std::string(v.val()); + }, + [](auto){} + ); + BOOST_TEST(times == 1); + BOOST_TEST(response_topic1 == response_topic2); + c->subscribe(response_topic2, MQTT_NS::qos::at_most_once); + }, + [&] { + MQTT_CHK("h_connack3"); + BOOST_TEST(sp == false); + BOOST_TEST(connect_reason_code == MQTT_NS::v5::connect_reason_code::success); + + std::size_t times = 0; + MQTT_NS::v5::visit_props( + props, + [&](MQTT_NS::v5::property::response_topic const&) { + ++times; + }, + [](auto){} + ); + BOOST_TEST(times == 1); + // subscribe erased (previous) response topic + c->subscribe(response_topic2, MQTT_NS::qos::at_most_once); + } + ); + BOOST_TEST(ret); + return true; + }); + c->set_v5_suback_handler( + [&] + (packet_id_t /*packet_id*/, + std::vector reasons, + MQTT_NS::v5::properties /*props*/) { + auto ret = MQTT_ORDERED( + [&] { + MQTT_CHK("h_suback2"); + BOOST_TEST(reasons.size() == 1U); + BOOST_TEST(reasons[0] == MQTT_NS::v5::suback_reason_code::granted_qos_0); + }, + [&] { + MQTT_CHK("h_suback3"); + BOOST_TEST(reasons.size() == 1U); + // When authorization mechanism is introduced it will ought to be failed + BOOST_TEST(reasons[0] == MQTT_NS::v5::suback_reason_code::granted_qos_0); + c->disconnect(); + } + ); + BOOST_TEST(ret); + return true; + }); + c->set_v5_publish_handler( + [&] + (MQTT_NS::optional /*packet_id*/, + MQTT_NS::publish_options pubopts, + MQTT_NS::buffer topic, + MQTT_NS::buffer contents, + MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_publish2"); + BOOST_TEST(topic == response_topic2); + BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::yes); + BOOST_TEST(contents == "response_contents"); + c->disconnect(); + return true; + }); + c->set_close_handler( + [&] + () { + auto ret = MQTT_ORDERED( + [&] { + MQTT_CHK("h_close1"); + c->set_clean_session(false); + // session will expire on disconnect + c->connect( + MQTT_NS::v5::properties{ + MQTT_NS::v5::property::request_response_information(true) + } + ); + }, + [&] { + MQTT_CHK("h_close2"); + // no session exists even if clean false + c->set_clean_session(false); + c->connect( + MQTT_NS::v5::properties{ + MQTT_NS::v5::property::request_response_information(true) + } + ); + }, + [&] { + MQTT_CHK("h_close3"); + finish(); + } + ); + BOOST_TEST(ret); + }); + c->set_error_handler( + [] + (MQTT_NS::error_code) { + BOOST_CHECK(false); + }); + c->set_clean_session(true); + c->connect( + MQTT_NS::v5::properties{ + MQTT_NS::v5::property::request_response_information(true), + MQTT_NS::v5::property::session_expiry_interval( + MQTT_NS::session_never_expire + ) + } + ); + ioc.run(); + BOOST_TEST(chk.all()); + }; + do_combi_test_sync(test); +} + +BOOST_AUTO_TEST_SUITE_END()