Skip to content

Commit

Permalink
Use new I/O engine in InfluxdbWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
Al2Klimov committed Feb 28, 2019
1 parent 6ca5c61 commit 8b776c9
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 53 deletions.
139 changes: 87 additions & 52 deletions lib/perfdata/influxdbwriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
#include "icinga/macroprocessor.hpp"
#include "icinga/icingaapplication.hpp"
#include "icinga/checkcommand.hpp"
#include "base/application.hpp"
#include "base/defer.hpp"
#include "base/io-engine.hpp"
#include "base/tcpsocket.hpp"
#include "base/configtype.hpp"
#include "base/objectlock.hpp"
Expand All @@ -24,9 +27,21 @@
#include "base/tlsutility.hpp"
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <boost/asio/ssl/context.hpp>
#include <boost/beast/core/flat_buffer.hpp>
#include <boost/beast/http/field.hpp>
#include <boost/beast/http/message.hpp>
#include <boost/beast/http/parser.hpp>
#include <boost/beast/http/read.hpp>
#include <boost/beast/http/status.hpp>
#include <boost/beast/http/string_body.hpp>
#include <boost/beast/http/verb.hpp>
#include <boost/beast/http/write.hpp>
#include <boost/math/special_functions/fpclassify.hpp>
#include <boost/regex.hpp>
#include <boost/scoped_array.hpp>
#include <memory>
#include <string>
#include <utility>

using namespace icinga;
Expand Down Expand Up @@ -155,44 +170,51 @@ void InfluxdbWriter::ExceptionHandler(boost::exception_ptr exp)
//TODO: Close the connection, if we keep it open.
}

Stream::Ptr InfluxdbWriter::Connect()
InfluxdbWriter::OptionalTlsStream InfluxdbWriter::Connect()
{
TcpSocket::Ptr socket = new TcpSocket();

Log(LogNotice, "InfluxdbWriter")
<< "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";

try {
socket->Connect(GetHost(), GetPort());
} catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter")
<< "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw ex;
}
OptionalTlsStream stream;
bool ssl = GetSslEnable();

if (ssl) {
std::shared_ptr<boost::asio::ssl::context> sslContext;

if (GetSslEnable()) {
std::shared_ptr<SSL_CTX> sslContext;
try {
sslContext = MakeSSLContext(GetSslCert(), GetSslKey(), GetSslCaCert());
sslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert());
} catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter")
<< "Unable to create SSL context.";
throw ex;
throw;
}

TlsStream::Ptr tlsStream = new TlsStream(socket, GetHost(), RoleClient, sslContext);
stream.first = std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoService(), *sslContext, GetHost());
} else {
stream.second = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService());
}

try {
icinga::Connect(ssl ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
} catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter")
<< "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw;
}

if (ssl) {
auto& tlsStream (stream.first->next_layer());

try {
tlsStream->Handshake();
tlsStream.handshake(tlsStream.client);
} catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter")
<< "TLS handshake with host '" << GetHost() << "' failed.";
throw ex;
throw;
}

return tlsStream;
} else {
return new NetworkStream(socket);
}

return std::move(stream);
}

void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
Expand Down Expand Up @@ -428,13 +450,16 @@ void InfluxdbWriter::FlushTimeoutWQ()

void InfluxdbWriter::Flush()
{
namespace beast = boost::beast;
namespace http = beast::http;

Log(LogDebug, "InfluxdbWriter")
<< "Flushing data buffer to InfluxDB.";

String body = boost::algorithm::join(m_DataBuffer, "\n");
m_DataBuffer.clear();

Stream::Ptr stream;
OptionalTlsStream stream;

try {
stream = Connect();
Expand All @@ -444,8 +469,15 @@ void InfluxdbWriter::Flush()
return;
}

if (!stream)
return;
Defer s ([&stream]() {
if (stream.first) {
try {
stream.first->next_layer().shutdown();
} catch (...) {
// https://stackoverflow.com/questions/130117/throwing-exceptions-out-of-a-destructor
}
}
});

Url::Ptr url = new Url();
url->SetScheme(GetSslEnable() ? "https" : "http");
Expand All @@ -463,68 +495,71 @@ void InfluxdbWriter::Flush()
if (!GetPassword().IsEmpty())
url->AddQueryElement("p", GetPassword());

HttpRequest req(stream);
req.RequestMethod = "POST";
req.RequestUrl = url;
http::request<http::string_body> request (http::verb::post, std::string(url->Format(true)), 10);

request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion());
request.set(http::field::host, url->GetHost() + ":" + url->GetPort());

request.body() = body;
request.set(http::field::content_length, request.body().size());

try {
req.WriteBody(body.CStr(), body.GetLength());
req.Finish();
if (stream.first) {
http::write(*stream.first, request);
stream.first->flush();
} else {
http::write(*stream.second, request);
stream.second->flush();
}
} catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter")
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw ex;
throw;
}

HttpResponse resp(stream, req);
StreamReadContext context;
http::parser<false, http::string_body> parser;
beast::flat_buffer buf;

try {
while (resp.Parse(context, true) && !resp.Complete)
; /* Do nothing */
if (stream.first) {
http::read(*stream.first, buf, parser);
} else {
http::read(*stream.second, buf, parser);
}
} catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter")
<< "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex);
throw ex;
throw;
}

if (!resp.Complete) {
Log(LogWarning, "InfluxdbWriter")
<< "Failed to read a complete HTTP response from the InfluxDB server.";
return;
}
auto& response (parser.get());

if (resp.StatusCode != 204) {
if (response.result() != http::status::no_content) {
Log(LogWarning, "InfluxdbWriter")
<< "Unexpected response code: " << resp.StatusCode;
<< "Unexpected response code: " << response.result();

String contentType = resp.Headers->Get("content-type");
auto& contentType (response[http::field::content_type]);
if (contentType != "application/json") {
Log(LogWarning, "InfluxdbWriter")
<< "Unexpected Content-Type: " << contentType;
return;
}

size_t responseSize = resp.GetBodySize();
boost::scoped_array<char> buffer(new char[responseSize + 1]);
resp.ReadBody(buffer.get(), responseSize);
buffer.get()[responseSize] = '\0';

Dictionary::Ptr jsonResponse;
auto& body (response.body());

try {
jsonResponse = JsonDecode(buffer.get());
jsonResponse = JsonDecode(body);
} catch (...) {
Log(LogWarning, "InfluxdbWriter")
<< "Unable to parse JSON response:\n" << buffer.get();
<< "Unable to parse JSON response:\n" << body;
return;
}

String error = jsonResponse->Get("error");

Log(LogCritical, "InfluxdbWriter")
<< "InfluxDB error message:\n" << error;

return;
}
}

Expand Down
10 changes: 9 additions & 1 deletion lib/perfdata/influxdbwriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@
#include "base/configobject.hpp"
#include "base/tcpsocket.hpp"
#include "base/timer.hpp"
#include "base/tlsstream.hpp"
#include "base/workqueue.hpp"
#include <fstream>
#include <memory>
#include <utility>
#include <boost/asio/buffered_stream.hpp>
#include <boost/asio/ip/tcp.hpp>

namespace icinga
{
Expand All @@ -36,6 +41,9 @@ class InfluxdbWriter final : public ObjectImpl<InfluxdbWriter>
void Pause() override;

private:
typedef boost::asio::buffered_stream<boost::asio::ip::tcp::socket> AsioTcpStream;
typedef std::pair<std::shared_ptr<AsioTlsStream>, std::shared_ptr<AsioTcpStream>> OptionalTlsStream;

WorkQueue m_WorkQueue{10000000, 1};
Timer::Ptr m_FlushTimer;
std::vector<String> m_DataBuffer;
Expand All @@ -50,7 +58,7 @@ class InfluxdbWriter final : public ObjectImpl<InfluxdbWriter>
static String EscapeKeyOrTagValue(const String& str);
static String EscapeValue(const Value& value);

Stream::Ptr Connect();
OptionalTlsStream Connect();

void AssertOnWorkQueue();

Expand Down

0 comments on commit 8b776c9

Please sign in to comment.