From 8692200faa7141d92eabf9d5a88acc6a4bf37f34 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Mon, 25 Feb 2019 16:40:14 +0100 Subject: [PATCH 1/8] Connect(): add non-async overload --- lib/base/tcpsocket.hpp | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/lib/base/tcpsocket.hpp b/lib/base/tcpsocket.hpp index 069288aad20..0f8334f0130 100644 --- a/lib/base/tcpsocket.hpp +++ b/lib/base/tcpsocket.hpp @@ -27,6 +27,35 @@ class TcpSocket final : public Socket void Connect(const String& node, const String& service); }; +template +void Connect(Socket& socket, const String& node, const String& service) +{ + using boost::asio::ip::tcp; + + tcp::resolver resolver (socket.get_io_service()); + tcp::resolver::query query (node, service); + auto result (resolver.resolve(query)); + auto current (result.begin()); + + for (;;) { + try { + socket.open(current->endpoint().protocol()); + socket.set_option(tcp::socket::keep_alive(true)); + socket.connect(current->endpoint()); + + break; + } catch (const std::exception&) { + if (++current == result.end()) { + throw; + } + + if (socket.is_open()) { + socket.close(); + } + } + } +} + template void Connect(Socket& socket, const String& node, const String& service, boost::asio::yield_context yc) { From 083cdf262c95441adf97c8b76574c2530702c4e6 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Mon, 25 Feb 2019 17:22:00 +0100 Subject: [PATCH 2/8] Introduce UnbufferedAsioTlsStream#GetPeerCertificate() --- lib/base/tlsstream.cpp | 5 +++++ lib/base/tlsstream.hpp | 1 + lib/remote/apilistener.cpp | 2 +- lib/remote/jsonrpcconnection-pki.cpp | 2 +- 4 files changed, 8 insertions(+), 2 deletions(-) diff --git a/lib/base/tlsstream.cpp b/lib/base/tlsstream.cpp index 38913f28a8b..5f6fe33cf90 100644 --- a/lib/base/tlsstream.cpp +++ b/lib/base/tlsstream.cpp @@ -465,6 +465,11 @@ String UnbufferedAsioTlsStream::GetVerifyError() const return m_VerifyError; } +std::shared_ptr UnbufferedAsioTlsStream::GetPeerCertificate() +{ + return std::shared_ptr(SSL_get_peer_certificate(native_handle()), X509_free); +} + void UnbufferedAsioTlsStream::BeforeHandshake(handshake_type type) { namespace ssl = boost::asio::ssl; diff --git a/lib/base/tlsstream.hpp b/lib/base/tlsstream.hpp index 6156a3d2f80..3974d12b887 100644 --- a/lib/base/tlsstream.hpp +++ b/lib/base/tlsstream.hpp @@ -119,6 +119,7 @@ class UnbufferedAsioTlsStream : public AsioTcpTlsStream bool IsVerifyOK() const; String GetVerifyError() const; + std::shared_ptr GetPeerCertificate(); template inline diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index de6e754c01f..c534c0969ed 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -523,7 +523,7 @@ void ApiListener::NewClientHandlerInternal(boost::asio::yield_context yc, const } }); - std::shared_ptr cert (SSL_get_peer_certificate(sslConn.native_handle()), X509_free); + std::shared_ptr cert (sslConn.GetPeerCertificate()); bool verify_ok = false; String identity; Endpoint::Ptr endpoint; diff --git a/lib/remote/jsonrpcconnection-pki.cpp b/lib/remote/jsonrpcconnection-pki.cpp index 66f88479b17..2f66eb7b555 100644 --- a/lib/remote/jsonrpcconnection-pki.cpp +++ b/lib/remote/jsonrpcconnection-pki.cpp @@ -34,7 +34,7 @@ Value RequestCertificateHandler(const MessageOrigin::Ptr& origin, const Dictiona /* Use the presented client certificate if not provided. */ if (certText.IsEmpty()) { auto stream (origin->FromClient->GetStream()); - cert = std::shared_ptr(SSL_get_peer_certificate(stream->next_layer().native_handle()), X509_free); + cert = stream->next_layer().GetPeerCertificate(); } else { cert = StringToCertificate(certText); } From aa5b21b198a403c03b23893ba2dd36eeb8e338ca Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Mon, 25 Feb 2019 18:12:32 +0100 Subject: [PATCH 3/8] Add non-async overloads for NetString::ReadStringFromStream() and NetString::WriteStringToStream() --- lib/base/netstring.cpp | 102 +++++++++++++++++++++++++++++++++++++++++ lib/base/netstring.hpp | 2 + 2 files changed, 104 insertions(+) diff --git a/lib/base/netstring.cpp b/lib/base/netstring.cpp index 489a8b40db7..2be7675a724 100644 --- a/lib/base/netstring.cpp +++ b/lib/base/netstring.cpp @@ -118,6 +118,85 @@ size_t NetString::WriteStringToStream(const Stream::Ptr& stream, const String& s return msg.GetLength(); } +/** + * Reads data from a stream in netstring format. + * + * @param stream The stream to read from. + * @returns The String that has been read from the IOQueue. + * @exception invalid_argument The input stream is invalid. + * @see https://github.com/PeterScott/netstring-c/blob/master/netstring.c + */ +String NetString::ReadStringFromStream(const std::shared_ptr& stream, + ssize_t maxMessageLength) +{ + namespace asio = boost::asio; + + size_t len = 0; + bool leadingZero = false; + + for (uint_fast8_t readBytes = 0;; ++readBytes) { + char byte = 0; + + { + asio::mutable_buffer byteBuf (&byte, 1); + asio::read(*stream, byteBuf); + } + + if (isdigit(byte)) { + if (readBytes == 9) { + BOOST_THROW_EXCEPTION(std::invalid_argument("Length specifier must not exceed 9 characters")); + } + + if (leadingZero) { + BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (leading zero)")); + } + + len = len * 10u + size_t(byte - '0'); + + if (!readBytes && byte == '0') { + leadingZero = true; + } + } else if (byte == ':') { + if (!readBytes) { + BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (no length specifier)")); + } + + break; + } else { + BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (missing :)")); + } + } + + if (maxMessageLength >= 0 && len > maxMessageLength) { + std::stringstream errorMessage; + errorMessage << "Max data length exceeded: " << (maxMessageLength / 1024) << " KB"; + + BOOST_THROW_EXCEPTION(std::invalid_argument(errorMessage.str())); + } + + String payload; + + if (len) { + payload.Append(len, 0); + + asio::mutable_buffer payloadBuf (&*payload.Begin(), payload.GetLength()); + asio::read(*stream, payloadBuf); + } + + char trailer = 0; + + { + asio::mutable_buffer trailerBuf (&trailer, 1); + asio::read(*stream, trailerBuf); + } + + if (trailer != ',') { + BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (missing ,)")); + } + + return std::move(payload); +} + /** * Reads data from a stream in netstring format. * @@ -197,6 +276,29 @@ String NetString::ReadStringFromStream(const std::shared_ptr& str return std::move(payload); } +/** + * Writes data into a stream using the netstring format and returns bytes written. + * + * @param stream The stream. + * @param str The String that is to be written. + * + * @return The amount of bytes written. + */ +size_t NetString::WriteStringToStream(const std::shared_ptr& stream, const String& str) +{ + namespace asio = boost::asio; + + std::ostringstream msgbuf; + WriteStringToStream(msgbuf, str); + + String msg = msgbuf.str(); + asio::const_buffer msgBuf (msg.CStr(), msg.GetLength()); + + asio::write(*stream, msgBuf); + + return msg.GetLength(); +} + /** * Writes data into a stream using the netstring format and returns bytes written. * diff --git a/lib/base/netstring.hpp b/lib/base/netstring.hpp index f84eac7a313..2d24359075a 100644 --- a/lib/base/netstring.hpp +++ b/lib/base/netstring.hpp @@ -26,9 +26,11 @@ class NetString public: static StreamReadStatus ReadStringFromStream(const Stream::Ptr& stream, String *message, StreamReadContext& context, bool may_wait = false, ssize_t maxMessageLength = -1); + static String ReadStringFromStream(const std::shared_ptr& stream, ssize_t maxMessageLength = -1); static String ReadStringFromStream(const std::shared_ptr& stream, boost::asio::yield_context yc, ssize_t maxMessageLength = -1); static size_t WriteStringToStream(const Stream::Ptr& stream, const String& message); + static size_t WriteStringToStream(const std::shared_ptr& stream, const String& message); static size_t WriteStringToStream(const std::shared_ptr& stream, const String& message, boost::asio::yield_context yc); static void WriteStringToStream(std::ostream& stream, const String& message); From 5367a48712e0fba53bc312a7dbb60a8a675582cd Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Mon, 25 Feb 2019 18:15:47 +0100 Subject: [PATCH 4/8] Add non-async overloads for JsonRpc::ReadMessage() and JsonRpc::SendMessage() --- lib/remote/jsonrpc.cpp | 31 +++++++++++++++++++++++++++++++ lib/remote/jsonrpc.hpp | 2 ++ 2 files changed, 33 insertions(+) diff --git a/lib/remote/jsonrpc.cpp b/lib/remote/jsonrpc.cpp index 03f3c7d0e08..63bc5ff85aa 100644 --- a/lib/remote/jsonrpc.cpp +++ b/lib/remote/jsonrpc.cpp @@ -59,6 +59,25 @@ size_t JsonRpc::SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& me return NetString::WriteStringToStream(stream, json); } +/** + * Sends a message to the connected peer and returns the bytes sent. + * + * @param message The message. + * + * @return The amount of bytes sent. + */ +size_t JsonRpc::SendMessage(const std::shared_ptr& stream, const Dictionary::Ptr& message) +{ + String json = JsonEncode(message); + +#ifdef I2_DEBUG + if (GetDebugJsonRpcCached()) + std::cerr << ConsoleColorTag(Console_ForegroundBlue) << ">> " << json << ConsoleColorTag(Console_Normal) << "\n"; +#endif /* I2_DEBUG */ + + return NetString::WriteStringToStream(stream, json); +} + /** * Sends a message to the connected peer and returns the bytes sent. * @@ -106,6 +125,18 @@ StreamReadStatus JsonRpc::ReadMessage(const Stream::Ptr& stream, String *message return StatusNewItem; } +String JsonRpc::ReadMessage(const std::shared_ptr& stream, ssize_t maxMessageLength) +{ + String jsonString = NetString::ReadStringFromStream(stream, maxMessageLength); + +#ifdef I2_DEBUG + if (GetDebugJsonRpcCached()) + std::cerr << ConsoleColorTag(Console_ForegroundBlue) << "<< " << jsonString << ConsoleColorTag(Console_Normal) << "\n"; +#endif /* I2_DEBUG */ + + return std::move(jsonString); +} + String JsonRpc::ReadMessage(const std::shared_ptr& stream, boost::asio::yield_context yc, ssize_t maxMessageLength) { String jsonString = NetString::ReadStringFromStream(stream, yc, maxMessageLength); diff --git a/lib/remote/jsonrpc.hpp b/lib/remote/jsonrpc.hpp index faf9c07e8d2..98187fe6cf8 100644 --- a/lib/remote/jsonrpc.hpp +++ b/lib/remote/jsonrpc.hpp @@ -22,9 +22,11 @@ class JsonRpc { public: static size_t SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message); + static size_t SendMessage(const std::shared_ptr& stream, const Dictionary::Ptr& message); static size_t SendMessage(const std::shared_ptr& stream, const Dictionary::Ptr& message, boost::asio::yield_context yc); static size_t SendRawMessage(const std::shared_ptr& stream, const String& json, boost::asio::yield_context yc); static StreamReadStatus ReadMessage(const Stream::Ptr& stream, String *message, StreamReadContext& src, bool may_wait = false, ssize_t maxMessageLength = -1); + static String ReadMessage(const std::shared_ptr& stream, ssize_t maxMessageLength = -1); static String ReadMessage(const std::shared_ptr& stream, boost::asio::yield_context yc, ssize_t maxMessageLength = -1); static Dictionary::Ptr DecodeMessage(const String& message); From 30016cac758abaca8f8e640a8f00cbdf3817a09a Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Mon, 25 Feb 2019 18:58:04 +0100 Subject: [PATCH 5/8] Use new I/O engine in PkiUtility::FetchCert() and PkiUtility::RequestCertificate() --- lib/remote/pkiutility.cpp | 92 ++++++++++++++++++++------------------- 1 file changed, 48 insertions(+), 44 deletions(-) diff --git a/lib/remote/pkiutility.cpp b/lib/remote/pkiutility.cpp index e1e78528865..c08989dd8a9 100644 --- a/lib/remote/pkiutility.cpp +++ b/lib/remote/pkiutility.cpp @@ -2,8 +2,11 @@ #include "remote/pkiutility.hpp" #include "remote/apilistener.hpp" +#include "base/defer.hpp" +#include "base/io-engine.hpp" #include "base/logger.hpp" #include "base/application.hpp" +#include "base/tcpsocket.hpp" #include "base/tlsutility.hpp" #include "base/console.hpp" #include "base/tlsstream.hpp" @@ -14,6 +17,7 @@ #include "remote/jsonrpc.hpp" #include #include +#include using namespace icinga; @@ -76,41 +80,43 @@ int PkiUtility::SignCsr(const String& csrfile, const String& certfile) std::shared_ptr PkiUtility::FetchCert(const String& host, const String& port) { - TcpSocket::Ptr client = new TcpSocket(); + std::shared_ptr sslContext; try { - client->Connect(host, port); + sslContext = MakeAsioSslContext(); } catch (const std::exception& ex) { Log(LogCritical, "pki") - << "Cannot connect to host '" << host << "' on port '" << port << "'"; + << "Cannot make SSL context."; Log(LogDebug, "pki") - << "Cannot connect to host '" << host << "' on port '" << port << "':\n" << DiagnosticInformation(ex); + << "Cannot make SSL context:\n" << DiagnosticInformation(ex); return std::shared_ptr(); } - std::shared_ptr sslContext; + auto stream (std::make_shared(IoEngine::Get().GetIoService(), *sslContext, host)); try { - sslContext = MakeSSLContext(); + Connect(stream->lowest_layer(), host, port); } catch (const std::exception& ex) { Log(LogCritical, "pki") - << "Cannot make SSL context."; + << "Cannot connect to host '" << host << "' on port '" << port << "'"; Log(LogDebug, "pki") - << "Cannot make SSL context:\n" << DiagnosticInformation(ex); + << "Cannot connect to host '" << host << "' on port '" << port << "':\n" << DiagnosticInformation(ex); return std::shared_ptr(); } - TlsStream::Ptr stream = new TlsStream(client, host, RoleClient, sslContext); + auto& sslConn (stream->next_layer()); try { - stream->Handshake(); + sslConn.handshake(sslConn.client); } catch (const std::exception& ex) { Log(LogCritical, "pki") << "Client TLS handshake failed. (" << ex.what() << ")"; return std::shared_ptr(); } - return stream->GetPeerCertificate(); + Defer shutdown ([&sslConn]() { sslConn.shutdown(); }); + + return sslConn.GetPeerCertificate(); } int PkiUtility::WriteCert(const std::shared_ptr& cert, const String& trustedfile) @@ -142,41 +148,43 @@ int PkiUtility::GenTicket(const String& cn, const String& salt, std::ostream& ti int PkiUtility::RequestCertificate(const String& host, const String& port, const String& keyfile, const String& certfile, const String& cafile, const std::shared_ptr& trustedCert, const String& ticket) { - TcpSocket::Ptr client = new TcpSocket(); + std::shared_ptr sslContext; try { - client->Connect(host, port); + sslContext = MakeAsioSslContext(certfile, keyfile); } catch (const std::exception& ex) { Log(LogCritical, "cli") - << "Cannot connect to host '" << host << "' on port '" << port << "'"; + << "Cannot make SSL context for cert path: '" << certfile << "' key path: '" << keyfile << "' ca path: '" << cafile << "'."; Log(LogDebug, "cli") - << "Cannot connect to host '" << host << "' on port '" << port << "':\n" << DiagnosticInformation(ex); + << "Cannot make SSL context for cert path: '" << certfile << "' key path: '" << keyfile << "' ca path: '" << cafile << "':\n" << DiagnosticInformation(ex); return 1; } - std::shared_ptr sslContext; + auto stream (std::make_shared(IoEngine::Get().GetIoService(), *sslContext, host)); try { - sslContext = MakeSSLContext(certfile, keyfile); + Connect(stream->lowest_layer(), host, port); } catch (const std::exception& ex) { Log(LogCritical, "cli") - << "Cannot make SSL context for cert path: '" << certfile << "' key path: '" << keyfile << "' ca path: '" << cafile << "'."; + << "Cannot connect to host '" << host << "' on port '" << port << "'"; Log(LogDebug, "cli") - << "Cannot make SSL context for cert path: '" << certfile << "' key path: '" << keyfile << "' ca path: '" << cafile << "':\n" << DiagnosticInformation(ex); + << "Cannot connect to host '" << host << "' on port '" << port << "':\n" << DiagnosticInformation(ex); return 1; } - TlsStream::Ptr stream = new TlsStream(client, host, RoleClient, sslContext); + auto& sslConn (stream->next_layer()); try { - stream->Handshake(); + sslConn.handshake(sslConn.client); } catch (const std::exception& ex) { Log(LogCritical, "cli") << "Client TLS handshake failed: " << DiagnosticInformation(ex, false); return 1; } - std::shared_ptr peerCert = stream->GetPeerCertificate(); + Defer shutdown ([&sslConn]() { sslConn.shutdown(); }); + + auto peerCert (sslConn.GetPeerCertificate()); if (X509_cmp(peerCert.get(), trustedCert.get())) { Log(LogCritical, "cli", "Peer certificate does not match trusted certificate."); @@ -196,36 +204,32 @@ int PkiUtility::RequestCertificate(const String& host, const String& port, const { "params", params } }); - JsonRpc::SendMessage(stream, request); - - String jsonString; Dictionary::Ptr response; - StreamReadContext src; - - for (;;) { - StreamReadStatus srs = JsonRpc::ReadMessage(stream, &jsonString, src); - - if (srs == StatusEof) - break; - if (srs != StatusNewItem) - continue; + try { + JsonRpc::SendMessage(stream, request); + stream->flush(); - response = JsonRpc::DecodeMessage(jsonString); + for (;;) { + response = JsonRpc::DecodeMessage(JsonRpc::ReadMessage(stream)); - if (response && response->Contains("error")) { - Log(LogCritical, "cli", "Could not fetch valid response. Please check the master log (notice or debug)."); + if (response && response->Contains("error")) { + Log(LogCritical, "cli", "Could not fetch valid response. Please check the master log (notice or debug)."); #ifdef I2_DEBUG - /* we shouldn't expose master errors to the user in production environments */ - Log(LogCritical, "cli", response->Get("error")); + /* we shouldn't expose master errors to the user in production environments */ + Log(LogCritical, "cli", response->Get("error")); #endif /* I2_DEBUG */ - return 1; - } + return 1; + } - if (response && (response->Get("id") != msgid)) - continue; + if (response && (response->Get("id") != msgid)) + continue; - break; + break; + } + } catch (...) { + Log(LogCritical, "cli", "Could not fetch valid response. Please check the master log."); + return 1; } if (!response) { From c21e705178fab2b985086441de646b81090c7982 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Thu, 28 Feb 2019 14:54:24 +0100 Subject: [PATCH 6/8] Use new I/O engine in InfluxdbWriter --- lib/perfdata/influxdbwriter.cpp | 136 +++++++++++++++++++------------- lib/perfdata/influxdbwriter.hpp | 10 ++- 2 files changed, 91 insertions(+), 55 deletions(-) diff --git a/lib/perfdata/influxdbwriter.cpp b/lib/perfdata/influxdbwriter.cpp index d732a83f219..bc7916d4e65 100644 --- a/lib/perfdata/influxdbwriter.cpp +++ b/lib/perfdata/influxdbwriter.cpp @@ -9,7 +9,9 @@ #include "icinga/macroprocessor.hpp" #include "icinga/icingaapplication.hpp" #include "icinga/checkcommand.hpp" +#include "base/application.hpp" #include "base/defer.hpp" +#include "base/io-engine.hpp" #include "base/tcpsocket.hpp" #include "base/configtype.hpp" #include "base/objectlock.hpp" @@ -25,9 +27,21 @@ #include "base/tlsutility.hpp" #include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include #include +#include +#include #include using namespace icinga; @@ -156,44 +170,51 @@ void InfluxdbWriter::ExceptionHandler(boost::exception_ptr exp) //TODO: Close the connection, if we keep it open. } -Stream::Ptr InfluxdbWriter::Connect() +InfluxdbWriter::OptionalTlsStream InfluxdbWriter::Connect() { - TcpSocket::Ptr socket = new TcpSocket(); - Log(LogNotice, "InfluxdbWriter") << "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'."; - try { - socket->Connect(GetHost(), GetPort()); - } catch (const std::exception& ex) { - Log(LogWarning, "InfluxdbWriter") - << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'."; - throw ex; - } + OptionalTlsStream stream; + bool ssl = GetSslEnable(); + + if (ssl) { + std::shared_ptr sslContext; - if (GetSslEnable()) { - std::shared_ptr sslContext; try { - sslContext = MakeSSLContext(GetSslCert(), GetSslKey(), GetSslCaCert()); + sslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert()); } catch (const std::exception& ex) { Log(LogWarning, "InfluxdbWriter") << "Unable to create SSL context."; - throw ex; + throw; } - TlsStream::Ptr tlsStream = new TlsStream(socket, GetHost(), RoleClient, sslContext); + stream.first = std::make_shared(IoEngine::Get().GetIoService(), *sslContext, GetHost()); + } else { + stream.second = std::make_shared(IoEngine::Get().GetIoService()); + } + + try { + icinga::Connect(ssl ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort()); + } catch (const std::exception& ex) { + Log(LogWarning, "InfluxdbWriter") + << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'."; + throw; + } + + if (ssl) { + auto& tlsStream (stream.first->next_layer()); + try { - tlsStream->Handshake(); + tlsStream.handshake(tlsStream.client); } catch (const std::exception& ex) { Log(LogWarning, "InfluxdbWriter") << "TLS handshake with host '" << GetHost() << "' failed."; - throw ex; + throw; } - - return tlsStream; - } else { - return new NetworkStream(socket); } + + return std::move(stream); } void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) @@ -429,6 +450,9 @@ void InfluxdbWriter::FlushTimeoutWQ() void InfluxdbWriter::Flush() { + namespace beast = boost::beast; + namespace http = beast::http; + /* Flush can be called from 1) Timeout 2) Threshold 3) on shutdown/reload. */ if (m_DataBuffer.empty()) return; @@ -439,7 +463,7 @@ void InfluxdbWriter::Flush() String body = boost::algorithm::join(m_DataBuffer, "\n"); m_DataBuffer.clear(); - Stream::Ptr stream; + OptionalTlsStream stream; try { stream = Connect(); @@ -449,10 +473,11 @@ void InfluxdbWriter::Flush() return; } - if (!stream) - return; - - Defer close ([&stream]() { stream->Close(); }); + Defer s ([&stream]() { + if (stream.first) { + stream.first->next_layer().shutdown(); + } + }); Url::Ptr url = new Url(); url->SetScheme(GetSslEnable() ? "https" : "http"); @@ -470,59 +495,64 @@ void InfluxdbWriter::Flush() if (!GetPassword().IsEmpty()) url->AddQueryElement("p", GetPassword()); - HttpRequest req(stream); - req.RequestMethod = "POST"; - req.RequestUrl = url; + http::request request (http::verb::post, std::string(url->Format(true)), 10); + + request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion()); + request.set(http::field::host, url->GetHost() + ":" + url->GetPort()); + + request.body() = body; + request.set(http::field::content_length, request.body().size()); try { - req.WriteBody(body.CStr(), body.GetLength()); - req.Finish(); + if (stream.first) { + http::write(*stream.first, request); + stream.first->flush(); + } else { + http::write(*stream.second, request); + stream.second->flush(); + } } catch (const std::exception& ex) { Log(LogWarning, "InfluxdbWriter") << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; - throw ex; + throw; } - HttpResponse resp(stream, req); - StreamReadContext context; + http::parser parser; + beast::flat_buffer buf; try { - while (resp.Parse(context, true) && !resp.Complete) - ; /* Do nothing */ + if (stream.first) { + http::read(*stream.first, buf, parser); + } else { + http::read(*stream.second, buf, parser); + } } catch (const std::exception& ex) { Log(LogWarning, "InfluxdbWriter") << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex); - throw ex; + throw; } - if (!resp.Complete) { - Log(LogWarning, "InfluxdbWriter") - << "Failed to read a complete HTTP response from the InfluxDB server."; - return; - } + auto& response (parser.get()); - if (resp.StatusCode != 204) { + if (response.result() != http::status::no_content) { Log(LogWarning, "InfluxdbWriter") - << "Unexpected response code: " << resp.StatusCode; + << "Unexpected response code: " << response.result(); - String contentType = resp.Headers->Get("content-type"); + auto& contentType (response[http::field::content_type]); if (contentType != "application/json") { Log(LogWarning, "InfluxdbWriter") << "Unexpected Content-Type: " << contentType; return; } - size_t responseSize = resp.GetBodySize(); - boost::scoped_array buffer(new char[responseSize + 1]); - resp.ReadBody(buffer.get(), responseSize); - buffer.get()[responseSize] = '\0'; - Dictionary::Ptr jsonResponse; + auto& body (response.body()); + try { - jsonResponse = JsonDecode(buffer.get()); + jsonResponse = JsonDecode(body); } catch (...) { Log(LogWarning, "InfluxdbWriter") - << "Unable to parse JSON response:\n" << buffer.get(); + << "Unable to parse JSON response:\n" << body; return; } @@ -530,8 +560,6 @@ void InfluxdbWriter::Flush() Log(LogCritical, "InfluxdbWriter") << "InfluxDB error message:\n" << error; - - return; } } diff --git a/lib/perfdata/influxdbwriter.hpp b/lib/perfdata/influxdbwriter.hpp index b3d35d365dd..c5142c0b85d 100644 --- a/lib/perfdata/influxdbwriter.hpp +++ b/lib/perfdata/influxdbwriter.hpp @@ -8,8 +8,13 @@ #include "base/configobject.hpp" #include "base/tcpsocket.hpp" #include "base/timer.hpp" +#include "base/tlsstream.hpp" #include "base/workqueue.hpp" #include +#include +#include +#include +#include namespace icinga { @@ -36,6 +41,9 @@ class InfluxdbWriter final : public ObjectImpl void Pause() override; private: + typedef boost::asio::buffered_stream AsioTcpStream; + typedef std::pair, std::shared_ptr> OptionalTlsStream; + WorkQueue m_WorkQueue{10000000, 1}; Timer::Ptr m_FlushTimer; std::vector m_DataBuffer; @@ -51,7 +59,7 @@ class InfluxdbWriter final : public ObjectImpl static String EscapeKeyOrTagValue(const String& str); static String EscapeValue(const Value& value); - Stream::Ptr Connect(); + OptionalTlsStream Connect(); void AssertOnWorkQueue(); From 0da37d03d89ae1e28648e09166e6babf5ee42025 Mon Sep 17 00:00:00 2001 From: Michael Friedrich Date: Fri, 5 Apr 2019 09:30:26 +0200 Subject: [PATCH 7/8] Debug Console: Use our new I/O engine for HTTP requests: 1) execute-script --- lib/cli/consolecommand.cpp | 241 ++++++++++++++++++++++++++++++------- lib/cli/consolecommand.hpp | 10 +- 2 files changed, 204 insertions(+), 47 deletions(-) diff --git a/lib/cli/consolecommand.cpp b/lib/cli/consolecommand.cpp index e35aa3567a1..3e93e11157a 100644 --- a/lib/cli/consolecommand.cpp +++ b/lib/cli/consolecommand.cpp @@ -14,9 +14,26 @@ #include "base/unixsocket.hpp" #include "base/utility.hpp" #include "base/networkstream.hpp" +#include "base/defer.hpp" +#include "base/io-engine.hpp" +#include "base/stream.hpp" +#include "base/tcpsocket.hpp" /* include global icinga::Connect */ +#include #include "base/exception.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include + + #ifdef HAVE_EDITLINE #include "cli/editline.hpp" #endif /* HAVE_EDITLINE */ @@ -25,7 +42,7 @@ using namespace icinga; namespace po = boost::program_options; static ScriptFrame *l_ScriptFrame; -static ApiClient::Ptr l_ApiClient; +static std::shared_ptr l_TlsStream; static String l_Session; REGISTER_CLICOMMAND("console", ConsoleCommand); @@ -161,11 +178,55 @@ void ConsoleCommand::InitParameters(boost::program_options::options_description& ; } +/** + * Connects to host:port and performs a TLS shandshake + * + * @param host To connect to. + * @param port To connect to. + * + * @returns AsioTlsStream pointer for future HTTP connections. + */ +std::shared_ptr ConsoleCommand::Connect(const String& host, const String& port) +{ + std::shared_ptr sslContext; + + try { + sslContext = MakeAsioSslContext(Empty, Empty, Empty); //TODO: Add support for cert, key, ca parameters + } catch(const std::exception& ex) { + Log(LogCritical, "DebugConsole") + << "Cannot make SSL context: " << ex.what(); + throw; + } + + std::shared_ptr stream = std::make_shared(IoEngine::Get().GetIoService(), *sslContext, host); + + try { + icinga::Connect(stream->lowest_layer(), host, port); + } catch (const std::exception& ex) { + Log(LogWarning, "DebugConsole") + << "Cannot connect to REST API on host '" << host << "' port '" << port << "': " << ex.what(); + throw; + } + + auto& tlsStream (stream->next_layer()); + + try { + tlsStream.handshake(tlsStream.client); + } catch (const std::exception& ex) { + Log(LogWarning, "DebugConsole") + << "TLS handshake with host '" << host << "' failed: " << ex.what(); + throw; + } + + return std::move(stream); +} + + #ifdef HAVE_EDITLINE char *ConsoleCommand::ConsoleCompleteHelper(const char *word, int state) { static std::vector matches; - +/* TODO XXX if (state == 0) { if (!l_ApiClient) matches = ConsoleHandler::GetAutocompletionSuggestions(word, *l_ScriptFrame); @@ -198,6 +259,7 @@ char *ConsoleCommand::ConsoleCompleteHelper(const char *word, int state) return nullptr; return strdup(matches[state].CStr()); + */ } #endif /* HAVE_EDITLINE */ @@ -267,7 +329,8 @@ int ConsoleCommand::Run(const po::variables_map& vm, const std::vector lines; int next_line = 1; @@ -294,11 +357,15 @@ int ConsoleCommand::RunScriptConsole(ScriptFrame& scriptFrame, const String& add l_ScriptFrame = &scriptFrame; l_Session = session; - if (!addr.IsEmpty()) { - Url::Ptr url; + /* User passed --connect and wants to run the expression via REST API. + * Evaluate this now before any user input happens. + */ + Url::Ptr url; + std::shared_ptr tlsStream; + if (!connectAddr.IsEmpty()) { try { - url = new Url(addr); + url = new Url(connectAddr); } catch (const std::exception& ex) { Log(LogCritical, "ConsoleCommand", ex.what()); return EXIT_FAILURE; @@ -315,7 +382,11 @@ int ConsoleCommand::RunScriptConsole(ScriptFrame& scriptFrame, const String& add if (url->GetPort().IsEmpty()) url->SetPort("5665"); - l_ApiClient = new ApiClient(url->GetHost(), url->GetPort(), url->GetUsername(), url->GetPassword()); + try { + l_TlsStream = ConsoleCommand::Connect(url->GetHost(), url->GetPort()); + } catch (const std::exception& ex) { + return EXIT_FAILURE; + } } while (std::cin.good()) { @@ -405,7 +476,8 @@ int ConsoleCommand::RunScriptConsole(ScriptFrame& scriptFrame, const String& add Value result; - if (!l_ApiClient) { + /* Local debug console. */ + if (connectAddr.IsEmpty()) { expr = ConfigCompiler::CompileText(fileName, command); /* This relies on the fact that - for syntax errors - CompileText() @@ -417,25 +489,29 @@ int ConsoleCommand::RunScriptConsole(ScriptFrame& scriptFrame, const String& add } else result = true; } else { - boost::mutex mutex; - boost::condition_variable cv; - bool ready = false; - boost::exception_ptr eptr; - - l_ApiClient->ExecuteScript(l_Session, command, scriptFrame.Sandboxed, - std::bind(&ConsoleCommand::ExecuteScriptCompletionHandler, - std::ref(mutex), std::ref(cv), std::ref(ready), - _1, _2, - std::ref(result), std::ref(eptr))); - - { - boost::mutex::scoped_lock lock(mutex); - while (!ready) - cv.wait(lock); + /* Remote debug console. */ + try { + l_TlsStream = ConsoleCommand::Connect(url->GetHost(), url->GetPort()); + } catch (const std::exception& ex) { + return EXIT_FAILURE; } - if (eptr) - boost::rethrow_exception(eptr); + try { + result = ExecuteScript(l_Session, l_TlsStream, url, command, scriptFrame.Sandboxed); + } catch (const ScriptError&) { + /* Re-throw the exception for the outside try-catch block. */ + boost::rethrow_exception(boost::current_exception()); + } catch (const std::exception& ex) { + Log(LogCritical, "ConsoleCommand") + << "HTTP query failed: " << ex.what(); + +#ifdef HAVE_EDITLINE + /* Ensures that the terminal state is resetted */ + rl_deprep_terminal(); +#endif /* HAVE_EDITLINE */ + + return EXIT_FAILURE; + } } if (commandOnce.IsEmpty()) { @@ -504,34 +580,107 @@ int ConsoleCommand::RunScriptConsole(ScriptFrame& scriptFrame, const String& add return EXIT_SUCCESS; } -void ConsoleCommand::ExecuteScriptCompletionHandler(boost::mutex& mutex, boost::condition_variable& cv, - bool& ready, const boost::exception_ptr& eptr, const Value& result, Value& resultOut, boost::exception_ptr& eptrOut) +/** + * Executes the DSL script via HTTP and returns HTTP and user errors. + * + * @param session Local session handler. + * @param tlsStream AsioTlsStream pointer, must be constructed by the caller. + * @param url Url object, must be constructed by the caller. + * @param command The DSL string. + * @param sandboxed Whether to run this sandboxed. + * @return Result value, also contains user errors. + */ +Value ConsoleCommand::ExecuteScript(const String& session, const std::shared_ptr& tlsStream, + const Url::Ptr& url, const String& command, bool sandboxed) { - if (eptr) { - try { - boost::rethrow_exception(eptr); - } catch (const ScriptError&) { - eptrOut = boost::current_exception(); - } catch (const std::exception& ex) { - Log(LogCritical, "ConsoleCommand") - << "HTTP query failed: " << ex.what(); + namespace beast = boost::beast; + namespace http = beast::http; -#ifdef HAVE_EDITLINE - /* Ensures that the terminal state is resetted */ - rl_deprep_terminal(); -#endif /* HAVE_EDITLINE */ + /* Extend the url parameters for the request. */ + url->SetPath({ "v1", "console", "execute-script" }); - Application::Exit(EXIT_FAILURE); - } + url->SetQuery({ + {"session", session}, + {"command", command}, + {"sandboxed", sandboxed ? "1" : "0"} + }); + + http::request request (http::verb::post, std::string(url->Format(false)), 10); + + request.set(http::field::user_agent, "Icinga/DebugConsole/" + Application::GetAppVersion()); + request.set(http::field::host, url->GetHost() + ":" + url->GetPort()); + + request.set(http::field::accept, "application/json"); + request.set(http::field::authorization, "Basic " + Base64::Encode(url->GetUsername() + ":" + url->GetPassword())); + + try { + http::write(*tlsStream, request); + tlsStream->flush(); + } catch (const std::exception& ex) { + Log(LogWarning, "DebugConsole") + << "Cannot write HTTP request to REST API at URL '" << url->Format(true) << "': " << ex.what(); + throw; } - resultOut = result; + http::parser parser; + beast::flat_buffer buf; - { - boost::mutex::scoped_lock lock(mutex); - ready = true; - cv.notify_all(); + try { + http::read(*tlsStream, buf, parser); + } catch (const std::exception& ex) { + Log(LogWarning, "DebugConsole") + << "Failed to parse HTTP response from REST API at URL '" << url->Format(true) << "': " << ex.what(); + throw; + } + + auto& response (parser.get()); + + /* Handle HTTP errors first. */ + if (response.result() != http::status::ok) { + + std::string message = "HTTP request failed; Code: " + Convert::ToString(response.result()) + "; Body: " + response.body(); + BOOST_THROW_EXCEPTION(ScriptError(message)); + } + + Dictionary::Ptr jsonResponse; + auto& body (response.body()); + + try { + jsonResponse = JsonDecode(body); + } catch (...) { + std::string message = "Cannot parse JSON response body: " + response.body(); + BOOST_THROW_EXCEPTION(ScriptError(message)); } + + /* Extract the result, and handle user input errors too. */ + Array::Ptr results = jsonResponse->Get("results"); + Value result; + + if (results && results->GetLength() > 0) { + Dictionary::Ptr resultInfo = results->Get(0); + + if (resultInfo->Get("code") >= 200 && resultInfo->Get("code") <= 299) { + result = resultInfo->Get("result"); + } else { + String errorMessage = resultInfo->Get("status"); + + DebugInfo di; + Dictionary::Ptr debugInfo = resultInfo->Get("debug_info"); + + if (debugInfo) { + di.Path = debugInfo->Get("path"); + di.FirstLine = debugInfo->Get("first_line"); + di.FirstColumn = debugInfo->Get("first_column"); + di.LastLine = debugInfo->Get("last_line"); + di.LastColumn = debugInfo->Get("last_column"); + } + + bool incompleteExpression = resultInfo->Get("incomplete_expression"); + BOOST_THROW_EXCEPTION(ScriptError(errorMessage, di, incompleteExpression)); + } + } + + return result; } void ConsoleCommand::AutocompleteScriptCompletionHandler(boost::mutex& mutex, boost::condition_variable& cv, diff --git a/lib/cli/consolecommand.hpp b/lib/cli/consolecommand.hpp index f36f37cd9fd..1b76bfa8757 100644 --- a/lib/cli/consolecommand.hpp +++ b/lib/cli/consolecommand.hpp @@ -6,6 +6,9 @@ #include "cli/clicommand.hpp" #include "base/exception.hpp" #include "base/scriptframe.hpp" +#include "base/tlsstream.hpp" +#include "remote/url.hpp" + namespace icinga { @@ -29,7 +32,7 @@ class ConsoleCommand final : public CLICommand boost::program_options::options_description& hiddenDesc) const override; int Run(const boost::program_options::variables_map& vm, const std::vector& ap) const override; - static int RunScriptConsole(ScriptFrame& scriptFrame, const String& addr = String(), + static int RunScriptConsole(ScriptFrame& scriptFrame, const String& connectAddr = String(), const String& session = String(), const String& commandOnce = String(), const String& commandOnceFileName = String(), bool syntaxOnly = false); @@ -37,6 +40,11 @@ class ConsoleCommand final : public CLICommand mutable boost::mutex m_Mutex; mutable boost::condition_variable m_CV; + static std::shared_ptr Connect(const String& host, const String& port); + + static Value ExecuteScript(const String& session, const std::shared_ptr& tlsStream, + const Url::Ptr& url, const String& command, bool sandboxed); + static void ExecuteScriptCompletionHandler(boost::mutex& mutex, boost::condition_variable& cv, bool& ready, const boost::exception_ptr& eptr, const Value& result, Value& resultOut, boost::exception_ptr& eptrOut); From 62b577909109ad47b754bf430d02570d80a760e2 Mon Sep 17 00:00:00 2001 From: Michael Friedrich Date: Fri, 5 Apr 2019 16:35:35 +0200 Subject: [PATCH 8/8] Debug Console: Refactor code into generic HTTP Requests --- lib/cli/consolecommand.cpp | 84 ++++++++++++++++++++------------------ lib/cli/consolecommand.hpp | 2 + 2 files changed, 47 insertions(+), 39 deletions(-) diff --git a/lib/cli/consolecommand.cpp b/lib/cli/consolecommand.cpp index 3e93e11157a..facffdcb2c9 100644 --- a/lib/cli/consolecommand.cpp +++ b/lib/cli/consolecommand.cpp @@ -226,14 +226,11 @@ std::shared_ptr ConsoleCommand::Connect(const String& host, const char *ConsoleCommand::ConsoleCompleteHelper(const char *word, int state) { static std::vector matches; -/* TODO XXX +/* if (state == 0) { - if (!l_ApiClient) + if (!l_TlsStream) matches = ConsoleHandler::GetAutocompletionSuggestions(word, *l_ScriptFrame); else { - boost::mutex mutex; - boost::condition_variable cv; - bool ready = false; Array::Ptr suggestions; l_ApiClient->AutocompleteScript(l_Session, word, l_ScriptFrame->Sandboxed, @@ -242,11 +239,6 @@ char *ConsoleCommand::ConsoleCompleteHelper(const char *word, int state) _1, _2, std::ref(suggestions))); - { - boost::mutex::scoped_lock lock(mutex); - while (!ready) - cv.wait(lock); - } matches.clear(); @@ -581,44 +573,31 @@ int ConsoleCommand::RunScriptConsole(ScriptFrame& scriptFrame, const String& con } /** - * Executes the DSL script via HTTP and returns HTTP and user errors. + * Sends the request via REST API and returns the parsed response. * - * @param session Local session handler. - * @param tlsStream AsioTlsStream pointer, must be constructed by the caller. - * @param url Url object, must be constructed by the caller. - * @param command The DSL string. - * @param sandboxed Whether to run this sandboxed. - * @return Result value, also contains user errors. + * @param tlsStream Caller must prepare TLS stream/handshake. + * @param url Fully prepared Url object. + * @return A dictionary decoded from JSON. */ -Value ConsoleCommand::ExecuteScript(const String& session, const std::shared_ptr& tlsStream, - const Url::Ptr& url, const String& command, bool sandboxed) +Dictionary::Ptr ConsoleCommand::SendRequest(const std::shared_ptr& tlsStream, const Url::Ptr& url) { namespace beast = boost::beast; namespace http = beast::http; - /* Extend the url parameters for the request. */ - url->SetPath({ "v1", "console", "execute-script" }); - - url->SetQuery({ - {"session", session}, - {"command", command}, - {"sandboxed", sandboxed ? "1" : "0"} - }); - - http::request request (http::verb::post, std::string(url->Format(false)), 10); + http::request request(http::verb::post, std::string(url->Format(false)), 10); request.set(http::field::user_agent, "Icinga/DebugConsole/" + Application::GetAppVersion()); request.set(http::field::host, url->GetHost() + ":" + url->GetPort()); - request.set(http::field::accept, "application/json"); - request.set(http::field::authorization, "Basic " + Base64::Encode(url->GetUsername() + ":" + url->GetPassword())); + request.set(http::field::accept, "application/json"); + request.set(http::field::authorization, "Basic " + Base64::Encode(url->GetUsername() + ":" + url->GetPassword())); try { http::write(*tlsStream, request); tlsStream->flush(); - } catch (const std::exception& ex) { + } catch (const std::exception &ex) { Log(LogWarning, "DebugConsole") - << "Cannot write HTTP request to REST API at URL '" << url->Format(true) << "': " << ex.what(); + << "Cannot write HTTP request to REST API at URL '" << url->Format(true) << "': " << ex.what(); throw; } @@ -627,23 +606,23 @@ Value ConsoleCommand::ExecuteScript(const String& session, const std::shared_ptr try { http::read(*tlsStream, buf, parser); - } catch (const std::exception& ex) { + } catch (const std::exception &ex) { Log(LogWarning, "DebugConsole") - << "Failed to parse HTTP response from REST API at URL '" << url->Format(true) << "': " << ex.what(); + << "Failed to parse HTTP response from REST API at URL '" << url->Format(true) << "': " << ex.what(); throw; } - auto& response (parser.get()); + auto &response(parser.get()); /* Handle HTTP errors first. */ if (response.result() != http::status::ok) { - - std::string message = "HTTP request failed; Code: " + Convert::ToString(response.result()) + "; Body: " + response.body(); + std::string message = "HTTP request failed; Code: " + Convert::ToString(response.result()) + + "; Body: " + response.body(); BOOST_THROW_EXCEPTION(ScriptError(message)); } Dictionary::Ptr jsonResponse; - auto& body (response.body()); + auto &body(response.body()); try { jsonResponse = JsonDecode(body); @@ -652,6 +631,33 @@ Value ConsoleCommand::ExecuteScript(const String& session, const std::shared_ptr BOOST_THROW_EXCEPTION(ScriptError(message)); } + return jsonResponse; +} + +/** + * Executes the DSL script via HTTP and returns HTTP and user errors. + * + * @param session Local session handler. + * @param tlsStream AsioTlsStream pointer, must be constructed by the caller. + * @param url Url object, must be constructed by the caller. + * @param command The DSL string. + * @param sandboxed Whether to run this sandboxed. + * @return Result value, also contains user errors. + */ +Value ConsoleCommand::ExecuteScript(const String& session, const std::shared_ptr& tlsStream, + const Url::Ptr& url, const String& command, bool sandboxed) +{ + /* Extend the url parameters for the request. */ + url->SetPath({"v1", "console", "execute-script"}); + + url->SetQuery({ + {"session", session}, + {"command", command}, + {"sandboxed", sandboxed ? "1" : "0"} + }); + + Dictionary::Ptr jsonResponse = SendRequest(tlsStream, url); + /* Extract the result, and handle user input errors too. */ Array::Ptr results = jsonResponse->Get("results"); Value result; diff --git a/lib/cli/consolecommand.hpp b/lib/cli/consolecommand.hpp index 1b76bfa8757..6817b6157f4 100644 --- a/lib/cli/consolecommand.hpp +++ b/lib/cli/consolecommand.hpp @@ -45,6 +45,8 @@ class ConsoleCommand final : public CLICommand static Value ExecuteScript(const String& session, const std::shared_ptr& tlsStream, const Url::Ptr& url, const String& command, bool sandboxed); + static Dictionary::Ptr SendRequest(const std::shared_ptr& tlsStream, const Url::Ptr& url); + static void ExecuteScriptCompletionHandler(boost::mutex& mutex, boost::condition_variable& cv, bool& ready, const boost::exception_ptr& eptr, const Value& result, Value& resultOut, boost::exception_ptr& eptrOut);