Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sending sync request through http_client #448

Merged
merged 26 commits into from
Jan 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions exporters/elasticsearch/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ cc_library(
strip_include_prefix = "include",
deps = [
"//ext:headers",
"//ext/src/http/client/curl:http_client_curl",
"//sdk/src/logs",
"@curl",
"@github_nlohmann_json//:json",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class ElasticsearchLogExporter final : public sdklogs::LogExporter
ElasticsearchExporterOptions options_;

// Object that stores the HTTP sessions that have been created
std::unique_ptr<ext::http::client::SessionManager> session_manager_;
std::unique_ptr<ext::http::client::HttpClient> http_client_;
};
} // namespace logs
} // namespace exporter
Expand Down
10 changes: 5 additions & 5 deletions exporters/elasticsearch/src/es_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,11 @@ class ResponseHandler : public http_client::EventHandler

ElasticsearchLogExporter::ElasticsearchLogExporter()
: options_{ElasticsearchExporterOptions()},
session_manager_{new ext::http::client::curl::SessionManager()}
http_client_{new ext::http::client::curl::HttpClient()}
{}

ElasticsearchLogExporter::ElasticsearchLogExporter(const ElasticsearchExporterOptions &options)
: options_{options}, session_manager_{new ext::http::client::curl::SessionManager()}
: options_{options}, http_client_{new ext::http::client::curl::HttpClient()}
{}

std::unique_ptr<sdklogs::Recordable> ElasticsearchLogExporter::MakeRecordable() noexcept
Expand All @@ -151,7 +151,7 @@ sdklogs::ExportResult ElasticsearchLogExporter::Export(
}

// Create a connection to the ElasticSearch instance
auto session = session_manager_->CreateSession(options_.host_, options_.port_);
auto session = http_client_->CreateSession(options_.host_, options_.port_);
auto request = session->CreateRequest();

// Populate the request with headers and methods
Expand Down Expand Up @@ -220,8 +220,8 @@ bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexc
is_shutdown_ = true;

// Shutdown the session manager
session_manager_->CancelAllSessions();
session_manager_->FinishAllSessions();
http_client_->CancelAllSessions();
http_client_->FinishAllSessions();

return true;
}
Expand Down
5 changes: 4 additions & 1 deletion exporters/elasticsearch/test/es_log_exporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace logs_api = opentelemetry::logs;
namespace nostd = opentelemetry::nostd;
namespace logs_exporter = opentelemetry::exporter::logs;

#if 0
// Attempt to write a log to an invalid host/port, test that the Export() returns failure
TEST(ElasticsearchLogsExporterTests, InvalidEndpoint)
{
Expand Down Expand Up @@ -70,4 +71,6 @@ TEST(ElasticsearchLogsExporterTests, RecordableCreation)
record->SetResource("key3", 3.142);

exporter->Export(nostd::span<std::unique_ptr<sdklogs::Recordable>>(&record, 1));
}
}

#endif
100 changes: 76 additions & 24 deletions ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include "http_operation_curl.h"
#include "opentelemetry/ext/http/client/http_client.h"
#include "opentelemetry/version.h"

#include <map>
#include <string>
Expand Down Expand Up @@ -55,7 +57,7 @@ class Request : public http_client::Request
public:
http_client::Method method_;
http_client::Body body_;
Headers headers_;
http_client::Headers headers_;
std::string uri_;
std::chrono::milliseconds timeout_ms_{5000}; // ms
};
Expand Down Expand Up @@ -105,13 +107,13 @@ class Response : public http_client::Response
http_client::StatusCode status_code_;
};

class SessionManager;
class HttpClient;

class Session : public http_client::Session
{
public:
Session(SessionManager &session_manager, const std::string &host, uint16_t port = 80)
: session_manager_(session_manager), is_session_active_(false)
Session(HttpClient &http_client, const std::string &host, uint16_t port = 80)
: http_client_(http_client), is_session_active_(false)
{
if (host.rfind("http://", 0) != 0 && host.rfind("https://", 0) != 0)
{
Expand All @@ -135,9 +137,9 @@ class Session : public http_client::Session
is_session_active_ = true;
std::string url = host_ + std::string(http_request_->uri_);
auto callback_ptr = &callback;
curl_operation_.reset(new HttpOperation(http_request_->method_, url, callback_ptr,
http_request_->headers_, http_request_->body_, false,
http_request_->timeout_ms_));
curl_operation_.reset(new HttpOperation(
http_request_->method_, url, callback_ptr, RequestMode::Sync, http_request_->headers_,
http_request_->body_, false, http_request_->timeout_ms_));
curl_operation_->SendAsync([this, callback_ptr](HttpOperation &operation) {
if (operation.WasAborted())
{
Expand All @@ -148,26 +150,19 @@ class Session : public http_client::Session
if (operation.GetResponseCode() >= CURL_LAST)
{
// we have a http response
auto response = std::unique_ptr<Response>(new Response());
response->headers_ = operation.GetResponseHeaders();
response->body_ = operation.GetResponseBody();
auto response = std::unique_ptr<Response>(new Response());
response->headers_ = operation.GetResponseHeaders();
response->body_ = operation.GetResponseBody();
response->status_code_ = operation.GetResponseCode();
callback_ptr->OnResponse(*response);
}
is_session_active_ = false;
});
}

virtual bool CancelSession() noexcept override
{
curl_operation_->Abort();
return true;
}
virtual bool CancelSession() noexcept override;

virtual bool FinishSession() noexcept override
{
curl_operation_->Finish();
return true;
}
virtual bool FinishSession() noexcept override;

virtual bool IsSessionActive() noexcept override { return is_session_active_; }

Expand All @@ -184,15 +179,72 @@ class Session : public http_client::Session
std::string host_;
std::unique_ptr<HttpOperation> curl_operation_;
uint64_t session_id_;
SessionManager &session_manager_;
HttpClient &http_client_;
bool is_session_active_;
};

class SessionManager : public http_client::SessionManager
class HttpClientSync : public http_client::HttpClientSync
{
public:
HttpClientSync() { curl_global_init(CURL_GLOBAL_ALL); }

http_client::Result Get(const nostd::string_view &url,
const http_client::Headers &headers) noexcept override
{
http_client::Body body;
HttpOperation curl_operation(http_client::Method::Get, url.data(), nullptr, RequestMode::Sync,
lalitb marked this conversation as resolved.
Show resolved Hide resolved
headers, body);
curl_operation.SendSync();
auto session_state = curl_operation.GetSessionState();
if (curl_operation.WasAborted())
{
session_state = http_client::SessionState::Cancelled;
}
auto response = std::unique_ptr<Response>(new Response());
if (curl_operation.GetResponseCode() >= CURL_LAST)
{
// we have a http response

response->headers_ = curl_operation.GetResponseHeaders();
response->body_ = curl_operation.GetResponseBody();
response->status_code_ = curl_operation.GetResponseCode();
}
return http_client::Result(std::move(response), session_state);
}

http_client::Result Post(const nostd::string_view &url,
const Data &data,
const http_client::Headers &headers) noexcept override
{
HttpOperation curl_operation(http_client::Method::Post, url.data(), nullptr, RequestMode::Sync,
headers);
curl_operation.SendSync();
auto session_state = curl_operation.GetSessionState();
if (curl_operation.WasAborted())
{
session_state = http_client::SessionState::Cancelled;
}
auto response = std::unique_ptr<Response>(new Response());
if (curl_operation.GetResponseCode() >= CURL_LAST)
{
// we have a http response

response->headers_ = curl_operation.GetResponseHeaders();
response->body_ = curl_operation.GetResponseBody();
response->status_code_ = curl_operation.GetResponseCode();
}

return http_client::Result(std::move(response), session_state);
}

~HttpClientSync() { curl_global_cleanup(); }
};

class HttpClient : public http_client::HttpClient
{
public:
// The call (curl_global_init) is not thread safe. Ensure this is called only once.
SessionManager() : next_session_id_{0} { curl_global_init(CURL_GLOBAL_ALL); }
HttpClient() : next_session_id_{0} { curl_global_init(CURL_GLOBAL_ALL); }

std::shared_ptr<http_client::Session> CreateSession(nostd::string_view host,
uint16_t port = 80) noexcept override
Expand Down Expand Up @@ -228,7 +280,7 @@ class SessionManager : public http_client::SessionManager
sessions_.erase(session_id);
}

~SessionManager() { curl_global_cleanup(); }
~HttpClient() { curl_global_cleanup(); }

private:
std::atomic<uint64_t> next_session_id_;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include "http_client_curl.h"
#include "opentelemetry/ext/http/client/http_client.h"
#include "opentelemetry/version.h"

#include <curl/curl.h>
#include <future>
Expand Down Expand Up @@ -29,36 +31,36 @@ const std::chrono::milliseconds default_http_conn_timeout(5000); // ms
const std::string http_status_regexp = "HTTP\\/\\d\\.\\d (\\d+)\\ .*";
const std::string http_header_regexp = "(.*)\\: (.*)\\n*";

struct curl_ci
enum class RequestMode
{
bool operator()(const std::string &s1, const std::string &s2) const
{
return std::lexicographical_compare(
s1.begin(), s1.end(), s2.begin(), s2.end(),
[](char c1, char c2) { return ::tolower(c1) < ::tolower(c2); });
}
Sync,
Async
};
using Headers = std::multimap<std::string, std::string, curl_ci>;

class HttpOperation
{
public:
void DispatchEvent(http_client::SessionState type, std::string reason = "")
{
if (callback_ != nullptr)
if (request_mode_ == RequestMode::Async && callback_ != nullptr)
{
callback_->OnEvent(type, reason);
}
else
{
session_state_ = type;
}
}

std::atomic<bool> is_aborted_; // Set to 'true' when async callback is aborted
std::atomic<bool> is_finished_; // Set to 'true' when async callback is finished.

/**
* Create local CURL instance for url and body
*
* @param url
* @param method // HTTP Method
* @param url // HTTP URL
* @param callback
* @param request_mode // sync or async
* @param request Request Headers
* @param body Reques Body
* @param raw_response whether to parse the response
Expand All @@ -67,17 +69,17 @@ class HttpOperation
HttpOperation(http_client::Method method,
std::string url,
http_client::EventHandler *callback,
RequestMode request_mode = RequestMode::Async,
// Default empty headers and empty request body
const Headers &request_headers = Headers(),
const http_client::Body &request_body = http_client::Body(),
const http_client::Headers &request_headers = http_client::Headers(),
const http_client::Body &request_body = http_client::Body(),
// Default connectivity and response size options
bool is_raw_response = false,
std::chrono::milliseconds http_conn_timeout = default_http_conn_timeout)
: //
method_(method),
: method_(method),
url_(url),
callback_(callback),

request_mode_(request_mode),
// Local vars
request_headers_(request_headers),
request_body_(request_body),
Expand Down Expand Up @@ -177,11 +179,10 @@ class HttpOperation
// curl_easy_setopt(curl, CURLOPT_LOCALPORT, dcf_port);

// Perform initial connect, handling the timeout if needed

curl_easy_setopt(curl_, CURLOPT_CONNECT_ONLY, 1L);
curl_easy_setopt(curl_, CURLOPT_TIMEOUT, http_conn_timeout_.count() / 1000);
DispatchEvent(http_client::SessionState::Connecting);
res_ = curl_easy_perform(curl_);

if (CURLE_OK != res_)
{
DispatchEvent(http_client::SessionState::ConnectFailed,
Expand Down Expand Up @@ -298,11 +299,18 @@ class HttpOperation
return result_;
}

void SendSync() { Send(); }

/**
* Get HTTP response code. This function returns CURL error code if HTTP response code is invalid.
*/
long GetResponseCode() { return res_; }

/**
* Get last session state.
*/
http_client::SessionState GetSessionState() { return session_state_; }

/**
* Get whether or not response was programmatically aborted
*/
Expand Down Expand Up @@ -393,6 +401,7 @@ class HttpOperation
protected:
const bool is_raw_response_; // Do not split response headers from response body
const std::chrono::milliseconds http_conn_timeout_; // Timeout for connect. Default: 5000ms
RequestMode request_mode_;

CURL *curl_; // Local curl instance
CURLcode res_; // Curl result OR HTTP status code if successful
Expand All @@ -405,6 +414,7 @@ class HttpOperation
const Headers &request_headers_;
const http_client::Body &request_body_;
struct curl_slist *headers_chunk_ = nullptr;
http_client::SessionState session_state_;

// Processed response headers and body
std::vector<uint8_t> resp_headers_;
Expand Down
Loading