diff --git a/test/broker_offline_message.cpp b/test/broker_offline_message.cpp index 96b343242..77ce50ba5 100644 --- a/test/broker_offline_message.cpp +++ b/test/broker_offline_message.cpp @@ -247,7 +247,7 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v3_1_1 ) { BOOST_AUTO_TEST_CASE( offline_pubsub_v5 ) { - // + // Process with no timeout: // c1 ---- broker ----- c2 (CleanSession: false) // // 1. c2 subscribe t1 QoS2 @@ -256,6 +256,8 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v5 ) { // 4. c1 publish t1 QoS1 // 5. c1 publish t1 QoS2 // 6. c2 connect again + // Received published messages + // 7. Disconnect // boost::asio::io_context iocb; @@ -349,7 +351,7 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v5 ) { } ); c2->set_v5_connack_handler( - [&chk, &c2] + [&chk, &c1, &c2] (bool sp, MQTT_NS::v5::connect_reason_code connack_reason_code, MQTT_NS::v5::properties /*props*/) { auto ret = chk.match( "c1_h_connack", @@ -419,13 +421,16 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v5 ) { [&chk, &c2] (packet_id_t, MQTT_NS::v5::pubcomp_reason_code, MQTT_NS::v5::properties /*props*/) { MQTT_CHK("c1_h_pubcomp"); - c2->connect( - MQTT_NS::v5::properties{ - MQTT_NS::v5::property::session_expiry_interval( - MQTT_NS::session_never_expire - ) - } - ); + + // Reconnect immediatly + c2->connect( + MQTT_NS::v5::properties{ + MQTT_NS::v5::property::session_expiry_interval( + MQTT_NS::session_never_expire + ) + } + ); + return true; } ); @@ -551,4 +556,256 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v5 ) { th.join(); } +BOOST_AUTO_TEST_CASE( offline_pubsub_v5_timeout ) { + + // Process with timeout: + // c1 ---- broker ----- c2 (CleanSession: false) + // + // 1. c2 subscribe t1 QoS2 + // 2. c2 disconnect + // 3. c1 publish t1 QoS0 + // 4. c1 publish t1 QoS1 + // 5. c1 publish t1 QoS2 + // * Wait for timeout of messages + // 6. c2 connect again + // Do not receive published messages + // * Wait for possible published messages + // 7. Disconnect + // + + boost::asio::io_context iocb; + test_broker b(iocb); + MQTT_NS::optional s; + std::promise p; + auto f = p.get_future(); + std::thread th( + [&] { + s.emplace(iocb, b); + p.set_value(); + iocb.run(); + } + ); + f.wait(); + auto finish = + [&] { + as::post( + iocb, + [&] { + s->close(); + } + ); + }; + + boost::asio::io_context ioc; + + auto c1 = MQTT_NS::make_client(ioc, broker_url, broker_notls_port, MQTT_NS::protocol_version::v5); + c1->set_clean_session(true); + c1->set_client_id("cid1"); + + auto c2 = MQTT_NS::make_client(ioc, broker_url, broker_notls_port, MQTT_NS::protocol_version::v5); + c2->set_clean_session(false); + c2->set_client_id("cid2"); + + using packet_id_t = typename std::remove_reference_t::packet_id_t; + + checker chk = { + cont("c1_h_connack"), + cont("c2_h_connack1"), + + // c2 subscribe t1 qos2 + cont("c2_h_suback"), + cont("c2_h_close1"), + + // c1 publish t1 qos0 + // c1 publish t1 qos1 + // c1 publish t1 qos2 + cont("c1_h_puback"), + cont("c1_h_pubrec"), + cont("c1_h_pubcomp"), + + // c2 connect again + cont("c2_h_connack2"), + + cont("c1_h_close"), + cont("c2_h_close2"), + }; + + // Following is used to perform timeout of the message + unsigned int message_timeout = 1; + as::steady_timer timeout(ioc); + + MQTT_NS::v5::properties ps { + MQTT_NS::v5::property::payload_format_indicator(MQTT_NS::v5::property::payload_format_indicator::string), + MQTT_NS::v5::property::message_expiry_interval(message_timeout), + MQTT_NS::v5::property::content_type("content type"_mb), + MQTT_NS::v5::property::topic_alias(0x1234U), + MQTT_NS::v5::property::response_topic("response topic"_mb), + MQTT_NS::v5::property::correlation_data("correlation data"_mb), + MQTT_NS::v5::property::user_property("key1"_mb, "val1"_mb), + MQTT_NS::v5::property::user_property("key2"_mb, "val2"_mb), + }; + + auto prop_size = ps.size(); + std::size_t user_prop_count = 0; + + c1->set_v5_connack_handler( + [&chk, &c2] + (bool sp, MQTT_NS::v5::connect_reason_code connack_reason_code, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("c1_h_connack"); + BOOST_TEST(sp == false); + BOOST_TEST(connack_reason_code == MQTT_NS::v5::connect_reason_code::success); + c2->connect( + MQTT_NS::v5::properties{ + MQTT_NS::v5::property::session_expiry_interval( + MQTT_NS::session_never_expire + ) + } + ); + return true; + } + ); + c2->set_v5_connack_handler( + [&chk, &c1, &c2, &timeout, &message_timeout] + (bool sp, MQTT_NS::v5::connect_reason_code connack_reason_code, MQTT_NS::v5::properties /*props*/) { + auto ret = chk.match( + "c1_h_connack", + [&] { + MQTT_CHK("c2_h_connack1"); + BOOST_TEST(sp == false); + BOOST_TEST(connack_reason_code == MQTT_NS::v5::connect_reason_code::success); + c2->subscribe("topic1", MQTT_NS::qos::exactly_once); + }, + "c2_h_connack1", + [&] { + MQTT_CHK("c2_h_connack2"); + BOOST_TEST(sp == true); + BOOST_TEST(connack_reason_code == MQTT_NS::v5::connect_reason_code::success); + + // Disconnect after timeout + timeout.expires_after(std::chrono::seconds(1 + message_timeout)); + timeout.async_wait( + [&](MQTT_NS::error_code ec) { + if (!ec) { + c1->disconnect(); + } + } + ); + + } + ); + BOOST_TEST(ret); + return true; + } + ); + c2->set_v5_suback_handler( + [&chk, &c2] + (packet_id_t, std::vector reasons, MQTT_NS::v5::properties /*props*/) mutable { + MQTT_CHK("c2_h_suback"); + BOOST_TEST(reasons.size() == 1U); + BOOST_TEST(reasons[0] == MQTT_NS::v5::suback_reason_code::granted_qos_2); + c2->disconnect(); + return true; + } + ); + c2->set_close_handler( + [&chk, &c1, &finish, &ps] + () { + auto ret = chk.match( + "c2_h_suback", + [&] { + MQTT_CHK("c2_h_close1"); + c1->publish("topic1", "topic1_contents1", MQTT_NS::qos::at_most_once, ps); + c1->publish("topic1", "topic1_contents2", MQTT_NS::qos::at_least_once, ps); + c1->publish("topic1", "topic1_contents3", MQTT_NS::qos::exactly_once, ps); + }, + "c2_h_close1", + [&] { + MQTT_CHK("c2_h_close2"); + finish(); + } + ); + BOOST_TEST(ret); + + } + ); + c1->set_v5_puback_handler( + [&chk] + (packet_id_t, MQTT_NS::v5::puback_reason_code, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("c1_h_puback"); + return true; + } + ); + c1->set_v5_pubrec_handler( + [&chk] + (packet_id_t, MQTT_NS::v5::pubrec_reason_code, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("c1_h_pubrec"); + return true; + } + ); + c1->set_v5_pubcomp_handler( + [&chk, &c2, &message_timeout, &timeout] + (packet_id_t, MQTT_NS::v5::pubcomp_reason_code, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("c1_h_pubcomp"); + + // Reconnect when messages are timed out + timeout.expires_after(std::chrono::seconds(1 + message_timeout)); + timeout.async_wait( + [&c2](MQTT_NS::error_code ec) { + if (!ec) { + c2->connect( + MQTT_NS::v5::properties{ + MQTT_NS::v5::property::session_expiry_interval( + MQTT_NS::session_never_expire + ) + } + ); + } + } + ); + + return true; + } + ); + c2->set_v5_publish_handler( + [&chk, &c1] + (MQTT_NS::optional packet_id, + MQTT_NS::publish_options pubopts, + MQTT_NS::buffer topic, + MQTT_NS::buffer contents, + MQTT_NS::v5::properties props) { + + // We should not received any published message when offline messages timeout + BOOST_TEST(false); + return true; + } + ); + c1->set_close_handler( + [&chk, &c2] + () { + MQTT_CHK("c1_h_close"); + c2->disconnect(); + } + ); + + // error cases + c1->set_error_handler( + [] + (MQTT_NS::error_code) { + BOOST_CHECK(false); + } + ); + c2->set_error_handler( + [] + (MQTT_NS::error_code) { + BOOST_CHECK(false); + } + ); + + c1->connect(); + + ioc.run(); + BOOST_TEST(chk.all()); + th.join(); +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/test/combi_test.hpp b/test/combi_test.hpp index f4c63b154..0aa642088 100644 --- a/test/combi_test.hpp +++ b/test/combi_test.hpp @@ -67,6 +67,7 @@ inline void do_test( [&] { s->close(); b.clear_all_sessions(); + b.clear_all_retained_topics(); } ); }, @@ -118,6 +119,7 @@ inline void do_tls_test( [&] { s->close(); b.clear_all_sessions(); + b.clear_all_retained_topics(); } ); }, @@ -167,6 +169,7 @@ inline void do_ws_test( [&] { s->close(); b.clear_all_sessions(); + b.clear_all_retained_topics(); } ); }, @@ -218,6 +221,7 @@ inline void do_tls_ws_test( [&] { s->close(); b.clear_all_sessions(); + b.clear_all_retained_topics(); } ); }, diff --git a/test/retain_2.cpp b/test/retain_2.cpp index 923b0f816..937de9fbf 100644 --- a/test/retain_2.cpp +++ b/test/retain_2.cpp @@ -260,6 +260,187 @@ BOOST_AUTO_TEST_CASE( retain_and_publish ) { do_combi_test_sync(test); } +BOOST_AUTO_TEST_CASE( retain_and_publish_timeout ) { + auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) { + if (c->get_protocol_version() != MQTT_NS::protocol_version::v5) { + finish(); + return; + } + + using packet_id_t = typename std::remove_reference_t::packet_id_t; + c->set_client_id("cid1"); + c->set_clean_session(true); + + std::uint16_t pid_sub; + std::uint16_t pid_unsub; + + checker chk = { + // connect + cont("h_connack"), + // subscribe topic1 QoS0 + cont("h_suback1"), + // publish topic1 QoS0 retain + cont("h_publish1"), + // unsubscribe topic1 + cont("h_unsuback1"), + // subscribe topic1 QoS0 + cont("h_suback2"), + + // cont("h_publish2"), retained message should timeout now + + // unsubscribe topic1 + cont("h_unsuback2"), + // disconnect + cont("h_close"), + }; + + constexpr unsigned int message_timeout = 1; + as::steady_timer timeout(ioc); + + switch (c->get_protocol_version()) { + case MQTT_NS::protocol_version::v5: + c->set_v5_connack_handler( + [&chk, &c, &pid_sub] + (bool sp, MQTT_NS::v5::connect_reason_code connack_return_code, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_connack"); + BOOST_TEST(sp == false); + BOOST_TEST(connack_return_code == MQTT_NS::v5::connect_reason_code::success); + pid_sub = c->subscribe("topic1", MQTT_NS::qos::at_most_once); + return true; + }); + c->set_v5_puback_handler( + [] + (packet_id_t, MQTT_NS::v5::puback_reason_code, MQTT_NS::v5::properties /*props*/) { + BOOST_CHECK(false); + return true; + }); + c->set_v5_pubrec_handler( + [] + (packet_id_t, MQTT_NS::v5::pubrec_reason_code, MQTT_NS::v5::properties /*props*/) { + BOOST_CHECK(false); + return true; + }); + c->set_v5_pubcomp_handler( + [] + (packet_id_t, MQTT_NS::v5::pubcomp_reason_code, MQTT_NS::v5::properties /*props*/) { + BOOST_CHECK(false); + return true; + }); + c->set_v5_suback_handler( + [&chk, &c, &pid_sub, &pid_unsub, &timeout, &message_timeout] + (packet_id_t packet_id, std::vector reasons, MQTT_NS::v5::properties /*props*/) { + BOOST_TEST(packet_id == pid_sub); + BOOST_TEST(reasons.size() == 1U); + BOOST_TEST(reasons[0] == MQTT_NS::v5::suback_reason_code::granted_qos_0); + auto ret = chk.match( + "h_connack", + [&] { + MQTT_NS::v5::properties ps { + MQTT_NS::v5::property::payload_format_indicator(MQTT_NS::v5::property::payload_format_indicator::string), + MQTT_NS::v5::property::message_expiry_interval(message_timeout) + }; + + MQTT_CHK("h_suback1"); + c->publish("topic1", "topic1_contents", MQTT_NS::qos::at_most_once | MQTT_NS::retain::yes, ps); + }, + "h_unsuback1", + [&] { + MQTT_CHK("h_suback2"); + + timeout.expires_after(std::chrono::seconds(1 + message_timeout)); + timeout.async_wait( + [&c, &pid_unsub](MQTT_NS::error_code ec) { + if (!ec) { + pid_unsub = c->unsubscribe("topic1"); + } + } + ); + } + ); + BOOST_TEST(ret); + return true; + }); + c->set_v5_unsuback_handler( + [&chk, &c, &pid_sub, &pid_unsub, &timeout, message_timeout] + (packet_id_t packet_id, std::vector reasons, MQTT_NS::v5::properties /*props*/) { + BOOST_TEST(packet_id == pid_unsub); + BOOST_TEST(reasons.size() == 1U); + BOOST_TEST(reasons[0] == MQTT_NS::v5::unsuback_reason_code::success); + auto ret = chk.match( + "h_publish1", + [&] { + MQTT_CHK("h_unsuback1"); + + timeout.expires_after(std::chrono::seconds(1 + message_timeout)); + timeout.async_wait( + [&c, &pid_sub](MQTT_NS::error_code ec) { + if (!ec) { + pid_sub = c->subscribe("topic1", MQTT_NS::qos::at_most_once); + } + } + ); + }, + "h_suback2", + [&] { + MQTT_CHK("h_unsuback2"); + c->disconnect(); + } + ); + BOOST_TEST(ret); + return true; + }); + c->set_v5_publish_handler( + [&chk, &c, &pid_unsub] + (MQTT_NS::optional packet_id, + MQTT_NS::publish_options pubopts, + MQTT_NS::buffer topic, + MQTT_NS::buffer contents, + MQTT_NS::v5::properties /*props*/) { + BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::no); + BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::at_most_once); + BOOST_CHECK(!packet_id); + BOOST_TEST(topic == "topic1"); + BOOST_TEST(contents == "topic1_contents"); + pid_unsub = c->unsubscribe("topic1"); + auto ret = chk.match( + "h_suback1", + [&] { + MQTT_CHK("h_publish1"); + BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no); + }, + "h_suback2", + [&] { + // Retained message should timeout and not be received + BOOST_TEST(false); + } + ); + BOOST_TEST(ret); + return true; + }); + break; + default: + BOOST_CHECK(false); + break; + } + + c->set_close_handler( + [&chk, &finish] + () { + MQTT_CHK("h_close"); + finish(); + }); + c->set_error_handler( + [] + (MQTT_NS::error_code) { + BOOST_CHECK(false); + }); + c->connect(); + ioc.run(); + BOOST_TEST(chk.all()); + }; + do_combi_test_sync(test); +} + BOOST_AUTO_TEST_CASE( retain_rap ) { auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) { if (c->get_protocol_version() != MQTT_NS::protocol_version::v5) { diff --git a/test/retained_topic_map.hpp b/test/retained_topic_map.hpp index 5f17e1dab..a7d2fd834 100644 --- a/test/retained_topic_map.hpp +++ b/test/retained_topic_map.hpp @@ -71,8 +71,8 @@ class retained_topic_map { using wildcard_const_iterator = typename path_entry_set::template index::type::const_iterator; path_entry_set map; - size_t map_size = 0; - node_id_t next_node_id = root_node_id + 1; + size_t map_size; + node_id_t next_node_id; direct_const_iterator root; @@ -271,11 +271,17 @@ class retained_topic_map { map_size -= count; } + void init_map() { + map_size = 0; + // Create the root node + root = map.insert(path_entry(root_parent_id, "", root_node_id)).first; + next_node_id = root_node_id + 1; + } + public: retained_topic_map() { - // Create the root node - root = map.insert(path_entry(root_parent_id, "", root_node_id)).first; + init_map(); } // Insert a value at the specified topic @@ -322,6 +328,12 @@ class retained_topic_map { // Get the number of entries in the map (for debugging purpose only) std::size_t internal_size() const { return map.size(); } + // Clear all topics + void clear() { + map.clear(); + init_map(); + } + // Dump debug information template void dump(Output &out) { diff --git a/test/test_broker.hpp b/test/test_broker.hpp index e02da08b6..04ee7b2b6 100644 --- a/test/test_broker.hpp +++ b/test/test_broker.hpp @@ -703,6 +703,38 @@ class test_broker { sessions_.clear(); } + void clear_all_retained_topics() { + retains_.clear(); + } + + template + static MQTT_NS::optional get_property(MQTT_NS::v5::properties const &props) { + MQTT_NS::optional result; + + auto visitor = MQTT_NS::make_lambda_visitor( + [&result](T const& t) { result = t; }, + [](auto&& ...) { } + ); + + for (auto const& p : props) { + MQTT_NS::visit(visitor, p); + } + + return result; + } + + template + static void set_property(MQTT_NS::v5::properties const &props, T&& v) { + auto visitor = MQTT_NS::make_lambda_visitor( + [&v](T& t) mutable { t = std::forward(v); }, + [](auto&& ...) { } + ); + + for (auto const& p : props) { + MQTT_NS::visit(visitor, p); + } + } + private: /** * @brief connect_proc Process an incoming CONNECT packet @@ -734,21 +766,19 @@ class test_broker { auto& ep = *spep; MQTT_NS::optional session_expiry_interval; + MQTT_NS::optional will_expiry_interval; if (ep.get_protocol_version() == MQTT_NS::protocol_version::v5) { - for (auto const& p : props) { - MQTT_NS::visit( - MQTT_NS::make_lambda_visitor( - [&session_expiry_interval](MQTT_NS::v5::property::session_expiry_interval const& t) { - if (t.val() != 0) { - session_expiry_interval.emplace(std::chrono::seconds(t.val())); - } - }, - [](auto&& ...) { - } - ), - p - ); + auto v = get_property(props); + if (v && v.value().val() != 0) { + session_expiry_interval.emplace(std::chrono::seconds(v.value().val())); + } + + if(will) { + auto v = get_property(will.value().props()); + if (v && v.value().val() != 0) { + will_expiry_interval.emplace(std::chrono::seconds(v.value().val())); + } } if (h_connect_props_) { @@ -810,15 +840,26 @@ class test_broker { auto send_offline_messages = [&] (session_state& session) { try { - while (!session.offline_messages.empty()) { - auto const& msg = session.offline_messages.front(); - session.con->publish( - msg.topic, - msg.contents, - msg.pubopts, - msg.props - ); - session.offline_messages.pop_front(); + auto &seq_idx = session.offline_messages.get(); + while(!seq_idx.empty()) { + seq_idx.modify(seq_idx.begin(), [&](auto &i) { + auto props = MQTT_NS::force_move(i.props); + + if(i.tim_message_expiry) { + set_property(props, + MQTT_NS::v5::property::message_expiry_interval(static_cast(std::chrono::duration_cast( + i.tim_message_expiry->expiry() - std::chrono::steady_clock::now()).count()))); + } + + session.con->publish( + MQTT_NS::force_move(i.topic), + MQTT_NS::force_move(i.contents), + MQTT_NS::force_move(i.pubopts), + MQTT_NS::force_move(props) + ); + }); + + seq_idx.pop_front(); } } catch (MQTT_NS::packet_id_exhausted_error const& e) { @@ -844,12 +885,15 @@ class test_broker { // new connection it = idx.emplace_hint( it, + ioc_, subs_map_, spep, client_id, MQTT_NS::force_move(will), + MQTT_NS::force_move(will_expiry_interval), MQTT_NS::force_move(session_expiry_interval) ); + send_connack(false); } else if (it->online()) { @@ -863,7 +907,7 @@ class test_broker { it, [&](auto& e) { e.clean(); - e.will = MQTT_NS::force_move(will); + e.update_will(ioc_, MQTT_NS::force_move(will), will_expiry_interval); // TODO: e.will_delay = MQTT_NS::force_move(will_delay); e.session_expiry_interval = MQTT_NS::force_move(session_expiry_interval); e.tim_session_expiry.reset(); @@ -882,7 +926,7 @@ class test_broker { ep.restore_topic_alias_recv_container(MQTT_NS::force_move(e.topic_alias_recv.value())); e.topic_alias_recv = MQTT_NS::nullopt; } - e.will = MQTT_NS::force_move(will); + e.update_will(ioc_, MQTT_NS::force_move(will), will_expiry_interval); // TODO: e.will_delay = MQTT_NS::force_move(will_delay); e.session_expiry_interval = MQTT_NS::force_move(session_expiry_interval); e.tim_session_expiry.reset(); @@ -898,10 +942,12 @@ class test_broker { // new connection it = idx.emplace_hint( it, + ioc_, subs_map_, spep, client_id, MQTT_NS::force_move(will), + MQTT_NS::force_move(will_expiry_interval), MQTT_NS::force_move(session_expiry_interval) ); send_connack(false); @@ -917,7 +963,7 @@ class test_broker { [&](auto& e) { e.clean(); e.con = spep; - e.will = MQTT_NS::force_move(will); + e.update_will(ioc_, MQTT_NS::force_move(will), will_expiry_interval); // TODO: e.will_delay = MQTT_NS::force_move(will_delay); e.session_expiry_interval = MQTT_NS::force_move(session_expiry_interval); e.tim_session_expiry.reset(); @@ -936,7 +982,7 @@ class test_broker { ep.restore_topic_alias_recv_container(MQTT_NS::force_move(e.topic_alias_recv.value())); e.topic_alias_recv = MQTT_NS::nullopt; } - e.will = MQTT_NS::force_move(will); + e.update_will(ioc_, MQTT_NS::force_move(will), will_expiry_interval); // TODO: e.will_delay = MQTT_NS::force_move(will_delay); e.session_expiry_interval = MQTT_NS::force_move(session_expiry_interval); e.tim_session_expiry.reset(); @@ -995,17 +1041,25 @@ class test_broker { auto do_send_will = [&](session_state& session) { - if (session.will) { + if (session.will()) { if (send_will) { // TODO: This should be triggered by the will delay // Not sent immediately. try { + auto props = MQTT_NS::force_move(session.will().value().props()); + + if(session.get_tim_will_expiry()) { + set_property(props, + MQTT_NS::v5::property::message_expiry_interval(static_cast( + std::chrono::duration_cast(session.get_tim_will_expiry()->expiry() - std::chrono::steady_clock::now()).count()))); + } + do_publish( ep, - MQTT_NS::force_move(session.will.value().topic()), - MQTT_NS::force_move(session.will.value().message()), - session.will.value().get_qos() | session.will.value().get_retain(), - MQTT_NS::force_move(session.will.value().props()) + MQTT_NS::force_move(session.will().value().topic()), + MQTT_NS::force_move(session.will().value().message()), + session.will().value().get_qos() | session.will().value().get_retain(), + props ); } catch (MQTT_NS::packet_id_exhausted_error const& e) { @@ -1015,7 +1069,7 @@ class test_broker { } } else { - session.will = MQTT_NS::nullopt; + session.reset_will(); } } }; @@ -1052,22 +1106,19 @@ class test_broker { e.con.reset(); - auto const& sei_opt = e.session_expiry_interval; - if (sei_opt && sei_opt.value() != - std::chrono::seconds(MQTT_NS::session_never_expire)) { - e.tim_session_expiry = std::make_shared(ioc_); - e.tim_session_expiry->expires_after(sei_opt.value()); + if (e.session_expiry_interval && e.session_expiry_interval.value() != std::chrono::seconds(MQTT_NS::session_never_expire)) { + e.tim_session_expiry = std::make_shared(ioc_, e.session_expiry_interval.value()); e.tim_session_expiry->async_wait( - [&, wp = std::weak_ptr(e.tim_session_expiry)] - (MQTT_NS::error_code ec) { - auto sp = wp.lock(); - if (!ec) { - auto& idx = sessions_.get(); - idx.erase(sp); + [this, wp = std::weak_ptr(e.tim_session_expiry)](MQTT_NS::error_code ec) { + if (auto sp = wp.lock()) { + if (!ec) { + sessions_.get().erase(sp); + } } } ); } + // TopicAlias lifetime is the same as Session lifetime // It is different from MQTT v5 spec but practical choice. // See @@ -1305,6 +1356,11 @@ class test_broker { if (sid) { props.push_back(MQTT_NS::v5::property::subscription_identifier(*sid)); } + if (r.tim_message_expiry) { + set_property(props, + MQTT_NS::v5::property::message_expiry_interval(static_cast( + std::chrono::duration_cast(r.tim_message_expiry->expiry() - std::chrono::steady_clock::now()).count()))); + } ep.publish( r.topic, r.contents, @@ -1413,19 +1469,9 @@ class test_broker { } break; case MQTT_NS::protocol_version::v5: { // Get subscription identifier - for (auto const& p : props) { - MQTT_NS::visit( - MQTT_NS::make_lambda_visitor( - [&sid](MQTT_NS::v5::property::subscription_identifier const& p) { - if (p.val() != 0) { - sid.emplace(p.val()); - } - }, - [](auto&& ...) { - } - ), - p - ); + auto v = get_property(props); + if (v && v.value().val() != 0) { + sid.emplace(v.value().val()); } std::vector res; @@ -1540,6 +1586,7 @@ class test_broker { if (sub.sid) { props.push_back(MQTT_NS::v5::property::subscription_identifier(sub.sid.value())); sub.ss.get().deliver( + ioc_, topic, contents, new_pubopts, @@ -1549,6 +1596,7 @@ class test_broker { } else { sub.ss.get().deliver( + ioc_, topic, contents, new_pubopts, @@ -1559,6 +1607,15 @@ class test_broker { } ); + MQTT_NS::optional message_expiry_interval; + if (ep.get_protocol_version() == MQTT_NS::protocol_version::v5) { + auto v = get_property(props); + if (v && v.value().val() != 0) { + message_expiry_interval.emplace(std::chrono::seconds(v.value().val())); + } + + } + /* * If the message is marked as being retained, then we * keep it in case a new subscription is added that matches @@ -1585,13 +1642,28 @@ class test_broker { retains_.erase(topic); } else { + std::shared_ptr tim_message_expiry; + if(message_expiry_interval) { + tim_message_expiry = std::make_shared(ioc_, message_expiry_interval.value()); + tim_message_expiry->async_wait( + [this, topic = topic, wp = std::weak_ptr(tim_message_expiry)] + (boost::system::error_code const& ec) { + if (auto sp = wp.lock()) { + if( ! ec ) { + retains_.erase(topic); + } + } + }); + } + retains_.insert_or_assign( topic, retain { MQTT_NS::force_move(topic), MQTT_NS::force_move(contents), MQTT_NS::force_move(props), - pubopts.get_qos() + pubopts.get_qos(), + tim_message_expiry } ); } @@ -1599,6 +1671,7 @@ class test_broker { } private: + struct tag_seq {}; struct tag_con {}; struct tag_topic{}; struct tag_topic_filter{}; @@ -1607,7 +1680,6 @@ class test_broker { struct tag_cid_topic_filter {}; struct tag_tim {}; struct tag_pid {}; - struct tag_seq {}; struct session_state; using session_state_ref = std::reference_wrapper; @@ -1684,17 +1756,36 @@ class test_broker { MQTT_NS::buffer topic, MQTT_NS::buffer contents, MQTT_NS::v5::properties props, - MQTT_NS::publish_options pubopts) + MQTT_NS::publish_options pubopts, + std::shared_ptr tim_message_expiry) : topic(MQTT_NS::force_move(topic)), contents(MQTT_NS::force_move(contents)), props(MQTT_NS::force_move(props)), - pubopts(pubopts) {} + pubopts(pubopts), + tim_message_expiry(MQTT_NS::force_move(tim_message_expiry)) + { } + MQTT_NS::buffer topic; MQTT_NS::buffer contents; MQTT_NS::v5::properties props; MQTT_NS::publish_options pubopts; + + std::shared_ptr tim_message_expiry; }; + using mi_offline_message = mi::multi_index_container< + offline_message, + mi::indexed_by< + mi::sequenced< + mi::tag + >, + mi::ordered_non_unique< + mi::tag, + BOOST_MULTI_INDEX_MEMBER(offline_message, std::shared_ptr, tim_message_expiry) + > + > + >; + /** * http://docs.oasis-open.org/mqtt/mqtt/v5.0/cs02/mqtt-v5.0-cs02.html#_Session_State * @@ -1715,17 +1806,20 @@ class test_broker { struct session_state { // TODO: Currently not fully implemented... session_state( + as::io_context& ioc, sub_con_map& subs_map, con_sp_t con, MQTT_NS::buffer client_id, MQTT_NS::optional will, + MQTT_NS::optional will_expiry_interval, MQTT_NS::optional session_expiry_interval = MQTT_NS::nullopt) :subs_map_(subs_map), con(MQTT_NS::force_move(con)), client_id(MQTT_NS::force_move(client_id)), - will(MQTT_NS::force_move(will)), session_expiry_interval(MQTT_NS::force_move(session_expiry_interval)) - {} + { + update_will(ioc, will, will_expiry_interval); + } session_state() = default; session_state(session_state&&) = default; @@ -1739,6 +1833,7 @@ class test_broker { } void deliver( + as::io_context& ioc, MQTT_NS::buffer pub_topic, MQTT_NS::buffer contents, MQTT_NS::publish_options pubopts, @@ -1759,11 +1854,34 @@ class test_broker { ); } else { - offline_messages.emplace_back( + MQTT_NS::optional message_expiry_interval; + + auto v = get_property(props); + if (v && v.value().val() != 0) { + message_expiry_interval.emplace(std::chrono::seconds(v.value().val())); + } + + std::shared_ptr tim_message_expiry; + if (message_expiry_interval) { + tim_message_expiry = std::make_shared(ioc, message_expiry_interval.value()); + tim_message_expiry->async_wait( + [this, wp = std::weak_ptr(tim_message_expiry)](MQTT_NS::error_code ec) mutable { + if (auto sp = wp.lock()) { + if (!ec) { + offline_messages.get().erase(sp); + } + } + } + ); + } + + auto& seq_idx = offline_messages.get(); + auto i = seq_idx.emplace_back( MQTT_NS::force_move(pub_topic), MQTT_NS::force_move(contents), MQTT_NS::force_move(props), - pubopts + pubopts, + MQTT_NS::force_move(tim_message_expiry) ); } } @@ -1787,15 +1905,46 @@ class test_broker { con_sp_t con; MQTT_NS::buffer client_id; - MQTT_NS::optional will; MQTT_NS::optional will_delay; MQTT_NS::optional session_expiry_interval; std::shared_ptr tim_session_expiry; MQTT_NS::optional topic_alias_recv; + mi_inflight_message inflight_messages; - std::deque offline_messages; std::set qos2_publish_processed; + + mi_offline_message offline_messages; + std::set handles; // to efficient remove + + void update_will(as::io_context& ioc, MQTT_NS::optional will, MQTT_NS::optional will_expiry_interval) { + tim_will_expiry.reset(); + will_value = MQTT_NS::force_move(will); + + if (will_value && will_expiry_interval) { + tim_will_expiry = std::make_shared(ioc, will_expiry_interval.value()); + tim_will_expiry->async_wait( + [this, client_id = client_id, wp = std::weak_ptr(tim_will_expiry)](MQTT_NS::error_code ec) { + if (auto sp = wp.lock()) { + reset_will(); + } } + ); + } + } + + void reset_will() { + tim_will_expiry.reset(); + will_value = MQTT_NS::nullopt; + } + + MQTT_NS::optional& will() { return will_value; } + MQTT_NS::optional const& will() const { return will_value; } + + std::shared_ptr& get_tim_will_expiry() { return tim_will_expiry; } + private: + std::shared_ptr tim_will_expiry; + MQTT_NS::optional will_value; + }; // The mi_session_online container holds the relevant data about an active connection with the broker. @@ -1826,16 +1975,20 @@ class test_broker { MQTT_NS::buffer topic, MQTT_NS::buffer contents, MQTT_NS::v5::properties props, - MQTT_NS::qos qos_value) + MQTT_NS::qos qos_value, + std::shared_ptr tim_message_expiry = std::shared_ptr()) :topic(MQTT_NS::force_move(topic)), contents(MQTT_NS::force_move(contents)), props(MQTT_NS::force_move(props)), - qos_value(qos_value) + qos_value(qos_value), + tim_message_expiry(MQTT_NS::force_move(tim_message_expiry)) { } + MQTT_NS::buffer topic; MQTT_NS::buffer contents; MQTT_NS::v5::properties props; MQTT_NS::qos qos_value; + std::shared_ptr tim_message_expiry; }; using retained_messages = retained_topic_map; diff --git a/test/will.cpp b/test/will.cpp index 573cbdd1b..59820dd73 100644 --- a/test/will.cpp +++ b/test/will.cpp @@ -169,6 +169,175 @@ BOOST_AUTO_TEST_CASE( will_qos0 ) { th.join(); } +BOOST_AUTO_TEST_CASE( will_qo0_timeout ) { + boost::asio::io_context iocb; + test_broker b(iocb); + MQTT_NS::optional s; + std::promise p; + auto f = p.get_future(); + std::thread th( + [&] { + s.emplace(iocb, b); + p.set_value(); + iocb.run(); + } + ); + f.wait(); + auto finish = + [&] { + as::post( + iocb, + [&] { + s->close(); + } + ); + }; + + boost::asio::io_context ioc; + + constexpr uint32_t will_expiry_interval = 1; + as::steady_timer timeout(ioc); + as::steady_timer timeout_2(ioc); + + MQTT_NS::v5::properties ps { + MQTT_NS::v5::property::message_expiry_interval(will_expiry_interval), + }; + + auto c1 = MQTT_NS::make_client(ioc, broker_url, broker_notls_port, MQTT_NS::protocol_version::v5); + c1->set_client_id("cid1"); + c1->set_clean_session(true); + c1->set_will(MQTT_NS::will("topic1"_mb, "will_contents"_mb, MQTT_NS::retain::yes, MQTT_NS::force_move(ps))); + + int c1fd_count = 0; + auto c1_force_disconnect = [&c1, &c1fd_count] { + if (++c1fd_count == 2) c1->force_disconnect(); + }; + + auto c2 = MQTT_NS::make_client(ioc, broker_url, broker_notls_port, MQTT_NS::protocol_version::v5); + c2->set_client_id("cid2"); + c2->set_clean_session(true); + + using packet_id_t = typename std::remove_reference_t::packet_id_t; + + + checker chk = { + // connect + cont("h_connack_1"), + // force_disconnect + cont("h_error_1"), + + // connect + deps("h_connack_2"), + // subscribe topic1 QoS0 + cont("h_suback_2"), + // cont("h_publish_2"), // will receive + // unsubscribe topic1 + cont("h_unsuback_2"), + // disconnect + cont("h_close_2"), + + }; + + c1->set_v5_connack_handler( + [&chk, &c1_force_disconnect] + (bool sp, MQTT_NS::v5::connect_reason_code connack_return_code, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_connack_1"); + BOOST_TEST(sp == false); + BOOST_TEST(connack_return_code == MQTT_NS::v5::connect_reason_code::success); + c1_force_disconnect(); + return true; + }); + c1->set_close_handler( + [] + () { + BOOST_CHECK(false); + }); + c1->set_error_handler( + [&chk] + (MQTT_NS::error_code) { + MQTT_CHK("h_error_1"); + }); + + std::uint16_t pid_sub2; + std::uint16_t pid_unsub2; + + c2->set_v5_connack_handler( + [&chk, &c2, &pid_sub2] + (bool sp, MQTT_NS::v5::connect_reason_code connack_return_code, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_connack_2"); + BOOST_TEST(sp == false); + BOOST_TEST(connack_return_code == MQTT_NS::v5::connect_reason_code::success); + pid_sub2 = c2->subscribe("topic1", MQTT_NS::qos::at_most_once); + return true; + }); + c2->set_close_handler( + [&chk, &finish] + () { + MQTT_CHK("h_close_2"); + finish(); + }); + c2->set_error_handler( + [] + (MQTT_NS::error_code) { + BOOST_CHECK(false); + }); + c2->set_v5_suback_handler( + [&chk, &c2, &c1_force_disconnect, &pid_sub2, &pid_unsub2, &timeout, &timeout_2, &will_expiry_interval] + (packet_id_t packet_id, std::vector reasons, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_suback_2"); + BOOST_TEST(packet_id == pid_sub2); + BOOST_TEST(reasons.size() == 1U); + BOOST_TEST(reasons[0] == MQTT_NS::v5::suback_reason_code::granted_qos_0); + + timeout.expires_after(std::chrono::seconds(1 + will_expiry_interval)); + timeout.async_wait( + [&c1_force_disconnect](MQTT_NS::error_code ec) { + if (!ec) { + c1_force_disconnect(); + } + } + ); + + timeout_2.expires_after(std::chrono::seconds(2 + will_expiry_interval)); + timeout_2.async_wait( + [&c2, &pid_unsub2](MQTT_NS::error_code ec) { + if (!ec) { + pid_unsub2 = c2->unsubscribe("topic1"); + } + } + ); + + return true; + }); + c2->set_v5_unsuback_handler( + [&chk, &c2, &pid_unsub2] + (packet_id_t packet_id, std::vector reasons, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_unsuback_2"); + BOOST_TEST(packet_id == pid_unsub2); + c2->disconnect(); + return true; + }); + c2->set_v5_publish_handler( + [&chk, &c2, &pid_unsub2] + (MQTT_NS::optional packet_id, + MQTT_NS::publish_options pubopts, + MQTT_NS::buffer topic, + MQTT_NS::buffer contents, + MQTT_NS::v5::properties /*props*/) { + + // Will should not be received + BOOST_TEST(false); + return true; + }); + + c1->connect(); + c2->connect(); + + ioc.run(); + BOOST_TEST(chk.all()); + th.join(); +} + BOOST_AUTO_TEST_CASE( will_qos1 ) { boost::asio::io_context iocb; test_broker b(iocb); @@ -719,6 +888,7 @@ BOOST_AUTO_TEST_CASE( will_prop ) { iocb, [&] { s->close(); + b.clear_all_retained_topics(); } ); };