diff --git a/.github/workflows/gha.yml b/.github/workflows/gha.yml index f9fece3bd..5a200da86 100644 --- a/.github/workflows/gha.yml +++ b/.github/workflows/gha.yml @@ -52,7 +52,18 @@ jobs: - name: Checkout uses: actions/checkout@v2 - name: Configure + env: + S_CFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -fno-omit-frame-pointer -fsanitize=address + S_CXXFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -fno-omit-frame-pointer -fsanitize=address -pedantic -Wno-noexcept-type -DBOOST_MULTI_INDEX_ENABLE_SAFE_MODE -DBOOST_MULTI_INDEX_DISABLE_SERIALIZATION -DBOOST_MULTI_INDEX_ENABLE_INVARIANT_CHECKING + S_LDFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -fno-omit-frame-pointer -fsanitize=address + NS_CFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion + NS_CXXFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -pedantic -Wno-noexcept-type -DBOOST_MULTI_INDEX_ENABLE_SAFE_MODE -DBOOST_MULTI_INDEX_DISABLE_SERIALIZATION -DBOOST_MULTI_INDEX_ENABLE_INVARIANT_CHECKING + NS_LDFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion run: | + [ ${{ matrix.pattern }} == 1 ] || [ ${{ matrix.pattern }} == 2 ] || [ ${{ matrix.pattern }} == 3 ] && \ + export CFLAGS=${S_CFLAGS} && export CXXFLAGS=${S_CXXFLAGS} && export LDFLAGS=${S_LDFLAGS} + [ ${{ matrix.pattern }} == 0 ] || [ ${{ matrix.pattern }} == 4 ] || [ ${{ matrix.pattern }} == 5 ] || [ ${{ matrix.pattern }} == 6 ] || [ ${{ matrix.pattern }} == 7 ] && \ + export CFLAGS=${NS_CFLAGS} && export CXXFLAGS=${NS_CXXFLAGS} && export LDFLAGS=${NS_LDFLAGS} [ ${{ matrix.pattern }} == 0 ] && FLAGS="-DCMAKE_CXX_COMPILER=clang++ -DMQTT_TEST_1=ON -DMQTT_TEST_2=ON -DMQTT_TEST_3=OFF -DMQTT_TEST_4=OFF -DMQTT_TEST_5=OFF -DMQTT_TEST_6=OFF -DMQTT_TEST_7=OFF -DMQTT_BUILD_EXAMPLES=OFF -DMQTT_USE_TLS=OFF -DMQTT_USE_WS=ON -DMQTT_USE_STR_CHECK=ON -DMQTT_USE_LOG=ON -DMQTT_STD_ANY=OFF -DMQTT_STD_OPTIONAL=OFF -DMQTT_STD_VARIANT=OFF -DMQTT_STD_STRING_VIEW=OFF -DMQTT_STD_SHARED_PTR_ARRAY=OFF" [ ${{ matrix.pattern }} == 1 ] && FLAGS="-DCMAKE_CXX_COMPILER=clang++ -DMQTT_TEST_1=ON -DMQTT_TEST_2=ON -DMQTT_TEST_3=ON -DMQTT_TEST_4=OFF -DMQTT_TEST_5=OFF -DMQTT_TEST_6=OFF -DMQTT_TEST_7=OFF -DMQTT_BUILD_EXAMPLES=OFF -DMQTT_USE_TLS=ON -DMQTT_USE_WS=ON -DMQTT_USE_STR_CHECK=ON -DMQTT_USE_LOG=OFF -DMQTT_STD_ANY=OFF -DMQTT_STD_OPTIONAL=OFF -DMQTT_STD_VARIANT=OFF -DMQTT_STD_STRING_VIEW=OFF -DMQTT_STD_SHARED_PTR_ARRAY=OFF" [ ${{ matrix.pattern }} == 2 ] && FLAGS="-DCMAKE_CXX_COMPILER=clang++ -DMQTT_TEST_1=OFF -DMQTT_TEST_2=OFF -DMQTT_TEST_3=OFF -DMQTT_TEST_4=ON -DMQTT_TEST_5=ON -DMQTT_TEST_6=ON -DMQTT_TEST_7=OFF -DMQTT_BUILD_EXAMPLES=OFF -DMQTT_USE_TLS=ON -DMQTT_USE_WS=ON -DMQTT_USE_STR_CHECK=ON -DMQTT_USE_LOG=OFF -DMQTT_STD_ANY=ON -DMQTT_STD_OPTIONAL=ON -DMQTT_STD_VARIANT=ON -DMQTT_STD_STRING_VIEW=ON -DMQTT_STD_SHARED_PTR_ARRAY=OFF" @@ -62,32 +73,17 @@ jobs: [ ${{ matrix.pattern }} == 6 ] && FLAGS="-DCMAKE_CXX_COMPILER=g++ -DMQTT_CODECOV=ON -DMQTT_TEST_1=OFF -DMQTT_TEST_2=OFF -DMQTT_TEST_3=OFF -DMQTT_TEST_4=OFF -DMQTT_TEST_5=ON -DMQTT_TEST_6=ON -DMQTT_TEST_7=OFF -DMQTT_BUILD_EXAMPLES=OFF -DMQTT_USE_TLS=ON -DMQTT_USE_WS=ON -DMQTT_USE_STR_CHECK=ON -DMQTT_USE_LOG=OFF -DMQTT_STD_ANY=ON -DMQTT_STD_OPTIONAL=ON -DMQTT_STD_VARIANT=ON -DMQTT_STD_STRING_VIEW=ON -DMQTT_STD_SHARED_PTR_ARRAY=OFF" [ ${{ matrix.pattern }} == 7 ] && FLAGS="-DCMAKE_CXX_COMPILER=g++ -DMQTT_CODECOV=ON -DMQTT_TEST_1=OFF -DMQTT_TEST_2=OFF -DMQTT_TEST_3=OFF -DMQTT_TEST_4=OFF -DMQTT_TEST_5=OFF -DMQTT_TEST_6=OFF -DMQTT_TEST_7=ON -DMQTT_BUILD_EXAMPLES=ON -DMQTT_USE_TLS=ON -DMQTT_USE_WS=OFF -DMQTT_USE_STR_CHECK=OFF -DMQTT_USE_LOG=ON -DMQTT_STD_ANY=OFF -DMQTT_STD_OPTIONAL=OFF -DMQTT_STD_VARIANT=OFF -DMQTT_STD_STRING_VIEW=OFF -DMQTT_STD_SHARED_PTR_ARRAY=OFF" - BOOST_ROOT=$BOOST_ROOT_1_72_0 cmake -S ${{ github.workspace }} -B ${{ runner.temp }} ${FLAGS} + BOOST_ROOT=$BOOST_ROOT_1_72_0 cmake -S ${{ github.workspace }} -B ${{ runner.temp }} ${FLAGS} -DCMAKE_C_FLAGS="${CFLAGS}" -DCMAKE_CXX_FLAGS="${CXXFLAGS}" -DCMAKE_EXE_LINKER_FLAGS="${LDFLAGS}" - name: Check Header Dependencies run: | cmake --build ${{ runner.temp }} --parallel $(nproc) --clean-first --target check_deps - - name: Compile with Sanitizer - if: (matrix.pattern == 1) || (matrix.pattern == 2) || (matrix.pattern == 3) - env: - CFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -fno-omit-frame-pointer -fsanitize=address - CXXFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -fno-omit-frame-pointer -fsanitize=address - LDFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -fno-omit-frame-pointer -fsanitize=address - run: | - CXXFLAGS="${CXXFLAGS} -pedantic -DBOOST_MULTI_INDEX_ENABLE_SAFE_MODE -DBOOST_MULTI_INDEX_DISABLE_SERIALIZATION -DBOOST_MULTI_INDEX_ENABLE_INVARIANT_CHECKING" - cmake --build ${{ runner.temp }} --clean-first - - name: Compile without Sanitizer - if: (matrix.pattern == 0) || (matrix.pattern == 4) || (matrix.pattern == 5) || (matrix.pattern == 6) || (matrix.pattern == 7) - env: - CFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -fno-omit-frame-pointer - CXXFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -fno-omit-frame-pointer - LDFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -fno-omit-frame-pointer + - name: Compile run: | - CXXFLAGS="${CXXFLAGS} -pedantic -DBOOST_MULTI_INDEX_ENABLE_SAFE_MODE -DBOOST_MULTI_INDEX_DISABLE_SERIALIZATION -DBOOST_MULTI_INDEX_ENABLE_INVARIANT_CHECKING" - cmake --build ${{ runner.temp }} --clean-first + VERBOSE=1 cmake --build ${{ runner.temp }} --clean-first - name: Test working-directory: ${{ runner.temp }} run: | - ctest -VV + CTEST_ARGS="--log_level=all" ctest -VV - name: Code Coverage if: (matrix.pattern == 4) || (matrix.pattern == 5) || (matrix.pattern == 6) || (matrix.pattern == 7) run: | diff --git a/azure-pipelines.yml b/azure-pipelines.yml index b6dd5f7e8..e1dfe6940 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -151,7 +151,11 @@ steps: return Write-Error "cmake --build failed" } cd test - ctest -VV + + # If you want to debug a specific test file with logs, do as follows instead of execute ctest + # Release\resend.exe --log_level=all + ctest -VV -C Release + if (!$?) { return Write-Error "ctest -VV failed" } diff --git a/include/mqtt/endpoint.hpp b/include/mqtt/endpoint.hpp index 209aa6061..9bd15a377 100644 --- a/include/mqtt/endpoint.hpp +++ b/include/mqtt/endpoint.hpp @@ -783,8 +783,7 @@ class endpoint : public std::enable_shared_from_this; -BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(logger, global_logger_t); +BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(logger, global_logger_t) // Normal attributes BOOST_LOG_ATTRIBUTE_KEYWORD(file, "MqttFile", std::string) diff --git a/include/mqtt/topic_alias_recv.hpp b/include/mqtt/topic_alias_recv.hpp index 8f8130209..ce4ff04fa 100644 --- a/include/mqtt/topic_alias_recv.hpp +++ b/include/mqtt/topic_alias_recv.hpp @@ -26,7 +26,7 @@ namespace MQTT_NS { using topic_alias_recv_map_t = std::map; inline void register_topic_alias(topic_alias_recv_map_t& m, string_view topic, topic_alias_t alias) { - BOOST_ASSERT(alias > 0 && alias <= topic_alias_max); + BOOST_ASSERT(alias > 0); //alias <= topic_alias_max is always true MQTT_LOG("mqtt_impl", info) << MQTT_ADD_VALUE(address, &m) @@ -43,7 +43,7 @@ inline void register_topic_alias(topic_alias_recv_map_t& m, string_view topic, } inline std::string find_topic_by_alias(topic_alias_recv_map_t const& m, topic_alias_t alias) { - BOOST_ASSERT(alias > 0 && alias <= topic_alias_max); + BOOST_ASSERT(alias > 0); //alias <= topic_alias_max is always true std::string topic; auto it = m.find(alias); diff --git a/include/mqtt/v5_message.hpp b/include/mqtt/v5_message.hpp index 22f462c4f..88b6332ff 100644 --- a/include/mqtt/v5_message.hpp +++ b/include/mqtt/v5_message.hpp @@ -886,6 +886,14 @@ class basic_publish_message { return props_; } + /** + * @brief Get properties + * @return properties + */ + properties& props() { + return props_; + } + /** * @brief Set dup flag * @param dup flag value to set diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index c95bd4138..c5e505571 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -123,7 +123,13 @@ FOREACH (source_file ${check_PROGRAMS}) ENDIF () ENDIF () - ADD_TEST (${source_file_we} ${source_file_we}) + # Running test with arguments + # CTEST_ARGS="--log_level=all" ctest -V + IF ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "MSVC") + ADD_TEST (NAME ${source_file_we} COMMAND ${CMAKE_CURRENT_BINARY_DIR}/${source_file_we}) + ELSE () + ADD_TEST (NAME ${source_file_we} COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/args_provider.sh ${CMAKE_CURRENT_BINARY_DIR}/${source_file_we}) + ENDIF () set_tests_properties(${source_file_we} PROPERTIES TIMEOUT 300) ENDFOREACH () diff --git a/test/args_provider.ps1 b/test/args_provider.ps1 new file mode 100755 index 000000000..d6f6fd7bb --- /dev/null +++ b/test/args_provider.ps1 @@ -0,0 +1,2 @@ +Param( $command ) +Start-Process -FilePath $command -ArgumentList $env:CTEST_ARGS -Wait diff --git a/test/args_provider.sh b/test/args_provider.sh new file mode 100755 index 000000000..9ec10c0aa --- /dev/null +++ b/test/args_provider.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +$1 $(echo ${CTEST_ARGS}) diff --git a/test/broker_offline_message.cpp b/test/broker_offline_message.cpp index 77ce50ba5..33b0621de 100644 --- a/test/broker_offline_message.cpp +++ b/test/broker_offline_message.cpp @@ -351,7 +351,7 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v5 ) { } ); c2->set_v5_connack_handler( - [&chk, &c1, &c2] + [&chk, &c2] (bool sp, MQTT_NS::v5::connect_reason_code connack_reason_code, MQTT_NS::v5::properties /*props*/) { auto ret = chk.match( "c1_h_connack", @@ -454,7 +454,7 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v5 ) { BOOST_TEST(t.val() == "content type"); }, [&](MQTT_NS::v5::property::message_expiry_interval const& t) { - BOOST_TEST(t.val() == 0x12345678UL); + BOOST_TEST(t.val() <= 0x12345678UL); }, [&](MQTT_NS::v5::property::response_topic const& t) { BOOST_TEST(t.val() == "response topic"); @@ -645,9 +645,6 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v5_timeout ) { MQTT_NS::v5::property::user_property("key2"_mb, "val2"_mb), }; - auto prop_size = ps.size(); - std::size_t user_prop_count = 0; - c1->set_v5_connack_handler( [&chk, &c2] (bool sp, MQTT_NS::v5::connect_reason_code connack_reason_code, MQTT_NS::v5::properties /*props*/) { @@ -767,13 +764,12 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v5_timeout ) { } ); c2->set_v5_publish_handler( - [&chk, &c1] - (MQTT_NS::optional packet_id, - MQTT_NS::publish_options pubopts, - MQTT_NS::buffer topic, - MQTT_NS::buffer contents, - MQTT_NS::v5::properties props) { - + [] + (MQTT_NS::optional, + MQTT_NS::publish_options, + MQTT_NS::buffer, + MQTT_NS::buffer, + MQTT_NS::v5::properties) { // We should not received any published message when offline messages timeout BOOST_TEST(false); return true; diff --git a/test/resend.cpp b/test/resend.cpp index ee2bb5b5f..c83d17396 100644 --- a/test/resend.cpp +++ b/test/resend.cpp @@ -1654,6 +1654,340 @@ BOOST_AUTO_TEST_CASE( pubrel_qos2_from_broker ) { do_combi_test_sync(test); } +BOOST_AUTO_TEST_CASE( publish_message_expired_from_broker ) { + auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) { + if (c->get_protocol_version() != MQTT_NS::protocol_version::v5) { + finish(); + return; + } + + using packet_id_t = typename std::remove_reference_t::packet_id_t; + c->set_client_id("cid1"); + c->set_clean_session(true); + c->set_auto_pub_response(false); + + packet_id_t pid_pub; + + boost::asio::steady_timer tim(ioc); + + checker chk = { + cont("start"), + // connect + cont("h_connack1"), + // disconnect + cont("h_close1"), + // connect + cont("h_connack2"), + cont("h_suback"), + // publish topic1 QoS1 + cont("h_puback"), + deps("h_publish", "h_suback"), + // force_disconnect + deps("h_error", "h_puback", "h_publish"), + // connect + cont("h_connack3"), + // disconnect + cont("h_close2"), + }; + + c->set_v5_connack_handler( + [&chk, &c] + (bool sp, MQTT_NS::v5::connect_reason_code connack_return_code, MQTT_NS::v5::properties /*props*/) { + BOOST_TEST(connack_return_code == MQTT_NS::v5::connect_reason_code::success); + auto ret = chk.match( + "start", + [&] { + MQTT_CHK("h_connack1"); + BOOST_TEST(sp == false); + c->disconnect(); + }, + "h_close1", + [&] { + MQTT_CHK("h_connack2"); + // The previous connection is not set Session Expiry Interval. + // That means session state is cleared on close. + BOOST_TEST(sp == false); + c->subscribe("topic1", MQTT_NS::qos::exactly_once); + }, + "h_error", + [&] { + MQTT_CHK("h_connack3"); + BOOST_TEST(sp == true); + c->disconnect(); + } + ); + BOOST_TEST(ret); + return true; + }); + c->set_v5_suback_handler( + [&chk, &c] + (packet_id_t /*packet_id*/, std::vector reasons, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_suback"); + BOOST_TEST(reasons.size() == 1U); + BOOST_TEST(reasons.size() == 1U); + BOOST_TEST(reasons[0] == MQTT_NS::v5::suback_reason_code::granted_qos_2); + c->publish( + "topic1", + "topic1_contents", + MQTT_NS::qos::at_least_once, + MQTT_NS::v5::properties { MQTT_NS::v5::property::message_expiry_interval(1) } + ); + return true; + } + ); + c->set_v5_publish_handler( + [&chk, &c, &pid_pub] + (MQTT_NS::optional packet_id, + MQTT_NS::publish_options pubopts, + MQTT_NS::buffer topic, + MQTT_NS::buffer contents, + MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_publish"); + BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::no); + BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::at_least_once); + BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no); + BOOST_CHECK(packet_id); + pid_pub = packet_id.value(); + BOOST_TEST(topic == "topic1"); + BOOST_TEST(contents == "topic1_contents"); + if (chk.passed("h_puback")) { + c->force_disconnect(); + } + return true; + } + ); + c->set_v5_puback_handler( + [&chk, &c] + (packet_id_t /*packet_id*/, MQTT_NS::v5::puback_reason_code, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_puback"); + if (chk.passed("h_publish")) { + c->force_disconnect(); + } + return true; + } + ); + + c->set_close_handler( + [&chk, &c, &finish] + () { + auto ret = chk.match( + "h_connack1", + [&] { + MQTT_CHK("h_close1"); + connect_no_clean(c); + }, + "h_puback", + [&] { + MQTT_CHK("h_close2"); + finish(); + } + ); + BOOST_TEST(ret); + }); + c->set_error_handler( + [&chk, &c, &tim] + (MQTT_NS::error_code) { + MQTT_CHK("h_error"); + tim.expires_after(std::chrono::seconds(2)); + tim.async_wait( + [&c] (MQTT_NS::error_code ec) { + BOOST_ASSERT( ! ec); + connect_no_clean(c); + } + ); + }); + MQTT_CHK("start"); + c->connect(); + ioc.run(); + BOOST_TEST(chk.all()); + }; + do_combi_test_sync(test); +} + +BOOST_AUTO_TEST_CASE( publish_message_expiry_update_from_broker ) { + auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) { + if (c->get_protocol_version() != MQTT_NS::protocol_version::v5) { + finish(); + return; + } + + using packet_id_t = typename std::remove_reference_t::packet_id_t; + c->set_client_id("cid1"); + c->set_clean_session(true); + c->set_auto_pub_response(false); + + packet_id_t pid_pub; + + boost::asio::steady_timer tim(ioc); + + checker chk = { + cont("start"), + // connect + cont("h_connack1"), + // disconnect + cont("h_close1"), + // connect + cont("h_connack2"), + cont("h_suback"), + // publish topic1 QoS1 + cont("h_puback"), + deps("h_publish1", "h_suback"), + // force_disconnect + deps("h_error", "h_puback", "h_publish1"), + // connect + cont("h_connack3"), + cont("h_publish2"), + // disconnect + cont("h_close2"), + }; + + c->set_v5_connack_handler( + [&chk, &c] + (bool sp, MQTT_NS::v5::connect_reason_code connack_return_code, MQTT_NS::v5::properties /*props*/) { + BOOST_TEST(connack_return_code == MQTT_NS::v5::connect_reason_code::success); + auto ret = chk.match( + "start", + [&] { + MQTT_CHK("h_connack1"); + BOOST_TEST(sp == false); + c->disconnect(); + }, + "h_close1", + [&] { + MQTT_CHK("h_connack2"); + // The previous connection is not set Session Expiry Interval. + // That means session state is cleared on close. + BOOST_TEST(sp == false); + c->subscribe("topic1", MQTT_NS::qos::exactly_once); + }, + "h_error", + [&] { + MQTT_CHK("h_connack3"); + BOOST_TEST(sp == true); + } + ); + BOOST_TEST(ret); + return true; + } + ); + c->set_v5_suback_handler( + [&chk, &c] + (packet_id_t /*packet_id*/, std::vector reasons, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_suback"); + BOOST_TEST(reasons.size() == 1U); + BOOST_TEST(reasons.size() == 1U); + BOOST_TEST(reasons[0] == MQTT_NS::v5::suback_reason_code::granted_qos_2); + c->publish( + "topic1", + "topic1_contents", + MQTT_NS::qos::at_least_once, + MQTT_NS::v5::properties { MQTT_NS::v5::property::message_expiry_interval(5) } + ); + return true; + } + ); + c->set_v5_publish_handler( + [&chk, &c, &pid_pub] + (MQTT_NS::optional packet_id, + MQTT_NS::publish_options pubopts, + MQTT_NS::buffer topic, + MQTT_NS::buffer contents, + MQTT_NS::v5::properties props) { + auto ret = chk.match( + "h_suback", + [&] { + MQTT_CHK("h_publish1"); + BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::no); + BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::at_least_once); + BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no); + BOOST_CHECK(packet_id); + pid_pub = packet_id.value(); + BOOST_TEST(topic == "topic1"); + BOOST_TEST(contents == "topic1_contents"); + if (chk.passed("h_puback")) { + c->force_disconnect(); + } + }, + "h_publish1", + [&] { + MQTT_CHK("h_publish2"); + BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::yes); + BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::at_least_once); + BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no); + BOOST_CHECK(packet_id); + BOOST_TEST(packet_id.value() == pid_pub); + BOOST_TEST(topic == "topic1"); + BOOST_TEST(contents == "topic1_contents"); + + for (auto const& p : props) { + MQTT_NS::visit( + MQTT_NS::make_lambda_visitor( + [&](MQTT_NS::v5::property::message_expiry_interval const& t) { + BOOST_TEST(t.val() < 5); + }, + [&](auto&& ...) { + BOOST_TEST(false); + } + ), + p + ); + } + c->puback(packet_id.value()); + c->disconnect(); + } + ); + BOOST_TEST(ret); + return true; + } + ); + c->set_v5_puback_handler( + [&chk, &c] + (packet_id_t /*packet_id*/, MQTT_NS::v5::puback_reason_code, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_puback"); + if (chk.passed("h_publish1")) { + c->force_disconnect(); + } + return true; + } + ); + + c->set_close_handler( + [&chk, &c, &finish] + () { + auto ret = chk.match( + "h_connack1", + [&] { + MQTT_CHK("h_close1"); + connect_no_clean(c); + }, + "h_close1", + [&] { + MQTT_CHK("h_close2"); + finish(); + } + ); + BOOST_TEST(ret); + }); + c->set_error_handler( + [&chk, &c, &tim] + (MQTT_NS::error_code) { + MQTT_CHK("h_error"); + tim.expires_after(std::chrono::seconds(2)); + tim.async_wait( + [&c] (MQTT_NS::error_code ec) { + BOOST_ASSERT( ! ec); + connect_no_clean(c); + } + ); + }); + MQTT_CHK("start"); + c->connect(); + ioc.run(); + BOOST_TEST(chk.all()); + }; + do_combi_test_sync(test); +} + BOOST_AUTO_TEST_CASE( multi_publish_qos1 ) { auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) { using packet_id_t = typename std::remove_reference_t::packet_id_t; diff --git a/test/retain_2.cpp b/test/retain_2.cpp index 937de9fbf..cbfa87c6b 100644 --- a/test/retain_2.cpp +++ b/test/retain_2.cpp @@ -294,7 +294,7 @@ BOOST_AUTO_TEST_CASE( retain_and_publish_timeout ) { cont("h_close"), }; - constexpr unsigned int message_timeout = 1; + std::uint32_t message_timeout = 1; as::steady_timer timeout(ioc); switch (c->get_protocol_version()) { @@ -327,7 +327,7 @@ BOOST_AUTO_TEST_CASE( retain_and_publish_timeout ) { return true; }); c->set_v5_suback_handler( - [&chk, &c, &pid_sub, &pid_unsub, &timeout, &message_timeout] + [&chk, &c, &pid_sub, &pid_unsub, &timeout, message_timeout] (packet_id_t packet_id, std::vector reasons, MQTT_NS::v5::properties /*props*/) { BOOST_TEST(packet_id == pid_sub); BOOST_TEST(reasons.size() == 1U); @@ -1165,7 +1165,7 @@ BOOST_AUTO_TEST_CASE( prop ) { BOOST_TEST(t.val() == MQTT_NS::v5::property::payload_format_indicator::string); }, [&](MQTT_NS::v5::property::message_expiry_interval const& t) { - BOOST_TEST(t.val() == 0x12345678UL); + BOOST_TEST(t.val() <= 0x12345678UL); }, [&](MQTT_NS::v5::property::response_topic const& t) { BOOST_TEST(t.val() == "response topic"); diff --git a/test/retained_topic_map.cpp b/test/retained_topic_map.cpp index ba6daa27c..d005fb3d0 100644 --- a/test/retained_topic_map.cpp +++ b/test/retained_topic_map.cpp @@ -206,25 +206,32 @@ BOOST_AUTO_TEST_CASE(erase_upper_first) { } } +#if 0 + BOOST_AUTO_TEST_CASE(large_number_of_topics) { - retained_topic_map map; + retained_topic_map map; - constexpr size_t num_topics = 10000; - for (size_t i = 0; i < num_topics; ++i) { + constexpr std::size_t num_topics = 10000; + for (std::size_t i = 0; i < num_topics; ++i) { map.insert_or_assign((boost::format("topic/%d") % i).str(), i); } BOOST_TEST(map.size() == num_topics); - for (size_t i = 0; i < num_topics; ++i) { - map.find( - (boost::format("topic/%d") % i).str(), - [&](size_t value) { - BOOST_TEST(value == i); - } - ); - } + try { + for (std::size_t i = 0; i < num_topics; ++i) { + map.find( + (boost::format("topic/%d") % i).str(), + [&](std::size_t value) { + if (value != i) throw false; + } + ); + } + } + catch (bool) { + BOOST_TEST(false); + } - for (size_t i = 0; i < num_topics; ++i) { + for (std::size_t i = 0; i < num_topics; ++i) { map.erase((boost::format("topic/%d") % i).str()); } @@ -232,4 +239,6 @@ BOOST_AUTO_TEST_CASE(large_number_of_topics) { BOOST_TEST(map.internal_size() == 1); } +#endif + BOOST_AUTO_TEST_SUITE_END() diff --git a/test/subscription_map.hpp b/test/subscription_map.hpp index d51266637..3d2e90a3d 100644 --- a/test/subscription_map.hpp +++ b/test/subscription_map.hpp @@ -8,11 +8,14 @@ #define MQTT_SUBSCRIPTION_MAP_HPP #include + #include +#include + #include #include +#include -#include #include "topic_filter_tokenizer.hpp" /** @@ -81,37 +84,82 @@ // Compile error on other platforms (not 32 or 64 bit) template -struct count_storage -{ +struct count_storage { static_assert(N == 4 || N == 8, "Subscription map count_storage only knows how to handle architectures with 32 or 64 bit size_t: please update to support your platform."); }; template<> -struct count_storage<4> -{ - std::uint32_t value : 30; - std::uint32_t has_hash_child : 1; - std::uint32_t has_plus_child : 1; - - count_storage(std::uint32_t value = 1) - : value(value), has_hash_child(false), has_plus_child(false) +struct count_storage<4> { +public: + count_storage(std::uint32_t v = 1) + : value_(v & 0x3fffffffUL), has_hash_child_(false), has_plus_child_(false) { } static constexpr std::size_t max() { return std::numeric_limits::max() >> 2; } + + std::uint32_t value() const { return value_; } + void set_value(std::uint32_t v) { + value_ = v & 0x3fffffffUL; + } + void increment_value() { + ++value_; + } + void decrement_value() { + --value_; + } + + bool has_hash_child() const { return has_hash_child_; } + void set_hash_child(bool v) { + has_hash_child_ = v; + } + + bool has_plus_child() const { return has_plus_child_; } + void set_plus_child(bool v) { + has_plus_child_ = v; + } + +private: + std::uint32_t value_ : 30; + std::uint32_t has_hash_child_ : 1; + std::uint32_t has_plus_child_ : 1; + }; template<> -struct count_storage<8> -{ - std::uint64_t value : 62; - std::uint64_t has_hash_child : 1; - std::uint64_t has_plus_child : 1; - - count_storage(std::uint64_t value = 1) - : value(value), has_hash_child(false), has_plus_child(false) +struct count_storage<8> { +public: + count_storage(std::uint64_t v= 1) + : value_(v & 0x3fffffffffffffffULL), has_hash_child_(false), has_plus_child_(false) { } static constexpr std::uint64_t max() { return std::numeric_limits::max() >> 2; } + + std::uint64_t value() const { return value_; } + void set_value(std::uint64_t v) { + value_ = v & 0x3fffffffffffffffULL; + } + void increment_value() { + ++value_; + } + void decrement_value() { + --value_; + } + + bool has_hash_child() const { return has_hash_child_; } + void set_hash_child(bool v) { + has_hash_child_ = v; + } + + bool has_plus_child() const { return has_plus_child_; } + void set_plus_child(bool v) { + has_plus_child_ = v; + } + + +private: + std::uint64_t value_ : 62; + std::uint64_t has_hash_child_ : 1; + std::uint64_t has_plus_child_ : 1; }; template @@ -147,11 +195,11 @@ class subscription_map_base { // Increase the subscription count for a specific node static void increase_count_storage(count_storage_t &count) { - if(count.value == count_storage_t::max()) { + if(count.value() == count_storage_t::max()) { throw_max_stored_topics(); } - ++count.value; + count.increment_value(); } using this_type = subscription_map_base; @@ -230,8 +278,8 @@ class subscription_map_base { path_entry(generate_node_id(), parent->first) ).first; - parent->second.count.has_plus_child |= (t == "+"); - parent->second.count.has_hash_child |= (t == "#"); + parent->second.count.set_plus_child(parent->second.count.has_plus_child() | (t == "+")); + parent->second.count.set_hash_child(parent->second.count.has_hash_child() | (t == "#")); } else { increase_count_storage(entry->second.count); @@ -254,17 +302,17 @@ class subscription_map_base { // Go through entries to remove for (auto& entry : boost::adaptors::reverse(path)) { if (remove_plus_child_flag) { - entry->second.count.has_plus_child = false; + entry->second.count.set_plus_child(false); remove_plus_child_flag = false; } if (remove_hash_child_flag) { - entry->second.count.has_hash_child = false; + entry->second.count.set_hash_child(false); remove_hash_child_flag = false; } - --entry->second.count.value; - if (entry->second.count.value == 0) { + entry->second.count.decrement_value(); + if (entry->second.count.value() == 0) { remove_plus_child_flag = (entry->first.second == "+"); remove_hash_child_flag = (entry->first.second == "#"); @@ -276,11 +324,11 @@ class subscription_map_base { auto root = get_root(); if (remove_plus_child_flag) { - root->second.count.has_plus_child = false; + root->second.count.set_plus_child(false); } if (remove_hash_child_flag) { - root->second.count.has_hash_child = false; + root->second.count.set_hash_child(false); } } @@ -303,7 +351,7 @@ class subscription_map_base { new_entries.push_back(i); } - if (entry->second.count .has_plus_child) { + if (entry->second.count .has_plus_child()) { i = self.map.find(path_entry_key(parent, MQTT_NS::string_view("+"))); if (i != self.map.end()) { if (parent != self.root_node_id || t.empty() || t[0] != '$') { @@ -312,7 +360,7 @@ class subscription_map_base { } } - if (entry->second.count.has_hash_child) { + if (entry->second.count.has_hash_child()) { i = self.map.find(path_entry_key(parent, MQTT_NS::string_view("#"))); if (i != self.map.end()) { if (parent != self.root_node_id || t.empty() || t[0] != '$'){ diff --git a/test/test_broker.hpp b/test/test_broker.hpp index aefd052d0..0e1f206f9 100644 --- a/test/test_broker.hpp +++ b/test/test_broker.hpp @@ -708,12 +708,12 @@ class test_broker { } template - static MQTT_NS::optional get_property(MQTT_NS::v5::properties const &props) { + static MQTT_NS::optional get_property(MQTT_NS::v5::properties const& props) { MQTT_NS::optional result; auto visitor = MQTT_NS::make_lambda_visitor( [&result](T const& t) { result = t; }, - [](auto&& ...) { } + [](auto const&) { } ); for (auto const& p : props) { @@ -724,13 +724,13 @@ class test_broker { } template - static void set_property(MQTT_NS::v5::properties const &props, T&& v) { + static void set_property(MQTT_NS::v5::properties& props, T&& v) { auto visitor = MQTT_NS::make_lambda_visitor( [&v](T& t) mutable { t = std::forward(v); }, - [](auto&& ...) { } + [](auto&) { } ); - for (auto const& p : props) { + for (auto& p : props) { MQTT_NS::visit(visitor, p); } } @@ -776,7 +776,7 @@ class test_broker { if (will) { auto v = get_property(will.value().props()); - if (v && v.value().val() != 0) { + if (v) { will_expiry_interval.emplace(std::chrono::seconds(v.value().val())); } } @@ -833,7 +833,34 @@ class test_broker { auto send_inflight_messages = [&] (session_state& session) { for (auto const& ifm : session.inflight_messages) { - session.con->send_store_message(ifm.msg, ifm.life_keeper); + MQTT_NS::optional msg; + if (ifm.tim_message_expiry) { + MQTT_NS::visit( + MQTT_NS::make_lambda_visitor( + [&](MQTT_NS::v5::basic_publish_message const& m) { + auto updated_msg = m; + auto& props = updated_msg.props(); + + auto d = + std::chrono::duration_cast( + ifm.tim_message_expiry->expiry() - std::chrono::steady_clock::now() + ).count(); + if (d < 0) d = 0; + set_property( + props, + MQTT_NS::v5::property::message_expiry_interval( + static_cast(d) + ) + ); + msg.emplace(MQTT_NS::force_move(updated_msg)); + }, + [](auto const&) { + } + ), + ifm.msg + ); + } + session.con->send_store_message(msg ? msg.value() : ifm.msg, ifm.life_keeper); } }; @@ -842,23 +869,33 @@ class test_broker { try { auto &seq_idx = session.offline_messages.get(); while(!seq_idx.empty()) { - seq_idx.modify(seq_idx.begin(), [&](auto &i) { - auto props = MQTT_NS::force_move(i.props); + seq_idx.modify( + seq_idx.begin(), + [&](auto &i) { + auto props = MQTT_NS::force_move(i.props); + + if (i.tim_message_expiry) { + auto d = + std::chrono::duration_cast( + i.tim_message_expiry->expiry() - std::chrono::steady_clock::now() + ).count(); + if (d < 0) d = 0; + set_property( + props, + MQTT_NS::v5::property::message_expiry_interval( + static_cast(d) + ) + ); + } - if (i.tim_message_expiry) { - set_property(props, - MQTT_NS::v5::property::message_expiry_interval(static_cast(std::chrono::duration_cast( - i.tim_message_expiry->expiry() - std::chrono::steady_clock::now()).count()))); + session.con->publish( + MQTT_NS::force_move(i.topic), + MQTT_NS::force_move(i.contents), + MQTT_NS::force_move(i.pubopts), + MQTT_NS::force_move(props) + ); } - - session.con->publish( - MQTT_NS::force_move(i.topic), - MQTT_NS::force_move(i.contents), - MQTT_NS::force_move(i.pubopts), - MQTT_NS::force_move(props) - ); - }); - + ); seq_idx.pop_front(); } } @@ -1049,9 +1086,17 @@ class test_broker { auto props = MQTT_NS::force_move(session.will().value().props()); if (session.get_tim_will_expiry()) { - set_property(props, - MQTT_NS::v5::property::message_expiry_interval(static_cast( - std::chrono::duration_cast(session.get_tim_will_expiry()->expiry() - std::chrono::steady_clock::now()).count()))); + auto d = + std::chrono::duration_cast( + session.get_tim_will_expiry()->expiry() - std::chrono::steady_clock::now() + ).count(); + if (d < 0) d = 0; + set_property( + props, + MQTT_NS::v5::property::message_expiry_interval( + static_cast(d) + ) + ); } do_publish( @@ -1093,13 +1138,41 @@ class test_broker { do_send_will(e); e.con->for_each_store_with_life_keeper( - [&e] (MQTT_NS::store_message_variant msg, MQTT_NS::any life_keeper) { + [this, &e] (MQTT_NS::store_message_variant msg, MQTT_NS::any life_keeper) { MQTT_LOG("mqtt_broker", trace) << MQTT_ADD_VALUE(address, e.con.get()) << "store inflight message"; + + std::shared_ptr tim_message_expiry; + + MQTT_NS::visit( + MQTT_NS::make_lambda_visitor( + [&](MQTT_NS::v5::basic_publish_message const& m) { + auto v = get_property(m.props()); + if (v) { + tim_message_expiry = + std::make_shared(ioc_, std::chrono::seconds(v.value().val())); + tim_message_expiry->async_wait( + [&e, wp = std::weak_ptr(tim_message_expiry)] + (MQTT_NS::error_code ec) { + if (auto sp = wp.lock()) { + if (!ec) { + e.inflight_messages.get().erase(sp); + } + } + } + ); + } + }, + [&](auto const&) {} + ), + msg + ); + e.inflight_messages.emplace_back( MQTT_NS::force_move(msg), - MQTT_NS::force_move(life_keeper) + MQTT_NS::force_move(life_keeper), + MQTT_NS::force_move(tim_message_expiry) ); } ); @@ -1357,9 +1430,16 @@ class test_broker { props.push_back(MQTT_NS::v5::property::subscription_identifier(*sid)); } if (r.tim_message_expiry) { - set_property(props, - MQTT_NS::v5::property::message_expiry_interval(static_cast( - std::chrono::duration_cast(r.tim_message_expiry->expiry() - std::chrono::steady_clock::now()).count()))); + auto d = + std::chrono::duration_cast( + r.tim_message_expiry->expiry() - std::chrono::steady_clock::now() + ).count(); + set_property( + props, + MQTT_NS::v5::property::message_expiry_interval( + static_cast(d) + ) + ); } ep.publish( r.topic, @@ -1610,7 +1690,7 @@ class test_broker { MQTT_NS::optional message_expiry_interval; if (ep.get_protocol_version() == MQTT_NS::protocol_version::v5) { auto v = get_property(props); - if (v && v.value().val() != 0) { + if (v) { message_expiry_interval.emplace(std::chrono::seconds(v.value().val())); } @@ -1715,9 +1795,13 @@ class test_broker { using sub_con_map = multiple_subscription_map; struct inflight_message { - inflight_message(MQTT_NS::store_message_variant msg, MQTT_NS::any life_keeper) + inflight_message( + MQTT_NS::store_message_variant msg, + MQTT_NS::any life_keeper, + std::shared_ptr tim_message_expiry) :msg { MQTT_NS::force_move(msg) }, - life_keeper { MQTT_NS::force_move(life_keeper) } + life_keeper { MQTT_NS::force_move(life_keeper) }, + tim_message_expiry { MQTT_NS::force_move(tim_message_expiry) } {} packet_id_t packet_id() const { @@ -1733,6 +1817,7 @@ class test_broker { } MQTT_NS::store_message_variant msg; MQTT_NS::any life_keeper; + std::shared_ptr tim_message_expiry; }; using mi_inflight_message = mi::multi_index_container< @@ -1744,6 +1829,10 @@ class test_broker { mi::ordered_unique< mi::tag, BOOST_MULTI_INDEX_CONST_MEM_FUN(inflight_message, packet_id_t, packet_id) + >, + mi::ordered_non_unique< + mi::tag, + BOOST_MULTI_INDEX_MEMBER(inflight_message, std::shared_ptr, tim_message_expiry) > > >; @@ -1858,7 +1947,7 @@ class test_broker { MQTT_NS::optional message_expiry_interval; auto v = get_property(props); - if (v && v.value().val() != 0) { + if (v) { message_expiry_interval.emplace(std::chrono::seconds(v.value().val())); } @@ -1877,7 +1966,7 @@ class test_broker { } auto& seq_idx = offline_messages.get(); - auto i = seq_idx.emplace_back( + seq_idx.emplace_back( MQTT_NS::force_move(pub_topic), MQTT_NS::force_move(contents), MQTT_NS::force_move(props), @@ -1918,17 +2007,24 @@ class test_broker { std::set handles; // to efficient remove - void update_will(as::io_context& ioc, MQTT_NS::optional will, MQTT_NS::optional will_expiry_interval) { + void update_will( + as::io_context& ioc, + MQTT_NS::optional will, + MQTT_NS::optional will_expiry_interval) { tim_will_expiry.reset(); will_value = MQTT_NS::force_move(will); if (will_value && will_expiry_interval) { tim_will_expiry = std::make_shared(ioc, will_expiry_interval.value()); tim_will_expiry->async_wait( - [this, client_id = client_id, wp = std::weak_ptr(tim_will_expiry)](MQTT_NS::error_code ec) { + [this, client_id = client_id, wp = std::weak_ptr(tim_will_expiry)] + (MQTT_NS::error_code ec) { if (auto sp = wp.lock()) { - reset_will(); - } } + if (!ec) { + reset_will(); + } + } + } ); } } diff --git a/test/will.cpp b/test/will.cpp index 59820dd73..bbbd36061 100644 --- a/test/will.cpp +++ b/test/will.cpp @@ -195,7 +195,7 @@ BOOST_AUTO_TEST_CASE( will_qo0_timeout ) { boost::asio::io_context ioc; - constexpr uint32_t will_expiry_interval = 1; + uint32_t will_expiry_interval = 1; as::steady_timer timeout(ioc); as::steady_timer timeout_2(ioc); @@ -282,7 +282,7 @@ BOOST_AUTO_TEST_CASE( will_qo0_timeout ) { BOOST_CHECK(false); }); c2->set_v5_suback_handler( - [&chk, &c2, &c1_force_disconnect, &pid_sub2, &pid_unsub2, &timeout, &timeout_2, &will_expiry_interval] + [&chk, &c2, &c1_force_disconnect, &pid_sub2, &pid_unsub2, &timeout, &timeout_2, will_expiry_interval] (packet_id_t packet_id, std::vector reasons, MQTT_NS::v5::properties /*props*/) { MQTT_CHK("h_suback_2"); BOOST_TEST(packet_id == pid_sub2); @@ -311,18 +311,18 @@ BOOST_AUTO_TEST_CASE( will_qo0_timeout ) { }); c2->set_v5_unsuback_handler( [&chk, &c2, &pid_unsub2] - (packet_id_t packet_id, std::vector reasons, MQTT_NS::v5::properties /*props*/) { + (packet_id_t packet_id, std::vector, MQTT_NS::v5::properties /*props*/) { MQTT_CHK("h_unsuback_2"); BOOST_TEST(packet_id == pid_unsub2); c2->disconnect(); return true; }); c2->set_v5_publish_handler( - [&chk, &c2, &pid_unsub2] - (MQTT_NS::optional packet_id, - MQTT_NS::publish_options pubopts, - MQTT_NS::buffer topic, - MQTT_NS::buffer contents, + [] + (MQTT_NS::optional, + MQTT_NS::publish_options, + MQTT_NS::buffer, + MQTT_NS::buffer, MQTT_NS::v5::properties /*props*/) { // Will should not be received @@ -1044,7 +1044,7 @@ BOOST_AUTO_TEST_CASE( will_prop ) { BOOST_TEST(t.val() == MQTT_NS::v5::property::payload_format_indicator::string); }, [&](MQTT_NS::v5::property::message_expiry_interval const& t) { - BOOST_TEST(t.val() == 0x12345678UL); + BOOST_TEST(t.val() <= 0x12345678UL); }, [&](MQTT_NS::v5::property::will_delay_interval const& t) { BOOST_TEST(t.val() == 0x12345678UL);