Skip to content

Commit

Permalink
Merge pull request #894 from redboltz/refine_res_topic_life_management
Browse files Browse the repository at this point in the history
Refine res topic life management
  • Loading branch information
redboltz authored Oct 19, 2021
2 parents 0a86d6f + 21ebcce commit c1c3d9e
Show file tree
Hide file tree
Showing 6 changed files with 505 additions and 70 deletions.
47 changes: 42 additions & 5 deletions include/mqtt/broker/broker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ class broker_t {
optional<std::chrono::steady_clock::duration> session_expiry_interval;
optional<std::chrono::steady_clock::duration> will_expiry_interval;
v5::properties connack_props;

bool response_topic_requested = false;
if (ep.get_protocol_version() == protocol_version::v5) {
{
auto v = get_property<v5::property::session_expiry_interval>(props);
Expand All @@ -1026,9 +1026,7 @@ class broker_t {
{
auto v = get_property<v5::property::request_response_information>(props);
if (v && v.value().val() == 1) {
connack_props.emplace_back(
v5::property::response_topic(allocate_buffer(create_uuid_string()))
);
response_topic_requested = true;
}
}

Expand Down Expand Up @@ -1109,8 +1107,34 @@ class broker_t {
break;
}

auto set_response_topic =
[this, response_topic_requested, &connack_props](session_state& s) {
if (response_topic_requested) {
auto response_topic = [&] {
if (auto rt_opt = s.get_response_topic()) {
return rt_opt.value();
}
auto rt = create_uuid_string();
s.set_response_topic(rt);
return rt;
} ();
s.set_clean_handler(
[this, response_topic] {
std::lock_guard<mutex> g(mtx_retains_);
retains_.erase(response_topic);
}
);
connack_props.emplace_back(
v5::property::response_topic(
allocate_buffer(response_topic)
)
);
}
};

auto send_connack =
[&](bool session_present, std::function<void(error_code)> finish = [](error_code){}) {
[this, &ep, &connack_props]
(bool session_present, std::function<void(error_code)> finish = [](error_code){}) {
// Reply to the connect message.
switch (ep.get_protocol_version()) {
case protocol_version::v3_1_1:
Expand Down Expand Up @@ -1158,6 +1182,7 @@ class broker_t {
}
};


/**
* http://docs.oasis-open.org/mqtt/mqtt/v5.0/cs02/mqtt-v5.0-cs02.html#_Toc514345311
* 3.1.2.4 Clean Start
Expand Down Expand Up @@ -1192,6 +1217,8 @@ class broker_t {
force_move(will_expiry_interval),
force_move(session_expiry_interval)
);
// set_response_topic never modify key part
set_response_topic(const_cast<session_state&>(*it));
send_connack(false);
}
else if (it->online()) {
Expand All @@ -1204,6 +1231,8 @@ class broker_t {
<< MQTT_ADD_VALUE(address, this)
<< "cid:" << client_id
<< "online connection exists, discard old one due to new one's clean_start and renew";
// set_response_topic never modify key part
set_response_topic(const_cast<session_state&>(*it));
send_connack(false);
idx.modify(
it,
Expand All @@ -1222,6 +1251,8 @@ class broker_t {
<< MQTT_ADD_VALUE(address, this)
<< "cid:" << client_id
<< "online connection exists, inherit old one and renew";
// set_response_topic never modify key part
set_response_topic(const_cast<session_state&>(*it));
send_connack(
true,
[
Expand Down Expand Up @@ -1279,6 +1310,8 @@ class broker_t {
force_move(session_expiry_interval)
);
BOOST_ASSERT(inserted);
// set_response_topic never modify key part
set_response_topic(const_cast<session_state&>(*it));
send_connack(false);
}
}
Expand All @@ -1290,6 +1323,8 @@ class broker_t {
<< MQTT_ADD_VALUE(address, this)
<< "cid:" << client_id
<< "offline connection exists, discard old one due to new one's clean_start and renew";
// set_response_topic never modify key part
set_response_topic(const_cast<session_state&>(*it));
send_connack(false);
idx.modify(
it,
Expand All @@ -1309,6 +1344,8 @@ class broker_t {
<< MQTT_ADD_VALUE(address, this)
<< "cid:" << client_id
<< "offline connection exists, inherit old one and renew";
// set_response_topic never modify key part
set_response_topic(const_cast<session_state&>(*it));
send_connack(
true,
[
Expand Down
16 changes: 16 additions & 0 deletions include/mqtt/broker/session_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,15 @@ struct session_state {
}
}

void set_clean_handler(std::function<void()> handler) {
clean_handler_ = force_move(handler);
}

void clean() {
MQTT_LOG("mqtt_broker", trace)
<< MQTT_ADD_VALUE(address, this)
<< "clean";
if (clean_handler_) clean_handler_();
{
std::lock_guard<mutex> g(mtx_inflight_messages_);
inflight_messages_.clear();
Expand Down Expand Up @@ -509,6 +514,14 @@ struct session_state {
return session_expiry_interval_;
}

void set_response_topic(std::string topic) {
response_topic_.emplace(force_move(topic));
}

optional<std::string> get_response_topic() const {
return response_topic_;
}

private:
void send_will_impl() {
if (!will_value_) return;
Expand Down Expand Up @@ -576,6 +589,9 @@ struct session_state {
bool remain_after_close_;

std::set<packet_id_t> qos2_publish_handled_;

optional<std::string> response_topic_;
std::function<void()> clean_handler_;
};

class session_states {
Expand Down
2 changes: 1 addition & 1 deletion include/mqtt/broker/uuid.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ MQTT_BROKER_NS_BEGIN

inline std::string create_uuid_string() {
// See https://github.com/boostorg/uuid/issues/121
static thread_local auto gen = boost::uuids::random_generator();
thread_local auto gen = boost::uuids::random_generator();
return boost::uuids::to_string(gen());
}

Expand Down
1 change: 1 addition & 0 deletions test/system/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ ENDIF ()

IF (MQTT_TEST_5)
LIST (APPEND check_PROGRAMS
st_reqres.cpp
st_resend.cpp
st_retain_1.cpp
st_resend_new_client.cpp
Expand Down
64 changes: 0 additions & 64 deletions test/system/st_connect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1936,70 +1936,6 @@ BOOST_AUTO_TEST_CASE( connack_prop ) {
do_combi_test_sync(test);
}

BOOST_AUTO_TEST_CASE( request_response ) {
auto test = [](boost::asio::io_context& ioc, auto& cs, auto finish, auto& /*b*/) {
auto& c = cs[0];
clear_ordered();
if (c->get_protocol_version() != MQTT_NS::protocol_version::v5) {
finish();
return;
}

c->set_client_id("cid1");
c->set_clean_session(true);
BOOST_TEST(c->connected() == false);

checker chk = {
// connect
cont("h_connack"),
// disconnect
cont("h_close"),
};

c->set_v5_connack_handler(
[&chk, &c]
(bool sp, MQTT_NS::v5::connect_reason_code connect_reason_code, MQTT_NS::v5::properties props) {
MQTT_CHK("h_connack");
BOOST_TEST(c->connected() == true);
BOOST_TEST(sp == false);
BOOST_TEST(connect_reason_code == MQTT_NS::v5::connect_reason_code::success);

std::size_t times = 0;
MQTT_NS::v5::visit_props(
props,
[&](MQTT_NS::v5::property::response_topic const&) {
++times;
},
[](auto){}
);
BOOST_TEST(times == 1);

c->disconnect();
BOOST_TEST(c->connected() == true);
return true;
});
c->set_close_handler(
[&chk, &finish]
() {
MQTT_CHK("h_close");
finish();
});
c->set_error_handler(
[]
(MQTT_NS::error_code) {
BOOST_CHECK(false);
});
c->connect(
MQTT_NS::v5::properties{
MQTT_NS::v5::property::request_response_information(true)
}
);
ioc.run();
BOOST_TEST(chk.all());
};
do_combi_test_sync(test);
}

BOOST_AUTO_TEST_CASE( session_taken_over ) {
auto test = [](boost::asio::io_context& ioc, auto& cs, auto finish, auto& /*b*/) {
auto& c1 = cs[0];
Expand Down
Loading

0 comments on commit c1c3d9e

Please sign in to comment.