diff --git a/include/mqtt/endpoint.hpp b/include/mqtt/endpoint.hpp index f5099aaf..179cfa2b 100644 --- a/include/mqtt/endpoint.hpp +++ b/include/mqtt/endpoint.hpp @@ -5329,35 +5329,76 @@ class endpoint : public std::enable_shared_from_this handlers; + handlers.reserve(async_shutdown_handler_queue_.size()); + std::move( + async_shutdown_handler_queue_.begin(), + async_shutdown_handler_queue_.end(), + std::back_inserter(handlers) + ); + async_shutdown_handler_queue_.clear(); + for (auto const& h : handlers) { + if (h) h(boost::system::errc::make_error_code(boost::system::errc::success)); + } + async_shutdown_handler_called_ = true; + }; + s.async_clean_shutdown_and_close( - [this, func, sp = this->shared_from_this(), ssp = socket_sp_ref()](error_code ec) { // *1 + [ + this, + sp = this->shared_from_this(), + ssp = socket_sp_ref(), + call_handlers + ] + (error_code ec) { // *1 MQTT_LOG("mqtt_impl", trace) << MQTT_ADD_VALUE(address, this) << "async_clean_shutdown_and_close ec:" << ec.message(); tim_shutdown_.cancel(); connected_ = false; - if (!async_shutdown_handler_called_) { - async_shutdown_handler_called_ = true; - if (func) func(boost::system::errc::make_error_code(boost::system::errc::success)); - } + call_handlers(); } ); // timeout timer set tim_shutdown_.expires_after(shutdown_timeout); std::weak_ptr wp(std::static_pointer_cast(this->shared_from_this())); tim_shutdown_.async_wait( - [this, func, wp = force_move(wp), ssp = socket_sp_ref()](error_code ec) mutable { + [ + this, + wp = force_move(wp), + ssp = socket_sp_ref(), + call_handlers + ] + (error_code ec) mutable { if (auto sp = wp.lock()) { MQTT_LOG("mqtt_impl", trace) << MQTT_ADD_VALUE(address, this) @@ -5374,18 +5415,15 @@ class endpoint : public std::enable_shared_from_thissocket().post( - [this, func = force_move(func), sp] { - if (!async_shutdown_handler_called_) { - error_code ec; - socket().force_shutdown_and_close(ec); - MQTT_LOG("mqtt_impl", trace) - << MQTT_ADD_VALUE(address, this) - << "force_shutdown_and_close ec:" - << ec.message(); - connected_ = false; - async_shutdown_handler_called_ = true; - if (func) func(boost::system::errc::make_error_code(boost::system::errc::success)); - } + [this, sp, call_handlers = force_move(call_handlers)] { + error_code ec; + socket().force_shutdown_and_close(ec); + MQTT_LOG("mqtt_impl", trace) + << MQTT_ADD_VALUE(address, this) + << "force_shutdown_and_close ec:" + << ec.message(); + call_handlers(); + connected_ = false; } ); } @@ -11597,6 +11635,7 @@ class endpoint : public std::enable_shared_from_this mqtt_connected_{false}; std::atomic shutdown_requested_{false}; std::atomic async_shutdown_handler_called_{false}; + std::vector async_shutdown_handler_queue_; std::string client_id_;