Skip to content

Commit

Permalink
Merge branch 'message-expiry-timer' of https://github.com/kleunen/mqt…
Browse files Browse the repository at this point in the history
…t_cpp into kleunen-message-expiry-timer
  • Loading branch information
redboltz committed Nov 27, 2020
2 parents 04a74e1 + 6357928 commit 091098a
Show file tree
Hide file tree
Showing 6 changed files with 858 additions and 81 deletions.
275 changes: 266 additions & 9 deletions test/broker_offline_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v3_1_1 ) {

BOOST_AUTO_TEST_CASE( offline_pubsub_v5 ) {

//
// Process with no timeout:
// c1 ---- broker ----- c2 (CleanSession: false)
//
// 1. c2 subscribe t1 QoS2
Expand All @@ -256,6 +256,8 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v5 ) {
// 4. c1 publish t1 QoS1
// 5. c1 publish t1 QoS2
// 6. c2 connect again
// Received published messages
// 7. Disconnect
//

boost::asio::io_context iocb;
Expand Down Expand Up @@ -349,7 +351,7 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v5 ) {
}
);
c2->set_v5_connack_handler(
[&chk, &c2]
[&chk, &c1, &c2]
(bool sp, MQTT_NS::v5::connect_reason_code connack_reason_code, MQTT_NS::v5::properties /*props*/) {
auto ret = chk.match(
"c1_h_connack",
Expand Down Expand Up @@ -419,13 +421,16 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v5 ) {
[&chk, &c2]
(packet_id_t, MQTT_NS::v5::pubcomp_reason_code, MQTT_NS::v5::properties /*props*/) {
MQTT_CHK("c1_h_pubcomp");
c2->connect(
MQTT_NS::v5::properties{
MQTT_NS::v5::property::session_expiry_interval(
MQTT_NS::session_never_expire
)
}
);

// Reconnect immediatly
c2->connect(
MQTT_NS::v5::properties{
MQTT_NS::v5::property::session_expiry_interval(
MQTT_NS::session_never_expire
)
}
);

return true;
}
);
Expand Down Expand Up @@ -551,4 +556,256 @@ BOOST_AUTO_TEST_CASE( offline_pubsub_v5 ) {
th.join();
}

BOOST_AUTO_TEST_CASE( offline_pubsub_v5_timeout ) {

// Process with timeout:
// c1 ---- broker ----- c2 (CleanSession: false)
//
// 1. c2 subscribe t1 QoS2
// 2. c2 disconnect
// 3. c1 publish t1 QoS0
// 4. c1 publish t1 QoS1
// 5. c1 publish t1 QoS2
// * Wait for timeout of messages
// 6. c2 connect again
// Do not receive published messages
// * Wait for possible published messages
// 7. Disconnect
//

boost::asio::io_context iocb;
test_broker b(iocb);
MQTT_NS::optional<test_server_no_tls> s;
std::promise<void> p;
auto f = p.get_future();
std::thread th(
[&] {
s.emplace(iocb, b);
p.set_value();
iocb.run();
}
);
f.wait();
auto finish =
[&] {
as::post(
iocb,
[&] {
s->close();
}
);
};

boost::asio::io_context ioc;

auto c1 = MQTT_NS::make_client(ioc, broker_url, broker_notls_port, MQTT_NS::protocol_version::v5);
c1->set_clean_session(true);
c1->set_client_id("cid1");

auto c2 = MQTT_NS::make_client(ioc, broker_url, broker_notls_port, MQTT_NS::protocol_version::v5);
c2->set_clean_session(false);
c2->set_client_id("cid2");

using packet_id_t = typename std::remove_reference_t<decltype(*c1)>::packet_id_t;

checker chk = {
cont("c1_h_connack"),
cont("c2_h_connack1"),

// c2 subscribe t1 qos2
cont("c2_h_suback"),
cont("c2_h_close1"),

// c1 publish t1 qos0
// c1 publish t1 qos1
// c1 publish t1 qos2
cont("c1_h_puback"),
cont("c1_h_pubrec"),
cont("c1_h_pubcomp"),

// c2 connect again
cont("c2_h_connack2"),

cont("c1_h_close"),
cont("c2_h_close2"),
};

// Following is used to perform timeout of the message
unsigned int message_timeout = 1;
as::steady_timer timeout(ioc);

MQTT_NS::v5::properties ps {
MQTT_NS::v5::property::payload_format_indicator(MQTT_NS::v5::property::payload_format_indicator::string),
MQTT_NS::v5::property::message_expiry_interval(message_timeout),
MQTT_NS::v5::property::content_type("content type"_mb),
MQTT_NS::v5::property::topic_alias(0x1234U),
MQTT_NS::v5::property::response_topic("response topic"_mb),
MQTT_NS::v5::property::correlation_data("correlation data"_mb),
MQTT_NS::v5::property::user_property("key1"_mb, "val1"_mb),
MQTT_NS::v5::property::user_property("key2"_mb, "val2"_mb),
};

auto prop_size = ps.size();
std::size_t user_prop_count = 0;

c1->set_v5_connack_handler(
[&chk, &c2]
(bool sp, MQTT_NS::v5::connect_reason_code connack_reason_code, MQTT_NS::v5::properties /*props*/) {
MQTT_CHK("c1_h_connack");
BOOST_TEST(sp == false);
BOOST_TEST(connack_reason_code == MQTT_NS::v5::connect_reason_code::success);
c2->connect(
MQTT_NS::v5::properties{
MQTT_NS::v5::property::session_expiry_interval(
MQTT_NS::session_never_expire
)
}
);
return true;
}
);
c2->set_v5_connack_handler(
[&chk, &c1, &c2, &timeout, &message_timeout]
(bool sp, MQTT_NS::v5::connect_reason_code connack_reason_code, MQTT_NS::v5::properties /*props*/) {
auto ret = chk.match(
"c1_h_connack",
[&] {
MQTT_CHK("c2_h_connack1");
BOOST_TEST(sp == false);
BOOST_TEST(connack_reason_code == MQTT_NS::v5::connect_reason_code::success);
c2->subscribe("topic1", MQTT_NS::qos::exactly_once);
},
"c2_h_connack1",
[&] {
MQTT_CHK("c2_h_connack2");
BOOST_TEST(sp == true);
BOOST_TEST(connack_reason_code == MQTT_NS::v5::connect_reason_code::success);

// Disconnect after timeout
timeout.expires_after(std::chrono::seconds(1 + message_timeout));
timeout.async_wait(
[&](MQTT_NS::error_code ec) {
if (!ec) {
c1->disconnect();
}
}
);

}
);
BOOST_TEST(ret);
return true;
}
);
c2->set_v5_suback_handler(
[&chk, &c2]
(packet_id_t, std::vector<MQTT_NS::v5::suback_reason_code> reasons, MQTT_NS::v5::properties /*props*/) mutable {
MQTT_CHK("c2_h_suback");
BOOST_TEST(reasons.size() == 1U);
BOOST_TEST(reasons[0] == MQTT_NS::v5::suback_reason_code::granted_qos_2);
c2->disconnect();
return true;
}
);
c2->set_close_handler(
[&chk, &c1, &finish, &ps]
() {
auto ret = chk.match(
"c2_h_suback",
[&] {
MQTT_CHK("c2_h_close1");
c1->publish("topic1", "topic1_contents1", MQTT_NS::qos::at_most_once, ps);
c1->publish("topic1", "topic1_contents2", MQTT_NS::qos::at_least_once, ps);
c1->publish("topic1", "topic1_contents3", MQTT_NS::qos::exactly_once, ps);
},
"c2_h_close1",
[&] {
MQTT_CHK("c2_h_close2");
finish();
}
);
BOOST_TEST(ret);

}
);
c1->set_v5_puback_handler(
[&chk]
(packet_id_t, MQTT_NS::v5::puback_reason_code, MQTT_NS::v5::properties /*props*/) {
MQTT_CHK("c1_h_puback");
return true;
}
);
c1->set_v5_pubrec_handler(
[&chk]
(packet_id_t, MQTT_NS::v5::pubrec_reason_code, MQTT_NS::v5::properties /*props*/) {
MQTT_CHK("c1_h_pubrec");
return true;
}
);
c1->set_v5_pubcomp_handler(
[&chk, &c2, &message_timeout, &timeout]
(packet_id_t, MQTT_NS::v5::pubcomp_reason_code, MQTT_NS::v5::properties /*props*/) {
MQTT_CHK("c1_h_pubcomp");

// Reconnect when messages are timed out
timeout.expires_after(std::chrono::seconds(1 + message_timeout));
timeout.async_wait(
[&c2](MQTT_NS::error_code ec) {
if (!ec) {
c2->connect(
MQTT_NS::v5::properties{
MQTT_NS::v5::property::session_expiry_interval(
MQTT_NS::session_never_expire
)
}
);
}
}
);

return true;
}
);
c2->set_v5_publish_handler(
[&chk, &c1]
(MQTT_NS::optional<packet_id_t> packet_id,
MQTT_NS::publish_options pubopts,
MQTT_NS::buffer topic,
MQTT_NS::buffer contents,
MQTT_NS::v5::properties props) {

// We should not received any published message when offline messages timeout
BOOST_TEST(false);
return true;
}
);
c1->set_close_handler(
[&chk, &c2]
() {
MQTT_CHK("c1_h_close");
c2->disconnect();
}
);

// error cases
c1->set_error_handler(
[]
(MQTT_NS::error_code) {
BOOST_CHECK(false);
}
);
c2->set_error_handler(
[]
(MQTT_NS::error_code) {
BOOST_CHECK(false);
}
);

c1->connect();

ioc.run();
BOOST_TEST(chk.all());
th.join();
}

BOOST_AUTO_TEST_SUITE_END()
4 changes: 4 additions & 0 deletions test/combi_test.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ inline void do_test(
[&] {
s->close();
b.clear_all_sessions();
b.clear_all_retained_topics();
}
);
},
Expand Down Expand Up @@ -118,6 +119,7 @@ inline void do_tls_test(
[&] {
s->close();
b.clear_all_sessions();
b.clear_all_retained_topics();
}
);
},
Expand Down Expand Up @@ -167,6 +169,7 @@ inline void do_ws_test(
[&] {
s->close();
b.clear_all_sessions();
b.clear_all_retained_topics();
}
);
},
Expand Down Expand Up @@ -218,6 +221,7 @@ inline void do_tls_ws_test(
[&] {
s->close();
b.clear_all_sessions();
b.clear_all_retained_topics();
}
);
},
Expand Down
Loading

0 comments on commit 091098a

Please sign in to comment.