Skip to content

Commit

Permalink
Fixed invalid timing async_shutdown handler call.
Browse files Browse the repository at this point in the history
If the 2nd async_shutdown() is called during the 1st async_shutdown
processing, then the 2nd async_shutdown() is called immediately.
It should be called the 1st handler calling timing.
  • Loading branch information
redboltz committed Sep 28, 2022
1 parent 5726b50 commit 18d8950
Showing 1 changed file with 38 additions and 19 deletions.
57 changes: 38 additions & 19 deletions include/mqtt/endpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5329,35 +5329,52 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "already shutdowned";
if (func) func(boost::system::errc::make_error_code(boost::system::errc::success));
if (async_shutdown_handdler_called_) {
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "async_shutdown handler immediately called";
if (func) func(boost::system::errc::make_error_code(boost::system::errc::success));
}
else {
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "enqueue async_shutdown handler";
async_shutdown_handler_queue_.emplace_back(force_move(func));
}
return;
}
shutdown_requested_ = true;
mqtt_connected_ = false;
async_shutdown_handler_called_ = false;
async_shutdown_handdler_called_ = false;

MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "enqueue async_shutdown handler";
async_shutdown_handler_queue_.emplace_back(force_move(func));

MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "async_clean_shutdown_and_close";
s.async_clean_shutdown_and_close(
[this, func, sp = this->shared_from_this(), ssp = socket_sp_ref()](error_code ec) { // *1
[this, sp = this->shared_from_this(), ssp = socket_sp_ref()](error_code ec) { // *1
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "async_clean_shutdown_and_close ec:"
<< ec.message();
tim_shutdown_.cancel();
connected_ = false;
if (!async_shutdown_handler_called_) {
async_shutdown_handler_called_ = true;
if (func) func(boost::system::errc::make_error_code(boost::system::errc::success));
auto handlers{force_move(async_shutdown_handler_queue_)};
for (auto const& h : handlers) {
if (h) h(boost::system::errc::make_error_code(boost::system::errc::success));
}
async_shutdown_handdler_called_ = true;
}
);
// timeout timer set
tim_shutdown_.expires_after(shutdown_timeout);
std::weak_ptr<this_type> wp(std::static_pointer_cast<this_type>(this->shared_from_this()));
tim_shutdown_.async_wait(
[this, func, wp = force_move(wp), ssp = socket_sp_ref()](error_code ec) mutable {
[this, wp = force_move(wp), ssp = socket_sp_ref()](error_code ec) mutable {
if (auto sp = wp.lock()) {
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
Expand All @@ -5374,18 +5391,19 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
<< MQTT_ADD_VALUE(address, this)
<< "post force_shutdown_and_close";
sp->socket().post(
[this, func = force_move(func), sp] {
if (!async_shutdown_handler_called_) {
error_code ec;
socket().force_shutdown_and_close(ec);
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "force_shutdown_and_close ec:"
<< ec.message();
connected_ = false;
async_shutdown_handler_called_ = true;
if (func) func(boost::system::errc::make_error_code(boost::system::errc::success));
[this, sp] {
error_code ec;
socket().force_shutdown_and_close(ec);
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "force_shutdown_and_close ec:"
<< ec.message();
connected_ = false;
auto handlers{force_move(async_shutdown_handler_queue_)};
for (auto const& h : handlers) {
if (h) h(boost::system::errc::make_error_code(boost::system::errc::success));
}
async_shutdown_handdler_called_ = true;
}
);
}
Expand Down Expand Up @@ -11596,7 +11614,8 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
std::atomic<bool> connected_{false};
std::atomic<bool> mqtt_connected_{false};
std::atomic<bool> shutdown_requested_{false};
std::atomic<bool> async_shutdown_handler_called_{false};
std::atomic<bool> async_shutdown_handdler_called_{false};
std::vector<async_handler_t> async_shutdown_handler_queue_;

std::string client_id_;

Expand Down

0 comments on commit 18d8950

Please sign in to comment.