From 361487322cfcf1c3c2b5ac59d0b5197a2d530644 Mon Sep 17 00:00:00 2001 From: Takatoshi Kondo Date: Tue, 19 Oct 2021 09:20:42 +0900 Subject: [PATCH 1/2] Removed redundant static specifier. --- include/mqtt/broker/uuid.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/mqtt/broker/uuid.hpp b/include/mqtt/broker/uuid.hpp index 3956030f2..ba59a8db0 100644 --- a/include/mqtt/broker/uuid.hpp +++ b/include/mqtt/broker/uuid.hpp @@ -19,7 +19,7 @@ MQTT_BROKER_NS_BEGIN inline std::string create_uuid_string() { // See https://github.com/boostorg/uuid/issues/121 - static thread_local auto gen = boost::uuids::random_generator(); + thread_local auto gen = boost::uuids::random_generator(); return boost::uuids::to_string(gen()); } From 21ebcce13ce9692af7da9b138c72f470924c4f55 Mon Sep 17 00:00:00 2001 From: Takatoshi Kondo Date: Tue, 19 Oct 2021 11:33:59 +0900 Subject: [PATCH 2/2] Refined response topic life time. 1. If the previous session doesn't exist then response topic is newly generated. The previous session is erased by session expiry interval of the previous session and clean start true of the new session. 2. If the presivous session exists and the session has response topic, then reuse it. --- include/mqtt/broker/broker.hpp | 47 ++- include/mqtt/broker/session_state.hpp | 16 + test/system/CMakeLists.txt | 1 + test/system/st_connect.cpp | 64 ---- test/system/st_reqres.cpp | 445 ++++++++++++++++++++++++++ 5 files changed, 504 insertions(+), 69 deletions(-) create mode 100644 test/system/st_reqres.cpp diff --git a/include/mqtt/broker/broker.hpp b/include/mqtt/broker/broker.hpp index 684f306fd..0a9ffa441 100644 --- a/include/mqtt/broker/broker.hpp +++ b/include/mqtt/broker/broker.hpp @@ -1015,7 +1015,7 @@ class broker_t { optional session_expiry_interval; optional will_expiry_interval; v5::properties connack_props; - + bool response_topic_requested = false; if (ep.get_protocol_version() == protocol_version::v5) { { auto v = get_property(props); @@ -1026,9 +1026,7 @@ class broker_t { { auto v = get_property(props); if (v && v.value().val() == 1) { - connack_props.emplace_back( - v5::property::response_topic(allocate_buffer(create_uuid_string())) - ); + response_topic_requested = true; } } @@ -1109,8 +1107,34 @@ class broker_t { break; } + auto set_response_topic = + [this, response_topic_requested, &connack_props](session_state& s) { + if (response_topic_requested) { + auto response_topic = [&] { + if (auto rt_opt = s.get_response_topic()) { + return rt_opt.value(); + } + auto rt = create_uuid_string(); + s.set_response_topic(rt); + return rt; + } (); + s.set_clean_handler( + [this, response_topic] { + std::lock_guard g(mtx_retains_); + retains_.erase(response_topic); + } + ); + connack_props.emplace_back( + v5::property::response_topic( + allocate_buffer(response_topic) + ) + ); + } + }; + auto send_connack = - [&](bool session_present, std::function finish = [](error_code){}) { + [this, &ep, &connack_props] + (bool session_present, std::function finish = [](error_code){}) { // Reply to the connect message. switch (ep.get_protocol_version()) { case protocol_version::v3_1_1: @@ -1158,6 +1182,7 @@ class broker_t { } }; + /** * http://docs.oasis-open.org/mqtt/mqtt/v5.0/cs02/mqtt-v5.0-cs02.html#_Toc514345311 * 3.1.2.4 Clean Start @@ -1192,6 +1217,8 @@ class broker_t { force_move(will_expiry_interval), force_move(session_expiry_interval) ); + // set_response_topic never modify key part + set_response_topic(const_cast(*it)); send_connack(false); } else if (it->online()) { @@ -1204,6 +1231,8 @@ class broker_t { << MQTT_ADD_VALUE(address, this) << "cid:" << client_id << "online connection exists, discard old one due to new one's clean_start and renew"; + // set_response_topic never modify key part + set_response_topic(const_cast(*it)); send_connack(false); idx.modify( it, @@ -1222,6 +1251,8 @@ class broker_t { << MQTT_ADD_VALUE(address, this) << "cid:" << client_id << "online connection exists, inherit old one and renew"; + // set_response_topic never modify key part + set_response_topic(const_cast(*it)); send_connack( true, [ @@ -1279,6 +1310,8 @@ class broker_t { force_move(session_expiry_interval) ); BOOST_ASSERT(inserted); + // set_response_topic never modify key part + set_response_topic(const_cast(*it)); send_connack(false); } } @@ -1290,6 +1323,8 @@ class broker_t { << MQTT_ADD_VALUE(address, this) << "cid:" << client_id << "offline connection exists, discard old one due to new one's clean_start and renew"; + // set_response_topic never modify key part + set_response_topic(const_cast(*it)); send_connack(false); idx.modify( it, @@ -1309,6 +1344,8 @@ class broker_t { << MQTT_ADD_VALUE(address, this) << "cid:" << client_id << "offline connection exists, inherit old one and renew"; + // set_response_topic never modify key part + set_response_topic(const_cast(*it)); send_connack( true, [ diff --git a/include/mqtt/broker/session_state.hpp b/include/mqtt/broker/session_state.hpp index 8a553cf84..dec469206 100644 --- a/include/mqtt/broker/session_state.hpp +++ b/include/mqtt/broker/session_state.hpp @@ -281,10 +281,15 @@ struct session_state { } } + void set_clean_handler(std::function handler) { + clean_handler_ = force_move(handler); + } + void clean() { MQTT_LOG("mqtt_broker", trace) << MQTT_ADD_VALUE(address, this) << "clean"; + if (clean_handler_) clean_handler_(); { std::lock_guard g(mtx_inflight_messages_); inflight_messages_.clear(); @@ -509,6 +514,14 @@ struct session_state { return session_expiry_interval_; } + void set_response_topic(std::string topic) { + response_topic_.emplace(force_move(topic)); + } + + optional get_response_topic() const { + return response_topic_; + } + private: void send_will_impl() { if (!will_value_) return; @@ -576,6 +589,9 @@ struct session_state { bool remain_after_close_; std::set qos2_publish_handled_; + + optional response_topic_; + std::function clean_handler_; }; class session_states { diff --git a/test/system/CMakeLists.txt b/test/system/CMakeLists.txt index 68a832ba6..4047bc6f4 100644 --- a/test/system/CMakeLists.txt +++ b/test/system/CMakeLists.txt @@ -43,6 +43,7 @@ ENDIF () IF (MQTT_TEST_5) LIST (APPEND check_PROGRAMS + st_reqres.cpp st_resend.cpp st_retain_1.cpp st_resend_new_client.cpp diff --git a/test/system/st_connect.cpp b/test/system/st_connect.cpp index aec46e4e7..c74e70cb3 100644 --- a/test/system/st_connect.cpp +++ b/test/system/st_connect.cpp @@ -1936,70 +1936,6 @@ BOOST_AUTO_TEST_CASE( connack_prop ) { do_combi_test_sync(test); } -BOOST_AUTO_TEST_CASE( request_response ) { - auto test = [](boost::asio::io_context& ioc, auto& cs, auto finish, auto& /*b*/) { - auto& c = cs[0]; - clear_ordered(); - if (c->get_protocol_version() != MQTT_NS::protocol_version::v5) { - finish(); - return; - } - - c->set_client_id("cid1"); - c->set_clean_session(true); - BOOST_TEST(c->connected() == false); - - checker chk = { - // connect - cont("h_connack"), - // disconnect - cont("h_close"), - }; - - c->set_v5_connack_handler( - [&chk, &c] - (bool sp, MQTT_NS::v5::connect_reason_code connect_reason_code, MQTT_NS::v5::properties props) { - MQTT_CHK("h_connack"); - BOOST_TEST(c->connected() == true); - BOOST_TEST(sp == false); - BOOST_TEST(connect_reason_code == MQTT_NS::v5::connect_reason_code::success); - - std::size_t times = 0; - MQTT_NS::v5::visit_props( - props, - [&](MQTT_NS::v5::property::response_topic const&) { - ++times; - }, - [](auto){} - ); - BOOST_TEST(times == 1); - - c->disconnect(); - BOOST_TEST(c->connected() == true); - return true; - }); - c->set_close_handler( - [&chk, &finish] - () { - MQTT_CHK("h_close"); - finish(); - }); - c->set_error_handler( - [] - (MQTT_NS::error_code) { - BOOST_CHECK(false); - }); - c->connect( - MQTT_NS::v5::properties{ - MQTT_NS::v5::property::request_response_information(true) - } - ); - ioc.run(); - BOOST_TEST(chk.all()); - }; - do_combi_test_sync(test); -} - BOOST_AUTO_TEST_CASE( session_taken_over ) { auto test = [](boost::asio::io_context& ioc, auto& cs, auto finish, auto& /*b*/) { auto& c1 = cs[0]; diff --git a/test/system/st_reqres.cpp b/test/system/st_reqres.cpp new file mode 100644 index 000000000..213b31c70 --- /dev/null +++ b/test/system/st_reqres.cpp @@ -0,0 +1,445 @@ +// Copyright Takatoshi Kondo 2015 +// +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include "../common/test_main.hpp" +#include "combi_test.hpp" +#include "checker.hpp" +#include "ordered_caller.hpp" +#include "test_util.hpp" +#include "../common/global_fixture.hpp" + +BOOST_AUTO_TEST_SUITE(st_reqres) + +using namespace MQTT_NS::literals; + +BOOST_AUTO_TEST_CASE( pubsub ) { + auto test = [](boost::asio::io_context& ioc, auto& cs, auto finish, auto& /*b*/) { + auto& c = cs[0]; + clear_ordered(); + using packet_id_t = typename std::remove_reference_t::packet_id_t; + if (c->get_protocol_version() != MQTT_NS::protocol_version::v5) { + finish(); + return; + } + + c->set_client_id("cid1"); + + std::string response_topic; + + checker chk = { + // connect + cont("h_connack"), + // subscribe + cont("h_suback"), + // publish + cont("h_publish"), + // disconnect + cont("h_close"), + }; + + c->set_v5_connack_handler( + [&] + (bool sp, MQTT_NS::v5::connect_reason_code connect_reason_code, MQTT_NS::v5::properties props) { + MQTT_CHK("h_connack"); + BOOST_TEST(sp == false); + BOOST_TEST(connect_reason_code == MQTT_NS::v5::connect_reason_code::success); + + std::size_t times = 0; + MQTT_NS::v5::visit_props( + props, + [&](MQTT_NS::v5::property::response_topic const& v) { + ++times; + response_topic = std::string(v.val()); + }, + [](auto){} + ); + BOOST_TEST(times == 1); + + c->subscribe(response_topic, MQTT_NS::qos::at_most_once); + return true; + }); + c->set_v5_suback_handler( + [&] + (packet_id_t /*packet_id*/, + std::vector reasons, + MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_suback"); + BOOST_TEST(reasons.size() == 1U); + BOOST_TEST(reasons[0] == MQTT_NS::v5::suback_reason_code::granted_qos_0); + + c->publish(response_topic, "response_contents", MQTT_NS::qos::at_most_once); + return true; + }); + c->set_v5_publish_handler( + [&] + (MQTT_NS::optional /*packet_id*/, + MQTT_NS::publish_options /*pubopts*/, + MQTT_NS::buffer topic, + MQTT_NS::buffer contents, + MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_publish"); + BOOST_TEST(topic == response_topic); + BOOST_TEST(contents == "response_contents"); + c->disconnect(); + return true; + }); + c->set_close_handler( + [&] + () { + MQTT_CHK("h_close"); + finish(); + }); + c->set_error_handler( + [] + (MQTT_NS::error_code) { + BOOST_CHECK(false); + }); + c->set_clean_session(true); + c->connect( + MQTT_NS::v5::properties{ + MQTT_NS::v5::property::request_response_information(true), + } + ); + ioc.run(); + BOOST_TEST(chk.all()); + }; + do_combi_test_sync(test); +} + +BOOST_AUTO_TEST_CASE( session ) { + auto test = [](boost::asio::io_context& ioc, auto& cs, auto finish, auto& /*b*/) { + auto& c = cs[0]; + clear_ordered(); + if (c->get_protocol_version() != MQTT_NS::protocol_version::v5) { + finish(); + return; + } + + c->set_client_id("cid1"); + + std::string response_topic1; + std::string response_topic2; + + checker chk = { + // connect + cont("h_connack1"), + // disconnect + cont("h_close1"), + + // connect + cont("h_connack2"), + // disconnect + cont("h_close2"), + + // connect + cont("h_connack3"), + // disconnect + cont("h_close3"), + }; + + c->set_v5_connack_handler( + [&] + (bool sp, MQTT_NS::v5::connect_reason_code connect_reason_code, MQTT_NS::v5::properties props) { + auto ret = MQTT_ORDERED( + [&] { + MQTT_CHK("h_connack1"); + BOOST_TEST(sp == false); + BOOST_TEST(connect_reason_code == MQTT_NS::v5::connect_reason_code::success); + + std::size_t times = 0; + MQTT_NS::v5::visit_props( + props, + [&](MQTT_NS::v5::property::response_topic const& v) { + ++times; + response_topic1 = std::string(v.val()); + }, + [](auto){} + ); + BOOST_TEST(times == 1); + }, + [&] { + MQTT_CHK("h_connack2"); + BOOST_TEST(sp == true); + BOOST_TEST(connect_reason_code == MQTT_NS::v5::connect_reason_code::success); + + std::size_t times = 0; + MQTT_NS::v5::visit_props( + props, + [&](MQTT_NS::v5::property::response_topic const& v) { + ++times; + response_topic2 = std::string(v.val()); + }, + [](auto){} + ); + BOOST_TEST(times == 1); + BOOST_TEST(response_topic1 == response_topic2); + }, + [&] { + MQTT_CHK("h_connack3"); + BOOST_TEST(sp == false); + BOOST_TEST(connect_reason_code == MQTT_NS::v5::connect_reason_code::success); + + std::size_t times = 0; + MQTT_NS::v5::visit_props( + props, + [&](MQTT_NS::v5::property::response_topic const& v) { + ++times; + BOOST_TEST(v.val() != response_topic1); + BOOST_TEST(v.val() != response_topic2); + }, + [](auto){} + ); + BOOST_TEST(times == 1); + } + ); + BOOST_TEST(ret); + c->disconnect(); + return true; + }); + c->set_close_handler( + [&] + () { + auto ret = MQTT_ORDERED( + [&] { + MQTT_CHK("h_close1"); + c->set_clean_session(false); + // session will expire on disconnect + c->connect( + MQTT_NS::v5::properties{ + MQTT_NS::v5::property::request_response_information(true) + } + ); + }, + [&] { + MQTT_CHK("h_close2"); + // no session exists even if clean false + c->set_clean_session(false); + c->connect( + MQTT_NS::v5::properties{ + MQTT_NS::v5::property::request_response_information(true) + } + ); + }, + [&] { + MQTT_CHK("h_close3"); + finish(); + } + ); + BOOST_TEST(ret); + }); + c->set_error_handler( + [] + (MQTT_NS::error_code) { + BOOST_CHECK(false); + }); + c->set_clean_session(true); + c->connect( + MQTT_NS::v5::properties{ + MQTT_NS::v5::property::request_response_information(true), + MQTT_NS::v5::property::session_expiry_interval( + MQTT_NS::session_never_expire + ) + } + ); + ioc.run(); + BOOST_TEST(chk.all()); + }; + do_combi_test_sync(test); +} + +BOOST_AUTO_TEST_CASE( retain ) { + auto test = [](boost::asio::io_context& ioc, auto& cs, auto finish, auto& /*b*/) { + auto& c = cs[0]; + clear_ordered(); + using packet_id_t = typename std::remove_reference_t::packet_id_t; + if (c->get_protocol_version() != MQTT_NS::protocol_version::v5) { + finish(); + return; + } + + c->set_client_id("cid1"); + + std::string response_topic1; + std::string response_topic2; + + // postfix number is a connection number in this scenario + checker chk = { + // connect + cont("h_connack1"), + // publish retain + // disconnect + cont("h_close1"), + + // connect + cont("h_connack2"), + // subscribe + cont("h_suback2"), + // receive retained publish + cont("h_publish2"), + // disconnect + cont("h_close2"), + + // connect + cont("h_connack3"), + // subscribe + cont("h_suback3"), + // disconnect + cont("h_close3"), + }; + + c->set_v5_connack_handler( + [&] + (bool sp, MQTT_NS::v5::connect_reason_code connect_reason_code, MQTT_NS::v5::properties props) { + auto ret = MQTT_ORDERED( + [&] { + MQTT_CHK("h_connack1"); + BOOST_TEST(sp == false); + BOOST_TEST(connect_reason_code == MQTT_NS::v5::connect_reason_code::success); + + std::size_t times = 0; + MQTT_NS::v5::visit_props( + props, + [&](MQTT_NS::v5::property::response_topic const& v) { + ++times; + response_topic1 = std::string(v.val()); + }, + [](auto){} + ); + BOOST_TEST(times == 1); + c->publish( + response_topic1, + "response_contents", + MQTT_NS::qos::at_most_once | MQTT_NS::retain::yes + ); + c->disconnect(); + }, + [&] { + MQTT_CHK("h_connack2"); + BOOST_TEST(sp == true); + BOOST_TEST(connect_reason_code == MQTT_NS::v5::connect_reason_code::success); + + std::size_t times = 0; + MQTT_NS::v5::visit_props( + props, + [&](MQTT_NS::v5::property::response_topic const& v) { + ++times; + response_topic2 = std::string(v.val()); + }, + [](auto){} + ); + BOOST_TEST(times == 1); + BOOST_TEST(response_topic1 == response_topic2); + c->subscribe(response_topic2, MQTT_NS::qos::at_most_once); + }, + [&] { + MQTT_CHK("h_connack3"); + BOOST_TEST(sp == false); + BOOST_TEST(connect_reason_code == MQTT_NS::v5::connect_reason_code::success); + + std::size_t times = 0; + MQTT_NS::v5::visit_props( + props, + [&](MQTT_NS::v5::property::response_topic const&) { + ++times; + }, + [](auto){} + ); + BOOST_TEST(times == 1); + // subscribe erased (previous) response topic + c->subscribe(response_topic2, MQTT_NS::qos::at_most_once); + } + ); + BOOST_TEST(ret); + return true; + }); + c->set_v5_suback_handler( + [&] + (packet_id_t /*packet_id*/, + std::vector reasons, + MQTT_NS::v5::properties /*props*/) { + auto ret = MQTT_ORDERED( + [&] { + MQTT_CHK("h_suback2"); + BOOST_TEST(reasons.size() == 1U); + BOOST_TEST(reasons[0] == MQTT_NS::v5::suback_reason_code::granted_qos_0); + }, + [&] { + MQTT_CHK("h_suback3"); + BOOST_TEST(reasons.size() == 1U); + // When authorization mechanism is introduced it will ought to be failed + BOOST_TEST(reasons[0] == MQTT_NS::v5::suback_reason_code::granted_qos_0); + c->disconnect(); + } + ); + BOOST_TEST(ret); + return true; + }); + c->set_v5_publish_handler( + [&] + (MQTT_NS::optional /*packet_id*/, + MQTT_NS::publish_options pubopts, + MQTT_NS::buffer topic, + MQTT_NS::buffer contents, + MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_publish2"); + BOOST_TEST(topic == response_topic2); + BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::yes); + BOOST_TEST(contents == "response_contents"); + c->disconnect(); + return true; + }); + c->set_close_handler( + [&] + () { + auto ret = MQTT_ORDERED( + [&] { + MQTT_CHK("h_close1"); + c->set_clean_session(false); + // session will expire on disconnect + c->connect( + MQTT_NS::v5::properties{ + MQTT_NS::v5::property::request_response_information(true) + } + ); + }, + [&] { + MQTT_CHK("h_close2"); + // no session exists even if clean false + c->set_clean_session(false); + c->connect( + MQTT_NS::v5::properties{ + MQTT_NS::v5::property::request_response_information(true) + } + ); + }, + [&] { + MQTT_CHK("h_close3"); + finish(); + } + ); + BOOST_TEST(ret); + }); + c->set_error_handler( + [] + (MQTT_NS::error_code) { + BOOST_CHECK(false); + }); + c->set_clean_session(true); + c->connect( + MQTT_NS::v5::properties{ + MQTT_NS::v5::property::request_response_information(true), + MQTT_NS::v5::property::session_expiry_interval( + MQTT_NS::session_never_expire + ) + } + ); + ioc.run(); + BOOST_TEST(chk.all()); + }; + do_combi_test_sync(test); +} + +BOOST_AUTO_TEST_SUITE_END()