Skip to content

Commit

Permalink
Merge pull request #950 from redboltz/fix_invalid_timing_async_shutdo…
Browse files Browse the repository at this point in the history
…wn_handler_call

Fixed invalid timing async_shutdown handler call.
  • Loading branch information
redboltz authored Sep 28, 2022
2 parents 5726b50 + 6b24371 commit 0b6a831
Showing 1 changed file with 58 additions and 19 deletions.
77 changes: 58 additions & 19 deletions include/mqtt/endpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5329,35 +5329,76 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "already shutdowned";
if (func) func(boost::system::errc::make_error_code(boost::system::errc::success));
if (async_shutdown_handler_called_) {
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "async_shutdown handler immediately called";
if (func) func(boost::system::errc::make_error_code(boost::system::errc::success));
}
else {
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "enqueue async_shutdown handler";
async_shutdown_handler_queue_.emplace_back(force_move(func));
}
return;
}
shutdown_requested_ = true;
mqtt_connected_ = false;
async_shutdown_handler_called_ = false;

MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "enqueue async_shutdown handler";
async_shutdown_handler_queue_.emplace_back(force_move(func));

MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "async_clean_shutdown_and_close";
auto call_handlers =
[this] {
std::vector<async_handler_t> handlers;
handlers.reserve(async_shutdown_handler_queue_.size());
std::move(
async_shutdown_handler_queue_.begin(),
async_shutdown_handler_queue_.end(),
std::back_inserter(handlers)
);
async_shutdown_handler_queue_.clear();
for (auto const& h : handlers) {
if (h) h(boost::system::errc::make_error_code(boost::system::errc::success));
}
async_shutdown_handler_called_ = true;
};

s.async_clean_shutdown_and_close(
[this, func, sp = this->shared_from_this(), ssp = socket_sp_ref()](error_code ec) { // *1
[
this,
sp = this->shared_from_this(),
ssp = socket_sp_ref(),
call_handlers
]
(error_code ec) { // *1
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "async_clean_shutdown_and_close ec:"
<< ec.message();
tim_shutdown_.cancel();
connected_ = false;
if (!async_shutdown_handler_called_) {
async_shutdown_handler_called_ = true;
if (func) func(boost::system::errc::make_error_code(boost::system::errc::success));
}
call_handlers();
}
);
// timeout timer set
tim_shutdown_.expires_after(shutdown_timeout);
std::weak_ptr<this_type> wp(std::static_pointer_cast<this_type>(this->shared_from_this()));
tim_shutdown_.async_wait(
[this, func, wp = force_move(wp), ssp = socket_sp_ref()](error_code ec) mutable {
[
this,
wp = force_move(wp),
ssp = socket_sp_ref(),
call_handlers
]
(error_code ec) mutable {
if (auto sp = wp.lock()) {
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
Expand All @@ -5374,18 +5415,15 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
<< MQTT_ADD_VALUE(address, this)
<< "post force_shutdown_and_close";
sp->socket().post(
[this, func = force_move(func), sp] {
if (!async_shutdown_handler_called_) {
error_code ec;
socket().force_shutdown_and_close(ec);
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "force_shutdown_and_close ec:"
<< ec.message();
connected_ = false;
async_shutdown_handler_called_ = true;
if (func) func(boost::system::errc::make_error_code(boost::system::errc::success));
}
[this, sp, call_handlers = force_move(call_handlers)] {
error_code ec;
socket().force_shutdown_and_close(ec);
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "force_shutdown_and_close ec:"
<< ec.message();
call_handlers();
connected_ = false;
}
);
}
Expand Down Expand Up @@ -11597,6 +11635,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
std::atomic<bool> mqtt_connected_{false};
std::atomic<bool> shutdown_requested_{false};
std::atomic<bool> async_shutdown_handler_called_{false};
std::vector<async_handler_t> async_shutdown_handler_queue_;

std::string client_id_;

Expand Down

0 comments on commit 0b6a831

Please sign in to comment.