Skip to content

Commit

Permalink
Add JSON recordable
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkSeufert committed Dec 15, 2020
1 parent 47cfbfc commit 79e6591
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 248 deletions.
1 change: 1 addition & 0 deletions exporters/elasticsearch/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ cc_library(
],
hdrs = [
"include/opentelemetry/exporters/elasticsearch/es_log_exporter.h",
"include/opentelemetry/exporters/elasticsearch/es_log_recordable.h",
],
copts = [
"-DCURL_STATICLIB",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

#include "nlohmann/json.hpp"
#include "opentelemetry/ext/http/client/curl/http_client_curl.h"
#include "opentelemetry/logs/log_record.h"
#include "opentelemetry/nostd/type_traits.h"
#include "opentelemetry/sdk/logs/exporter.h"
#include "opentelemetry/sdk/logs/log_record.h"

#include <time.h>
#include <iostream>
Expand Down Expand Up @@ -90,13 +90,18 @@ class ElasticsearchLogExporter final : public sdklogs::LogExporter
*/
ElasticsearchLogExporter(const ElasticsearchExporterOptions &options);

/**
* Creates a recordable that stores the data in a JSON object
*/
std::unique_ptr<sdk::logs::Recordable> MakeRecordable() noexcept override;

/**
* Exports a vector of log records to the Elasticsearch instance. Guaranteed to return after a
* timeout specified from the options passed from the constructor.
* @param records A list of log records to send to Elasticsearch.
*/
sdklogs::ExportResult Export(const nostd::span<std::unique_ptr<opentelemetry::logs::LogRecord>>
&records) noexcept override;
sdklogs::ExportResult Export(
const nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records) noexcept override;

/**
* Shutdown this exporter.
Expand All @@ -113,47 +118,6 @@ class ElasticsearchLogExporter final : public sdklogs::LogExporter

// Object that stores the HTTP sessions that have been created
std::unique_ptr<ext::http::client::SessionManager> session_manager_;

/**
* Converts a log record into a nlohmann::json object.
*/
json RecordToJSON(std::unique_ptr<opentelemetry::logs::LogRecord> record);

/**
* Converts a common::AttributeValue into a string, which is used for parsing the attributes
* and resource KeyValueIterables
*/
const std::string ValueToString(const common::AttributeValue &value)
{
switch (value.index())
{
case common::AttributeType::TYPE_BOOL:
return (opentelemetry::nostd::get<bool>(value) ? "true" : "false");
break;
case common::AttributeType::TYPE_INT:
return std::to_string(opentelemetry::nostd::get<int>(value));
break;
case common::AttributeType::TYPE_INT64:
return std::to_string(opentelemetry::nostd::get<int64_t>(value));
break;
case common::AttributeType::TYPE_UINT:
return std::to_string(opentelemetry::nostd::get<unsigned int>(value));
break;
case common::AttributeType::TYPE_UINT64:
return std::to_string(opentelemetry::nostd::get<uint64_t>(value));
break;
case common::AttributeType::TYPE_DOUBLE:
return std::to_string(opentelemetry::nostd::get<double>(value));
break;
case common::AttributeType::TYPE_STRING:
case common::AttributeType::TYPE_CSTRING:
return opentelemetry::nostd::get<opentelemetry::nostd::string_view>(value).data();
break;
default:
return "Invalid type";
break;
}
}
};
} // namespace logs
} // namespace exporter
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <map>
#include <unordered_map>
#include "nlohmann/json.hpp"
#include "opentelemetry/sdk/common/attribute_utils.h" // same as traces/attribute_utils
#include "opentelemetry/sdk/logs/recordable.h"
#include "opentelemetry/version.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace exporter
{
namespace logs
{

/**
* An Elasticsearch Recordable implemenation that stores the 10 fields of the Log Data Model inside
* a JSON object,
*/
class ElasticSearchRecordable final : public sdk::logs::Recordable
{
private:
// Define a JSON object that will be populated with the log data
nlohmann::json json_;

/**
* Converts a common::AttributeValue into a string, which is used for converting a
* common::AttributeValue into a string
*/
const std::string ValueToString(const common::AttributeValue &value)
{
switch (value.index())
{
case common::AttributeType::TYPE_BOOL:
return (opentelemetry::nostd::get<bool>(value) ? "true" : "false");
break;
case common::AttributeType::TYPE_INT:
return std::to_string(opentelemetry::nostd::get<int>(value));
break;
case common::AttributeType::TYPE_INT64:
return std::to_string(opentelemetry::nostd::get<int64_t>(value));
break;
case common::AttributeType::TYPE_UINT:
return std::to_string(opentelemetry::nostd::get<unsigned int>(value));
break;
case common::AttributeType::TYPE_UINT64:
return std::to_string(opentelemetry::nostd::get<uint64_t>(value));
break;
case common::AttributeType::TYPE_DOUBLE:
return std::to_string(opentelemetry::nostd::get<double>(value));
break;
case common::AttributeType::TYPE_STRING:
case common::AttributeType::TYPE_CSTRING:
return opentelemetry::nostd::get<opentelemetry::nostd::string_view>(value).data();
break;
default:
return "Invalid type";
break;
}
}

public:
/**
* Set the severity for this log.
* @param severity the severity of the event
*/
void SetSeverity(opentelemetry::logs::Severity severity) noexcept override
{
// Convert the severity enum to a string
static opentelemetry::nostd::string_view severityStringMap_[25] = {
"kInvalid", "kTrace", "kTrace2", "kTrace3", "kTrace4", "kDebug", "kDebug2",
"kDebug3", "kDebug4", "kInfo", "kInfo2", "kInfo3", "kInfo4", "kWarn",
"kWarn2", "kWarn3", "kWarn4", "kError", "kError2", "kError3", "kError4",
"kFatal", "kFatal2", "kFatal3", "kFatal4"};
json_["severity"] = severityStringMap_[static_cast<int>(severity)];
}

/**
* Set name for this log
* @param name the name to set
*/
void SetName(nostd::string_view name) noexcept override { json_["name"] = name.data(); }

/**
* Set body field for this log.
* @param message the body to set
*/
void SetBody(nostd::string_view message) noexcept override { json_["body"] = message.data(); }

/**
* Set a resource for this log.
* @param name the name of the resource
* @param value the resource value
*/
void SetResource(nostd::string_view key,
const opentelemetry::common::AttributeValue &value) noexcept override
{
json_["resource"][key.data()] = ValueToString(value);
}

/**
* Set an attribute of a log.
* @param key the key of the attribute
* @param value the attribute value
*/
void SetAttribute(nostd::string_view key,
const opentelemetry::common::AttributeValue &value) noexcept override
{
json_["attributes"][key.data()] = ValueToString(value);
}

/**
* Set trace id for this log.
* @param trace_id the trace id to set
*/
void SetTraceId(opentelemetry::trace::TraceId trace_id) noexcept override
{
char trace_buf[32];
trace_id.ToLowerBase16(trace_buf);
json_["traceid"] = std::string(trace_buf, sizeof(trace_buf));
}

/**
* Set span id for this log.
* @param span_id the span id to set
*/
virtual void SetSpanId(opentelemetry::trace::SpanId span_id) noexcept override
{
char span_buf[16];
span_id.ToLowerBase16(span_buf);
json_["spanid"] = std::string(span_buf, sizeof(span_buf));
}

/**
* Inject a trace_flags for this log.
* @param trace_flags the span id to set
*/
void SetTraceFlags(opentelemetry::trace::TraceFlags trace_flags) noexcept override
{
char flag_buf[2];
trace_flags.ToLowerBase16(flag_buf);
json_["traceflags"] = std::string(flag_buf, sizeof(flag_buf));
}

/**
* Set the timestamp for this log.
* @param timestamp the timestamp of the event
*/
void SetTimestamp(core::SystemTimestamp timestamp) noexcept override
{
json_["timestamp"] = timestamp.time_since_epoch().count();
}

/**
* Returns a JSON object contain the log information
*/
nlohmann::json GetJSON() noexcept { return json_; };
};
} // namespace logs
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
82 changes: 13 additions & 69 deletions exporters/elasticsearch/src/es_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "opentelemetry/exporters/elasticsearch/es_log_exporter.h"
#include "opentelemetry/exporters/elasticsearch/es_log_recordable.h"

namespace nostd = opentelemetry::nostd;
namespace sdklogs = opentelemetry::sdk::logs;
Expand Down Expand Up @@ -93,8 +94,13 @@ ElasticsearchLogExporter::ElasticsearchLogExporter(const ElasticsearchExporterOp
: options_{options}, session_manager_{new ext::http::client::curl::SessionManager()}
{}

std::unique_ptr<sdklogs::Recordable> ElasticsearchLogExporter::MakeRecordable() noexcept
{
return std::unique_ptr<sdklogs::Recordable>(new ElasticSearchRecordable);
}

sdklogs::ExportResult ElasticsearchLogExporter::Export(
const nostd::span<std::unique_ptr<opentelemetry::logs::LogRecord>> &records) noexcept
const nostd::span<std::unique_ptr<sdklogs::Recordable>> &records) noexcept
{
// Return failure if this exporter has been shutdown
if (isShutdown_)
Expand All @@ -111,7 +117,7 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export(
for (auto &record : records)
{
// Convert the log record to a JSON object, and store in json array
logs.emplace_back(RecordToJSON(std::move(record)));
// logs.emplace_back(RecordToJSON(std::move(record)));
}

// Create a connection to the ElasticSearch instance
Expand All @@ -126,14 +132,16 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export(

// Add the request body
std::string body = "";
for (int i = 0; i < logs.size(); i++)
for (auto &record : records)
{
// Append {"index":{}} before JSON body, which tells Elasticsearch to write to index specified
// in URI
body += "{\"index\" : {}}\n";

// Add the context of the Log Record
body += logs[i].dump() + "\n";
// Add the context of the Recordable
auto json_record = std::unique_ptr<ElasticSearchRecordable>(
static_cast<ElasticSearchRecordable *>(record.release()));
body += json_record->GetJSON().dump() + "\n";
}
std::vector<uint8_t> body_vec(body.begin(), body.end());
request->SetBody(body_vec);
Expand Down Expand Up @@ -164,7 +172,6 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export(

// Parse the response output to determine if the request wasen't successful
std::string responseBody = handler->GetResponseBody();
std::cout << responseBody << std::endl;
if (responseBody.find("\"failed\" : 0") == std::string::npos)
{
if (options_.console_debug_)
Expand All @@ -190,69 +197,6 @@ bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexc

return true;
}

json ElasticsearchLogExporter::RecordToJSON(std::unique_ptr<opentelemetry::logs::LogRecord> record)
{
// Create a json object to store the LogRecord information in
json log;

// Write the simple fields
log["timestamp"] = record->timestamp.time_since_epoch().count();
log["name"] = record->name;
log["body"] = record->body;

// Write the severity by converting it from its enum to a string
static opentelemetry::nostd::string_view severityStringMap_[25] = {
"kInvalid", "kTrace", "kTrace2", "kTrace3", "kTrace4", "kDebug", "kDebug2",
"kDebug3", "kDebug4", "kInfo", "kInfo2", "kInfo3", "kInfo4", "kWarn",
"kWarn2", "kWarn3", "kWarn4", "kError", "kError2", "kError3", "kError4",
"kFatal", "kFatal2", "kFatal3", "kFatal4"};
log["severity"] = severityStringMap_[static_cast<int>(record->severity)];

// Convert the resources into a JSON object
json resource;
if (record->resource != nullptr)
{
record->resource->ForEachKeyValue([&](nostd::string_view key,
common::AttributeValue value) noexcept {
resource[key.data()] = ValueToString(value);
return true;
});

// Push the attributes JSON object into the main JSON under the "resource" key
log.push_back({"resource", resource});
}

// Convert the attributes into a JSON object
json attributes;
if (record->attributes != nullptr)
{
record->attributes->ForEachKeyValue([&](nostd::string_view key,
common::AttributeValue value) noexcept {
attributes[key.data()] = ValueToString(value);
return true;
});

// Push the attributes JSON object into the main JSON under the "attributes" key
log.push_back({"attributes", attributes});
}

// Convert traceid, spanid, and traceflags into strings
char trace_buf[32];
record->trace_id.ToLowerBase16(trace_buf);
log["traceid"] = std::string(trace_buf, sizeof(trace_buf));

char span_buf[16];
record->span_id.ToLowerBase16(span_buf);
log["spanid"] = std::string(span_buf, sizeof(span_buf));

char flag_buf[2];
record->trace_flags.ToLowerBase16(flag_buf);
log["traceflags"] = std::string(flag_buf, sizeof(flag_buf));

return log;
}

} // namespace logs
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
Loading

0 comments on commit 79e6591

Please sign in to comment.