From ad283808847916470f8f2dad9c5a8d7d8f8a6e2a Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 5 Jun 2019 10:32:20 +0200 Subject: [PATCH] Close server connections and shutdown coroutines immediately on disconnect --- lib/remote/httpserverconnection.cpp | 16 ++++++++++++---- lib/remote/httpserverconnection.hpp | 2 ++ lib/remote/jsonrpcconnection-heartbeat.cpp | 8 ++++---- lib/remote/jsonrpcconnection.cpp | 19 ++++++++++++++----- lib/remote/jsonrpcconnection.hpp | 2 ++ 5 files changed, 34 insertions(+), 13 deletions(-) diff --git a/lib/remote/httpserverconnection.cpp b/lib/remote/httpserverconnection.cpp index 9d563af7604..1d407d06285 100644 --- a/lib/remote/httpserverconnection.cpp +++ b/lib/remote/httpserverconnection.cpp @@ -31,7 +31,8 @@ using namespace icinga; auto const l_ServerHeader ("Icinga/" + Application::GetAppVersion()); HttpServerConnection::HttpServerConnection(const String& identity, bool authenticated, const std::shared_ptr& stream) - : m_Stream(stream), m_Seen(Utility::GetTime()), m_IoStrand(stream->get_executor().context()), m_ShuttingDown(false), m_HasStartedStreaming(false) + : m_Stream(stream), m_Seen(Utility::GetTime()), m_IoStrand(stream->get_executor().context()), m_ShuttingDown(false), m_HasStartedStreaming(false), + m_CheckLivenessTimer(stream->get_executor().context()) { if (authenticated) { m_ApiUser = ApiUser::GetByClientCN(identity); @@ -80,6 +81,13 @@ void HttpServerConnection::Disconnect() } catch (...) { } + try { + m_Stream->lowest_layer().cancel(); + } catch (...) { + } + + m_CheckLivenessTimer.cancel(); + auto listener (ApiListener::GetInstance()); if (listener) { @@ -529,11 +537,11 @@ void HttpServerConnection::ProcessMessages(boost::asio::yield_context yc) void HttpServerConnection::CheckLiveness(boost::asio::yield_context yc) { - boost::asio::deadline_timer timer (m_Stream->get_executor().context()); + boost::system::error_code ec; for (;;) { - timer.expires_from_now(boost::posix_time::seconds(5)); - timer.async_wait(yc); + m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(5)); + m_CheckLivenessTimer.async_wait(yc[ec]); if (m_ShuttingDown) { break; diff --git a/lib/remote/httpserverconnection.hpp b/lib/remote/httpserverconnection.hpp index b32db86a6c9..6586e0ff1cc 100644 --- a/lib/remote/httpserverconnection.hpp +++ b/lib/remote/httpserverconnection.hpp @@ -7,6 +7,7 @@ #include "base/string.hpp" #include "base/tlsstream.hpp" #include +#include #include #include @@ -39,6 +40,7 @@ class HttpServerConnection final : public Object boost::asio::io_service::strand m_IoStrand; bool m_ShuttingDown; bool m_HasStartedStreaming; + boost::asio::deadline_timer m_CheckLivenessTimer; void ProcessMessages(boost::asio::yield_context yc); void CheckLiveness(boost::asio::yield_context yc); diff --git a/lib/remote/jsonrpcconnection-heartbeat.cpp b/lib/remote/jsonrpcconnection-heartbeat.cpp index 93f2a774c0b..993877e4e40 100644 --- a/lib/remote/jsonrpcconnection-heartbeat.cpp +++ b/lib/remote/jsonrpcconnection-heartbeat.cpp @@ -7,9 +7,9 @@ #include "base/configtype.hpp" #include "base/logger.hpp" #include "base/utility.hpp" -#include #include #include +#include using namespace icinga; @@ -17,11 +17,11 @@ REGISTER_APIFUNCTION(Heartbeat, event, &JsonRpcConnection::HeartbeatAPIHandler); void JsonRpcConnection::HandleAndWriteHeartbeats(boost::asio::yield_context yc) { - boost::asio::deadline_timer timer (m_Stream->get_executor().context()); + boost::system::error_code ec; for (;;) { - timer.expires_from_now(boost::posix_time::seconds(10)); - timer.async_wait(yc); + m_HeartbeatTimer.expires_from_now(boost::posix_time::seconds(10)); + m_HeartbeatTimer.async_wait(yc[ec]); if (m_ShuttingDown) { break; diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index 251acbc0404..53c3c58e5e9 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -16,9 +16,9 @@ #include "base/tlsstream.hpp" #include #include -#include #include #include +#include #include using namespace icinga; @@ -32,7 +32,8 @@ JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated, const std::shared_ptr& stream, ConnectionRole role) : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role), m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_NextHeartbeat(0), m_IoStrand(stream->get_executor().context()), - m_OutgoingMessagesQueued(stream->get_executor().context()), m_WriterDone(stream->get_executor().context()), m_ShuttingDown(false) + m_OutgoingMessagesQueued(stream->get_executor().context()), m_WriterDone(stream->get_executor().context()), m_ShuttingDown(false), + m_CheckLivenessTimer(stream->get_executor().context()), m_HeartbeatTimer(stream->get_executor().context()) { if (authenticated) m_Endpoint = Endpoint::GetByName(identity); @@ -206,6 +207,14 @@ void JsonRpcConnection::Disconnect() } catch (...) { } + try { + m_Stream->lowest_layer().cancel(); + } catch (...) { + } + + m_CheckLivenessTimer.cancel(); + m_HeartbeatTimer.cancel(); + CpuBoundWork removeClient (yc); if (m_Endpoint) { @@ -310,11 +319,11 @@ Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary:: void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc) { - boost::asio::deadline_timer timer (m_Stream->get_executor().context()); + boost::system::error_code ec; for (;;) { - timer.expires_from_now(boost::posix_time::seconds(30)); - timer.async_wait(yc); + m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(30)); + m_CheckLivenessTimer.async_wait(yc[ec]); if (m_ShuttingDown) { break; diff --git a/lib/remote/jsonrpcconnection.hpp b/lib/remote/jsonrpcconnection.hpp index 994dd736811..caca1cb72f7 100644 --- a/lib/remote/jsonrpcconnection.hpp +++ b/lib/remote/jsonrpcconnection.hpp @@ -11,6 +11,7 @@ #include "base/workqueue.hpp" #include #include +#include #include #include @@ -77,6 +78,7 @@ class JsonRpcConnection final : public Object AsioConditionVariable m_OutgoingMessagesQueued; AsioConditionVariable m_WriterDone; bool m_ShuttingDown; + boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer; void HandleIncomingMessages(boost::asio::yield_context yc); void WriteOutgoingMessages(boost::asio::yield_context yc);