diff --git a/include/mqtt/endpoint.hpp b/include/mqtt/endpoint.hpp index f5099aaf..cfdd3ff2 100644 --- a/include/mqtt/endpoint.hpp +++ b/include/mqtt/endpoint.hpp @@ -5329,35 +5329,52 @@ class endpoint : public std::enable_shared_from_thisshared_from_this(), ssp = socket_sp_ref()](error_code ec) { // *1 + [this, sp = this->shared_from_this(), ssp = socket_sp_ref()](error_code ec) { // *1 MQTT_LOG("mqtt_impl", trace) << MQTT_ADD_VALUE(address, this) << "async_clean_shutdown_and_close ec:" << ec.message(); tim_shutdown_.cancel(); connected_ = false; - if (!async_shutdown_handler_called_) { - async_shutdown_handler_called_ = true; - if (func) func(boost::system::errc::make_error_code(boost::system::errc::success)); + auto handlers{force_move(async_shutdown_handler_queue_)}; + for (auto const& h : handlers) { + if (h) h(boost::system::errc::make_error_code(boost::system::errc::success)); } + async_shutdown_handler_called_ = true; } ); // timeout timer set tim_shutdown_.expires_after(shutdown_timeout); std::weak_ptr wp(std::static_pointer_cast(this->shared_from_this())); tim_shutdown_.async_wait( - [this, func, wp = force_move(wp), ssp = socket_sp_ref()](error_code ec) mutable { + [this, wp = force_move(wp), ssp = socket_sp_ref()](error_code ec) mutable { if (auto sp = wp.lock()) { MQTT_LOG("mqtt_impl", trace) << MQTT_ADD_VALUE(address, this) @@ -5374,18 +5391,19 @@ class endpoint : public std::enable_shared_from_thissocket().post( - [this, func = force_move(func), sp] { - if (!async_shutdown_handler_called_) { - error_code ec; - socket().force_shutdown_and_close(ec); - MQTT_LOG("mqtt_impl", trace) - << MQTT_ADD_VALUE(address, this) - << "force_shutdown_and_close ec:" - << ec.message(); - connected_ = false; - async_shutdown_handler_called_ = true; - if (func) func(boost::system::errc::make_error_code(boost::system::errc::success)); + [this, sp] { + error_code ec; + socket().force_shutdown_and_close(ec); + MQTT_LOG("mqtt_impl", trace) + << MQTT_ADD_VALUE(address, this) + << "force_shutdown_and_close ec:" + << ec.message(); + connected_ = false; + auto handlers{force_move(async_shutdown_handler_queue_)}; + for (auto const& h : handlers) { + if (h) h(boost::system::errc::make_error_code(boost::system::errc::success)); } + async_shutdown_handler_called_ = true; } ); } @@ -11597,6 +11615,7 @@ class endpoint : public std::enable_shared_from_this mqtt_connected_{false}; std::atomic shutdown_requested_{false}; std::atomic async_shutdown_handler_called_{false}; + std::vector async_shutdown_handler_queue_; std::string client_id_;