From f48ff4b39666e9421c6e4d10a24798400a4cad9b Mon Sep 17 00:00:00 2001 From: Takatoshi Kondo Date: Sat, 28 Nov 2020 11:30:52 +0900 Subject: [PATCH] Added message expiry interval support for inflight messages. Fixed message expiry interval is 0 case. The interval 0 is different from absent of the interval. The absent interval means no expiration. There is no specific notation about interval 0. That means regard the interval 0 as 0 second. So the timer will fire immediately. I have another design option. If the interval is 0 then skip sending/storing process. But I don't choose it. It requires additional logic. boost asio steady_timer accept 0 timer. So I use the same logic as the intarval has seconds value. NOTE: session expiry interval is different. Absent means explicit set 0. Updated as message expiry interval update based checking. The function set_propery() didn't work well because the parameter was const reference. I fixed it. Then I got some of comparison errors. It means Message Expiry Interval is working as expected. Fixed CI compiler flags. Added test log output mechanism with ctest. Fixed warnings. Removed invalid disconnect() call. --- .github/workflows/gha.yml | 34 ++- azure-pipelines.yml | 6 +- include/mqtt/endpoint.hpp | 3 +- include/mqtt/log.hpp | 2 +- include/mqtt/topic_alias_recv.hpp | 4 +- include/mqtt/v5_message.hpp | 8 + test/CMakeLists.txt | 8 +- test/args_provider.ps1 | 2 + test/args_provider.sh | 3 + test/broker_offline_message.cpp | 20 +- test/resend.cpp | 334 ++++++++++++++++++++++++++++++ test/retain_2.cpp | 6 +- test/retained_topic_map.cpp | 33 +-- test/subscription_map.hpp | 110 +++++++--- test/test_broker.hpp | 174 ++++++++++++---- test/will.cpp | 18 +- 16 files changed, 633 insertions(+), 132 deletions(-) create mode 100755 test/args_provider.ps1 create mode 100755 test/args_provider.sh diff --git a/.github/workflows/gha.yml b/.github/workflows/gha.yml index f9fece3bd..5a200da86 100644 --- a/.github/workflows/gha.yml +++ b/.github/workflows/gha.yml @@ -52,7 +52,18 @@ jobs: - name: Checkout uses: actions/checkout@v2 - name: Configure + env: + S_CFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -fno-omit-frame-pointer -fsanitize=address + S_CXXFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -fno-omit-frame-pointer -fsanitize=address -pedantic -Wno-noexcept-type -DBOOST_MULTI_INDEX_ENABLE_SAFE_MODE -DBOOST_MULTI_INDEX_DISABLE_SERIALIZATION -DBOOST_MULTI_INDEX_ENABLE_INVARIANT_CHECKING + S_LDFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -fno-omit-frame-pointer -fsanitize=address + NS_CFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion + NS_CXXFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -pedantic -Wno-noexcept-type -DBOOST_MULTI_INDEX_ENABLE_SAFE_MODE -DBOOST_MULTI_INDEX_DISABLE_SERIALIZATION -DBOOST_MULTI_INDEX_ENABLE_INVARIANT_CHECKING + NS_LDFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion run: | + [ ${{ matrix.pattern }} == 1 ] || [ ${{ matrix.pattern }} == 2 ] || [ ${{ matrix.pattern }} == 3 ] && \ + export CFLAGS=${S_CFLAGS} && export CXXFLAGS=${S_CXXFLAGS} && export LDFLAGS=${S_LDFLAGS} + [ ${{ matrix.pattern }} == 0 ] || [ ${{ matrix.pattern }} == 4 ] || [ ${{ matrix.pattern }} == 5 ] || [ ${{ matrix.pattern }} == 6 ] || [ ${{ matrix.pattern }} == 7 ] && \ + export CFLAGS=${NS_CFLAGS} && export CXXFLAGS=${NS_CXXFLAGS} && export LDFLAGS=${NS_LDFLAGS} [ ${{ matrix.pattern }} == 0 ] && FLAGS="-DCMAKE_CXX_COMPILER=clang++ -DMQTT_TEST_1=ON -DMQTT_TEST_2=ON -DMQTT_TEST_3=OFF -DMQTT_TEST_4=OFF -DMQTT_TEST_5=OFF -DMQTT_TEST_6=OFF -DMQTT_TEST_7=OFF -DMQTT_BUILD_EXAMPLES=OFF -DMQTT_USE_TLS=OFF -DMQTT_USE_WS=ON -DMQTT_USE_STR_CHECK=ON -DMQTT_USE_LOG=ON -DMQTT_STD_ANY=OFF -DMQTT_STD_OPTIONAL=OFF -DMQTT_STD_VARIANT=OFF -DMQTT_STD_STRING_VIEW=OFF -DMQTT_STD_SHARED_PTR_ARRAY=OFF" [ ${{ matrix.pattern }} == 1 ] && FLAGS="-DCMAKE_CXX_COMPILER=clang++ -DMQTT_TEST_1=ON -DMQTT_TEST_2=ON -DMQTT_TEST_3=ON -DMQTT_TEST_4=OFF -DMQTT_TEST_5=OFF -DMQTT_TEST_6=OFF -DMQTT_TEST_7=OFF -DMQTT_BUILD_EXAMPLES=OFF -DMQTT_USE_TLS=ON -DMQTT_USE_WS=ON -DMQTT_USE_STR_CHECK=ON -DMQTT_USE_LOG=OFF -DMQTT_STD_ANY=OFF -DMQTT_STD_OPTIONAL=OFF -DMQTT_STD_VARIANT=OFF -DMQTT_STD_STRING_VIEW=OFF -DMQTT_STD_SHARED_PTR_ARRAY=OFF" [ ${{ matrix.pattern }} == 2 ] && FLAGS="-DCMAKE_CXX_COMPILER=clang++ -DMQTT_TEST_1=OFF -DMQTT_TEST_2=OFF -DMQTT_TEST_3=OFF -DMQTT_TEST_4=ON -DMQTT_TEST_5=ON -DMQTT_TEST_6=ON -DMQTT_TEST_7=OFF -DMQTT_BUILD_EXAMPLES=OFF -DMQTT_USE_TLS=ON -DMQTT_USE_WS=ON -DMQTT_USE_STR_CHECK=ON -DMQTT_USE_LOG=OFF -DMQTT_STD_ANY=ON -DMQTT_STD_OPTIONAL=ON -DMQTT_STD_VARIANT=ON -DMQTT_STD_STRING_VIEW=ON -DMQTT_STD_SHARED_PTR_ARRAY=OFF" @@ -62,32 +73,17 @@ jobs: [ ${{ matrix.pattern }} == 6 ] && FLAGS="-DCMAKE_CXX_COMPILER=g++ -DMQTT_CODECOV=ON -DMQTT_TEST_1=OFF -DMQTT_TEST_2=OFF -DMQTT_TEST_3=OFF -DMQTT_TEST_4=OFF -DMQTT_TEST_5=ON -DMQTT_TEST_6=ON -DMQTT_TEST_7=OFF -DMQTT_BUILD_EXAMPLES=OFF -DMQTT_USE_TLS=ON -DMQTT_USE_WS=ON -DMQTT_USE_STR_CHECK=ON -DMQTT_USE_LOG=OFF -DMQTT_STD_ANY=ON -DMQTT_STD_OPTIONAL=ON -DMQTT_STD_VARIANT=ON -DMQTT_STD_STRING_VIEW=ON -DMQTT_STD_SHARED_PTR_ARRAY=OFF" [ ${{ matrix.pattern }} == 7 ] && FLAGS="-DCMAKE_CXX_COMPILER=g++ -DMQTT_CODECOV=ON -DMQTT_TEST_1=OFF -DMQTT_TEST_2=OFF -DMQTT_TEST_3=OFF -DMQTT_TEST_4=OFF -DMQTT_TEST_5=OFF -DMQTT_TEST_6=OFF -DMQTT_TEST_7=ON -DMQTT_BUILD_EXAMPLES=ON -DMQTT_USE_TLS=ON -DMQTT_USE_WS=OFF -DMQTT_USE_STR_CHECK=OFF -DMQTT_USE_LOG=ON -DMQTT_STD_ANY=OFF -DMQTT_STD_OPTIONAL=OFF -DMQTT_STD_VARIANT=OFF -DMQTT_STD_STRING_VIEW=OFF -DMQTT_STD_SHARED_PTR_ARRAY=OFF" - BOOST_ROOT=$BOOST_ROOT_1_72_0 cmake -S ${{ github.workspace }} -B ${{ runner.temp }} ${FLAGS} + BOOST_ROOT=$BOOST_ROOT_1_72_0 cmake -S ${{ github.workspace }} -B ${{ runner.temp }} ${FLAGS} -DCMAKE_C_FLAGS="${CFLAGS}" -DCMAKE_CXX_FLAGS="${CXXFLAGS}" -DCMAKE_EXE_LINKER_FLAGS="${LDFLAGS}" - name: Check Header Dependencies run: | cmake --build ${{ runner.temp }} --parallel $(nproc) --clean-first --target check_deps - - name: Compile with Sanitizer - if: (matrix.pattern == 1) || (matrix.pattern == 2) || (matrix.pattern == 3) - env: - CFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -fno-omit-frame-pointer -fsanitize=address - CXXFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -fno-omit-frame-pointer -fsanitize=address - LDFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -fno-omit-frame-pointer -fsanitize=address - run: | - CXXFLAGS="${CXXFLAGS} -pedantic -DBOOST_MULTI_INDEX_ENABLE_SAFE_MODE -DBOOST_MULTI_INDEX_DISABLE_SERIALIZATION -DBOOST_MULTI_INDEX_ENABLE_INVARIANT_CHECKING" - cmake --build ${{ runner.temp }} --clean-first - - name: Compile without Sanitizer - if: (matrix.pattern == 0) || (matrix.pattern == 4) || (matrix.pattern == 5) || (matrix.pattern == 6) || (matrix.pattern == 7) - env: - CFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -fno-omit-frame-pointer - CXXFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -fno-omit-frame-pointer - LDFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -fno-omit-frame-pointer + - name: Compile run: | - CXXFLAGS="${CXXFLAGS} -pedantic -DBOOST_MULTI_INDEX_ENABLE_SAFE_MODE -DBOOST_MULTI_INDEX_DISABLE_SERIALIZATION -DBOOST_MULTI_INDEX_ENABLE_INVARIANT_CHECKING" - cmake --build ${{ runner.temp }} --clean-first + VERBOSE=1 cmake --build ${{ runner.temp }} --clean-first - name: Test working-directory: ${{ runner.temp }} run: | - ctest -VV + CTEST_ARGS="--log_level=all" ctest -VV - name: Code Coverage if: (matrix.pattern == 4) || (matrix.pattern == 5) || (matrix.pattern == 6) || (matrix.pattern == 7) run: | diff --git a/azure-pipelines.yml b/azure-pipelines.yml index b6dd5f7e8..e1dfe6940 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -151,7 +151,11 @@ steps: return Write-Error "cmake --build failed" } cd test - ctest -VV + + # If you want to debug a specific test file with logs, do as follows instead of execute ctest + # Release\resend.exe --log_level=all + ctest -VV -C Release + if (!$?) { return Write-Error "ctest -VV failed" } diff --git a/include/mqtt/endpoint.hpp b/include/mqtt/endpoint.hpp index 209aa6061..9bd15a377 100644 --- a/include/mqtt/endpoint.hpp +++ b/include/mqtt/endpoint.hpp @@ -783,8 +783,7 @@ class endpoint : public std::enable_shared_from_this; -BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(logger, global_logger_t); +BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(logger, global_logger_t) // Normal attributes BOOST_LOG_ATTRIBUTE_KEYWORD(file, "MqttFile", std::string) diff --git a/include/mqtt/topic_alias_recv.hpp b/include/mqtt/topic_alias_recv.hpp index 8f8130209..ce4ff04fa 100644 --- a/include/mqtt/topic_alias_recv.hpp +++ b/include/mqtt/topic_alias_recv.hpp @@ -26,7 +26,7 @@ namespace MQTT_NS { using topic_alias_recv_map_t = std::map; inline void register_topic_alias(topic_alias_recv_map_t& m, string_view topic, topic_alias_t alias) { - BOOST_ASSERT(alias > 0 && alias <= topic_alias_max); + BOOST_ASSERT(alias > 0); //alias <= topic_alias_max is always true MQTT_LOG("mqtt_impl", info) << MQTT_ADD_VALUE(address, &m) @@ -43,7 +43,7 @@ inline void register_topic_alias(topic_alias_recv_map_t& m, string_view topic, } inline std::string find_topic_by_alias(topic_alias_recv_map_t const& m, topic_alias_t alias) { - BOOST_ASSERT(alias > 0 && alias <= topic_alias_max); + BOOST_ASSERT(alias > 0); //alias <= topic_alias_max is always true std::string topic; auto it = m.find(alias); diff --git a/include/mqtt/v5_message.hpp b/include/mqtt/v5_message.hpp index 22f462c4f..88b6332ff 100644 --- a/include/mqtt/v5_message.hpp +++ b/include/mqtt/v5_message.hpp @@ -886,6 +886,14 @@ class basic_publish_message { return props_; } + /** + * @brief Get properties + * @return properties + */ + properties& props() { + return props_; + } + /** * @brief Set dup flag * @param dup flag value to set diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index c95bd4138..c5e505571 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -123,7 +123,13 @@ FOREACH (source_file ${check_PROGRAMS}) ENDIF () ENDIF () - ADD_TEST (${source_file_we} ${source_file_we}) + # Running test with arguments + # CTEST_ARGS="--log_level=all" ctest -V + IF ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "MSVC") + ADD_TEST (NAME ${source_file_we} COMMAND ${CMAKE_CURRENT_BINARY_DIR}/${source_file_we}) + ELSE () + ADD_TEST (NAME ${source_file_we} COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/args_provider.sh ${CMAKE_CURRENT_BINARY_DIR}/${source_file_we}) + ENDIF () set_tests_properties(${source_file_we} PROPERTIES TIMEOUT 300) ENDFOREACH () diff --git a/test/args_provider.ps1 b/test/args_provider.ps1 new file mode 100755 index 000000000..d6f6fd7bb --- /dev/null +++ b/test/args_provider.ps1 @@ -0,0 +1,2 @@ +Param( $command ) +Start-Process -FilePath $command -ArgumentList $env:CTEST_ARGS -Wait diff --git a/test/args_provider.sh b/test/args_provider.sh new file mode 100755 index 000000000..9ec10c0aa --- /dev/null +++ b/test/args_provider.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +$1 $(echo ${CTEST_ARGS}) diff --git a/test/broker_offline_message.cpp b/test/broker_offline_message.cpp index 77ce50ba5..33b0621de 100644 --- a/test/broker_offline_message.cpp +++ b/test/broker_offline_message.cpp @@ -351,7 +351,7 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v5 ) { } ); c2->set_v5_connack_handler( - [&chk, &c1, &c2] + [&chk, &c2] (bool sp, MQTT_NS::v5::connect_reason_code connack_reason_code, MQTT_NS::v5::properties /*props*/) { auto ret = chk.match( "c1_h_connack", @@ -454,7 +454,7 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v5 ) { BOOST_TEST(t.val() == "content type"); }, [&](MQTT_NS::v5::property::message_expiry_interval const& t) { - BOOST_TEST(t.val() == 0x12345678UL); + BOOST_TEST(t.val() <= 0x12345678UL); }, [&](MQTT_NS::v5::property::response_topic const& t) { BOOST_TEST(t.val() == "response topic"); @@ -645,9 +645,6 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v5_timeout ) { MQTT_NS::v5::property::user_property("key2"_mb, "val2"_mb), }; - auto prop_size = ps.size(); - std::size_t user_prop_count = 0; - c1->set_v5_connack_handler( [&chk, &c2] (bool sp, MQTT_NS::v5::connect_reason_code connack_reason_code, MQTT_NS::v5::properties /*props*/) { @@ -767,13 +764,12 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v5_timeout ) { } ); c2->set_v5_publish_handler( - [&chk, &c1] - (MQTT_NS::optional packet_id, - MQTT_NS::publish_options pubopts, - MQTT_NS::buffer topic, - MQTT_NS::buffer contents, - MQTT_NS::v5::properties props) { - + [] + (MQTT_NS::optional, + MQTT_NS::publish_options, + MQTT_NS::buffer, + MQTT_NS::buffer, + MQTT_NS::v5::properties) { // We should not received any published message when offline messages timeout BOOST_TEST(false); return true; diff --git a/test/resend.cpp b/test/resend.cpp index ee2bb5b5f..c83d17396 100644 --- a/test/resend.cpp +++ b/test/resend.cpp @@ -1654,6 +1654,340 @@ BOOST_AUTO_TEST_CASE( pubrel_qos2_from_broker ) { do_combi_test_sync(test); } +BOOST_AUTO_TEST_CASE( publish_message_expired_from_broker ) { + auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) { + if (c->get_protocol_version() != MQTT_NS::protocol_version::v5) { + finish(); + return; + } + + using packet_id_t = typename std::remove_reference_t::packet_id_t; + c->set_client_id("cid1"); + c->set_clean_session(true); + c->set_auto_pub_response(false); + + packet_id_t pid_pub; + + boost::asio::steady_timer tim(ioc); + + checker chk = { + cont("start"), + // connect + cont("h_connack1"), + // disconnect + cont("h_close1"), + // connect + cont("h_connack2"), + cont("h_suback"), + // publish topic1 QoS1 + cont("h_puback"), + deps("h_publish", "h_suback"), + // force_disconnect + deps("h_error", "h_puback", "h_publish"), + // connect + cont("h_connack3"), + // disconnect + cont("h_close2"), + }; + + c->set_v5_connack_handler( + [&chk, &c] + (bool sp, MQTT_NS::v5::connect_reason_code connack_return_code, MQTT_NS::v5::properties /*props*/) { + BOOST_TEST(connack_return_code == MQTT_NS::v5::connect_reason_code::success); + auto ret = chk.match( + "start", + [&] { + MQTT_CHK("h_connack1"); + BOOST_TEST(sp == false); + c->disconnect(); + }, + "h_close1", + [&] { + MQTT_CHK("h_connack2"); + // The previous connection is not set Session Expiry Interval. + // That means session state is cleared on close. + BOOST_TEST(sp == false); + c->subscribe("topic1", MQTT_NS::qos::exactly_once); + }, + "h_error", + [&] { + MQTT_CHK("h_connack3"); + BOOST_TEST(sp == true); + c->disconnect(); + } + ); + BOOST_TEST(ret); + return true; + }); + c->set_v5_suback_handler( + [&chk, &c] + (packet_id_t /*packet_id*/, std::vector reasons, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_suback"); + BOOST_TEST(reasons.size() == 1U); + BOOST_TEST(reasons.size() == 1U); + BOOST_TEST(reasons[0] == MQTT_NS::v5::suback_reason_code::granted_qos_2); + c->publish( + "topic1", + "topic1_contents", + MQTT_NS::qos::at_least_once, + MQTT_NS::v5::properties { MQTT_NS::v5::property::message_expiry_interval(1) } + ); + return true; + } + ); + c->set_v5_publish_handler( + [&chk, &c, &pid_pub] + (MQTT_NS::optional packet_id, + MQTT_NS::publish_options pubopts, + MQTT_NS::buffer topic, + MQTT_NS::buffer contents, + MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_publish"); + BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::no); + BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::at_least_once); + BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no); + BOOST_CHECK(packet_id); + pid_pub = packet_id.value(); + BOOST_TEST(topic == "topic1"); + BOOST_TEST(contents == "topic1_contents"); + if (chk.passed("h_puback")) { + c->force_disconnect(); + } + return true; + } + ); + c->set_v5_puback_handler( + [&chk, &c] + (packet_id_t /*packet_id*/, MQTT_NS::v5::puback_reason_code, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_puback"); + if (chk.passed("h_publish")) { + c->force_disconnect(); + } + return true; + } + ); + + c->set_close_handler( + [&chk, &c, &finish] + () { + auto ret = chk.match( + "h_connack1", + [&] { + MQTT_CHK("h_close1"); + connect_no_clean(c); + }, + "h_puback", + [&] { + MQTT_CHK("h_close2"); + finish(); + } + ); + BOOST_TEST(ret); + }); + c->set_error_handler( + [&chk, &c, &tim] + (MQTT_NS::error_code) { + MQTT_CHK("h_error"); + tim.expires_after(std::chrono::seconds(2)); + tim.async_wait( + [&c] (MQTT_NS::error_code ec) { + BOOST_ASSERT( ! ec); + connect_no_clean(c); + } + ); + }); + MQTT_CHK("start"); + c->connect(); + ioc.run(); + BOOST_TEST(chk.all()); + }; + do_combi_test_sync(test); +} + +BOOST_AUTO_TEST_CASE( publish_message_expiry_update_from_broker ) { + auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) { + if (c->get_protocol_version() != MQTT_NS::protocol_version::v5) { + finish(); + return; + } + + using packet_id_t = typename std::remove_reference_t::packet_id_t; + c->set_client_id("cid1"); + c->set_clean_session(true); + c->set_auto_pub_response(false); + + packet_id_t pid_pub; + + boost::asio::steady_timer tim(ioc); + + checker chk = { + cont("start"), + // connect + cont("h_connack1"), + // disconnect + cont("h_close1"), + // connect + cont("h_connack2"), + cont("h_suback"), + // publish topic1 QoS1 + cont("h_puback"), + deps("h_publish1", "h_suback"), + // force_disconnect + deps("h_error", "h_puback", "h_publish1"), + // connect + cont("h_connack3"), + cont("h_publish2"), + // disconnect + cont("h_close2"), + }; + + c->set_v5_connack_handler( + [&chk, &c] + (bool sp, MQTT_NS::v5::connect_reason_code connack_return_code, MQTT_NS::v5::properties /*props*/) { + BOOST_TEST(connack_return_code == MQTT_NS::v5::connect_reason_code::success); + auto ret = chk.match( + "start", + [&] { + MQTT_CHK("h_connack1"); + BOOST_TEST(sp == false); + c->disconnect(); + }, + "h_close1", + [&] { + MQTT_CHK("h_connack2"); + // The previous connection is not set Session Expiry Interval. + // That means session state is cleared on close. + BOOST_TEST(sp == false); + c->subscribe("topic1", MQTT_NS::qos::exactly_once); + }, + "h_error", + [&] { + MQTT_CHK("h_connack3"); + BOOST_TEST(sp == true); + } + ); + BOOST_TEST(ret); + return true; + } + ); + c->set_v5_suback_handler( + [&chk, &c] + (packet_id_t /*packet_id*/, std::vector reasons, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_suback"); + BOOST_TEST(reasons.size() == 1U); + BOOST_TEST(reasons.size() == 1U); + BOOST_TEST(reasons[0] == MQTT_NS::v5::suback_reason_code::granted_qos_2); + c->publish( + "topic1", + "topic1_contents", + MQTT_NS::qos::at_least_once, + MQTT_NS::v5::properties { MQTT_NS::v5::property::message_expiry_interval(5) } + ); + return true; + } + ); + c->set_v5_publish_handler( + [&chk, &c, &pid_pub] + (MQTT_NS::optional packet_id, + MQTT_NS::publish_options pubopts, + MQTT_NS::buffer topic, + MQTT_NS::buffer contents, + MQTT_NS::v5::properties props) { + auto ret = chk.match( + "h_suback", + [&] { + MQTT_CHK("h_publish1"); + BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::no); + BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::at_least_once); + BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no); + BOOST_CHECK(packet_id); + pid_pub = packet_id.value(); + BOOST_TEST(topic == "topic1"); + BOOST_TEST(contents == "topic1_contents"); + if (chk.passed("h_puback")) { + c->force_disconnect(); + } + }, + "h_publish1", + [&] { + MQTT_CHK("h_publish2"); + BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::yes); + BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::at_least_once); + BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no); + BOOST_CHECK(packet_id); + BOOST_TEST(packet_id.value() == pid_pub); + BOOST_TEST(topic == "topic1"); + BOOST_TEST(contents == "topic1_contents"); + + for (auto const& p : props) { + MQTT_NS::visit( + MQTT_NS::make_lambda_visitor( + [&](MQTT_NS::v5::property::message_expiry_interval const& t) { + BOOST_TEST(t.val() < 5); + }, + [&](auto&& ...) { + BOOST_TEST(false); + } + ), + p + ); + } + c->puback(packet_id.value()); + c->disconnect(); + } + ); + BOOST_TEST(ret); + return true; + } + ); + c->set_v5_puback_handler( + [&chk, &c] + (packet_id_t /*packet_id*/, MQTT_NS::v5::puback_reason_code, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_puback"); + if (chk.passed("h_publish1")) { + c->force_disconnect(); + } + return true; + } + ); + + c->set_close_handler( + [&chk, &c, &finish] + () { + auto ret = chk.match( + "h_connack1", + [&] { + MQTT_CHK("h_close1"); + connect_no_clean(c); + }, + "h_close1", + [&] { + MQTT_CHK("h_close2"); + finish(); + } + ); + BOOST_TEST(ret); + }); + c->set_error_handler( + [&chk, &c, &tim] + (MQTT_NS::error_code) { + MQTT_CHK("h_error"); + tim.expires_after(std::chrono::seconds(2)); + tim.async_wait( + [&c] (MQTT_NS::error_code ec) { + BOOST_ASSERT( ! ec); + connect_no_clean(c); + } + ); + }); + MQTT_CHK("start"); + c->connect(); + ioc.run(); + BOOST_TEST(chk.all()); + }; + do_combi_test_sync(test); +} + BOOST_AUTO_TEST_CASE( multi_publish_qos1 ) { auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) { using packet_id_t = typename std::remove_reference_t::packet_id_t; diff --git a/test/retain_2.cpp b/test/retain_2.cpp index 937de9fbf..cbfa87c6b 100644 --- a/test/retain_2.cpp +++ b/test/retain_2.cpp @@ -294,7 +294,7 @@ BOOST_AUTO_TEST_CASE( retain_and_publish_timeout ) { cont("h_close"), }; - constexpr unsigned int message_timeout = 1; + std::uint32_t message_timeout = 1; as::steady_timer timeout(ioc); switch (c->get_protocol_version()) { @@ -327,7 +327,7 @@ BOOST_AUTO_TEST_CASE( retain_and_publish_timeout ) { return true; }); c->set_v5_suback_handler( - [&chk, &c, &pid_sub, &pid_unsub, &timeout, &message_timeout] + [&chk, &c, &pid_sub, &pid_unsub, &timeout, message_timeout] (packet_id_t packet_id, std::vector reasons, MQTT_NS::v5::properties /*props*/) { BOOST_TEST(packet_id == pid_sub); BOOST_TEST(reasons.size() == 1U); @@ -1165,7 +1165,7 @@ BOOST_AUTO_TEST_CASE( prop ) { BOOST_TEST(t.val() == MQTT_NS::v5::property::payload_format_indicator::string); }, [&](MQTT_NS::v5::property::message_expiry_interval const& t) { - BOOST_TEST(t.val() == 0x12345678UL); + BOOST_TEST(t.val() <= 0x12345678UL); }, [&](MQTT_NS::v5::property::response_topic const& t) { BOOST_TEST(t.val() == "response topic"); diff --git a/test/retained_topic_map.cpp b/test/retained_topic_map.cpp index ba6daa27c..d005fb3d0 100644 --- a/test/retained_topic_map.cpp +++ b/test/retained_topic_map.cpp @@ -206,25 +206,32 @@ BOOST_AUTO_TEST_CASE(erase_upper_first) { } } +#if 0 + BOOST_AUTO_TEST_CASE(large_number_of_topics) { - retained_topic_map map; + retained_topic_map map; - constexpr size_t num_topics = 10000; - for (size_t i = 0; i < num_topics; ++i) { + constexpr std::size_t num_topics = 10000; + for (std::size_t i = 0; i < num_topics; ++i) { map.insert_or_assign((boost::format("topic/%d") % i).str(), i); } BOOST_TEST(map.size() == num_topics); - for (size_t i = 0; i < num_topics; ++i) { - map.find( - (boost::format("topic/%d") % i).str(), - [&](size_t value) { - BOOST_TEST(value == i); - } - ); - } + try { + for (std::size_t i = 0; i < num_topics; ++i) { + map.find( + (boost::format("topic/%d") % i).str(), + [&](std::size_t value) { + if (value != i) throw false; + } + ); + } + } + catch (bool) { + BOOST_TEST(false); + } - for (size_t i = 0; i < num_topics; ++i) { + for (std::size_t i = 0; i < num_topics; ++i) { map.erase((boost::format("topic/%d") % i).str()); } @@ -232,4 +239,6 @@ BOOST_AUTO_TEST_CASE(large_number_of_topics) { BOOST_TEST(map.internal_size() == 1); } +#endif + BOOST_AUTO_TEST_SUITE_END() diff --git a/test/subscription_map.hpp b/test/subscription_map.hpp index d51266637..3d2e90a3d 100644 --- a/test/subscription_map.hpp +++ b/test/subscription_map.hpp @@ -8,11 +8,14 @@ #define MQTT_SUBSCRIPTION_MAP_HPP #include + #include +#include + #include #include +#include -#include #include "topic_filter_tokenizer.hpp" /** @@ -81,37 +84,82 @@ // Compile error on other platforms (not 32 or 64 bit) template -struct count_storage -{ +struct count_storage { static_assert(N == 4 || N == 8, "Subscription map count_storage only knows how to handle architectures with 32 or 64 bit size_t: please update to support your platform."); }; template<> -struct count_storage<4> -{ - std::uint32_t value : 30; - std::uint32_t has_hash_child : 1; - std::uint32_t has_plus_child : 1; - - count_storage(std::uint32_t value = 1) - : value(value), has_hash_child(false), has_plus_child(false) +struct count_storage<4> { +public: + count_storage(std::uint32_t v = 1) + : value_(v & 0x3fffffffUL), has_hash_child_(false), has_plus_child_(false) { } static constexpr std::size_t max() { return std::numeric_limits::max() >> 2; } + + std::uint32_t value() const { return value_; } + void set_value(std::uint32_t v) { + value_ = v & 0x3fffffffUL; + } + void increment_value() { + ++value_; + } + void decrement_value() { + --value_; + } + + bool has_hash_child() const { return has_hash_child_; } + void set_hash_child(bool v) { + has_hash_child_ = v; + } + + bool has_plus_child() const { return has_plus_child_; } + void set_plus_child(bool v) { + has_plus_child_ = v; + } + +private: + std::uint32_t value_ : 30; + std::uint32_t has_hash_child_ : 1; + std::uint32_t has_plus_child_ : 1; + }; template<> -struct count_storage<8> -{ - std::uint64_t value : 62; - std::uint64_t has_hash_child : 1; - std::uint64_t has_plus_child : 1; - - count_storage(std::uint64_t value = 1) - : value(value), has_hash_child(false), has_plus_child(false) +struct count_storage<8> { +public: + count_storage(std::uint64_t v= 1) + : value_(v & 0x3fffffffffffffffULL), has_hash_child_(false), has_plus_child_(false) { } static constexpr std::uint64_t max() { return std::numeric_limits::max() >> 2; } + + std::uint64_t value() const { return value_; } + void set_value(std::uint64_t v) { + value_ = v & 0x3fffffffffffffffULL; + } + void increment_value() { + ++value_; + } + void decrement_value() { + --value_; + } + + bool has_hash_child() const { return has_hash_child_; } + void set_hash_child(bool v) { + has_hash_child_ = v; + } + + bool has_plus_child() const { return has_plus_child_; } + void set_plus_child(bool v) { + has_plus_child_ = v; + } + + +private: + std::uint64_t value_ : 62; + std::uint64_t has_hash_child_ : 1; + std::uint64_t has_plus_child_ : 1; }; template @@ -147,11 +195,11 @@ class subscription_map_base { // Increase the subscription count for a specific node static void increase_count_storage(count_storage_t &count) { - if(count.value == count_storage_t::max()) { + if(count.value() == count_storage_t::max()) { throw_max_stored_topics(); } - ++count.value; + count.increment_value(); } using this_type = subscription_map_base; @@ -230,8 +278,8 @@ class subscription_map_base { path_entry(generate_node_id(), parent->first) ).first; - parent->second.count.has_plus_child |= (t == "+"); - parent->second.count.has_hash_child |= (t == "#"); + parent->second.count.set_plus_child(parent->second.count.has_plus_child() | (t == "+")); + parent->second.count.set_hash_child(parent->second.count.has_hash_child() | (t == "#")); } else { increase_count_storage(entry->second.count); @@ -254,17 +302,17 @@ class subscription_map_base { // Go through entries to remove for (auto& entry : boost::adaptors::reverse(path)) { if (remove_plus_child_flag) { - entry->second.count.has_plus_child = false; + entry->second.count.set_plus_child(false); remove_plus_child_flag = false; } if (remove_hash_child_flag) { - entry->second.count.has_hash_child = false; + entry->second.count.set_hash_child(false); remove_hash_child_flag = false; } - --entry->second.count.value; - if (entry->second.count.value == 0) { + entry->second.count.decrement_value(); + if (entry->second.count.value() == 0) { remove_plus_child_flag = (entry->first.second == "+"); remove_hash_child_flag = (entry->first.second == "#"); @@ -276,11 +324,11 @@ class subscription_map_base { auto root = get_root(); if (remove_plus_child_flag) { - root->second.count.has_plus_child = false; + root->second.count.set_plus_child(false); } if (remove_hash_child_flag) { - root->second.count.has_hash_child = false; + root->second.count.set_hash_child(false); } } @@ -303,7 +351,7 @@ class subscription_map_base { new_entries.push_back(i); } - if (entry->second.count .has_plus_child) { + if (entry->second.count .has_plus_child()) { i = self.map.find(path_entry_key(parent, MQTT_NS::string_view("+"))); if (i != self.map.end()) { if (parent != self.root_node_id || t.empty() || t[0] != '$') { @@ -312,7 +360,7 @@ class subscription_map_base { } } - if (entry->second.count.has_hash_child) { + if (entry->second.count.has_hash_child()) { i = self.map.find(path_entry_key(parent, MQTT_NS::string_view("#"))); if (i != self.map.end()) { if (parent != self.root_node_id || t.empty() || t[0] != '$'){ diff --git a/test/test_broker.hpp b/test/test_broker.hpp index aefd052d0..0e1f206f9 100644 --- a/test/test_broker.hpp +++ b/test/test_broker.hpp @@ -708,12 +708,12 @@ class test_broker { } template - static MQTT_NS::optional get_property(MQTT_NS::v5::properties const &props) { + static MQTT_NS::optional get_property(MQTT_NS::v5::properties const& props) { MQTT_NS::optional result; auto visitor = MQTT_NS::make_lambda_visitor( [&result](T const& t) { result = t; }, - [](auto&& ...) { } + [](auto const&) { } ); for (auto const& p : props) { @@ -724,13 +724,13 @@ class test_broker { } template - static void set_property(MQTT_NS::v5::properties const &props, T&& v) { + static void set_property(MQTT_NS::v5::properties& props, T&& v) { auto visitor = MQTT_NS::make_lambda_visitor( [&v](T& t) mutable { t = std::forward(v); }, - [](auto&& ...) { } + [](auto&) { } ); - for (auto const& p : props) { + for (auto& p : props) { MQTT_NS::visit(visitor, p); } } @@ -776,7 +776,7 @@ class test_broker { if (will) { auto v = get_property(will.value().props()); - if (v && v.value().val() != 0) { + if (v) { will_expiry_interval.emplace(std::chrono::seconds(v.value().val())); } } @@ -833,7 +833,34 @@ class test_broker { auto send_inflight_messages = [&] (session_state& session) { for (auto const& ifm : session.inflight_messages) { - session.con->send_store_message(ifm.msg, ifm.life_keeper); + MQTT_NS::optional msg; + if (ifm.tim_message_expiry) { + MQTT_NS::visit( + MQTT_NS::make_lambda_visitor( + [&](MQTT_NS::v5::basic_publish_message const& m) { + auto updated_msg = m; + auto& props = updated_msg.props(); + + auto d = + std::chrono::duration_cast( + ifm.tim_message_expiry->expiry() - std::chrono::steady_clock::now() + ).count(); + if (d < 0) d = 0; + set_property( + props, + MQTT_NS::v5::property::message_expiry_interval( + static_cast(d) + ) + ); + msg.emplace(MQTT_NS::force_move(updated_msg)); + }, + [](auto const&) { + } + ), + ifm.msg + ); + } + session.con->send_store_message(msg ? msg.value() : ifm.msg, ifm.life_keeper); } }; @@ -842,23 +869,33 @@ class test_broker { try { auto &seq_idx = session.offline_messages.get(); while(!seq_idx.empty()) { - seq_idx.modify(seq_idx.begin(), [&](auto &i) { - auto props = MQTT_NS::force_move(i.props); + seq_idx.modify( + seq_idx.begin(), + [&](auto &i) { + auto props = MQTT_NS::force_move(i.props); + + if (i.tim_message_expiry) { + auto d = + std::chrono::duration_cast( + i.tim_message_expiry->expiry() - std::chrono::steady_clock::now() + ).count(); + if (d < 0) d = 0; + set_property( + props, + MQTT_NS::v5::property::message_expiry_interval( + static_cast(d) + ) + ); + } - if (i.tim_message_expiry) { - set_property(props, - MQTT_NS::v5::property::message_expiry_interval(static_cast(std::chrono::duration_cast( - i.tim_message_expiry->expiry() - std::chrono::steady_clock::now()).count()))); + session.con->publish( + MQTT_NS::force_move(i.topic), + MQTT_NS::force_move(i.contents), + MQTT_NS::force_move(i.pubopts), + MQTT_NS::force_move(props) + ); } - - session.con->publish( - MQTT_NS::force_move(i.topic), - MQTT_NS::force_move(i.contents), - MQTT_NS::force_move(i.pubopts), - MQTT_NS::force_move(props) - ); - }); - + ); seq_idx.pop_front(); } } @@ -1049,9 +1086,17 @@ class test_broker { auto props = MQTT_NS::force_move(session.will().value().props()); if (session.get_tim_will_expiry()) { - set_property(props, - MQTT_NS::v5::property::message_expiry_interval(static_cast( - std::chrono::duration_cast(session.get_tim_will_expiry()->expiry() - std::chrono::steady_clock::now()).count()))); + auto d = + std::chrono::duration_cast( + session.get_tim_will_expiry()->expiry() - std::chrono::steady_clock::now() + ).count(); + if (d < 0) d = 0; + set_property( + props, + MQTT_NS::v5::property::message_expiry_interval( + static_cast(d) + ) + ); } do_publish( @@ -1093,13 +1138,41 @@ class test_broker { do_send_will(e); e.con->for_each_store_with_life_keeper( - [&e] (MQTT_NS::store_message_variant msg, MQTT_NS::any life_keeper) { + [this, &e] (MQTT_NS::store_message_variant msg, MQTT_NS::any life_keeper) { MQTT_LOG("mqtt_broker", trace) << MQTT_ADD_VALUE(address, e.con.get()) << "store inflight message"; + + std::shared_ptr tim_message_expiry; + + MQTT_NS::visit( + MQTT_NS::make_lambda_visitor( + [&](MQTT_NS::v5::basic_publish_message const& m) { + auto v = get_property(m.props()); + if (v) { + tim_message_expiry = + std::make_shared(ioc_, std::chrono::seconds(v.value().val())); + tim_message_expiry->async_wait( + [&e, wp = std::weak_ptr(tim_message_expiry)] + (MQTT_NS::error_code ec) { + if (auto sp = wp.lock()) { + if (!ec) { + e.inflight_messages.get().erase(sp); + } + } + } + ); + } + }, + [&](auto const&) {} + ), + msg + ); + e.inflight_messages.emplace_back( MQTT_NS::force_move(msg), - MQTT_NS::force_move(life_keeper) + MQTT_NS::force_move(life_keeper), + MQTT_NS::force_move(tim_message_expiry) ); } ); @@ -1357,9 +1430,16 @@ class test_broker { props.push_back(MQTT_NS::v5::property::subscription_identifier(*sid)); } if (r.tim_message_expiry) { - set_property(props, - MQTT_NS::v5::property::message_expiry_interval(static_cast( - std::chrono::duration_cast(r.tim_message_expiry->expiry() - std::chrono::steady_clock::now()).count()))); + auto d = + std::chrono::duration_cast( + r.tim_message_expiry->expiry() - std::chrono::steady_clock::now() + ).count(); + set_property( + props, + MQTT_NS::v5::property::message_expiry_interval( + static_cast(d) + ) + ); } ep.publish( r.topic, @@ -1610,7 +1690,7 @@ class test_broker { MQTT_NS::optional message_expiry_interval; if (ep.get_protocol_version() == MQTT_NS::protocol_version::v5) { auto v = get_property(props); - if (v && v.value().val() != 0) { + if (v) { message_expiry_interval.emplace(std::chrono::seconds(v.value().val())); } @@ -1715,9 +1795,13 @@ class test_broker { using sub_con_map = multiple_subscription_map; struct inflight_message { - inflight_message(MQTT_NS::store_message_variant msg, MQTT_NS::any life_keeper) + inflight_message( + MQTT_NS::store_message_variant msg, + MQTT_NS::any life_keeper, + std::shared_ptr tim_message_expiry) :msg { MQTT_NS::force_move(msg) }, - life_keeper { MQTT_NS::force_move(life_keeper) } + life_keeper { MQTT_NS::force_move(life_keeper) }, + tim_message_expiry { MQTT_NS::force_move(tim_message_expiry) } {} packet_id_t packet_id() const { @@ -1733,6 +1817,7 @@ class test_broker { } MQTT_NS::store_message_variant msg; MQTT_NS::any life_keeper; + std::shared_ptr tim_message_expiry; }; using mi_inflight_message = mi::multi_index_container< @@ -1744,6 +1829,10 @@ class test_broker { mi::ordered_unique< mi::tag, BOOST_MULTI_INDEX_CONST_MEM_FUN(inflight_message, packet_id_t, packet_id) + >, + mi::ordered_non_unique< + mi::tag, + BOOST_MULTI_INDEX_MEMBER(inflight_message, std::shared_ptr, tim_message_expiry) > > >; @@ -1858,7 +1947,7 @@ class test_broker { MQTT_NS::optional message_expiry_interval; auto v = get_property(props); - if (v && v.value().val() != 0) { + if (v) { message_expiry_interval.emplace(std::chrono::seconds(v.value().val())); } @@ -1877,7 +1966,7 @@ class test_broker { } auto& seq_idx = offline_messages.get(); - auto i = seq_idx.emplace_back( + seq_idx.emplace_back( MQTT_NS::force_move(pub_topic), MQTT_NS::force_move(contents), MQTT_NS::force_move(props), @@ -1918,17 +2007,24 @@ class test_broker { std::set handles; // to efficient remove - void update_will(as::io_context& ioc, MQTT_NS::optional will, MQTT_NS::optional will_expiry_interval) { + void update_will( + as::io_context& ioc, + MQTT_NS::optional will, + MQTT_NS::optional will_expiry_interval) { tim_will_expiry.reset(); will_value = MQTT_NS::force_move(will); if (will_value && will_expiry_interval) { tim_will_expiry = std::make_shared(ioc, will_expiry_interval.value()); tim_will_expiry->async_wait( - [this, client_id = client_id, wp = std::weak_ptr(tim_will_expiry)](MQTT_NS::error_code ec) { + [this, client_id = client_id, wp = std::weak_ptr(tim_will_expiry)] + (MQTT_NS::error_code ec) { if (auto sp = wp.lock()) { - reset_will(); - } } + if (!ec) { + reset_will(); + } + } + } ); } } diff --git a/test/will.cpp b/test/will.cpp index 59820dd73..bbbd36061 100644 --- a/test/will.cpp +++ b/test/will.cpp @@ -195,7 +195,7 @@ BOOST_AUTO_TEST_CASE( will_qo0_timeout ) { boost::asio::io_context ioc; - constexpr uint32_t will_expiry_interval = 1; + uint32_t will_expiry_interval = 1; as::steady_timer timeout(ioc); as::steady_timer timeout_2(ioc); @@ -282,7 +282,7 @@ BOOST_AUTO_TEST_CASE( will_qo0_timeout ) { BOOST_CHECK(false); }); c2->set_v5_suback_handler( - [&chk, &c2, &c1_force_disconnect, &pid_sub2, &pid_unsub2, &timeout, &timeout_2, &will_expiry_interval] + [&chk, &c2, &c1_force_disconnect, &pid_sub2, &pid_unsub2, &timeout, &timeout_2, will_expiry_interval] (packet_id_t packet_id, std::vector reasons, MQTT_NS::v5::properties /*props*/) { MQTT_CHK("h_suback_2"); BOOST_TEST(packet_id == pid_sub2); @@ -311,18 +311,18 @@ BOOST_AUTO_TEST_CASE( will_qo0_timeout ) { }); c2->set_v5_unsuback_handler( [&chk, &c2, &pid_unsub2] - (packet_id_t packet_id, std::vector reasons, MQTT_NS::v5::properties /*props*/) { + (packet_id_t packet_id, std::vector, MQTT_NS::v5::properties /*props*/) { MQTT_CHK("h_unsuback_2"); BOOST_TEST(packet_id == pid_unsub2); c2->disconnect(); return true; }); c2->set_v5_publish_handler( - [&chk, &c2, &pid_unsub2] - (MQTT_NS::optional packet_id, - MQTT_NS::publish_options pubopts, - MQTT_NS::buffer topic, - MQTT_NS::buffer contents, + [] + (MQTT_NS::optional, + MQTT_NS::publish_options, + MQTT_NS::buffer, + MQTT_NS::buffer, MQTT_NS::v5::properties /*props*/) { // Will should not be received @@ -1044,7 +1044,7 @@ BOOST_AUTO_TEST_CASE( will_prop ) { BOOST_TEST(t.val() == MQTT_NS::v5::property::payload_format_indicator::string); }, [&](MQTT_NS::v5::property::message_expiry_interval const& t) { - BOOST_TEST(t.val() == 0x12345678UL); + BOOST_TEST(t.val() <= 0x12345678UL); }, [&](MQTT_NS::v5::property::will_delay_interval const& t) { BOOST_TEST(t.val() == 0x12345678UL);